Table of Contents
2.Modify application.properties configuration" >2.Modify application.properties configuration
3. Define Exchange and Queue" >3. Define Exchange and Queue
2. Message sending confirmation" >2. Message sending confirmation
1. ConfirmCallback confirmation mode " >1. ConfirmCallback confirmation mode
2、 ReturnCallback 退回模式" >2、 ReturnCallback 退回模式
三、消息接收确认" >三、消息接收确认
1、basicAck" >1、basicAck
2、basicNack" >2、basicNack
3、basicReject" >3、basicReject
四、测试" >四、测试
五、踩坑日志" >五、踩坑日志
1、不消息确认" >1、不消息确认
2、消息无限投递" >2、消息无限投递
Home Java javaTutorial How to confirm with message in springboot + rabbitmq

How to confirm with message in springboot + rabbitmq

Jul 01, 2020 pm 05:35 PM
java rabbitmq springboot

How to confirm with message in springboot + rabbitmq

Recently, the department has called on everyone to organize more technology sharing sessions, saying that it is to activate the company's technical atmosphere, but I, who have seen through everything long ago, know that this T M is just for brushingKPI. However, having said that, this is indeed a good thing. Instead of having those boring wrangling meetings, doing more technical exchanges is still very helpful for personal growth.

So I took the initiative to sign up to participate in the sharing, cough cough cough~, it’s really not for that KPI, I just want to learn with everyone!

How to confirm with message in springboot + rabbitmq

Related learning recommendations: Java video tutorial

What I am sharing this time isspringboot rabbitmq How to implement the message confirmation mechanism, and some pitfall experience in actual development. In fact, the overall content is relatively simple. Sometimes things are so magical, and the simpler things are. The easier it is to make mistakes.

It can be seen that after using RabbitMQ, our business links have obviously become longer. Although the decoupling between systems has been achieved, the scenarios that may cause message loss have also increased. For example:

  • Message producer-> rabbitmq server (message sending failed)

  • rabbitmq server's own failure caused message loss

  • Message consumer - > rabbitmq service (failed to consume messages)

How to confirm with message in springboot + rabbitmq
So if you can’t use middleware, try not to use it. , if you use it just for the sake of use, it will only increase your troubles. After the message confirmation mechanism is turned on, although the accurate delivery of messages is guaranteed to a large extent, due to frequent confirmation interactions, rabbitmq the overall efficiency becomes low and the throughput drops seriously. Messages that are not very important are really not appreciated. It is recommended that you use the message confirmation mechanism.


Let’s first implement the springboot rabbitmq message confirmation mechanism, and then make a detailed analysis of the problems encountered.

1. Prepare the environment

1.Introduce rabbitmq dependency package

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
Copy after login

In the configuration, message confirmation of sender and consumer needs to be enabled.

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
Copy after login

Define switchconfirmTestExchange and queueconfirm_test_queue, and bind the queue to on the switch.

@Configurationpublic class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }}
Copy after login

rabbitmq's message confirmation is divided into two parts: message sending confirmation and message receiving confirmation.

How to confirm with message in springboot + rabbitmq

Sending message confirmation: used to confirm the producerproducer In the process of sending the message to broker, the switch exchange on broker and then delivering it to the queue queue, check whether the message is successfully delivered.

Messages from producer to rabbitmq broker have a confirmCallback confirmation mode.

There is a returnCallback fallback mode for message delivery failure from exchange to queue.

We can use these two Callback to ensure 100% delivery of the message.

As long as the message is received by rabbitmq broker, the confirmCallback callback will be triggered.

@Slf4j
@Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    @Override    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息发送异常!");
        } else {
            log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }}
Copy after login

Implement the interface ConfirmCallback and rewrite its confirm() method. There are three parameters in the method correlationData, ack, cause.

  • correlationData: There is only one id attribute inside the object, which is used to indicate the uniqueness of the current message.
  • ack: The status of message delivery to broker, true indicates success.
  • cause: Indicates the reason for delivery failure.

But the message being received by broker can only mean that it has arrived at the MQ server, and there is no guarantee that the message will be delivered to the target queue. So you need to use returnCallback next.

如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

@Slf4j
@Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }}
Copy after login

实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

@Autowired    private RabbitTemplate rabbitTemplate;

    @Autowired    private ConfirmCallbackService confirmCallbackService;

    @Autowired    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 确保消息发送失败后可以重新返回到队列中
         * 注意:yml需要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消费者确认收到消息后,手动ack回执回调处理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投递到队列失败回调处理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 发送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }
Copy after login

消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 {

    @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具体业务

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {

            if (message.getMessageProperties().getRedelivered()) {

                log.error("消息已重复处理失败,拒绝再次接收...");

                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
            } else {

                log.error("消息即将再次返回队列处理...");

                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }}
Copy after login

消费消息有三种回执方法,我们来分析一下每种方法的含义。

basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

void basicAck(long deliveryTag, boolean multiple)
Copy after login

deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)
Copy after login

deliveryTag:表示消息投递序号。

multiple:是否批量确认。

requeue:值为 true 消息将重新入队列。

basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

