跟着官网学RabbitMQ-Topics(主题)
课程地址:https://www.rabbitmq.com/tutorials/tutorial-five-java
在父项目 rabbitmq-foundation-project
中创建一个 module rabbitmq-topic-project
,
这次文件更多了。
BaseConfig.java
package com.runbrick;
import com.rabbitmq.client.ConnectionFactory;
public class BaseConfig {
public final static String EXCHANGE_NAME = "topic-log";
public static ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(45672);
factory.setUsername("user");
factory.setPassword("123qwe");
factory.setVirtualHost("test01");
return factory;
}
}
Producer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class Producer extends BaseConfig {
public static void main(String[] args) {
ConnectionFactory factory = getConnectionFactory();
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// exchangeDeclare 的 type 类型有 direct 、 topic 、 headers 和 fanout 这几个
// 这个教程里面需要使用 topic(主题)
// 需要对 routing_key 进行匹配分别不同的队列进行匹配
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String[] routingKeys = new String[]{
"error.mysql", "error.system",
"info.mysql", "info.system",
"warning.mysql", "warning.system"
};
for (int i = 0; i < 20; i++) {
String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
String message = String.format("[%s],%s", routingKey, new Date());
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException | TimeoutException e) {
throw new RuntimeException(e);
} catch (Exception e) {
e.printStackTrace();
}
}
}
InfoSystemConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class InfoSystemConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info.system");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] info log '" + message + "'");
}, consumerTag -> {});
}
}
InfoMysqlConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class InfoMysqlConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info.mysql");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] info log '" + message + "'");
}, consumerTag -> {});
}
}
WarningMysqlConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WarningMysqlConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning.mysql");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] warning log '" + message + "'");
}, consumerTag -> {});
}
}
WarningSystemConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WarningSystemConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning.system");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] warning log '" + message + "'");
}, consumerTag -> {});
}
}
ErrorSystemConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ErrorSystemConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.system");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] error system log '" + message + "'");
}, consumerTag -> {});
}
}
ErrorMysqlConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ErrorMysqlConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.mysql");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] error mysql log '" + message + "'");
}, consumerTag -> {});
}
}
ErrorConsumer.java
package com.runbrick;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class ErrorConsumer extends BaseConfig {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = getConnectionFactory();
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 创建一个随机的队列名称
// 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
String QUEUE_NAME = channel.queueDeclare().getQueue();
// 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.*");
channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] error all log '" + message + "'");
}, consumerTag -> {});
}
}
这个和上一节课没什么区别主要更换的 TOPIC 交换机。主要的是Fanout 不支持 routingKey 它会忽略这个参数,Topic 可以通过这个参数发送消息给不同的 Consumer。
TOPIC 的 routingKey
中可以使用两种特殊的通配符:
*
(星号) :匹配不多不少一个单词。bindingKey
为A.*.C
的队列,能匹配到routingKey
为A.B.C
的消息,但不能匹配到A.B.D.C
或A.C
。
#
(井号) :匹配零个或多个单词。bindingKey
为A.#
的队列,能匹配到routingKey
为A
、A.B
和A.B.C
的消息。bindingKey
为#.C
的队列,能匹配到routingKey
为C
、A.C
和A.B.C
的消息。
这样就实现了 Consumer 通过配置不同的 routingKey (error.*
、error.system
、error.mysql
) 来监听不同的颗粒度、业务消息。
把所有业务启动测试一下,还是 Consumer 先启动。在 producer 发送了消息后 Consumer 根据不同的 routingKey 拿到了不同的消息