RabbitMQ 概述

什么是 RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器基于 AMQP 协议实现它用于在分布式系统中存储转发消息具有高可靠性灵活的路由可扩展性等特点

  • 核心功能
    • 消息传递异步处理解耦系统
    • 路由机制支持多种交换器类型
    • 持久化保证消息不丢失
    • 集群支持高可用和负载均衡
  • 主要优势
    • 可靠性支持消息确认持久化
    • 灵活性多种交换器类型路由规则灵活
    • 易用性提供多种客户端库
    • 社区活跃文档丰富生态完善

RabbitMQ 的核心概念

核心组件

  • Producer生产者
    • 发送消息的应用程序
    • 将消息发送到 Exchange
  • Consumer消费者
    • 接收并处理消息的应用程序
    • 从 Queue 中获取消息
  • Exchange交换器
    • 接收生产者发送的消息
    • 根据路由键将消息路由到队列
  • Queue队列
    • 存储消息的容器
    • 消费者从队列中获取消息
  • Binding绑定
    • Exchange 和 Queue 之间的连接关系
    • 定义路由规则
  • Routing Key路由键
    • 生产者发送消息时指定的键
    • Exchange 根据路由键决定消息去向
  • Virtual Host虚拟主机
    • 逻辑隔离的消息环境
    • 类似数据库中的 schema

交换器类型

  • Direct Exchange直连交换器
    • 精确匹配路由键
    • 一对一或一对多
  • Fanout Exchange扇出交换器
    • 广播模式忽略路由键
    • 发送给所有绑定的队列
  • Topic Exchange主题交换器
    • 模式匹配路由键
    • 支持通配符 *#
  • Headers Exchange头交换器
    • 根据消息头属性匹配
    • 较少使用

RabbitMQ 的工作原理

消息流转过程

1
2
3
4
5
1. Producer 发送消息到 Exchange
2. Exchange 根据 Routing Key 和 Binding 规则路由消息
3. 消息被存储到匹配的 Queue 中
4. Consumer 从 Queue 中获取并处理消息
5. Consumer 发送 ACK 确认消息已处理

工作模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
简单模式
Producer → Queue → Consumer

工作队列模式
Producer → Queue → Multiple Consumers竞争消费

发布订阅模式
Producer → Fanout Exchange → Multiple Queues → Multiple Consumers

路由模式
Producer → Direct Exchange → Queue (based on routing key) → Consumer

主题模式
Producer → Topic Exchange → Queue (based on pattern) → Consumer

环境搭建

安装 RabbitMQ

Docker 安装推荐

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 拉取镜像
docker pull rabbitmq:3.12-management

# 运行容器带管理界面
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:3.12-management

# 访问管理界面
# http://localhost:15672
# 用户名admin
# 密码admin123

Linux 安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# Ubuntu/Debian
sudo apt-get install rabbitmq-server

# CentOS/RHEL
sudo yum install rabbitmq-server

# 启动服务
sudo systemctl start rabbitmq-server

# 设置开机自启
sudo systemctl enable rabbitmq-server

# 启用管理插件
sudo rabbitmq-plugins enable rabbitmq_management

Windows 安装

下载地址https://www.rabbitmq.com/install-windows.html

步骤

  1. 安装 ErlangRabbitMQ 依赖
  2. 下载 RabbitMQ 安装包
  3. 运行安装程序
  4. 启用管理插件rabbitmq-plugins enable rabbitmq_management

macOS 安装

1
2
3
4
5
6
7
8
# 使用 Homebrew
brew install rabbitmq

# 启动服务
brew services start rabbitmq

# 启用管理插件
rabbitmq-plugins enable rabbitmq_management

基本配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 添加用户
rabbitmqctl add_user username password

# 设置用户角色
rabbitmqctl set_user_tags username administrator

# 设置权限
rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

# 查看用户列表
rabbitmqctl list_users

# 查看队列
rabbitmqctl list_queues

