课程地址:https://www.rabbitmq.com/tutorials/tutorial-seven-java

在父项目 rabbitmq-foundation-project 中创建一个 module rabbitmq-publish-confirm

我们这里只做异步确认的代码,不做同步的代码演示。

发布者确认模式确保的是消息已经安全地到达 RabbitMQ Broker。它解决的是“生产者-Broker”之间的可靠性问题。

Producer.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class Producer {
    private final static String QUEUE_NAME    = "publish-confirm";
    private final static int    MESSAGE_COUNT = 10;

    public static void main(String[] args) {


        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 准备一个倒数闩锁,计数为要发送的消息数量
            final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
            // 开启发布确认模式
            channel.confirmSelect();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // ConfirmCallback ackCallback, ConfirmCallback nackCallback
            channel.addConfirmListener(
                    (deliveryTag, multiple) -> {
                        System.out.println(multiple);
                        System.out.println("ack" + deliveryTag);
                        latch.countDown();
                    },
                    (deliveryTag, multiple) -> {
                        System.out.println(multiple);
                        System.out.println("nack" + deliveryTag);
                        latch.countDown();
                    }
            );

            System.out.println("开始发送 " + MESSAGE_COUNT + " 条消息...");

            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "Hello World! line:" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }

            boolean allConfirmed = latch.await(60, TimeUnit.SECONDS); // 等待最多60秒

            if (allConfirmed) {
                System.out.println("所有消息都已成功收到确认!");
            } else {
                System.err.println("超时!不是所有消息都收到了确认。");
            }

        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;

public class Producer {
    private final static String QUEUE_NAME    = "publish-confirm";
    private final static int    MESSAGE_COUNT = 10;

    public static void main(String[] args) {


        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 开启发布确认模式
            channel.confirmSelect();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);

            // ConfirmCallback ackCallback, ConfirmCallback nackCallback
            channel.addConfirmListener(
                    (deliveryTag, multiple) -> {
                        if(multiple){
                            System.out.println("批量ack" + deliveryTag);
                        }else{
                            System.out.println("单条ack" + deliveryTag);
                        }

                    },
                    (deliveryTag, multiple) -> {
                        if(multiple){
                            System.out.println("批量nack" + deliveryTag);
                        }else{
                            System.out.println("单条nack" + deliveryTag);
                        }
                    }
            );

            System.out.println("开始发送 " + MESSAGE_COUNT + " 条消息...");

            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String message = "Hello World! line:" + i;
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }

            TimeUnit.SECONDS.sleep(30);


        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

这个代码很简单,就是 Producer 发送了消息我需要之道是不是发送到了,如果没有发送到我应该做什么处理,如果发送到了我应该做什么处理。它和前面的消费者确认没有什么关系,只是两个同步进行保证了消息的可靠

可以看下面的流程图

  1. 生产者 → 发送消息 → RabbitMQ Broker
  2. Broker 将消息写入磁盘/同步到镜像节点。
  3. RabbitMQ Broker → 发送 ACK → 生产者

    • (到此,发布者确认完成。生产者可以安心了,它的任务结束了。)
  4. (一段时间后...)
  5. RabbitMQ Broker → 推送消息 → 消费者
  6. 消费者执行业务逻辑(例如:更新数据库)。
  7. 消费者 → 发送 ACK → RabbitMQ Broker

    • (到此,消费者确认完成。)
  8. RabbitMQ Broker → 从队列中删除消息

来测试下

2025-07-28T05:20:42.png

可以看到这个确认模式会出现之前的消息一起给你发确认导致 3、4、5、6、7、8、9 没有出现,这时候要处理之前的所有没处理过得消息。推荐使用 channel.getNextPublishSeqNo() 获取下一条消息的唯一NO,存起来这样就能实现消息确认了。

标签: java, rabbitmq

添加新评论