课程地址:https://www.rabbitmq.com/tutorials/tutorial-one-java

前提条件是已经按照我的教程 “准备工作” 启动了 RabbitMQ 或者自己整了一个服务

在父项目 rabbitmq-foundation-project 中创建一个 module rabbitmq-hello-world-project,因为之前已经在父级 maven 中添加了 RabbitMQ 相关的坐标了子项目中就不用添加了

在项目中添加 Producer.java 和 Consumer.java 两个类

Producer.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";


    public static void main(String[] args) {

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer.java

package com.runbrick;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        }, consumerTag -> {});

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    }
}

上面这两个类是官网最简单的例子,主要是实现发消息和收消息的功能。先别管代码干啥的,直接运行这两个例子的 main ,先看下结果。

2025-07-23T08:59:00.png

可以看到 Producer 发送了一个消息,Consumer 收到了一个消息。下面来简单的讲一下这里面关键代码的用途

 private final static String QUEUE_NAME = "hello";

这个是定义了一个统一的队列名称,会通过这个队列传输内容,一条接一条的消息通过通道送到了 Consumer 手中。

2025-07-23T08:59:05.png

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01"); 

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

这一部分的内容很简单就是用来连接RabbitMQ服务、创建了一个 channel (Channel channel = connection.createChannel())的。可以看到在连接时候会创建一个 channel 的

2025-07-23T08:59:12.png

// 源代码 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

这个主要的作用就是告诉 RabbitMQ 请帮我生成一个队列,如果这个队列不存在则会创建一个队列,如果存在的话会检查提供的参数(比如是否持久化、是否排他等)是否与已存在的队列匹配。如果匹配,则什么也不做,操作成功返回;如果不匹配,则会报错并关闭 Channel。queueDeclare 是一个同步方法,它会等待服务器的响应。

  1. queue (队列名称)
  • 类型: 字符串
  • 作用: 指定队列的名称。生产者发送消息时需要知道这个名称,消费者监听时也需要知道这个名称。
  • 特殊情况: 如果你传递一个空字符串 "",RabbitMQ 会为你创建一个非持久的、排他的、自动删除的、由系统生成的唯一名称的队列。这个功能在实现某些临时模式(如 RPC 请求-响应模式中的回调队列)时非常有用。
  1. durable (是否持久化)
  • 类型: 布尔值 (true / false)
  • 作用: 决定了队列本身的元数据(名称、属性等)是否要被持久化存储。

    • true (持久化): 队列的元数据会被保存到磁盘上。即使 RabbitMQ 服务器重启,这个队列也依然存在。
    • false (非持久化/内存): 队列仅存在于内存中。服务器重启后,这个队列就会丢失。
  • 重要提醒: 队列持久化不等于消息持久化。一个持久化的队列,如果收到了非持久化的消息,在服务器重启后,队列虽然还在,但里面的消息会丢失。要让消息也持久化,需要在发送消息时(basic.publish)设置消息的 delivery_mode 属性为 2。
  1. exclusive (是否排他)
  • 类型: 布尔值 (true / false)
  • 作用: 设置队列是否为当前连接独占。

    • true (排他): 这个队列只对首次声明它的连接(Connection)可见,并且在该连接关闭后,队列会自动删除。其他连接无法访问这个队列。
    • false (非排他): 所有连接都可以访问这个队列。
  • 应用场景: 适用于一个客户端需要一个临时、私有的队列的场景。
  1. autoDelete (是否自动删除)
  • 类型: 布尔值 (true / false)
  • 作用: 设置队列在“不再被使用”时是否自动删除。

    • true (自动删除): 当最后一个消费者(Consumer)取消订阅后,该队列会自动被删除。注意,是最后一个消费者断开,而不是连接断开。
    • false (不自动删除): 即使没有消费者,队列也会一直存在。
  • exclusive 的区别:

    • exclusive 的生命周期与连接绑定。
    • autoDelete 的生命周期与消费者绑定。
  1. arguments (其他参数)
  • 类型: 字典/Map
  • 作用: 用于设置一些高级、可选的队列属性。常见的参数有:

    • x-message-ttl: 消息存活时间 (Time-To-Live)。设置队列中消息的过期时间(毫秒)。到期后消息会被丢弃或成为死信。
    • x-expires: 队列过期时间。设置队列在多长时间(毫秒)没有被使用(没有消费者、没有 basic.get 等)后自动删除。
    • x-max-length: 队列最大长度。限制队列中能存放的消息数量。超出后,队首的旧消息会被丢弃或成为死信。
    • x-max-length-bytes: 队列最大容量。限制队列中所有消息体的总大小(字节)。
    • x-dead-letter-exchange: 死信交换机。当消息变成“死信”(如过期、被拒绝、队列超长)时,会被投递到这个指定的交换机。
    • x-dead-letter-routing-key: 死信路由键。配合死信交换机使用,指定死信消息的路由键。
    • x-max-priority: 最大优先级。将队列设置为优先级队列,并指定最高优先级。
String message = "Hello World!";
//源代码: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

刚才我们已经搞定了消息中的连接服务、创建一个通道、创建一个队列。这时候就可以将消息转为 byte 推给 RabbitMQ了。不过这时候还有一个漏掉一个知识点那就是 exchange ,在 channel.basicPublish 的第一个参数我们并没有添加任何的参数,这个参数就是 exchange 不过官网的教程并没有在第一课就讲这个 exchange 所以也容我卖一个关子。不过可以看在启动 Producer 的时候 RabbitMQ Management 中的 Exchange 菜单里面有一行的数据变化了。