# 查看交换器
rabbitmqctl list_exchanges

验证安装

1
2
3
4
5
6
7
8
# 检查服务状态
rabbitmqctl status

# 查看节点信息
rabbitmqctl node_health_check

# 访问管理界面
# http://localhost:15672

核心概念详解

交换器Exchange

Direct Exchange

1
2
3
4
5
6
7
8
9
10
11
// 声明直连交换器
channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT, true);

// 声明队列
channel.queueDeclare("direct_queue", true, false, false, null);

// 绑定队列到交换器
channel.queueBind("direct_queue", "direct_exchange", "routing.key");

// 发送消息
channel.basicPublish("direct_exchange", "routing.key", null, message.getBytes());

Fanout Exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
// 声明扇出交换器
channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT, true);

// 声明多个队列
channel.queueDeclare("fanout_queue1", true, false, false, null);
channel.queueDeclare("fanout_queue2", true, false, false, null);

// 绑定队列路由键无效
channel.queueBind("fanout_queue1", "fanout_exchange", "");
channel.queueBind("fanout_queue2", "fanout_exchange", "");

// 发送消息
channel.basicPublish("fanout_exchange", "", null, message.getBytes());

Topic Exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 声明主题交换器
channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC, true);

// 声明队列
channel.queueDeclare("topic_queue1", true, false, false, null);
channel.queueDeclare("topic_queue2", true, false, false, null);

// 绑定队列使用通配符
// * 匹配一个单词
// # 匹配零个或多个单词
channel.queueBind("topic_queue1", "topic_exchange", "*.orange.*");
channel.queueBind("topic_queue2", "topic_exchange", "*.*.rabbit");

// 发送消息
channel.basicPublish("topic_exchange", "quick.orange.rabbit", null, message.getBytes());

Headers Exchange

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 声明头交换器
channel.exchangeDeclare("headers_exchange", BuiltinExchangeType.HEADERS, true);

// 声明队列
channel.queueDeclare("headers_queue", true, false, false, null);

// 绑定队列根据消息头匹配
Map<String, Object> headers = new HashMap<>();
headers.put("x-match", "all"); // all: 全部匹配, any: 任意匹配
headers.put("format", "json");
headers.put("type", "order");

channel.queueBind("headers_queue", "headers_exchange", "", headers);

// 发送消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.headers(headers)
.build();
channel.basicPublish("headers_exchange", "", properties, message.getBytes());

队列Queue

队列属性

属性 说明 默认值
durable 是否持久化 false
exclusive 是否独占 false
autoDelete 是否自动删除 false
arguments 其他参数 null
1
2
3
4
5
6
7
8
9
10
11
// 声明持久化队列
channel.queueDeclare("persistent_queue", true, false, false, null);

// 声明临时队列
channel.queueDeclare("temporary_queue", false, false, true, null);

// 声明带参数的队列
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 60000); // 消息 TTL 60秒
args.put("x-max-length", 1000); // 最大消息数
channel.queueDeclare("limited_queue", true, false, false, args);

消息属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 构建消息属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.contentType("application/json") // 内容类型
.contentEncoding("UTF-8") // 编码
.deliveryMode(2) // 持久化1: 非持久化, 2: 持久化
.priority(5) // 优先级0-9
.correlationId("corr-id-123") // 关联 ID
.replyTo("response_queue") // 回复队列
.expiration("60000") // 过期时间毫秒
.messageId("msg-id-456") // 消息 ID
.timestamp(new Date()) // 时间戳
.userId("producer") // 用户 ID
.appId("my-app") // 应用 ID
.build();

channel.basicPublish("exchange", "routing_key", properties, message.getBytes());

Java 客户端使用

添加依赖

1
2
3
4
5
6
<!-- RabbitMQ Java Client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.18.0</version>
</dependency>

