SpringCloud.H-Stream消息驱动
# SpringCloud.H-Stream消息驱动
笔记日期:2020.5.20
SpringCloud Stream
# 1. 概述
# 1. 简介 官网 (opens new window)
- 如果系统里同时存在多种MQ,可以使用使用Cloud Stream,只需要和Stream交互就可以进行管理。
- 一句话,==屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型==
# 2. 中文手册:
官方定义SpringCloud Stream是一个构建消息驱动微服务的框架
应用程序通过inputs 或者 outputs 来与SpringCloud Stream中binder对象交互。通过我们配置来binding(绑定),而SpringCloud Stream的==binder对象==负责与消息中间件交互,所以,我们只需要搞清楚如何与SpringCloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息时间驱动 SpringCloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
==目前只支持RabbitMQ、Kafka==
# 2.设计思想及理念
# 1 MQ
- 生产者/消费者之间靠==消息==媒介传递信息 ------ Message
- 消息必须走特定的==通道== ------ 消息通道 MessageChannel
- 消息通道里的消息如何被消费呢?谁负责收发==处理== ----- 消息通道 MessageChannel 的自己扣 SubScribableChannel,有 MessageChannel 消息处理器所订阅.
比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,Kafka有Topic和Partition分区
这些消息中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列中的一种,后面的业务需求,我们想往另一种消息队列进行迁移,这时候无疑就是灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候SpringCloud Stream给我们提供了一种解耦合的方式
# 2. 如何实现?
在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。
==通过定义绑定器Binde作为中间层,实现了应用程序与消息中间件细节之间的隔离。==
==Binder:INPUT对应于消费者,OUTPUT对应于生产者==
==Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播(在RabbitMQ就是Exchange,在Kafka是Topic)==
# 3. 流程
- Binder:很方便的连接中间件,屏蔽差异
- Channel:通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置
- Source和Sink:简单的可以理解为参照对象是SpringCloud Stream自身,从Stream发布消息就是输出,接收消息就是输入
编码API和常用注解
# 4. 案例 参考 (opens new window)
# 1. 生产者
- 依赖
<!--stream-rabbit-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2
3
4
5
- 配置文件
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: #在此配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding的整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关环境配置
spring:
rabbitmq:
host: 116.196.118.167
port: 5672
username: guest
password: guest
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的Exchange名称定义
content-type: application/json #设置消息类型,本次为json,文本则设置“text/plain”
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client: #客户端进行eureka注册的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka/
instance:
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔(默认是30秒)
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔(默认是90秒)
instance-id: send-8801.com #在信息列表时显示主机名称
prefer-ip-address: true #访问的路径变为IP地址
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- 业务(发送消息)
public interface IMessageProvider {
/**
* 发送消息
* @return message
*/
public String send();
}
2
3
4
5
6
7
//这不是传统的service,这是和rabbitmq打交道的,不需要加注解@Service
//这里不掉dao,去掉消息中间件的service
//信道channel和exchange绑定在一起
@Slf4j
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
/**
* 消息发送管道
*/
@Resource
private MessageChannel output;
/**
* 发送消息
*
* @return message
*/
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
log.info("******serial: " + serial);
return null;
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- controller
@Slf4j
@RestController
public class SendMessageController {
@Resource
private IMessageProvider iMessageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage() {
log.info("*******sendMessage: " + new Date());
return iMessageProvider.send();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
- 测试
http://localhost:8801/sendMessage
# 2. 消费者
- 配置
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此处配置要绑定的rabbitMQ的服务信息
defaultRabbit: # 表示定义的名称,用于binding的整合
type: rabbit # 消息中间件类型
environment: # 设置rabbitMQ的相关环境配置
spring:
rabbitmq:
host: 116.196.118.167
port: 5672
username: guest
password: guest
bindings: # 服务的整合处理
input: # 这个名字是一个通道的名称
destination: studyExchange # 表示要使用的exchange名称定义
content-type: application/json # 设置消息类型,本次为json,文本则设为text/plain
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 设置心跳的间隔时间,默认30
lease-expiration-duration-in-seconds: 5 # 超过5秒间隔,默认90
instance-id: receive-8802.com #主机名
prefer-ip-address: true # 显示ip
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
- controller(接受消息)
@Slf4j
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
/**
* 监听生产者
*/
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
log.info("消费者1号,------->接收到的消息: " + message.getPayload() + "\t port: " + serverPort);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 测试
http://localhost:8801/sendMessage 3条
# 5. 分组消费与持久化
# 1. 分组消费
# 1. 问题
比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果==一个订单同时被两个服务获取到==,那么就会造成数据错误,我们得避免这种情况,这时我们就可以使用==Stream中的消息分组==来解决。
- 注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。
- ==不同组是可以全面消费的(重复消费),同一组内会发送竞争关系,只有其中一个可以消费。==
# 2. 分组
# 1. 原理
微服务应用放置于同一个group中,就能保证消息只会被其中一个应用消费一次.==不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费.==
# 2. 解决
8802/8803都变成==不同组==
8802: group: chggxA # 分组解决重复消费
8803: group: chggxB # 分组解决重复消费
- ==相同组==
- 8803
- 8802
结果
生产者
消费者 8802
消费者 8803
# 3. 总结
==同一个组会竞争资源,轮询。不同组会重复消费。==
# 2. 持久化
关于自定义分组
如果8802去掉分组,而8803不去掉,当8802/8803都关闭服务,8801这时候发送消息,8802再启动的时候不会重新获得未曾获得的消息并消费,而8803重启后会获得8801之前发送的消息并消费。
所以==group分组属性在消息重复消费和消息持久化消费 避免消息丢失是非常重要的属性==
就是默认的分组不会保留未曾获得的消息,自定义的分组会保留