SpringBoot2.x整合RabbitMQ
# SpringBoot整合RabbitMQ
# 1. 搭建环境
# 1.1 依赖
<!--RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5
2
3
4
5
# 1.2 配置文件(yml)
spring:
application:
# 项目名称
name: rabbitmq-springboot
# 配置RabbitMQ
rabbitmq:
host: 116.196.118.167 # 主机
port: 5672 # 端口号
username: chggx # 用户名
password: Llxc326868 # 密码
virtual-host: /virtual_chggx # 类似于数据库
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 1.3 模板
RabbitTemplate
用来简化操作 使用时候直接在项目中注入即可使用
只有消费者存在时,才创建队列,交换机
# 2. hello world模型 (简单队列 (opens new window))
# 生产者
/**
* @Author: CHGGX
* @Description: <h1> 简单队列生产者 </h1>
*/
@SpringBootTest(classes = RabbitmaSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class SpProducer {
/**
* 注入RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 队列
*/
private static final String QUEUE_NAME = "sp_queue";
/**
* 简单队列生产者
*/
@Test
public void Send() {
// 发送的消息
String msg = "hello simple !";
rabbitTemplate.convertAndSend(QUEUE_NAME, msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 消费者
/**
* @Author: CHGGX
* @Description: <h1> 简单队列消费者 </h1>
*/
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "sp_queue"))
public class SpConsumer {
/**
* @param message 监听的消息
*/
@RabbitHandler
public void receive(String message) {
System.out.println("sp receive msg:" + message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitListener
: 作用在类上.方法上添加@RabbitHandler
# 3. work模型使用(工作队列 (opens new window))
# 3.1 轮询模式
- 你一个我一个,平均分发
# 生产者
/**
* @Author: CHGGX
* @Description: <h1> 工作队列生产者(轮询分发) </h1>
*/
@SpringBootTest(classes = RabbitmaSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class WorkProducer {
/**
* 注入RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 队列
*/
private static final String QUEUE_NAME = "work_queue";
/**
* 简单队列生产者
*/
@Test
public void Send() {
for (int i = 0; i < 10; i++) {
// 发送的消息
String msg = "hello work robin !" + i;
rabbitTemplate.convertAndSend(QUEUE_NAME, msg);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 消费者
/**
* @Author: CHGGX
* @Description: <h1> 工作队列消费者(轮询分发) </h1>
*/
@Component
public class WorkConsumer {
/**
* 消费者
* @param message 监听的消息
*/
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public void receive01(String message) {
System.out.println("[1] work receive msg:" + message);
}
/**
* 消费者
* @param message 监听的消息
*/
@RabbitListener(queuesToDeclare = @Queue(value = "work_queue"))
public void receive02(String message) {
System.out.println("[2] work receive msg:" + message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@RabbitListener
: 作用在方法上
# 2. fair dipatch模式
- 能者多劳
# 4. Fanout 广播模型 (Publish/Subscribe (opens new window))
# 生产者
/**
* @Author: CHGGX
* @Description: <h1> 发布订阅队列生产者(广播模型 fanout分发) </h1>
*/
@SpringBootTest(classes = RabbitmaSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class PsProducer {
/**
* 注入RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "ps_exchange";
/**
* 发布订阅队列生产者
*/
@Test
public void Send() {
// 发送的消息
String msg = "hello ps fanout !";
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "", msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 消费者
/**
* @Author: CHGGX
* @Description: <h1> 发布订阅队列消费者 </h1>
*/
@Component
public class PsConsumer {
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "ps_exchange", type = "fanout") // 要绑定的交换机名称,类型fanout分发
)
})
public void receive01(String message) {
System.out.println("[1] ps receive msg:" + message);
}
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "ps_exchange", type = "fanout") // 要绑定的交换机名称,类型fanout分发
)
})
public void receive02(String message) {
System.out.println("[2] ps receive msg:" + message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# 5. Direct 路由模型 (Routing (opens new window))
# 生产者
/**
* @Author: CHGGX
* @Description: <h1> Route模式生产者(direct) </h1>
*/
@SpringBootTest(classes = RabbitmaSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RtProducer {
/**
* 注入RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "rt_exchange";
/**
* 发布订阅队列生产者
*/
@Test
public void Send() {
// 发送的消息
String msg = "hello ps info route direct !";
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "waring", msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 消费者
/**
* @Author: CHGGX
* @Description: <h1> Route模式消费者(direct) </h1>
*/
@Component
public class RtConsumer {
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "rt_exchange", type = "direct"), // 要绑定的交换机名称,类型direct
key = {"waring", "error"}
)
})
public void receive01(String message) {
System.out.println("[1] ps receive msg:" + message);
}
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "rt_exchange", type = "direct"), // 要绑定的交换机名称,类型direct
key = {"info", "waring", "error"}
)
})
public void receive02(String message) {
System.out.println("[2] ps receive msg:" + message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
生产者的routeKey设置的key,当消费者中的key中不包含routekey的key时,消费者接受消息,反之不接受
# 6. Topic 主题模式 (Topics (opens new window))
通配符
*: 匹配单个
a.* a.save/a.b
#: 匹配多个
a.# a.b/a.b.c/a.b.c.d
# 生产者
/**
* @Author: CHGGX
* @Description: <h1> 订阅模式生产者(topic) </h1>
*/
@SpringBootTest(classes = RabbitmaSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TpProducer {
/**
* 注入RabbitTemplate
*/
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "tp_exchange";
/**
* 生产者队列生产者
*/
@Test
public void Send() {
// 发送的消息
String msg = "hello ps user.save topic !";
rabbitTemplate.convertAndSend(EXCHANGE_NAME, "user.save", msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 消费者
/**
* @Author: CHGGX
* @Description: <h1> 订阅模式消费者(topic) </h1>
*/
@Component
public class TpConsumer {
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "tp_exchange", type = "topic"), // 要绑定的交换机名称,类型topic
key = {"order.#", "produce.#","user.*"}
)
})
public void receive01(String message) {
System.out.println("[1] tp receive msg:" + message);
}
/**
* 消费者
* bindings: 绑定交换机和队列
*
* @param message 监听的消息
*/
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, // 创建临时队列
exchange = @Exchange(value = "tp_exchange", type = "topic"), // 要绑定的交换机名称,类型topic
key = {"user.save","user.*"}
)
})
public void receive02(String message) {
System.out.println("[2] tp receive msg:" + message);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
以上所有的队列基于注解开发
上次更新: 2023/03/27, 08:56:25