简单模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class SimpleProducer {

private static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin123");

// 创建连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.example.consumer;

import com.rabbitmq.client.*;

public class SimpleConsumer {

private static final String QUEUE_NAME = "simple_queue";

public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin123");

// 创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 定义消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};

// 消费消息自动 ACK
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
}
}

工作队列模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class WorkProducer {

private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明持久化队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 发送多条消息
for (int i = 1; i <= 10; i++) {
String message = "Message " + i;

// 设置消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.build();

channel.basicPublish("", QUEUE_NAME, properties, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");

// 模拟延迟
Thread.sleep(100);
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.example.consumer;

import com.rabbitmq.client.*;

public class WorkConsumer {

private static final String QUEUE_NAME = "work_queue";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 设置预取计数公平分发
channel.basicQos(1);

// 定义消费者
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");

try {
// 模拟处理时间
doWork(message);
} finally {
System.out.println(" [x] Done");
// 手动 ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};

// 消费消息手动 ACK
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
}

private static void doWork(String task) throws InterruptedException {
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}

发布订阅模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class PublishProducer {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明扇出交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

// 发送消息
String message = "Info: System started";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.example.consumer;

import com.rabbitmq.client.*;

public class PublishConsumer {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

// 绑定队列到交换器
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

路由模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RoutingProducer {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明直连交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 发送不同级别的消息
String[] severities = {"info", "warning", "error"};
for (String severity : severities) {
String message = severity.toUpperCase() + ": Log message";
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.example.consumer;

import com.rabbitmq.client.*;

public class RoutingConsumer {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

// 绑定队列只接收 error 级别
String[] bindings = {"error"};
for (String binding : bindings) {
channel.queueBind(queueName, EXCHANGE_NAME, binding);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

主题模式

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.example.producer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class TopicProducer {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 声明主题交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 发送不同路由键的消息
String[] routingKeys = {
"kern.critical",
"auth.info",
"auth.error",
"kern.info"
};

for (String routingKey : routingKeys) {
String message = "Message with routing key: " + routingKey;
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.example.consumer;

import com.rabbitmq.client.*;

public class TopicConsumer {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 声明交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 创建临时队列
String queueName = channel.queueDeclare().getQueue();

// 绑定队列接收所有 kern 相关的消息
String bindingKey = "kern.*";
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

System.out.println(" [*] Waiting for messages with binding key: " + bindingKey);

// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};

channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});
}
}

Spring Boot 整合

添加依赖

1
2
3
4
5
<!-- Spring Boot AMQP -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin123
virtual-host: /
# 连接超时
connection-timeout: 15000
# 发布者确认
publisher-confirm-type: correlated
publisher-returns: true
# 模板配置
template:
mandatory: true
# 监听器配置
listener:
simple:
acknowledge-mode: manual # 手动 ACK
concurrency: 5 # 最小并发数
max-concurrency: 10 # 最大并发数
prefetch: 1 # 预取计数

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package com.example.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

// 直连交换器示例
public static final String DIRECT_EXCHANGE = "direct_exchange";
public static final String DIRECT_QUEUE = "direct_queue";
public static final String DIRECT_ROUTING_KEY = "direct.routing.key";

// 扇出交换器示例
public static final String FANOUT_EXCHANGE = "fanout_exchange";
public static final String FANOUT_QUEUE1 = "fanout_queue1";
public static final String FANOUT_QUEUE2 = "fanout_queue2";

// 主题交换器示例
public static final String TOPIC_EXCHANGE = "topic_exchange";
public static final String TOPIC_QUEUE1 = "topic_queue1";
public static final String TOPIC_QUEUE2 = "topic_queue2";

/**
* 声明直连交换器
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}

/**
* 声明直连队列
*/
@Bean
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE).build();
}

/**
* 绑定直连队列到交换器
*/
@Bean
public Binding directBinding(DirectExchange directExchange, Queue directQueue) {
return BindingBuilder.bind(directQueue)
.to(directExchange)
.with(DIRECT_ROUTING_KEY);
}

/**
* 声明扇出交换器
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE, true, false);
}

/**
* 声明扇出队列1
*/
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable(FANOUT_QUEUE1).build();
}

/**
* 声明扇出队列2
*/
@Bean
public Queue fanoutQueue2() {
return QueueBuilder.durable(FANOUT_QUEUE2).build();
}

/**
* 绑定扇出队列1
*/
@Bean
public Binding fanoutBinding1(FanoutExchange fanoutExchange, Queue fanoutQueue1) {
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}

/**
* 绑定扇出队列2
*/
@Bean
public Binding fanoutBinding2(FanoutExchange fanoutExchange, Queue fanoutQueue2) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}

/**
* 声明主题交换器
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}

/**
* 声明主题队列1
*/
@Bean
public Queue topicQueue1() {
return QueueBuilder.durable(TOPIC_QUEUE1).build();
}

/**
* 声明主题队列2
*/
@Bean
public Queue topicQueue2() {
return QueueBuilder.durable(TOPIC_QUEUE2).build();
}

/**
* 绑定主题队列1匹配 *.orange.*
*/
@Bean
public Binding topicBinding1(TopicExchange topicExchange, Queue topicQueue1) {
return BindingBuilder.bind(topicQueue1)
.to(topicExchange)
.with("*.orange.*");
}

/**
* 绑定主题队列2匹配 *.*.rabbit
*/
@Bean
public Binding topicBinding2(TopicExchange topicExchange, Queue topicQueue2) {
return BindingBuilder.bind(topicQueue2)
.to(topicExchange)
.with("*.*.rabbit");
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.example.producer;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送直连消息
*/
public void sendDirectMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.DIRECT_EXCHANGE,
RabbitMQConfig.DIRECT_ROUTING_KEY,
message
);
System.out.println(" [x] Sent Direct: '" + message + "'");
}

/**
* 发送扇出消息
*/
public void sendFanoutMessage(String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.FANOUT_EXCHANGE,
"",
message
);
System.out.println(" [x] Sent Fanout: '" + message + "'");
}

/**
* 发送主题消息
*/
public void sendTopicMessage(String routingKey, String message) {
rabbitTemplate.convertAndSend(
RabbitMQConfig.TOPIC_EXCHANGE,
routingKey,
message
);
System.out.println(" [x] Sent Topic: '" + routingKey + "':'" + message + "'");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package com.example.consumer;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class MessageConsumer {

/**
* 监听直连队列
*/
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)
public void receiveDirectMessage(String message, Channel channel, Message amqpMessage) throws IOException {
try {
System.out.println(" [x] Received Direct: '" + message + "'");

// 处理业务逻辑
processMessage(message);

// 手动 ACK
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.err.println(" [x] Error processing message: " + e.getMessage());
// 拒绝消息重新入队
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
}
}

/**
* 监听扇出队列1
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE1)
public void receiveFanoutMessage1(String message) {
System.out.println(" [x] Received Fanout Queue1: '" + message + "'");
}

/**
* 监听扇出队列2
*/
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE2)
public void receiveFanoutMessage2(String message) {
System.out.println(" [x] Received Fanout Queue2: '" + message + "'");
}

/**
* 监听主题队列1
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE1)
public void receiveTopicMessage1(String message) {
System.out.println(" [x] Received Topic Queue1: '" + message + "'");
}

/**
* 监听主题队列2
*/
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE2)
public void receiveTopicMessage2(String message) {
System.out.println(" [x] Received Topic Queue2: '" + message + "'");
}

private void processMessage(String message) {
// 模拟业务处理
System.out.println(" Processing: " + message);
}
}

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.example.controller;

import com.example.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/messages")
public class MessageController {

@Autowired
private MessageProducer messageProducer;

@PostMapping("/direct")
public String sendDirect(@RequestParam String message) {
messageProducer.sendDirectMessage(message);
return "Direct message sent";
}

@PostMapping("/fanout")
public String sendFanout(@RequestParam String message) {
messageProducer.sendFanoutMessage(message);
return "Fanout message sent";
}

@PostMapping("/topic")
public String sendTopic(@RequestParam String routingKey, @RequestParam String message) {
messageProducer.sendTopicMessage(routingKey, message);
return "Topic message sent";
}
}

高级特性

消息确认机制

生产者确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 开启发布者确认
spring.rabbitmq.publisher-confirm-type=correlated

// 配置回调
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息成功到达交换器");
} else {
System.err.println("消息未到达交换器: " + cause);
// 重试或记录日志
}
}
}

