跟着官网学RabbitMQ-PublisherConfirms(发布者确认)
课程地址:https://www.rabbitmq.com/tutorials/tutorial-seven-java
在父项目 rabbitmq-foundation-project
中创建一个 module rabbitmq-publish-confirm
我们这里只做异步确认的代码,不做同步的代码演示。
发布者确认模式确保的是消息已经安全地到达 RabbitMQ Broker。它解决的是“生产者-Broker”之间的可靠性问题。
Producer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
public class Producer {
private final static String QUEUE_NAME = "publish-confirm";
private final static int MESSAGE_COUNT = 10;
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 准备一个倒数闩锁,计数为要发送的消息数量
final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
// 开启发布确认模式
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// ConfirmCallback ackCallback, ConfirmCallback nackCallback
channel.addConfirmListener(
(deliveryTag, multiple) -> {
System.out.println(multiple);
System.out.println("ack" + deliveryTag);
latch.countDown();
},
(deliveryTag, multiple) -> {
System.out.println(multiple);
System.out.println("nack" + deliveryTag);
latch.countDown();
}
);
System.out.println("开始发送 " + MESSAGE_COUNT + " 条消息...");
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "Hello World! line:" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
boolean allConfirmed = latch.await(60, TimeUnit.SECONDS); // 等待最多60秒
if (allConfirmed) {
System.out.println("所有消息都已成功收到确认!");
} else {
System.err.println("超时!不是所有消息都收到了确认。");
}
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.*;
import java.util.function.BooleanSupplier;
public class Producer {
private final static String QUEUE_NAME = "publish-confirm";
private final static int MESSAGE_COUNT = 10;
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 开启发布确认模式
channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// ConfirmCallback ackCallback, ConfirmCallback nackCallback
channel.addConfirmListener(
(deliveryTag, multiple) -> {
if(multiple){
System.out.println("批量ack" + deliveryTag);
}else{
System.out.println("单条ack" + deliveryTag);
}
},
(deliveryTag, multiple) -> {
if(multiple){
System.out.println("批量nack" + deliveryTag);
}else{
System.out.println("单条nack" + deliveryTag);
}
}
);
System.out.println("开始发送 " + MESSAGE_COUNT + " 条消息...");
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = "Hello World! line:" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
TimeUnit.SECONDS.sleep(30);
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
这个代码很简单,就是 Producer 发送了消息我需要之道是不是发送到了,如果没有发送到我应该做什么处理,如果发送到了我应该做什么处理。它和前面的消费者确认没有什么关系,只是两个同步进行保证了消息的可靠
可以看下面的流程图
- 生产者 → 发送消息 → RabbitMQ Broker
- Broker 将消息写入磁盘/同步到镜像节点。
RabbitMQ Broker → 发送 ACK → 生产者
- (到此,发布者确认完成。生产者可以安心了,它的任务结束了。)
- (一段时间后...)
- RabbitMQ Broker → 推送消息 → 消费者
- 消费者执行业务逻辑(例如:更新数据库)。
消费者 → 发送 ACK → RabbitMQ Broker
- (到此,消费者确认完成。)
- RabbitMQ Broker → 从队列中删除消息
来测试下
可以看到这个确认模式会出现之前的消息一起给你发确认导致 3、4、5、6、7、8、9 没有出现,这时候要处理之前的所有没处理过得消息。推荐使用 channel.getNextPublishSeqNo()
获取下一条消息的唯一NO,存起来这样就能实现消息确认了。