跟着官网学RabbitMQ-WorkQueues
课程地址:https://www.rabbitmq.com/tutorials/tutorial-two-java
官网的代码是通过 cmd 传参来发送消息的我直接写了一个循环,如果你喜欢发送消息是使用 java -jar xxx.jar args 的话需要你自行修改。
老规矩 在父项目 rabbitmq-foundation-project
中创建一个 module rabbitmq-work-queues-project
直接上代码,这次我们需要一个 Producer
和两个 Consumer
(其实代码一样为了起两个 Consumer 服务省事)
Producer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer extends BaseConfig {
private final static String QUEUE_NAME = "work-queue";
public static void main(String[] args) {
ConnectionFactory factory = getConnectionFactory();
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
// 这个里给队列声明一个持久化队列确保队列在 RabbitMQ 节点重启后仍然存在
// 当 RabbitMQ 退出或崩溃时,除非明确设置,否则它将遗忘队列和消息。要确保消息不丢失,需要满足两个条件:必须将队列和消息都标记为持久化。
// 因为你要是跟着教程走,会修改队列的配置。
// 但你之前已经有相同的队列了这时候不会修改成功的,那么你需要先删除这个队列或者修改 QUEUE_NAME 为新的名称
// 不然就会报错 inequivalent arg 'durable' for queue 'work-queue' in vhost 'test01': received 'true' but current is 'false',
// 记得这里改了之后也要修改 消费者的两个类:Consumer、Consumer2
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
for (int i = 0; i < 10; i++) {
int poll = (int) Math.floor(Math.random() * 10);
String repeat = ".".repeat(poll);
String message = String.format("Hello World %d%s", i, repeat);
// 如果 durable = true 这里的 basicPublish 的 props 参数也要配置一下 MessageProperties.PERSISTENT_TEXT_PLAIN
// 得告诉一下 RabbitMQ 这是一个持久化消息
// MessageProperties.PERSISTENT_TEXT_PLAIN 可以自己配置,这配置里面最主要的是 deliveryMode = 2
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 一些一样的代码统一挪到了 BaseConfig 类中,为了避免代码重复,简洁好看
*/
public class Consumer extends BaseConfig {
private final static String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 这个配置,保证一个消费者一次只能消费一个消息
channel.basicQos(1);
Boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, getDeliverCallback(channel), consumerTag -> {});
}
}
Consumer2.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 这个地方的代码和 Consumer2 是一样的,只需要研究 Consumer 即可
*/
public class Consumer2 extends BaseConfig {
private final static String QUEUE_NAME = "work-queue";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
// 这个配置,保证一个消费者一次只能消费一个消息
channel.basicQos(1);
Boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, getDeliverCallback(channel), consumerTag -> {});
}
}
BaseConfig.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class BaseConfig {
public static ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
return factory;
}
/**
* 模拟延时处理任务
* @param task
* @throws InterruptedException
*/
public static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
public static DeliverCallback getDeliverCallback(Channel channel) {
return (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
System.out.println(" [x] Done");
// 这里需要确认消息,否则消息会重复消费
// 这里不需要批量确认,因为一个消费者一次只能消费一个消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
}
}
这次相对于上一课的代码改了以下代码
channel.basicQos(1)
保证一个消费者一次只能消费一个消息,而且还有通过 doWork 中的 Thread.sleep(1000)。 这样才能实现一个 Producer 狂发,另外两个 Consumer 才可以雨露均沾。channel.queueDeclare()
durable 配置为 "true" , 代表持久化队列确保队列在 RabbitMQ 节点重启后仍然存在channel.basicPublish()
的 props 要配置为 MessageProperties.PERSISTENT_TEXT_PLAIN 这样才能真正实现持久化队列channel.basicConsume()
中的 ack 改为 false 这样就能在 Consumer 收到消息后可以手动告诉 RabbitMQ 我收到了消息并且处理的消息channel.basicAck()
既然前面将 ack 改为了 false 在 Consumer 收到消息后就必须通过这个代码确认消息.这里是消费者确认
测试一下代码,先执行一下 Producer 发送一堆消息到 rabbitmq 中,这时候可以看到有10条消息等待被消费
在运行 Consumer 可以看到消息在一点点的被消费
这个虽然有两个 Consumer 不过还是通过一个 Queue 拿数据,只不过是两个 Consumer 在争抢获取数据谁拿到是谁的。
可以通过在发送消息后看 RM ,走的还是默认的 direct 交换机