// 注册回调
@Configuration
public class RabbitMQConfig {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(new ConfirmCallback());
}
}

消费者确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 手动 ACK
@RabbitListener(queues = "my_queue")
public void handleMessage(String message, Channel channel, Message amqpMessage) throws IOException {
try {
// 处理消息
process(message);

// 确认消息
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息重新入队
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);

// 或者拒绝消息丢弃
// channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, false);
}
}

死信队列

配置死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Configuration
public class DeadLetterConfig {

public static final String DEAD_LETTER_EXCHANGE = "dlx_exchange";
public static final String DEAD_LETTER_QUEUE = "dlx_queue";
public static final String NORMAL_EXCHANGE = "normal_exchange";
public static final String NORMAL_QUEUE = "normal_queue";

/**
* 声明死信交换器
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}

/**
* 声明死信队列
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

/**
* 绑定死信队列
*/
@Bean
public Binding deadLetterBinding(DirectExchange deadLetterExchange, Queue deadLetterQueue) {
return BindingBuilder.bind(deadLetterQueue)
.to(deadLetterExchange)
.with("dead.letter");
}

/**
* 声明正常队列配置死信参数
*/
@Bean
public Queue normalQueue() {
return QueueBuilder.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey("dead.letter")
.ttl(60000) // 消息 TTL 60秒
.maxLength(100) // 最大长度
.build();
}

/**
* 声明正常交换器
*/
@Bean
public DirectExchange normalExchange() {
return new DirectExchange(NORMAL_EXCHANGE, true, false);
}

/**
* 绑定正常队列
*/
@Bean
public Binding normalBinding(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue)
.to(normalExchange)
.with("normal.routing.key");
}
}

