跟着官网学RabbitMQ-HelloWorld
课程地址:https://www.rabbitmq.com/tutorials/tutorial-one-java
前提条件是已经按照我的教程 “准备工作” 启动了 RabbitMQ 或者自己整了一个服务
在父项目 rabbitmq-foundation-project
中创建一个 module rabbitmq-hello-world-project
,因为之前已经在父级 maven 中添加了 RabbitMQ 相关的坐标了子项目中就不用添加了
在项目中添加 Producer.java 和 Consumer.java 两个类
Producer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer.java
package com.runbrick;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
}, consumerTag -> {});
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
}
}
上面这两个类是官网最简单的例子,主要是实现发消息和收消息的功能。先别管代码干啥的,直接运行这两个例子的 main ,先看下结果。
可以看到 Producer 发送了一个消息,Consumer 收到了一个消息。下面来简单的讲一下这里面关键代码的用途
private final static String QUEUE_NAME = "hello";
这个是定义了一个统一的队列名称,会通过这个队列传输内容,一条接一条的消息通过通道送到了 Consumer 手中。
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
这一部分的内容很简单就是用来连接RabbitMQ服务、创建了一个 channel (Channel channel = connection.createChannel())的。可以看到在连接时候会创建一个 channel 的
// 源代码 queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
这个主要的作用就是告诉 RabbitMQ 请帮我生成一个队列,如果这个队列不存在则会创建一个队列,如果存在的话会检查提供的参数(比如是否持久化、是否排他等)是否与已存在的队列匹配。如果匹配,则什么也不做,操作成功返回;如果不匹配,则会报错并关闭 Channel。queueDeclare
是一个同步方法,它会等待服务器的响应。
queue
(队列名称)
- 类型: 字符串
- 作用: 指定队列的名称。生产者发送消息时需要知道这个名称,消费者监听时也需要知道这个名称。
- 特殊情况: 如果你传递一个空字符串
""
,RabbitMQ 会为你创建一个非持久的、排他的、自动删除的、由系统生成的唯一名称的队列。这个功能在实现某些临时模式(如 RPC 请求-响应模式中的回调队列)时非常有用。
durable
(是否持久化)
- 类型: 布尔值 (
true
/false
) 作用: 决定了队列本身的元数据(名称、属性等)是否要被持久化存储。
true
(持久化): 队列的元数据会被保存到磁盘上。即使 RabbitMQ 服务器重启,这个队列也依然存在。false
(非持久化/内存): 队列仅存在于内存中。服务器重启后,这个队列就会丢失。
- 重要提醒: 队列持久化不等于消息持久化。一个持久化的队列,如果收到了非持久化的消息,在服务器重启后,队列虽然还在,但里面的消息会丢失。要让消息也持久化,需要在发送消息时(
basic.publish
)设置消息的delivery_mode
属性为 2。
exclusive
(是否排他)
- 类型: 布尔值 (
true
/false
) 作用: 设置队列是否为当前连接独占。
true
(排他): 这个队列只对首次声明它的连接(Connection)可见,并且在该连接关闭后,队列会自动删除。其他连接无法访问这个队列。false
(非排他): 所有连接都可以访问这个队列。
- 应用场景: 适用于一个客户端需要一个临时、私有的队列的场景。
autoDelete
(是否自动删除)
- 类型: 布尔值 (
true
/false
) 作用: 设置队列在“不再被使用”时是否自动删除。
true
(自动删除): 当最后一个消费者(Consumer)取消订阅后,该队列会自动被删除。注意,是最后一个消费者断开,而不是连接断开。false
(不自动删除): 即使没有消费者,队列也会一直存在。
与
exclusive
的区别:exclusive
的生命周期与连接绑定。autoDelete
的生命周期与消费者绑定。
arguments
(其他参数)
- 类型: 字典/Map
作用: 用于设置一些高级、可选的队列属性。常见的参数有:
x-message-ttl
: 消息存活时间 (Time-To-Live)。设置队列中消息的过期时间(毫秒)。到期后消息会被丢弃或成为死信。x-expires
: 队列过期时间。设置队列在多长时间(毫秒)没有被使用(没有消费者、没有basic.get
等)后自动删除。x-max-length
: 队列最大长度。限制队列中能存放的消息数量。超出后,队首的旧消息会被丢弃或成为死信。x-max-length-bytes
: 队列最大容量。限制队列中所有消息体的总大小(字节)。x-dead-letter-exchange
: 死信交换机。当消息变成“死信”(如过期、被拒绝、队列超长)时,会被投递到这个指定的交换机。x-dead-letter-routing-key
: 死信路由键。配合死信交换机使用,指定死信消息的路由键。x-max-priority
: 最大优先级。将队列设置为优先级队列,并指定最高优先级。
String message = "Hello World!";
//源代码: basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
刚才我们已经搞定了消息中的连接服务、创建一个通道、创建一个队列。这时候就可以将消息转为 byte 推给 RabbitMQ了。不过这时候还有一个漏掉一个知识点那就是 exchange ,在 channel.basicPublish 的第一个参数我们并没有添加任何的参数,这个参数就是 exchange 不过官网的教程并没有在第一课就讲这个 exchange 所以也容我卖一个关子。不过可以看在启动 Producer 的时候 RabbitMQ Management 中的 Exchange 菜单里面有一行的数据变化了。
即使不配置任何的 exchange 在推送的时候也是使用了 exchange 的。这里可能会有个误解 生产者直接将消息发送到队列(Queue)。实际上,生产者是将消息发送到交换机(Exchange) 。如果在不配 exchange 参数的时候消息去到了 默认交换机(Default Exchange)
exchange
(交换机名称)
- 类型: 字符串
- 作用: 指定消息要发送到哪个交换机。
特殊情况:默认交换机(Default Exchange) :
- 如果你为这个参数提供一个空字符串
""
,消息会被发送到一个特殊的、无名的默认交换机。 - 这个默认交换机是一个预先声明好的
direct
类型的交换机。它的特殊规则是:routingKey
必须与一个已存在的队列名称完全匹配,消息才会被直接投递到那个队列。 - 这是最简单的一种消息模型,常用于“Hello World”或简单的任务队列场景。
- 如果你为这个参数提供一个空字符串
routingKey
(路由键)
- 类型: 字符串
- 作用: 这就像是信封上的“地址”。交换机用它来决定消息的去向。
路由键的意义完全取决于交换机的类型:
- Direct Exchange: 路由键需要与队列和交换机之间的绑定键(Binding Key)完全匹配。
- Fanout Exchange: 此参数被忽略。交换机会将消息广播到所有与它绑定的队列。
- Topic Exchange: 路由键是一个带有点号
.
分隔的字符串(如stock.usd.nyse
),用于和支持通配符(*
,#
)的绑定键进行模式匹配。 - Headers Exchange: 此参数被忽略。交换机根据消息
properties
中的headers
属性进行匹配。
properties
(消息属性)
- 类型: 一个包含消息元数据(Metadata)的对象/字典。
- 作用: 这就是消息的“信封”,上面可以附加各种额外信息和指令,非常重要。
常用属性:
delivery_mode
: 投递模式。这是实现消息持久化的关键。2
: 持久化(Persistent)。消息会被保存到磁盘。即使 RabbitMQ 重启,消息也不会丢失(前提是它被路由到了一个持久化的队列)。1
或不设置: 非持久化(Transient)。消息仅存在于内存中,服务器重启后会丢失。
content_type
: 消息体的MIME类型,如application/json
。方便消费者知道如何解析消息体。reply_to
: 通常用于 RPC(远程过程调用)模式,指定一个队列名,让接收方知道应该将响应消息发送到哪里。correlation_id
: 也用于 RPC 模式,用来将响应与原始请求进行关联。expiration
: 设置单条消息的过期时间(毫秒)。headers
: 用于headers
交换机匹配。priority
: 设置消息的优先级(0-N)。需要队列支持优先级(通过x-max-priority
参数声明)。
body
(消息体)
- 类型: 字节数组 (Byte Array)
- 作用: 这是消息的实际内容,也就是信里的“正文”。RabbitMQ 对消息体的内容一无所知,它只把它当作一串二进制数据来传输。你需要自己负责序列化(如转为 JSON、Protobuf)和反序列化。
//源代码: basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
}, consumerTag -> {});
上一步是在 Producer 中推送了消息,这一步就要 Consumer 来接受消息了。 这段代码的逻辑就是我订阅了 QUEUE_NAME 的队列,如果有消息请告诉我。
queue
(队列名称)
- 类型: 字符串
- 作用: 指定要从哪个队列进行消费。这个队列必须已经存在。
on_message_callback
(消息处理回调函数)
- 类型: 函数/方法
- 作用: 这是
basicConsume
的核心。你需要提供一个函数,当 RabbitMQ 推送消息过来时,客户端库会自动调用这个函数。你所有的业务逻辑(比如处理订单、发送邮件、写入数据库)都应该写在这个函数里。 - 这个回调函数通常会接收几个重要的参数,我们稍后详细讲解。
auto_ack
(是否自动确认)
- 类型: 布尔值 (
true
/false
) 作用: 这是消费者可靠性的命脉,极其重要!它决定了消息何时被视为“成功消费”。
true
(自动确认): 不推荐在生产环境中使用。当 RabbitMQ 将消息发送给消费者后,它会立即在队列中将该消息标记为删除,而不关心消费者是否真的成功处理了它。如果你的消费者在处理消息的过程中崩溃了,这条消息就永久丢失了。false
(手动确认): 强烈推荐。RabbitMQ 将消息发送给消费者后,会将消息置于“未确认(Unacked)”状态。它会等待消费者在完成所有业务逻辑后,显式地发送一个“确认回执(Acknowledgement)”。只有收到这个回执,RabbitMQ 才会将消息从队列中删除。如果消费者崩溃或连接断开而没有发送回执,RabbitMQ 会将该消息重新放回队列,交给其他消费者处理,从而保证了消息不会丢失。
exclusive
(是否排他消费)
- 类型: 布尔值 (
true
/false
) - 作用: 如果设置为
true
,则表示只有这一个消费者可以从该队列消费。如果此时有其他消费者尝试连接到这个队列,会失败。
consumer_tag
(消费者标签)
- 类型: 字符串
- 作用: 为这个消费者指定一个唯一的标识符。如果你不提供,客户端库会自动生成一个。这个标签主要用于后续需要取消订阅(
basic.cancel
)的场景。
这时候这个基本课程就学完了。