RabbitMQ
# RabbitMQ
# 1. RabbitMQ简介
# 1.1 概述
RabbitMQ (opens new window)是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。 RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
主要特性
- 可伸缩性:集群服务
- 消息持久化:从内存持久化消息到硬盘,再从硬盘加载到内存
基于
AMQP
协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。官网教程 (opens new window)
# AMQP 协议
AMQP(advanced message queuing protocol)`在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题。顾名思义,AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。这使得实现了AMQP的provider天然性就是跨平台的。
2
# 1.2. 什么是消息中间件
消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ就是这样一款我们苦苦追寻的消息队列。RabbitMQ是一个开源的消息代理的队列服务器,用来通过普通协议在完全不同的应用之间共享数据。
# 2. RabbitMq安装
# 2.1 下载软件 (opens new window)
- erlang环境
- rabbitmq环境
# 2.2 安装环境,都点“下一步”就ok。安装完成后Windows系统控制台会出现如下图标。
# 2.3 启动管理工具
如下找到自己安装的目录,并复制全路径,这里我本地安装到了D盘下面
rabbitmq-plugins enable rabbitmq_management
# 2.4 访问
http://localhost:15672/#/ 登录guest/guest
# 3. 添加用户
# 3.1 添加用户
# 3.2 virtual hosts管理
- virtual hosts相当于mysql的db.
一般以"/"开头
- 对用户进行授权
- 用户的virtual hosts改为
# 4. RabbitMQ队列 (opens new window)
AMQP协议的回顾
- rabbitmq依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>
2
3
4
5
# 4.1. 简单队列(simple (opens new window))
- 模型
P:消息的生产者 C:消息的消费者 红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
# 4.1.1 获取MQ连接
/**
* @Author: CHGGX
* @Description: <h1> RabbitMQ连接工具类 </h1>
*/
public class ConnectionUtils {
/**
* 获取RabbitMQ的连接
*
* @return RabbitMQ连接
* @throws IOException 异常
* @throws TimeoutException 异常
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 1. 定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2.1 设置服务地址
factory.setHost("122.152.204.3");
// 2.2 设置amqp端口5672
factory.setPort(5672);
// 2.3 设置数据库 vhost
factory.setVirtualHost("/virtual_chggx");
// 2.4 设置用户名
factory.setUsername("chggx");
// 2.5 设置密码
factory.setPassword("Llxc326868");
// 3. 返回连接
return factory.newConnection();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 4.1.2 生产者发送消息到队列
/**
* @Author: CHGGX
* @Description: <h1> 生产者 </h1>
*/
public class SimpleProducer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 创建队列,并声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "Hello simple !";
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("send msg: " + msg);
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 4.1.3 消费者从队列获取消息
/**
* @Author: CHGGX
* @Description: <h1> 消费者 </h1>
*/
public class SimpleConsumer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "simple_queue";
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msgString = new String(delivery.getBody());
System.out.println("receive msg: " + msgString);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
注意: QueueingConsumer已过时.
# 4.1.4 消费者新的API
/**
* @Author: CHGGX
* @Description: <h1> 消费者 </h1>
*/
public class SimpleConsumer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "simple_queue";
/**
* 新的API接受消息
*/
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("receive new api msg:" + msg);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 4.1.5 简单队列的缺点
耦合性高,生产者一一对应消费者.
E:\我的博客系统\消息中间件\rabbitMq\image
# 4.2 工作队列(Work queues (opens new window))
- 模型
Simple队列是生产者与消费者一对一的关系。在实际开发中,我们经常遇到多个消费者,我们可以想象,生产者发送消息是毫不费力的,然而消费者获取消息通常还需要进行一系列的业务处理,这显然需要一段时间,这时候消息队列可能就会积压很多消息。
# 4.2.1 轮询分发(round-robin)
生产者
/**
* @Author: CHGGX
* @Description: <h1> 生产者 </h1>
*/
public class WorkProducer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 创建队列,并声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 发送消息
for (int i = 0; i < 50; i++) {
String msg = "hello work !" + i;
System.out.println("work msg:" + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 每个20*ims发送一次
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
work msg:hello work !0
work msg:hello work !1
work msg:hello work !2
...
work msg:hello work !49
2
3
4
5
消费者
- 消费者01
/**
* @Author: CHGGX
* @Description: <h1> 消费者 </h1>
*/
public class WorkConsumer01 {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
/**
* 新的API接受消息
*/
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[01] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
}
}
};
boolean autoAck = true;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[01] receive new api msg:hello work !0
done
[01] receive new api msg:hello work !2
done
[01] receive new api msg:hello work !4
done
[01] receive new api msg:hello work !6
done
....
[01] receive new api msg:hello work !48
done
2
3
4
5
6
7
8
9
10
11
- 消费者02
/**
* @Author: CHGGX
* @Description: <h1> 消费者 </h1>
*/
public class WorkConsumer02 {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
/**
* 新的API接受消息
*/
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[02] receive new api msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
}
}
};
boolean autoAck = true;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
[02] receive new api msg:hello work !1
done
[02] receive new api msg:hello work !3
done
[02] receive new api msg:hello work !5
done
[02] receive new api msg:hello work !7
done
...
[02] receive new api msg:hello work !49
done
2
3
4
5
6
7
8
9
10
11
现象: 消费者01和02处理的数据消息是一样的.消费者01偶数,02奇数.这种方式叫做轮询分发(round-robin).结果不管谁忙谁闲,分发的消息总是你一个我一个.
# 4.2.2 公平分发(fair dipatch)
生产者
/**
* @Author: CHGGX
* @Description: <h1> 公平分发生产者 </h1>
*/
public class WorkProducer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 创建队列,并声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// =========== 公平分发 ===========
// 每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个消息
// 限制发送给同一个消费者(不得超过一条消息)
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// =========== 公平分发 ===========
// 发送消息
for (int i = 0; i < 50; i++) {
String msg = "hello work !" + i;
System.out.println("work msg:" + msg);
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 每个20*ims发送一次
Thread.sleep(i * 20);
}
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
消费者
- 消费者01
/**
* @Author: CHGGX
* @Description: <h1> 公平分发消费者 </h1>
*/
public class WorkConsumer01 {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
/**
* 新的API接受消息
*/
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// =========== 公平分发 ===========
// 保证一次只分发一个
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[01] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// =========== 公平分发 ===========
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
- 消费者02
/**
* @Author: CHGGX
* @Description: <h1> 公平分发消费者 </h1>
*/
public class WorkConsumer02 {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "work_queue";
/**
* 新的API接受消息
*/
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// =========== 公平分发 ===========
// 保证一次只分发一个
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[02] receive new api msg:" + msg);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// =========== 公平分发 ===========
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
能者多劳
# 4.3 订阅模式(Publish/Subscribe (opens new window))
- 模型 (Fanout)
- 一个生产者,多个消费者
- 每个消费者都有自己的队列
- 生产者没有直接将消息发送到队列,而是发送到交换机(exchange)
- 每个队列都绑定到交换机上(exchange)
- 生产者发送的消息经过交换机到达队列就能实现一个消息被多个消费者消费
场景
注册 -> 邮件 -> 短信
生产者
/**
* @Author: CHGGX
* @Description: <h1> 发布订阅生产者 </h1>
*/
public class PsProducer {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "ps_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明交换机
/**
* String exchange: 交换机
* String type: 类型
*/
// fanout: 分发
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 发送消息
String msg = "hello PS !";
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
System.out.println("PS send msg:" + msg);
// 关闭资源
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
生产者直接将消息发送到交换机
Rabbitmq的exchange中
**此时消息丢失!!!**①因为交换机(exchange)没有存储能力,在rabbitmq中只有队列(queue)有存储能力,②因为这个时候还没有队列绑定到这个交换机,所以数据丢失了.
消费者
- 消费者01
/**
* @Author: CHGGX
* @Description: <h1> </h1>
*/
public class PsConsumer01 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "ps_exchange_fanout";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "ps_queue_email";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
/**
* String queue: 队列
* String exchange: 交换机
* String routingKey: 路由key
*/
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
// =========== 公平分发 ===========
// 保证一次只分发一个
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[01] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// =========== 公平分发 ===========
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
- 消费者02
/**
* @Author: CHGGX
* @Description: <h1> </h1>
*/
public class PsConsumer02 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "ps_exchange_fanout";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "ps_queue_sms";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
// =========== 公平分发 ===========
// 保证一次只分发一个
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[02] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
// =========== 公平分发 ===========
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
交换机绑定的队列
# 4.4 路由模式(Routing (opens new window))
- 模型 (Direct)
生产者
/**
* @Author: CHGGX
* @Description: <h1> 路由模式生产者 </h1>
*/
public class RProducer {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "r_exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明交换机
/**
* String exchange: 交换机
* String type: 类型
*/
// direct:
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 发送信息
String msg = "hello routing !";
// routingKey: error/info/waring
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("R send msg:" + msg);
// 关闭资源
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
消费者
- 消费者01
/**
* @Author: CHGGX
* @Description: <h1> 路由模式消费者 </h1>
*/
public class RConsumer01 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "r_exchange_direct";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "r_queue_email";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[01] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
- 消费者02
/**
* @Author: CHGGX
* @Description: <h1> 路由模式消费者 </h1>
*/
public class RConsumer02 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "r_exchange_direct";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "r_queue_sms";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"waring");
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[02] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# 4.5 主题模式(Topics (opens new window))
- 模型 (topic)
生产者
/**
* @Author: CHGGX
* @Description: <h1> 主题模式(通配符)生产者 </h1>
*/
public class TpProducer {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "tp_exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明交换机
/**
* String exchange: 交换机
* String type: 类型 (fanout/direct/topic)
*/
// topic: 通配符
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 发送信息
String msg = "hello topic !";
channel.basicPublish(EXCHANGE_NAME,"goods.add",null,msg.getBytes());
System.out.println("TP send msg:" + msg);
// 关闭资源
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
消费者
- 消费者01
/**
* @Author: CHGGX
* @Description: <h1> 主题模式(通配符)消费者 </h1>
*/
public class TpConsumer01 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "tp_exchange_topic";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "tp_queue_email";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[01] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
- 消费者02
/**
* @Author: CHGGX
* @Description: <h1> 主题模式(通配符)消费者 </h1>
*/
public class TpConsumer02 {
/**
* 交换机
*/
private static final String EXCHANGE_NAME = "tp_exchange_topic";
/**
* 消息队列
*/
private static final String QUEUE_NAME = "tp_queue_sms";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定到交换机
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
channel.basicQos(1);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[02] receive new api msg:" + msg);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println(" done ");
// 手动回执消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 5. 消息应答与消息持久化
# 5.1 消息应答
- 消息应答默认打开
// 自动应答 从true --- false
boolean autoAck = false;
// 监听队列
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
2
3
4
# 5.1.1 自动确认模式
boolean autoAck = true
: 一旦rabbitmq将消息发送给消费者,就会从内存中删除
问题: 这种情况下,如果杀死正在执行的消费者,就会丢失正在处理的消息.
# 5.1.2 手动模式
boolean autoAck = false
: 如果有一个消费者挂掉,就交付给其他消费者.(rabbitmq支持消息应答,消费者发送一个消息应答,告诉rabbitmq这个消息我已经处理完成,你可以删除了,人后然后rabbitmq就删除内存中的消息.)
# 5.2 消息的持久化
如果rabbitmq挂了,我们的消息丢失.该怎么处理???
/**
* String queue: 队列名称
* boolean durable: 是否持久化
* boolean exclusive: 是否排他
* boolean autoDelete: 是否自动删除
* Map<String, Object> arguments: 队列的其他一些参数
*/
// 是否持久化
boolean durable = false;
// 队列声明
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
2
3
4
5
6
7
8
9
10
11
12
我们将程序中的
boolean durable = false
;改成true;尽管代码是正确的,他也不会运行成功!因为我们已经定义了一个队列(work_queue).这个队列是为持久化的,rabbitmq不准许重新定义(不同参数)一个已经存在的队列.[解决方法: 删除队列/重新定义一个队列]
# 6. Exchange(交换机 转发器)
一方面是接收生产者的消息,另一方面是向队列推送消息.
匿名转发: ""
# 6.1 Fanout Exchange(不处理路由键routekey)
# 6.2 Direct Exchange(处理路由键routekey)
# 6.3 Topic Exchange (将路由键和某模式匹配)
- 通配符 主题模式(Topics (opens new window))
- #: 匹配一个或多个
- *: 匹配一个
# 7. RabbitMq的消息确认机制(事务+confirm)
在rabbitmq中,我们可以通过持久化数据,解决rabbitmq服务器异常的数据丢失问题
- 问题
生产者将消息发送出去之后,消息到底有没有到大rabbitmq服务器,默认的情况是不知道的;
- 解决
两种方法是:
- AMQP实现的事务机制
- confirm模式
# 7.1 事务机制
txSelect txCommit txRollback
txSelect: 用于将当前channel设置成transation模式
txCommit : 用于提交事务
txRollback : 用于回滚事务
生产者
/**
* @Author: CHGGX
* @Description: <h1> 事务生产者 </h1>
*/
public class TxProducer {
/**
* 交换机
*/
private static final String QUEUE_NAME = "queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
// 发送信息
String msg = "hello tx message !";
try {
/**
* 事务机制:
* txSelect: 用于将当前channel设置成transation模式
* txCommit : 用于提交事务
* txRollback : 用于回滚事务
*/
channel.txSelect();
// 发送信息
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("tx send msg:" + msg);
// 提交事务
channel.txCommit();
} catch (IOException e) {
// 回滚事务
channel.txRollback();
e.printStackTrace();
System.out.println("tx send msg txRollback");
}
// 关闭资源
channel.close();
connection.close();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
消费者
/**
* @Author: CHGGX
* @Description: <h1> 事物消费者 </h1>
*/
public class TxConsumer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "queue_tx";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[tx] receive new api msg:" + msg);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
缺点: 降低消息吞吐量(该模式走的是通信,txSelect txCommit txRollback会造成大量请求发送大量服务器,降低吞吐量)
# 7.2 confirm模式
# 7.2.1 生产者端confirm模式实现的原理
通过生产者的确认模式我们是要保证消息准确达到Broker端,而与AMQP事务不同的是Confirm是针对一条消息的,而事务是可以针对多条消息的。
# 发送原理图
为了使用Confirm模式,client会发送confirm.select方法帧。通过是否设置了no-wait属性,来决定Broker端是否会以confirm.select-ok来进行应答。一旦在channel上使用confirm.select方法,channel就将处于Confirm模式。处于 transactional模式的channel不能再被设置成Confirm模式,反之亦然。
发布确认和事务两者不可同时引入,channel一旦设置为Confirm模式就不能为事务模式,为事务模式就不能为Confirm模式.
在生产者将信道设置成Confirm模式,一旦信道进入Confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(以confirm.select为基础从1开始计数),一旦消息被投递到所有匹配的队列之后,Broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,Broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外Broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理
Confirm模式最大的好处在于它是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条basic.nack来代替basic.ack的消息,在这个情形下,basic.nack中各域值的含义与basic.ack中相应各域含义是相同的,同时requeue域的值应该被忽略。通过nack一条或多条消息, Broker表明自身无法对相应消息完成处理,并拒绝为这些消息的处理负责。在这种情况下,client可以选择将消息re-publish。
在channel 被设置成Confirm模式之后,所有被publish的后续消息都将被Confirm(即 ack)或者被nack一次。但是没有对消息被Confirm的快慢做任何保证,并且同一条消息不会既被Confirm又被nack。
# 7.2.2 开启confirm模式
生产者通过调用channel的confirmSelect方法将channel设置为Confirm模式,如果没有设置no-wait标志的话,Broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为Confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的)。
channel.confirmSelect()
# 7.2.3 编程模式
对于固定消息体大小和线程数,如果消息持久化,生产者Confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
- 普通Confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端Confirm。实际上是一种串行Confirm了,每publish一条消息之后就等待服务端Confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;
- 批量Confirm模式:批量Confirm模式,每发送一批消息之后,调用waitForConfirms()方法,等待服务端Confirm,这种批量确认的模式极大的提高了Confirm效率,但是如果一旦出现Confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;
- 异步Confirm模式:提供一个回调方法,服务端Confirm了一条或者多条消息后Client端会回调这个方法。
# 7.2.4 普通Confirm模式(单条)
生产者
/**
* @Author: CHGGX
* @Description: <h1> 普通Confirm模式(单条) </h1>
*/
public class PtProducer {
/**
* 队列
*/
private static final String QUEUE_NAME = "queue_confirm_pt";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// ==================confirm模式==================
// 生产者调用confirmSelect,将channel设置为confirm模式
channel.confirmSelect();
// 发送信息
String msg = "hello confirm message !";
// 发送信息
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
// 普通模式,调用waitForConfirms()方法
if (!channel.waitForConfirms()) {
System.out.println("msg send failed");
} else {
System.out.println("msg send ok");
}
// 关闭资源
channel.close();
connection.close();
// ==================confirm模式==================
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
消费者
/**
* @Author: CHGGX
* @Description: <h1> confirm消费者 </h1>
*/
public class TxConsumer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "queue_confirm_pt";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[confirm] receive new api msg:" + msg);
}
};
// 监听队列 autoAck: 自动问答
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 7.2.5 批量confirm模式
生产者
/**
* @Author: CHGGX
* @Description: <h1> 批量confirm模式(批量) </h1>
*/
public class PlProducer {
/**
* 队列
*/
private static final String QUEUE_NAME = "queue_confirm_pl";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// ==================confirm模式==================
// 生产者调用confirmSelect,将channel设置为confirm模式
channel.confirmSelect();
// 发送信息
String msg = "hello confirm message batch !";
// 批量发送信息
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
}
// 批量confirm模式,调用waitForConfirms()方法
if (!channel.waitForConfirms()) {
System.out.println("msg send failed");
} else {
System.out.println("msg send ok");
}
// 关闭资源
channel.close();
connection.close();
// ==================confirm模式==================
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
消费者
/**
* @Author: CHGGX
* @Description: <h1> confirm消费者 </h1>
*/
public class PlConsumer {
/**
* 消息队列
*/
private static final String QUEUE_NAME = "queue_confirm_pl";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 从连接中获取一个通道
final Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 新的API 定义一个消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 消息到达触发此方法
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("[confirm] receive new api msg:" + msg);
}
};
// 监听队列 autoAck: 自动问答
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 7.2.6 异步Confirm模式
Channel对象提供ConfirmListener()回调方法只包含deliverTag(当前Channel发出的序列号),我们需要自己为每一个Channel维护一个unconfirm的消息序列集合,没publish一条数据,集合就加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或者多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。
- 第一步:在 channel 上开启确认模式:channel.confirmSelect()
- 第二步:在 channel 上添加监听: channel.addConfirmListener(ConfirmListener listener);, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!
生产者
/**
* @Author: CHGGX
* @Description: <h1> 异步生产者 </h1>
*/
public class YbProducer {
/**
* 消息确认机制 confirm
*/
public static final String QUEUE_NAME = "queue_confirm_yb";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 获取一个通道
Channel channel = connection.createChannel();
// 声明一个队列
// QUEUE_NAME durable
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String msg = "Hello confirm Queue !";
//指定消息的投递模式 :confirm 确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("[Send] Msg " + i);
}
//添加一个确认监听, 这里就不关闭连接了,为了能保证能收到监听消息
channel.addConfirmListener(new ConfirmListener() {
// Ack成功的回调函数
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("Success Ack !");
System.out.println("multiple : " + multiple);
} else {
System.out.println("Success Ack !");
System.out.println("multiple : " + multiple);
}
}
//Ack失败的回调函数
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
if (multiple) {
System.out.println("Failed Ack !");
System.out.println("multiple : " + multiple);
} else {
System.out.println("Failed Ack !");
System.out.println("multiple : " + multiple);
}
}
});
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
消费者
/**
* @Author: CHGGX
* @Description: <h1> 异步消费者 </h1>
*/
public class YbConsumer {
public static final String QUEUE_NAME = "queue_confirm_yb";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取连接
Connection connection = ConnectionUtils.getConnection();
// 创建通道
Channel channel = connection.createChannel();
// 队列声明
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body, "utf-8");
System.out.println("------[ confirm Receive] msg :" + msg);
}
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
以上全部为生产者方的confirm模式解决rabbbitmq丢失问题. 源码地址 (opens new window)
# 8. MQ的应用场景
# 8.1 异步处理:
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式
串行方式:
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
并行方式:
将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
消息队列:
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回.消息队列
: 引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
# 8.2 应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
当库存系统出现故障时,订单就会失败。 订单系统和库存系统高耦合. 引入消息队列
订单系统:
用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:
订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失.
# 8.3 流量削峰
场景:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:
1.可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
2.可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)
1.用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
2.秒杀业务根据消息队列中的请求信息,再做后续处理.