死信队列触发条件

触发条件 说明
消息被拒绝 basicNack/basicReject 且 requeue=false
消息过期 超过 TTL 时间
队列达到最大长度 超过 x-max-length 限制

延迟队列

使用插件实现

1
2
3
4
5
6
7
8
9
10
11
# 下载延迟插件
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.12.0/rabbitmq_delayed_message_exchange-3.12.0.ez

# 复制到 plugins 目录
cp rabbitmq_delayed_message_exchange-3.12.0.ez /usr/lib/rabbitmq/plugins/

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 重启 RabbitMQ
systemctl restart rabbitmq-server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
public class DelayedQueueConfig {

public static final String DELAYED_EXCHANGE = "delayed_exchange";
public static final String DELAYED_QUEUE = "delayed_queue";

/**
* 声明延迟交换器
*/
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}

/**
* 声明延迟队列
*/
@Bean
public Queue delayedQueue() {
return QueueBuilder.durable(DELAYED_QUEUE).build();
}

/**
* 绑定延迟队列
*/
@Bean
public Binding delayedBinding(CustomExchange delayedExchange, Queue delayedQueue) {
return BindingBuilder.bind(delayedQueue)
.to(delayedExchange)
.with("delayed.routing.key")
.noargs();
}
}

// 发送延迟消息
@Component
public class DelayedProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDelayedMessage(String message, int delayMillis) {
MessagePostProcessor processor = msg -> {
msg.getMessageProperties().setDelay(delayMillis);
return msg;
};

rabbitTemplate.convertAndSend(
DelayedQueueConfig.DELAYED_EXCHANGE,
"delayed.routing.key",
message,
processor
);
}
}

