课程地址:https://www.rabbitmq.com/tutorials/tutorial-four-java

在父项目 rabbitmq-foundation-project 中创建一个 module rabbitmq-routing-project

这次文件要比前几个都多,要测试不同路由键去往不同的 Consumer服务

BaseConfig.java
还是老样子,提取一个基础配置类

package com.runbrick;

import com.rabbitmq.client.ConnectionFactory;

public class BaseConfig {

    public static ConnectionFactory getConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(45672);
        factory.setUsername("user");
        factory.setPassword("123qwe");
        factory.setVirtualHost("test01");
        return factory;
    }
}

Producer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class Producer extends BaseConfig {
    private final static String EXCHANGE_NAME = "routing-log";


    public static void main(String[] args) {

        ConnectionFactory factory = getConnectionFactory();

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);


            String[] routingKeys = new String[]{"error", "info", "warning"};
            for (int i = 0; i < 10; i++) {
                String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
                String message = String.format("[%s],%s", routingKey, new Date());
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        } catch (IOException | TimeoutException e) {
            throw new RuntimeException(e);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

InfoConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class InfoConsumer extends BaseConfig {
    private final static String EXCHANGE_NAME = "routing-log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] info log '" + message + "'");
        }, consumerTag -> {});
    }
}

ErrorConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class ErrorConsumer extends BaseConfig {
    private final static String EXCHANGE_NAME = "routing-log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] error log '" + message + "'");
        }, consumerTag -> {});
    }
}

WarningConsumer.java

package com.runbrick;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class WarningConsumer extends BaseConfig {
    private final static String EXCHANGE_NAME = "routing-log";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = getConnectionFactory();
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        // 创建一个随机的队列名称
        // 生成队列的名称类似 amq.gen-MqMWK6-HQ7yZb0vhLGcMxQ 这种
        String QUEUE_NAME = channel.queueDeclare().getQueue();
        // 需要绑定这个生成的 QUEUE_NAME 到 EXCHANGE 上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
        channel.basicConsume(QUEUE_NAME, true, (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] warning log '" + message + "'");
        }, consumerTag -> {});
    }
}

上节课 channel.exchangeDeclare() 使用的是 FANOUT 交换机,这节课还是使用了 DIRECT

        String[] routingKeys = new String[]{"error", "info", "warning"};
            for (int i = 0; i < 10; i++) {
                String routingKey = routingKeys[(int) (Math.random() * routingKeys.length)];
                String message = String.format("[%s],%s", routingKey, new Date());
                channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }

这段代码是通过循环 10 次来随机不同的 routingKeys 名字来发送不同的路由名字,这样不同 Consumer 的就能模拟接受到数据了。

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes(StandardCharsets.UTF_8));

之前的代码 basicPublish 第二个参数前面的教程有填写 queuename 的,现在又填写 routingKey 会感到奇怪。其实 channel.basicPublish 的第二个参数永远都是 routingKey(路由键)。

之所以它看起来既能填写 routingKey 又能填写 queueName,是因为 RabbitMQ 中存在一个特殊的、匿名的默认交换机(Default Exchange)

当你使用这个默认交换机时,它的路由规则非常特殊:它会把消息直接路由到与 routingKey 同名的队列(Queue)中

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");

每个 Consumer 的 channel.queueBind 都差不多只是最后一个参数不同,它的最后一个参数也是 routingKey 这样就能通过绑定 EXCHANGE 来实现获取 Producer 发送的不同 routingKey 消息了。

还是先启动每个 Consumer 不然 Producer 发送了接收不到。

2025-07-28T02:36:40.png

2025-07-28T02:36:45.png

可以看到在 Producer 发送了 10条不同 routingKey 的消息后,每个Consumer 都接收到了需要的消息

标签: java, rabbitmq

添加新评论