课程地址:https://www.rabbitmq.com/tutorials/tutorial-five-java

在父项目 rabbitmq-foundation-project 中创建一个 module rabbitmq-topic-project

这次文件更多了。

BaseConfig.java

package com.runbrick;

import com.rabbitmq.client.ConnectionFactory;

public class BaseConfig {
    public final static String EXCHANGE_NAME = "topic-log";

    public static ConnectionFactory getConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");
        return factory;
    }
}

Producer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class Producer extends BaseConfig {

    public static void main(String[] args) {

        ConnectionFactory factory = getConnectionFactory();

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // exchangeDeclare 的 type 类型有 direct 、 topic 、 headers 和 fanout 这几个
            // 这个教程里面需要使用 topic(主题)
            // 需要对 routing_key 进行匹配分别不同的队列进行匹配
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            String[] routingKeys = new String[]{
                    "error.mysql", "error.system",
                    "info.mysql", "info.system",
                    "warning.mysql", "warning.system"
            };
            for (int i = 0; i < 20; i++) {
                String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
                String message = String.format("[%s],%s", routingKey, new Date());
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

InfoSystemConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class InfoSystemConsumer extends BaseConfig {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info.system");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] info log '" + message + "'");
        }, consumerTag -> {});
    }
}

InfoMysqlConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class InfoMysqlConsumer extends BaseConfig {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info.mysql");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] info log '" + message + "'");
        }, consumerTag -> {});
    }
}

WarningMysqlConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WarningMysqlConsumer extends BaseConfig {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning.mysql");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] warning log '" + message + "'");
        }, consumerTag -> {});
    }
}

WarningSystemConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WarningSystemConsumer extends BaseConfig {

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning.system");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] warning log '" + message + "'");
        }, consumerTag -> {});
    }
}

ErrorSystemConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ErrorSystemConsumer extends BaseConfig {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.system");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] error system log '" + message + "'");
        }, consumerTag -> {});
    }
}

ErrorMysqlConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ErrorMysqlConsumer extends BaseConfig {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.mysql");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] error mysql log '" + message + "'");
        }, consumerTag -> {});
    }
}

ErrorConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ErrorConsumer extends BaseConfig {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error.*");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] error all log '" + message + "'");
        }, consumerTag -> {});
    }
}

这个和上一节课没什么区别主要更换的 TOPIC 交换机。主要的是Fanout 不支持 routingKey 它会忽略这个参数,Topic 可以通过这个参数发送消息给不同的 Consumer。

TOPIC 的 routingKey 中可以使用两种特殊的通配符:

  • * (星号) :匹配不多不少一个单词

    • bindingKeyA.*.C 的队列,能匹配到 routingKeyA.B.C 的消息,但不能匹配到 A.B.D.CA.C
  • # (井号) :匹配零个或多个单词

    • bindingKeyA.# 的队列,能匹配到 routingKeyAA.BA.B.C 的消息。
    • bindingKey#.C 的队列,能匹配到 routingKeyCA.CA.B.C 的消息。

这样就实现了 Consumer 通过配置不同的 routingKey (error.*error.systemerror.mysql ) 来监听不同的颗粒度、业务消息。

把所有业务启动测试一下,还是 Consumer 先启动。在 producer 发送了消息后 Consumer 根据不同的 routingKey 拿到了不同的消息

2025-07-28T03:05:24.png

标签: java, rabbitmq

添加新评论