2025-07-23T08:59:44.png

即使不配置任何的 exchange 在推送的时候也是使用了 exchange 的。这里可能会有个误解 生产者直接将消息发送到队列(Queue)。实际上,生产者是将消息发送到交换机(Exchange) 。如果在不配 exchange 参数的时候消息去到了 默认交换机(Default Exchange)

  1. exchange (交换机名称)
  • 类型: 字符串
  • 作用: 指定消息要发送到哪个交换机。
  • 特殊情况:默认交换机(Default Exchange) :

    • 如果你为这个参数提供一个空字符串 "" ,消息会被发送到一个特殊的、无名的默认交换机
    • 这个默认交换机是一个预先声明好的 direct 类型的交换机。它的特殊规则是:routingKey 必须与一个已存在的队列名称完全匹配,消息才会被直接投递到那个队列。
    • 这是最简单的一种消息模型,常用于“Hello World”或简单的任务队列场景。
  1. routingKey (路由键)
  • 类型: 字符串
  • 作用: 这就像是信封上的“地址”。交换机用它来决定消息的去向。
  • 路由键的意义完全取决于交换机的类型:

    • Direct Exchange: 路由键需要与队列和交换机之间的绑定键(Binding Key)完全匹配
    • Fanout Exchange: 此参数被忽略。交换机会将消息广播到所有与它绑定的队列。
    • Topic Exchange: 路由键是一个带有点号 . 分隔的字符串(如 stock.usd.nyse),用于和支持通配符(*, #)的绑定键进行模式匹配。
    • Headers Exchange: 此参数被忽略。交换机根据消息 properties 中的 headers 属性进行匹配。
  1. properties (消息属性)
  • 类型: 一个包含消息元数据(Metadata)的对象/字典。
  • 作用: 这就是消息的“信封”,上面可以附加各种额外信息和指令,非常重要。
  • 常用属性:

    • delivery_mode: 投递模式。这是实现消息持久化的关键。

      • 2: 持久化(Persistent)。消息会被保存到磁盘。即使 RabbitMQ 重启,消息也不会丢失(前提是它被路由到了一个持久化的队列)。
      • 1 或不设置: 非持久化(Transient)。消息仅存在于内存中,服务器重启后会丢失。
    • content_type: 消息体的MIME类型,如 application/json。方便消费者知道如何解析消息体。
    • reply_to: 通常用于 RPC(远程过程调用)模式,指定一个队列名,让接收方知道应该将响应消息发送到哪里。
    • correlation_id: 也用于 RPC 模式,用来将响应与原始请求进行关联。
    • expiration: 设置单条消息的过期时间(毫秒)。
    • headers: 用于 headers 交换机匹配。
    • priority: 设置消息的优先级(0-N)。需要队列支持优先级(通过 x-max-priority 参数声明)。
  1. body (消息体)
  • 类型: 字节数组 (Byte Array)
  • 作用: 这是消息的实际内容,也就是信里的“正文”。RabbitMQ 对消息体的内容一无所知,它只把它当作一串二进制数据来传输。你需要自己负责序列化(如转为 JSON、Protobuf)和反序列化。
//源代码: basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
 channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        }, consumerTag -> {});

上一步是在 Producer 中推送了消息,这一步就要 Consumer 来接受消息了。 这段代码的逻辑就是我订阅了 QUEUE_NAME 的队列,如果有消息请告诉我。

  1. queue (队列名称)
  • 类型: 字符串
  • 作用: 指定要从哪个队列进行消费。这个队列必须已经存在。
  1. on_message_callback (消息处理回调函数)
  • 类型: 函数/方法
  • 作用: 这是 basicConsume核心。你需要提供一个函数,当 RabbitMQ 推送消息过来时,客户端库会自动调用这个函数。你所有的业务逻辑(比如处理订单、发送邮件、写入数据库)都应该写在这个函数里。
  • 这个回调函数通常会接收几个重要的参数,我们稍后详细讲解。
  1. auto_ack (是否自动确认)
  • 类型: 布尔值 (true / false)
  • 作用: 这是消费者可靠性的命脉,极其重要!它决定了消息何时被视为“成功消费”。

    • true (自动确认): 不推荐在生产环境中使用。当 RabbitMQ 将消息发送给消费者后,它会立即在队列中将该消息标记为删除,而不关心消费者是否真的成功处理了它。如果你的消费者在处理消息的过程中崩溃了,这条消息就永久丢失了
    • false (手动确认): 强烈推荐。RabbitMQ 将消息发送给消费者后,会将消息置于“未确认(Unacked)”状态。它会等待消费者在完成所有业务逻辑后,显式地发送一个“确认回执(Acknowledgement)”。只有收到这个回执,RabbitMQ 才会将消息从队列中删除。如果消费者崩溃或连接断开而没有发送回执,RabbitMQ 会将该消息重新放回队列,交给其他消费者处理,从而保证了消息不会丢失。
  1. exclusive (是否排他消费)
  • 类型: 布尔值 (true / false)
  • 作用: 如果设置为 true,则表示只有这一个消费者可以从该队列消费。如果此时有其他消费者尝试连接到这个队列,会失败。
  1. consumer_tag (消费者标签)
  • 类型: 字符串
  • 作用: 为这个消费者指定一个唯一的标识符。如果你不提供,客户端库会自动生成一个。这个标签主要用于后续需要取消订阅(basic.cancel)的场景。

这时候这个基本课程就学完了。

标签: java, rabbitmq

添加新评论