跟着官网学RabbitMQ-Routing(路由)
课程地址:https://www.rabbitmq.com/tutorials/tutorial-four-java
在父项目 rabbitmq-foundation-project
中创建一个 module rabbitmq-routing-project
,
这次文件要比前几个都多,要测试不同路由键去往不同的 Consumer服务
BaseConfig.java
还是老样子,提取一个基础配置类
package com.runbrick;
import com.rabbitmq.client.ConnectionFactory;
public class BaseConfig {
public static ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
return factory;
}
}
Producer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class Producer extends BaseConfig {
private final static String EXCHANGE_NAME = "routing-log";
public static void main(String[] args) {
ConnectionFactory factory = getConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String[] routingKeys = new String[]{"error", "info", "warning"};
for (int i = 0; i < 10; i++) {
String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
String message = String.format("[%s],%s", routingKey, new Date());
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
InfoConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class InfoConsumer extends BaseConfig {
private final static String EXCHANGE_NAME = "routing-log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] info log '" + message + "'");
}, consumerTag -> {});
}
}
ErrorConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ErrorConsumer extends BaseConfig {
private final static String EXCHANGE_NAME = "routing-log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] error log '" + message + "'");
}, consumerTag -> {});
}
}
WarningConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WarningConsumer extends BaseConfig {
private final static String EXCHANGE_NAME = "routing-log";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] warning log '" + message + "'");
}, consumerTag -> {});
}
}
上节课 channel.exchangeDeclare()
使用的是 FANOUT
交换机,这节课还是使用了 DIRECT
String[] routingKeys = new String[]{"error", "info", "warning"};
for (int i = 0; i < 10; i++) {
String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
String message = String.format("[%s],%s", routingKey, new Date());
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
这段代码是通过循环 10 次来随机不同的 routingKeys 名字来发送不同的路由名字,这样不同 Consumer 的就能模拟接受到数据了。
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
之前的代码 basicPublish 第二个参数前面的教程有填写 queuename 的,现在又填写 routingKey 会感到奇怪。其实 channel.basicPublish
的第二个参数永远都是 routingKey
(路由键)。
之所以它看起来既能填写 routingKey
又能填写 queueName
,是因为 RabbitMQ 中存在一个特殊的、匿名的默认交换机(Default Exchange) 。
当你使用这个默认交换机时,它的路由规则非常特殊:它会把消息直接路由到与 routingKey
同名的队列(Queue)中。
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
每个 Consumer 的 channel.queueBind
都差不多只是最后一个参数不同,它的最后一个参数也是 routingKey
这样就能通过绑定 EXCHANGE 来实现获取 Producer 发送的不同 routingKey
消息了。
还是先启动每个 Consumer 不然 Producer 发送了接收不到。
可以看到在 Producer 发送了 10条不同 routingKey 的消息后,每个Consumer 都接收到了需要的消息