课程地址:https://www.rabbitmq.com/tutorials/tutorial-two-java

官网的代码是通过 cmd 传参来发送消息的我直接写了一个循环,如果你喜欢发送消息是使用 java -jar xxx.jar args 的话需要你自行修改。

老规矩 在父项目 rabbitmq-foundation-project 中创建一个 module rabbitmq-work-queues-project

直接上代码,这次我们需要一个 Producer 和两个 Consumer(其实代码一样为了起两个 Consumer 服务省事)

Producer.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer extends BaseConfig {
    private final static String QUEUE_NAME = "work-queue";

    public static void main(String[] args) {

        ConnectionFactory factory = getConnectionFactory();

        try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {

            // 这个里给队列声明一个持久化队列确保队列在 RabbitMQ 节点重启后仍然存在
            // 当 RabbitMQ 退出或崩溃时,除非明确设置,否则它将遗忘队列和消息。要确保消息不丢失,需要满足两个条件:必须将队列和消息都标记为持久化。
            // 因为你要是跟着教程走,会修改队列的配置。
            // 但你之前已经有相同的队列了这时候不会修改成功的,那么你需要先删除这个队列或者修改 QUEUE_NAME 为新的名称
            // 不然就会报错  inequivalent arg 'durable' for queue 'work-queue' in vhost 'test01': received 'true' but current is 'false',
            // 记得这里改了之后也要修改 消费者的两个类:Consumer、Consumer2
            boolean durable = true;
            channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

            for (int i = 0; i < 10; i++) {
                int poll = (int) Math.floor(Math.random() * 10);
                String repeat = ".".repeat(poll);

                String message = String.format("Hello World %d%s", i, repeat);
                // 如果 durable = true 这里的 basicPublish 的 props 参数也要配置一下 MessageProperties.PERSISTENT_TEXT_PLAIN
                // 得告诉一下 RabbitMQ 这是一个持久化消息
                // MessageProperties.PERSISTENT_TEXT_PLAIN 可以自己配置,这配置里面最主要的是 deliveryMode = 2
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

package com.runbrick;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 一些一样的代码统一挪到了 BaseConfig 类中,为了避免代码重复,简洁好看
 */
public class Consumer extends BaseConfig {
    private final static String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        // 这个配置,保证一个消费者一次只能消费一个消息
        channel.basicQos(1);

        Boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, getDeliverCallback(channel), consumerTag -> {});
    }
}

Consumer2.java

package com.runbrick;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 这个地方的代码和 Consumer2 是一样的,只需要研究 Consumer 即可
 */
public class Consumer2 extends BaseConfig {
    private final static String QUEUE_NAME = "work-queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        // 这个配置,保证一个消费者一次只能消费一个消息
        channel.basicQos(1);

        Boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, getDeliverCallback(channel), consumerTag -> {});
    }
}

BaseConfig.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.nio.charset.StandardCharsets;

public class BaseConfig {

    public static ConnectionFactory getConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");
        return factory;
    }


    /**
     * 模拟延时处理任务
     * @param task
     * @throws InterruptedException
     */
    public static void doWork(String task) throws InterruptedException {
        for (char ch : task.toCharArray()) {
            if (ch == '.') Thread.sleep(1000);
        }
    }

    public static DeliverCallback getDeliverCallback(Channel channel) {
        return (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                System.out.println(" [x] Done");
                // 这里需要确认消息,否则消息会重复消费
                // 这里不需要批量确认,因为一个消费者一次只能消费一个消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
    }
}

这次相对于上一课的代码改了以下代码

  1. channel.basicQos(1) 保证一个消费者一次只能消费一个消息,而且还有通过 doWork 中的 Thread.sleep(1000)。 这样才能实现一个 Producer 狂发,另外两个 Consumer 才可以雨露均沾。
  2. channel.queueDeclare() durable 配置为 "true" , 代表持久化队列确保队列在 RabbitMQ 节点重启后仍然存在
  3. channel.basicPublish()props 要配置为 MessageProperties.PERSISTENT_TEXT_PLAIN 这样才能真正实现持久化队列
  4. channel.basicConsume() 中的 ack 改为 false 这样就能在 Consumer 收到消息后可以手动告诉 RabbitMQ 我收到了消息并且处理的消息
  5. channel.basicAck() 既然前面将 ack 改为了 false 在 Consumer 收到消息后就必须通过这个代码确认消息.这里是消费者确认

测试一下代码,先执行一下 Producer 发送一堆消息到 rabbitmq 中,这时候可以看到有10条消息等待被消费

2025-07-28T01:31:43.png

在运行 Consumer 可以看到消息在一点点的被消费

2025-07-28T01:31:49.png

这个虽然有两个 Consumer 不过还是通过一个 Queue 拿数据,只不过是两个 Consumer 在争抢获取数据谁拿到是谁的。

可以通过在发送消息后看 RM ,走的还是默认的 direct 交换机

2025-07-28T01:31:54.png

标签: java, rabbitmq

添加新评论