消息优先级

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 声明优先级队列
@Bean
public Queue priorityQueue() {
return QueueBuilder.durable("priority_queue")
.maxPriority(10)
.build();
}

// 发送优先级消息
@Component
public class PriorityProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendPriorityMessage(String message, int priority) {
MessagePostProcessor processor = msg -> {
msg.getMessageProperties().setPriority(priority);
return msg;
};

rabbitTemplate.convertAndSend(
"priority_exchange",
"priority.routing.key",
message,
processor
);
}
}

集群与高可用

普通集群

1
2
3
4
5
6
7
8
9
10
11
12
13
# 在多台服务器上安装 RabbitMQ

# 同步 Erlang Cookie
# 确保所有节点的 ~/.erlang.cookie 文件内容相同

# 加入集群
# 在节点2上执行
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl start_app

# 查看集群状态
rabbitmqctl cluster_status

镜像队列

1
2
3
4
5
6
7
8
9
# 设置镜像队列策略
# 将所有队列镜像到所有节点
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

# 镜像到指定节点
rabbitmqctl set_policy ha-two "^" '{"ha-mode":"nodes","ha-params":["rabbit@node1","rabbit@node2"]}'

# 自动同步
rabbitmqctl set_policy ha-sync "^" '{"ha-sync-mode":"automatic"}'

Quorum 队列推荐

1
2
3
4
5
6
7
// 声明 Quorum 队列
@Bean
public Queue quorumQueue() {
return QueueBuilder.durable("quorum_queue")
.quorum()
.build();
}
特性 经典队列 镜像队列 Quorum 队列
高可用
数据一致性 - 最终一致 强一致
性能
推荐场景 非关键数据 兼容旧版本 关键数据推荐

最佳实践

命名规范

1
2
3
4
5
6
7
8
9
10
11
交换器命名
- 格式业务.类型.环境
- 示例order.direct.prodlog.fanout.dev

队列命名
- 格式业务.功能.环境
- 示例order.process.prodlog.error.dev

路由键命名
- 格式模块.操作.类型
- 示例order.create.infouser.update.error

性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 1. 使用连接池
// Spring Boot 自动管理连接无需额外配置

// 2. 批量发送
@Component
public class BatchProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void batchSend(List<String> messages) {
for (String message : messages) {
rabbitTemplate.convertAndSend("exchange", "routing.key", message);
}
}
}

// 3. 合理设置预取计数
spring.rabbitmq.listener.simple.prefetch=10

// 4. 避免大消息
// 单条消息建议不超过 1MB
// 大文件应存储在对象存储消息中只存引用

// 5. 使用持久化要权衡
// 持久化会降低性能根据业务需求选择

可靠性保障

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 1. 消息持久化
MessageProperties properties = new MessageProperties();
properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

// 2. 生产者确认
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
log.error("消息发送失败: {}", cause);
// 重试或补偿
}
});