void basicReject(long deliveryTag, boolean requeue)
Copy after login

deliveryTag:表示消息投递序号。

requeue:值为 true 消息将重新入队列。

发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。
How to confirm with message in springboot + rabbitmq
用抓包工具Wireshark 观察一下rabbitmq amqp协议交互的变化,也多了 ack 的过程。
How to confirm with message in springboot + rabbitmq

这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。
How to confirm with message in springboot + rabbitmq

在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

@RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消费者 2 号收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
Copy after login

但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

How to confirm with message in springboot + rabbitmq

本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

How to confirm with message in springboot + rabbitmq
而且rabbitmq management 只有一条未被确认的消息。

How to confirm with message in springboot + rabbitmq

经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));
Copy after login

但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

3、重复消费

如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

The above is the detailed content of How to confirm with message in springboot + rabbitmq. For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Roblox: Bubble Gum Simulator Infinity - How To Get And Use Royal Keys
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
Nordhold: Fusion System, Explained
4 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
Mandragora: Whispers Of The Witch Tree - How To Unlock The Grappling Hook
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Hot Topics

Java Tutorial
1670
14
PHP Tutorial
1274
29
C# Tutorial
1256
24
PHP: A Key Language for Web Development PHP: A Key Language for Web Development Apr 13, 2025 am 12:08 AM

PHP is a scripting language widely used on the server side, especially suitable for web development. 1.PHP can embed HTML, process HTTP requests and responses, and supports a variety of databases. 2.PHP is used to generate dynamic web content, process form data, access databases, etc., with strong community support and open source resources. 3. PHP is an interpreted language, and the execution process includes lexical analysis, grammatical analysis, compilation and execution. 4.PHP can be combined with MySQL for advanced applications such as user registration systems. 5. When debugging PHP, you can use functions such as error_reporting() and var_dump(). 6. Optimize PHP code to use caching mechanisms, optimize database queries and use built-in functions. 7

PHP vs. Python: Understanding the Differences PHP vs. Python: Understanding the Differences Apr 11, 2025 am 12:15 AM

PHP and Python each have their own advantages, and the choice should be based on project requirements. 1.PHP is suitable for web development, with simple syntax and high execution efficiency. 2. Python is suitable for data science and machine learning, with concise syntax and rich libraries.

Break or return from Java 8 stream forEach? Break or return from Java 8 stream forEach? Feb 07, 2025 pm 12:09 PM

Java 8 introduces the Stream API, providing a powerful and expressive way to process data collections. However, a common question when using Stream is: How to break or return from a forEach operation? Traditional loops allow for early interruption or return, but Stream's forEach method does not directly support this method. This article will explain the reasons and explore alternative methods for implementing premature termination in Stream processing systems. Further reading: Java Stream API improvements Understand Stream forEach The forEach method is a terminal operation that performs one operation on each element in the Stream. Its design intention is

PHP vs. Other Languages: A Comparison PHP vs. Other Languages: A Comparison Apr 13, 2025 am 12:19 AM

PHP is suitable for web development, especially in rapid development and processing dynamic content, but is not good at data science and enterprise-level applications. Compared with Python, PHP has more advantages in web development, but is not as good as Python in the field of data science; compared with Java, PHP performs worse in enterprise-level applications, but is more flexible in web development; compared with JavaScript, PHP is more concise in back-end development, but is not as good as JavaScript in front-end development.

PHP vs. Python: Core Features and Functionality PHP vs. Python: Core Features and Functionality Apr 13, 2025 am 12:16 AM

PHP and Python each have their own advantages and are suitable for different scenarios. 1.PHP is suitable for web development and provides built-in web servers and rich function libraries. 2. Python is suitable for data science and machine learning, with concise syntax and a powerful standard library. When choosing, it should be decided based on project requirements.

PHP's Impact: Web Development and Beyond PHP's Impact: Web Development and Beyond Apr 18, 2025 am 12:10 AM

PHPhassignificantlyimpactedwebdevelopmentandextendsbeyondit.1)ItpowersmajorplatformslikeWordPressandexcelsindatabaseinteractions.2)PHP'sadaptabilityallowsittoscaleforlargeapplicationsusingframeworkslikeLaravel.3)Beyondweb,PHPisusedincommand-linescrip

PHP: The Foundation of Many Websites PHP: The Foundation of Many Websites Apr 13, 2025 am 12:07 AM

The reasons why PHP is the preferred technology stack for many websites include its ease of use, strong community support, and widespread use. 1) Easy to learn and use, suitable for beginners. 2) Have a huge developer community and rich resources. 3) Widely used in WordPress, Drupal and other platforms. 4) Integrate tightly with web servers to simplify development deployment.

PHP vs. Python: Use Cases and Applications PHP vs. Python: Use Cases and Applications Apr 17, 2025 am 12:23 AM

PHP is suitable for web development and content management systems, and Python is suitable for data science, machine learning and automation scripts. 1.PHP performs well in building fast and scalable websites and applications and is commonly used in CMS such as WordPress. 2. Python has performed outstandingly in the fields of data science and machine learning, with rich libraries such as NumPy and TensorFlow.

See all articles