// 3. 消费者手动 ACK
@RabbitListener(queues = "my_queue")
public void handleMessage(String message, Channel channel, Message amqpMessage) {
try {
process(message);
channel.basicAck(amqpMessage.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(amqpMessage.getMessageProperties().getDeliveryTag(), false, true);
}
}

// 4. 死信队列处理异常消息
// 配置死信队列捕获失败消息

// 5. 幂等性处理
@RabbitListener(queues = "my_queue")
public void handleMessage(Message message) {
String messageId = message.getMessageProperties().getMessageId();

// 检查是否已处理
if (isProcessed(messageId)) {
return;
}

// 处理消息
process(message);

// 标记为已处理
markAsProcessed(messageId);
}

监控建议

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 1. 监控队列长度
rabbitmqctl list_queues name messages

# 2. 监控消费者数量
rabbitmqctl list_consumers

# 3. 监控连接数
rabbitmqctl list_connections

# 4. 使用管理界面
# http://localhost:15672

# 5. 集成监控系统
# Prometheus + Grafana
# 使用 rabbitmq_exporter

常见问题

消息丢失问题

1
2
3
4
5
6
7
问题消息在传输过程中丢失

解决方案
1. 生产者开启确认机制
2. 消息和队列持久化
3. 消费者手动 ACK
4. 使用事务性能较差不推荐

消息重复消费

1
2
3
4
5
6
7
问题同一条消息被多次处理

解决方案
1. 实现幂等性
2. 使用唯一消息 ID
3. 数据库去重
4. Redis 记录已处理的消息 ID
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 幂等性示例
@Component
public class IdempotentConsumer {

@Autowired
private StringRedisTemplate redisTemplate;

@RabbitListener(queues = "my_queue")
public void handleMessage(Message message) {
String messageId = message.getMessageProperties().getMessageId();

// 使用 SETNX 实现分布式锁
Boolean locked = redisTemplate.opsForValue()
.setIfAbsent("msg:" + messageId, "1", 300, TimeUnit.SECONDS);

if (Boolean.FALSE.equals(locked)) {
log.warn("消息已处理: {}", messageId);
return;
}

try {
// 处理业务
process(message);
} finally {
// 不删除让 key 自然过期
}
}
}

消息积压问题

1
2
3
4
5
6
7
问题消费者处理速度慢导致消息积压

解决方案
1. 增加消费者数量
2. 优化消费者处理逻辑
3. 使用批量处理
4. 临时扩容消费者实例
1
2
3
4
5
6
7
# 增加并发消费者
spring:
rabbitmq:
listener:
simple:
concurrency: 10
max-concurrency: 50

报错处理

💗💗 RabbitMQ 报错Connection refused

1
2
3
4
5
6
7
8
9
10
11
12
13
错误信息
java.net.ConnectException: Connection refused

错误原因
1. RabbitMQ 服务未启动
2. 端口配置错误
3. 防火墙阻止连接

解决方案
1. 检查服务状态rabbitmqctl status
2. 确认端口默认 5672
3. 检查防火墙设置
4. 确认用户名密码正确

💗💗 RabbitMQ 报错PRECONDITION_FAILED

1
2
3
4
5
6
7
8
9
10
错误信息
PRECONDITION_FAILED - inequivalent arg 'durable' for queue

错误原因
队列已存在但属性不匹配

解决方案
1. 删除旧队列重新创建
2. 确保队列属性一致
3. 使用不同的队列名

💗💗 RabbitMQ 报错NOT_FOUND

1
2
3
4
5
6
7
8
9
10
错误信息
NOT_FOUND - no exchange 'xxx' in vhost '/'

错误原因
交换器不存在

解决方案
1. 检查交换器名称是否正确
2. 确认交换器已声明
3. 检查虚拟主机配置

学习资源

  • 视频
    • 尚硅谷 - 2021 最新 RabbitMQ 教程https://www.bilibili.com/video/BV1cb4y1o7zz
  • 文档
    • RabbitMQ 中文文档http://rabbitmq.mr-ping.com/
    • RabbitMQ 官方文档https://www.rabbitmq.com/documentation.html
    • RabbitMQ GitHubhttps://github.com/rabbitmq/rabbitmq-server
  • 书籍
    • RabbitMQ 实战指南朱忠华著
    • RabbitMQ in ActionAlvaro Videla 著
    • RabbitMQ 实战高效部署分布式消息队列
  • 教程
    • RabbitMQ 入门教程https://www.runoob.com/w3cnote/rabbitmq-tutorial.html
    • Spring AMQP 文档https://spring.io/projects/spring-amqp
  • 工具
    • RabbitMQ 在线模拟器http://tryrabbitmq.com/
    • RabbitMQ ManagementWeb 管理界面
    • rabbitmqadmin命令行管理工具
    • Apache JMeter压力测试工具
  • 社区
    • RabbitMQ 中文社区http://www.rabbitmq.org.cn/
    • Stack Overflow RabbitMQ 标签https://stackoverflow.com/questions/tagged/rabbitmq