⭐⭐⭐ Spring Boot 项目实战 ⭐⭐⭐ Spring Cloud 项目实战
《Dubbo 实现原理与源码解析 —— 精品合集》 《Netty 实现原理与源码解析 —— 精品合集》
《Spring 实现原理与源码解析 —— 精品合集》 《MyBatis 实现原理与源码解析 —— 精品合集》
《Spring MVC 实现原理与源码解析 —— 精品合集》 《数据库实体设计合集》
《Spring Boot 实现原理与源码解析 —— 精品合集》 《Java 面试题 + Java 学习指南》

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Alibaba/RocketMQ/ 「芋道源码」欢迎转载,保留摘要,谢谢!


🙂🙂🙂关注**微信公众号:【芋道源码】**有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言将得到认真回复。甚至不知道如何读源码也可以请教噢
  4. 新的源码解析文章实时收到通知。每周更新一篇左右
  5. 认真的源码交流微信群。

本文在提供完整代码示例,可见 https://github.com/YunaiV/SpringBoot-Labslabx-06-spring-cloud-stream-rocketmq 目录。

原创不易,给点个 Star 嘿,一起冲鸭!

1. 概述

本文我们来学习 Spring Cloud Alibaba 提供的 Spring Cloud Stream RocketMQ 组件,基于 Spring Cloud Stream 的编程模型,接入 RocketMQ 作为消息中间件,实现消息驱动的微服务。

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  • 能够保证严格的消息顺序
  • 提供丰富的消息拉取模式
  • 高效的订阅者水平扩展能力
  • 实时的消息订阅机制
  • 亿级消息堆积能力

在开始本文之前,胖友需要对 RocketMQ 进行简单的学习。可以阅读《RocketMQ 极简入门》文章,将第一二小节看完,在本机搭建一个 RocketMQ 服务。

2. Spring Cloud Stream 介绍

Spring Cloud Stream 是一个用于构建基于消息的微服务应用框架,使用 Spring Integration 与 Broker 进行连接。

友情提示:可能有胖友对 Broker 不太了解,我们来简单解释下。

一般来说,消息队列中间件都有一个 Broker Server(代理服务器),消息中转角色,负责存储消息、转发消息。

例如说在 RocketMQ 中,Broker 负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。另外,Broker 也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。

Spring Cloud Stream 提供了消息中间件的统一抽象,推出了 publish-subscribe、consumer groups、partition 这些统一的概念。

Spring Cloud Stream 内部有两个概念:BinderBinding

Binder,跟消息中间件集成的组件,用来创建对应的 Binding。各消息中间件都有自己的 Binder 具体实现。

public interface Binder<T, 
C extends ConsumerProperties, // 消费者配置
P extends ProducerProperties> { // 生产者配置

// 创建消费者的 Binding
Binding<T> bindConsumer(String name, String group, T inboundBindTarget, C consumerProperties);

// 创建生产者的 Binding
Binding<T> bindProducer(String name, T outboundBindTarget, P producerProperties);

}

Binding,包括 Input Binding 和 Output Binding。Binding 在消息中间件与应用程序提供的 Provider 和 Consumer 之间提供了一个桥梁,实现了开发者只需使用应用程序的 Provider 或 Consumer 生产或消费数据即可,屏蔽了开发者与底层消息中间件的接触。

最终整体交互如下图所示:Spring Cloud Stream Application

可能看完之后,胖友对 Spring Cloud Stream 还是有点懵逼,并且觉得概念怎么这么多呢?不要慌,我们先来快速入个门,会有更加具象的感受。

3. 快速入门

示例代码对应仓库:

本小节,我们一起来快速入门下,会创建 2 个项目,分别作为生产者和消费者。最终项目如下图所示:项目结构

3.1 搭建生产者

创建 labx-06-sca-stream-rocketmq-producer-demo 项目,作为生产者。

3.1.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-06</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-06-sca-stream-rocketmq-producer-demo</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
<spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>

</project>

通过引入 spring-cloud-starter-stream-rocketmq 依赖,引入并实现 RocketMQ 的自动配置。在该依赖中,已经帮我们自动引入 RocketMQ 的大量依赖,非常方便,如下图所示:

3.1.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

spring.cloud.stream 为 Spring Cloud Stream 配置项,对应 BindingServiceProperties 类。配置的层级有点深,我们一层一层来看看。

spring.cloud.stream.bindings 为 Binding 配置项,对应 BindingProperties Map。其中,key 为 Binding 的名字。要注意,虽然说 Binding 分成 Input 和 Output 两种类型,但是在配置项中并不会体现出来,而是要在稍后搭配 @Input 还是 @Output 注解,才会有具体的区分。

这里,我们配置了一个名字为 demo01-output 的 Binding。从命名上,我们的意图是想作为 Output Binding,用于生产者发送消息。

  • destination:目的地。在 RocketMQ 中,使用 Topic 作为目的地。这里我们设置为 DEMO-TOPIC-01

    主题(Topic):表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是 RocketMQ 进行消息订阅的基本单位。

  • content-type:内容格式。这里使用 JSON 格式,因为稍后我们将发送消息的类型为 POJO,使用 JSON 进行序列化。

spring.cloud.stream.rocketmq 为 Spring Cloud Stream RocketMQ 配置项。

spring.cloud.stream.rocketmq.binder 为 RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类。

  • name-server:RocketMQ Namesrv 地址。

    名字服务(Name Server):名称服务充当路由消息的提供者。生产者或消费者能够通过名字服务查找各主题相应的 Broker IP 列表。多个 Namesrv 实例组成集群,但相互独立,没有信息交换。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项,用于对通用的 spring.cloud.stream.bindings 配置项的增强,实现 RocketMQ Binding 独特的配置。该配置项对应 RocketMQBindingProperties Map,其中 key 为 Binding 的名字,需要对应上噢。

这里,我们对名字为 demo01-output 的 Binding 进行增强,进行 Producer 的配置。其中,producer 为 RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类。

  • group:生产者分组。

    生产者组(Producer Group):同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息且原始生产者在发送之后崩溃,则 Broker 服务器会联系同一生产者组的其他生产者实例以提交或回溯消费。

  • sync:是否同步发送消息,默认为 false 异步。一般业务场景下,使用同步发送消息较多,所以这里我们设置为 true 同步消息。

    使用 RocketMQ 发送三种类型的消息:同步消息(sync)、异步消息(async)和单向消息(oneway)。其中前两种消息是可靠的,因为会有发送是否成功的应答。

3.1.3 MySource

创建 MySource 接口,声明名字为 Output Binding。代码如下:

public interface MySource {

@Output("demo01-output")
MessageChannel demo01Output();

}

这里,我们通过 @Output 注解,声明了一个名字为 demo01-output 的 Output Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Output 注解的方法的返回结果为 MessageChannel 类型,可以使用它发送消息。MessageChannel 提供的发送消息的方法如下:

@FunctionalInterface
public interface MessageChannel {

long INDEFINITE_TIMEOUT = -1;

default boolean send(Message<?> message) {
return send(message, INDEFINITE_TIMEOUT);
}

boolean send(Message<?> message, long timeout);

}

那么,我们是否要实现 MySource 接口呢?答案是不需要,全部交给 Spring Cloud Stream 的 BindableProxyFactory 来解决。BindableProxyFactory 会通过动态代理,自动实现 MySource 接口。 而 @Output 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Output 注解的方法,自动进行创建。

例如说,#demo01Output() 方法被自动创建返回结果为 DirectWithAttributesChannel,它是 MessageChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.1.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。代码如下:

public class Demo01Message {

/**
* 编号
*/
private Integer id;

// ... 省略 setter/getter/toString 方法

}

3.1.5 Demo01Controller

创建 Demo01Controller 类,提供发送消息的 HTTP 接口。代码如下:

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

@Autowired
private MySource mySource; // <X>

@GetMapping("/send")
public boolean send() {
// <1> 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// <2> 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// <3> 发送消息
return mySource.demo01Output().send(springMessage);
}

}

  • <X> 处,使用 @Autowired 注解,注入 MySource Bean。
  • <1> 处,创建 Demo01Message 对象。
  • <2> 处,使用 MessageBuilder 创建 Spring Message 对象,并设置消息内容为 Demo01Message 对象。
  • <3> 处,通过 MySource 获得 MessageChannel 对象,然后发送消息。

3.1.6 ProducerApplication

创建 ProducerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySource 接口。

3.2 搭建消费者

创建 labx-06-sca-stream-rocketmq-consumer-demo 项目,作为消费者。

3.2.1 引入依赖

创建 pom.xml 文件中,引入 Spring Cloud Alibaba RocketMQ 相关依赖。

友情提示:和「3.1.1 引入依赖」基本一样,点击 链接 查看。

3.2.2 配置文件

创建 application.yaml 配置文件,添加 Spring Cloud Alibaba RocketMQ 相关配置。

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

总体来说,和「3.1.2 配置文件」是比较接近的,所以我们只说差异点噢。

spring.cloud.stream.bindings 为 Binding 配置项。

这里,我们配置了一个名字为 demo01-input 的 Binding。从命名上,我们的意图是想作为 Input Binding,用于消费者消费消息。

  • group:消费者分组。

    消费者组(Consumer Group):同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的 Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。

spring.cloud.stream.rocketmq.bindings 为 RocketMQ 自定义 Binding 配置项。

这里,我们对名字为 demo01-input 的 Binding 进行增强,进行 Consumer 的配置。其中,consumer 为 RocketMQ Producer 配置项,对应 RocketMQConsumerProperties 类。

  • enabled:是否开启消费,默认为 true。在日常开发时,如果在本地环境不想消费,可以通过设置 enabledfalse 进行关闭。

  • broadcasting: 是否使用广播消费,默认为 false 使用集群消费。

    • 集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
    • 广播消费(Broadcasting):广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

这里一点要注意!!!艿艿加了三个感叹号,一定要理解集群消费和广播消费的差异。我们来举个例子,以有两个消费者分组 A 和 B 的场景举例子:

  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被两个消费者分组 "consumer_group_01""consumer_group_02" 都各自消费一次。
  • 假设每个消费者分组各启动一个实例,此时我们发送一条消息,该消息会被分组 A 的某个实例消费一次,被分组 B 的某个实例也消费一次

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。
  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
  • ... 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

同时,相同消费者分组的多个实例,可以实现高可用,保证在一个实例意外挂掉的情况下,其它实例能够顶上。并且,多个实例都进行消费,能够提升消费速度

友情提示:如果还不理解的话,没有关系,我们下面会演示下我们上面举的例子。

3.2.3 MySink

创建 MySink 接口,声明名字为 Input Binding。代码如下:

public interface MySink {

String DEMO01_INPUT = "demo01-input";

@Input(DEMO01_INPUT)
SubscribableChannel demo01Input();

}

这里,我们通过 @Input 注解,声明了一个名字为 demo01-input 的 Input Binding。注意,这个名字要和我们配置文件中的 spring.cloud.stream.bindings 配置项对应上。

同时,@Input 注解的方法的返回结果为 SubscribableChannel 类型,可以使用它订阅消息来消费。MessageChannel 提供的订阅消息的方法如下:

public interface SubscribableChannel extends MessageChannel {

boolean subscribe(MessageHandler handler); // 订阅

boolean unsubscribe(MessageHandler handler); // 取消订阅

}

那么,我们是否要实现 MySink 接口呢?答案也是不需要,还是全部交给 Spring Cloud Stream 的 BindableProxyFactory 大兄弟来解决。BindableProxyFactory 会通过动态代理,自动实现 MySink 接口。 而 @Input 注解的方法的返回值,BindableProxyFactory 会扫描带有 @Input 注解的方法,自动进行创建。

例如说,#demo01Input() 方法被自动创建返回结果为 DirectWithAttributesChannel,它也是 SubscribableChannel 的子类。

友情提示:感兴趣的胖友,可以在 BindableProxyFactory 的 #afterPropertiesSet()#invoke(MethodInvocation invocation) 方法上,都打上一个断点,然后进行愉快的调试。

3.2.4 Demo01Message

创建 Demo01Message 类,示例 Message 消息。

友情提示:和「3.1.4 Demo01Message」基本一样,点击 链接 查看。

3.2.5 Demo01Consumer

创建 Demo01Consumer 类,消费消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

在方法上,添加 @StreamListener 注解,声明对应的 Input Binding。这里,我们使用 MySink.DEMO01_INPUT

又因为我们消费的消息是 POJO 类型,所以我们需要添加 @Payload 注解,声明需要进行反序列化成 POJO 对象。

3.2.6 ConsumerApplication

创建 ConsumerApplication 类,启动应用。代码如下:

@SpringBootApplication
@EnableBinding(MySink.class)
public class ConsumerApplication {

public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}

}

使用 @EnableBinding 注解,声明指定接口开启 Binding 功能,扫描其 @Input@Output 注解。这里,我们设置为 MySink 接口。

3.3 测试单集群多实例的场景

本小节,我们会在一个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。此时在 IDEA 控制台看到 RocketMQ 相关的日志如下:

2020-02-22 09:32:54.462  INFO 50472 --- [           main] s.b.r.c.RocketMQListenerBindingContainer : running container: RocketMQListenerBindingContainer{consumerGroup='demo01-consumer-group-DEMO-TOPIC-01', nameServer='[127.0.0.1:9876]', topic='DEMO-TOPIC-01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='null', messageModel=CLUSTERING}
2020-02-22 09:32:54.462 INFO 50472 --- [ main] .c.s.b.r.i.RocketMQInboundChannelAdapter : started com.alibaba.cloud.stream.binder.rocketmq.integration.RocketMQInboundChannelAdapter@1cd3b138

友情提示:因为 IDEA 默认同一个程序只允许启动 1 次,所以我们需要配置 DemoProviderApplication 为 Allow parallel run。如下图所示:Allow parallel run

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-02-22 09:39:29.073 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-1682643477}]
2020-02-22 09:41:32.754 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=1890257867}]

// ConsumerApplication 控制台 02
2020-02-22 09:41:32.264 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:80 消息内容:Demo01Message{id=1401668556}]

符合预期。从日志可以看出,每条消息仅被消费一次。

3.4 测试多集群多实例的场景

本小节,我们会在二个消费者集群启动两个实例,测试在集群消费的情况下的表现。

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

② 修改 labx-06-sca-stream-rocketmq-consumer-demo 项目的配置文件,修改 spring.cloud.stream.bindings.demo01-input.group 配置项,将消费者分组改成 X-demo01-consumer-group-DEMO-TOPIC-01

然后,执行 ConsumerApplication 两次,再启动两个消费者的实例,从而实现在消费者分组 X-demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

③ 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-02-22 10:17:07.886 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-276398167}]
2020-02-22 10:17:08.237 INFO 50472 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-02-22 10:17:08.710 INFO 50534 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:80 消息内容:Demo01Message{id=412281482}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 01
2020-02-22 10:17:07.887 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:51 消息内容:Demo01Message{id=-276398167}]
2020-02-22 10:17:08.238 INFO 51092 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:51 消息内容:Demo01Message{id=-250975158}]

// 消费者分组 `X-demo01-consumer-group-DEMO-TOPIC-01` 的ConsumerApplication 控制台 02
2020-02-22 10:17:08.787 INFO 51096 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:77 消息内容:Demo01Message{id=412281482}]

符合预期。从日志可以看出,每条消息被每个消费者集群都进行了消费,且仅被消费一次。

3.5 小结

至此,我们已经完成了 Stream RocketMQ 的快速入门,是不是还是蛮简答的噢。现在胖友可以在回过头看看 Binder 和 Binding 的概念,是不是就清晰一些了。

4. 定时消息

示例代码对应仓库:

在 RocketMQ 中,提供定时消息的功能。

定时消息,是指消息发到 Broker 后,不能立刻被 Consumer 消费,要到特定的时间点或者等待特定的时间后才能被消费。

不过,RocketMQ 暂时不支持任意的时间精度的延迟,而是固化了 18 个延迟级别。如下表格:

延迟级别 时间 延迟级别 时间 延迟级别 时间
1 1s 7 3m 13 9m
2 5s 8 4m 14 10m
3 10s 9 5m 15 20m
4 30s 10 6m 16 30m
5 1m 11 7m 17 1h
6 2m 12 8m 18 2h

如果胖友想要任一时刻的定时消息,可以考虑借助 MySQL + Job 来实现。又或者考虑使用 DDMQ(滴滴打车基于 RocketMQ 和 Kafka 改造的开源消息队列)。

下面,我们来搭建一个 RocketMQ 定时消息的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,修改 labx-06-sca-stream-rocketmq-producer-demo 发送定时消息,继续使用 labx-06-sca-stream-rocketmq-consumer-demo 消费消息。

4.1 Demo01Controller

修改 Demo01Controller 类,增发送定时消息的 HTTP 接口。代码如下:

private Logger logger = LoggerFactory.getLogger(getClass());

@GetMapping("/send_delay")
public boolean sendDelay() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // <X> 设置延迟级别为 3,10 秒后消费。
.build();
// 发送消息
boolean sendResult = mySource.demo01Output().send(springMessage);
logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
return sendResult;
}

<X> 处,通过添加头 MessageConst.PROPERTY_DELAY_TIME_LEVEL,设置消息的延迟级别,从而发送定时消息。

4.2 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_delay 接口,发送延迟 10 秒的定时消息。IDEA 控制台输出日志如下:

// Producer 的控制台
2020-02-22 16:32:35.836 INFO 57143 --- [io-18080-exec-5] c.i.s.l.r.p.controller.Demo01Controller : [sendDelay][发送消息完成, 结果 = true]

// Consumer 的控制台
2020-02-22 16:32:45.841 INFO 57133 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:61 消息内容:Demo01Message{id=618574636}]

符合预期。在 Producer 发送的消息之后,Consumer 确实 10 秒后才消费消息。

5. 消费重试

示例代码对应仓库:

RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

死信队列用于处理无法被正常消费的消息。当一条消息初次消费失败,消息队列会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。

RocketMQ 将这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。在 RocketMQ 中,可以通过使用 console 控制台对死信队列中的消息进行重发来使得消费者实例再次进行消费。

每条消息的失败重试,是有一定的间隔时间。实际上,消费重试是基于「5. 定时消息」 来实现,第一次重试消费按照延迟级别为 3 开始。😈 所以,默认为 16 次重试消费,也非常好理解,毕竟延迟级别最高为 18 呀。

不过要注意,只有集群消费模式下,才有消息重试。

下面,我们来搭建一个 RocketMQ 消息重试的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-retry模拟消费失败后的重试

5.1 复制项目

「2. 快速入门」小节的 labx-06-sca-stream-rocketmq-consumer-demo,复制出 labx-06-sca-stream-rocketmq-consumer-retry

5.2 配置文件

修改 application.yml 配置文件,增加消费重试相关的两个配置项 delay-level-when-next-consumemax-attempts。最终配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
# Consumer 配置项,对应 ConsumerProperties 类
consumer:
max-attempts: 1
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
delay-level-when-next-consume: 0 # 异步消费消息模式下消费失败重试策略,默认为 0

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

① 对于 delay-level-when-next-consume 配置项,一共有三种选择:

  • -1:不重复,直接放入死信队列
  • 0:RocketMQ Broker 控制重试策略
  • 0:RocketMQ Consumer 控制重试策略

可能胖友对 Broker 和 Consumer 控制重试策略有点懵逼!?每天消息首次消费失败时,Consumer 会发回给 Broker,并告诉 Broker 按照什么延迟级别开始,不断重新投递给 Consumer 直到消费成功或者到达最大延迟级别。

举个例子,如果这里我们设置了 delay-level-when-next-consume 配置项为 18,则 2 小时后 Broker 会投递该消息给 Consumer 进行重新消费。

一般情况下,我们设置 delay-level-when-next-consume 配置项为 0 即可,使用 Broker 控制重试策略即可。默认配置下,Broker 会使用延迟级别从 3 开始,10 秒后 Broker 会投递该消息给 Consumer 进行重新消费。

② 对于 max-attempts 配置项,每次拉取到消息到本地时,如果消费重试,本地重试的最大总次数(包括第一次)。这个是 Spring Cloud Stream 提供的通用消费重试功能,是 Consumer 级别的,而 RocketMQ 提供的独有消费重试功能,是 Broker 级别的。

因为 Spring Cloud Stream 提供的重试间隔,是通过 sleep 实现,会占掉当前线程,影响 Consumer 的消费速度,所以这里并不推荐使用,因此设置 max-attempts 配置项为 1,禁用 Spring Cloud Stream 提供的重试功能,使用 RocketMQ 提供的重试功能

友情提示:如果胖友无法保证消费重试不会带来副作用,也就是说无法保证消费的幂等性,建议关闭消费重试功能,即设置 delay-level-when-next-consume 配置项为 -1,max-attempts 配置项为 1。

5.3 Demo01Consumer

修改 Demo01Consumer 类,在消费消息时抛出异常,从而模拟消费错误。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

}

5.4 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:

// Demo01Consumer 第一次消费失败,抛出 RuntimeException 异常
2020-02-22 19:18:52.241 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:69 消息内容:Demo01Message{id=-604160799}]
2020-02-22 19:18:52.245 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略

// Demo01Consumer 第一次重试消费失败,抛出 RuntimeException 异常。间隔了 10 秒,对应延迟级别 3 。
2020-02-22 19:19:02.259 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:69 消息内容:Demo01Message{id=-604160799}]
2020-02-22 19:19:02.259 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略

// Demo01Consumer 第二次重试消费失败,抛出 RuntimeException 异常。间隔了 30 秒,对应延迟级别 4 。
2020-02-22 19:19:32.266 INFO 61116 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:69 消息内容:Demo01Message{id=-604160799}]
2020-02-22 19:19:32.266 ERROR 61116 --- [MessageThread_1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: // ... 省略

// ... 省略,后续还有重试

符合预期。从日志中,我们可以看到,消息因为消费失败后,又重试消费了多次。

6. 消费异常处理机制

示例代码对应仓库:

在 Spring Cloud Stream 中,提供了通用的消费异常处理机制,可以拦截到消费者消费消息时发生的异常,进行自定义的处理逻辑。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「5. 消费重试」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-retry 复制出 labx-06-sca-stream-rocketmq-consumer-error-handler演示消费异常处理机制

6.1 复制项目

「5. 消费重试」小节的 labx-06-sca-stream-rocketmq-consumer-retry,复制出 labx-06-sca-stream-rocketmq-consumer-error-handler

6.2 Demo01Consumer

修改 Demo01Consumer 类,增加消费异常处理方法。完整代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT) // 对应 DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// <X> 注意,此处抛出一个 RuntimeException 异常,模拟消费失败
throw new RuntimeException("我就是故意抛出一个异常");
}

@ServiceActivator(inputChannel = "DEMO-TOPIC-01.demo01-consumer-group-DEMO-TOPIC-01.errors")
public void handleError(ErrorMessage errorMessage) {
logger.error("[handleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
logger.error("[handleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[handleError][headers:{}]", errorMessage.getHeaders());
}

@StreamListener(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME) // errorChannel
public void globalHandleError(ErrorMessage errorMessage) {
logger.error("[globalHandleError][payload:{}]", ExceptionUtils.getRootCauseMessage(errorMessage.getPayload()));
logger.error("[globalHandleError][originalMessage:{}]", errorMessage.getOriginalMessage());
logger.error("[globalHandleError][headers:{}]", errorMessage.getHeaders());
}

}

① 在 Spring Integration 的设定中,若 #onMessage(@Payload Demo01Message message) 方法消费消息发生异常时,会发送错误消息(ErrorMessage)到对应的错误 Channel(<destination>.<group>.errors中。同时,所有错误 Channel 都桥接到了 Spring Integration 定义的全局错误 Channel(errorChannel)

友情提示:先暂时记住 Spring Integration 这样的设定,艿艿也没去深究 T T,也是一脸懵逼。

因此,我们有两种方式来实现异常处理:

  • 局部的异常处理:通过订阅指定错误 Channel
  • 全局的异常处理:通过订阅全局错误 Channel

② 在 #handleError(ErrorMessage errorMessage) 方法上,我们声明了 @ServiceActivator 注解,订阅指定错误 Channel的错误消息,实现 #onMessage(@Payload Demo01Message message) 方法的局部异常处理。如下图所示:对应关系

③ 在 #globalHandleError(ErrorMessage errorMessage) 方法上,我们声明了 @StreamListener 注解,订阅全局错误 Channel的错误消息,实现全局异常处理。

④ 在全局局部异常处理都定义的情况下,错误消息仅会被符合条件局部错误异常处理。如果没有符合条件的,错误消息才会被全局异常处理。

6.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送一条消息。IDEA 控制台输出日志如下:

// onMessage 方法
2020-02-20 00:47:34.487 INFO 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:60 消息内容:Demo01Message{id=-317670393}]

// handleError 方法
2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][payload:RuntimeException: 我就是故意抛出一个异常]
2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][originalMessage:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=3, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_RECONSUME_TIMES=0, rocketmq_MESSAGE_ID=0A258102FE8918B4AAC2620411310017, rocketmq_SYS_FLAG=0, id=dc6dafb1-b303-7931-5977-45f319b935d9, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, rocketmq_BORN_TIMESTAMP=1582130833713, timestamp=1582130854444}]]
2020-02-20 00:47:34.496 ERROR 67767 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [handleError][headers:{id=cdf37b5d-878c-3d85-1f40-7711a3642a16, timestamp=1582130854489}]

😆 不过要注意,如果异常处理方法成功,没有重新抛出异常,会认定为该消息被消费成功,所以就不会进行消费重试。

7. 广播消费

示例代码对应仓库:

在上述的示例中,我们看到的都是使用集群消费,也是最常用的消费模式。而在一些场景下,我们需要使用广播消费

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,使用 labx-06-sca-stream-rocketmq-producer-demo 发送消息,从 labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting演示广播消费

7.1 复制项目

「2. 快速入门」小节的 labx-06-sca-stream-rocketmq-consumer-demo,复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting

7.2 配置文件

修改 application.yml 配置文件,设置 broadcasting 配置项为 true,开启广播消费的模式。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01-X # 消费者分组
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: true # 是否使用广播消费,默认为 false 使用集群消费

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

7.3 简单测试

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 demo01-consumer-group-DEMO-TOPIC-01 下有两个消费者实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-02-20 01:20:06.886 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-335590634}]
2020-02-20 01:20:18.368 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=283364059}]
2020-02-20 01:20:24.422 INFO 68510 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:78 消息内容:Demo01Message{id=-1253930234}]

// ConsumerApplication 控制台 02
2020-02-20 01:20:06.884 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:75 消息内容:Demo01Message{id=-335590634}]
2020-02-20 01:20:18.368 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:75 消息内容:Demo01Message{id=283364059}]
2020-02-20 01:20:24.422 INFO 68519 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:75 消息内容:Demo01Message{id=-1253930234}]

符合预期。从日志可以看出,每条消息仅被每个消费者消费了一次。

8. 顺序消息

示例代码对应仓库:

RocketMQ 提供了两种顺序级别:

  • 普通顺序消息:Producer 将相关联的消息发送到相同的消息队列。
  • 完全严格顺序:在【普通顺序消息】的基础上,Consumer 严格顺序消费。

官方文档是这么描述的:

消息有序,指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了三条消息分别是订单创建、订单付款、订单完成。消费时要按照这个顺序消费才能有意义,但是同时订单之间是可以并行消费的。RocketMQ 可以严格的保证消息有序。

顺序消息分为全局顺序消息与分区顺序消息,全局顺序是指某个 Topic 下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

  • 全局顺序:对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。适用场景:性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景
  • 分区顺序:对于指定的一个 Topic,所有消息根据 Sharding key 进行区块分区。 同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。适用场景:性能要求高,以 Sharding key 作为分区字段,在同一个区块中严格的按照 FIFO 原则进行消息发布和消费的场景。

注意,分区顺序就是普通顺序消息,全局顺序就是完全严格顺序。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:

8.1 搭建生产者

labx-06-sca-stream-rocketmq-producer-demo 复制出 labx-06-sca-stream-rocketmq-producer-orderly演示发送顺序消息

8.1.1 配置文件

修改 application.yml 配置文件,添加 partition-key-expression 配置项,设置 Producer 发送顺序消息的 Sharding key。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Producer 配置项,对应 ProducerProperties 类
producer:
partition-key-expression: payload['id'] # 分区 key 表达式。该表达式基于 Spring EL,从消息中获得分区 key。
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

partition-key-expression 配置项,该表达式基于 Spring EL,从消息中获得 Sharding key。

这里,我们设置该配置项为 payload['id'],表示从 Spring Message 的 payload 的 id。稍后我们发送的消息的 payload 为 Demo01Message,那么 id 就是 Demo01Message.id

如果我们想从消息的 headers 中获得 Sharding key,可以设置为 headers['partitionKey']

② Spring Cloud Stream 使用 PartitionHandler 进行 Sharding key 的获得与计算,最终 Sharding key 的结果为 key.hashCode() % partitionCount

感兴趣的胖友,可以阅读 PartitionHandler 的 #determinePartition(Message<?> message) 方法。

在获取到 Sharding key 之后,Spring Cloud Alibaba Stream RocketMQ 提供的 PartitionMessageQueueSelector 选择消息发送的队列。

我们以发送一条 id 为 1 的 Demo01Message 消息为示例,最终会发送到对应 RocketMQ Topic 的队列为 1。计算过程如下:

// 第一步,PartitionHandler 使用 `partition-key-expression` 表达式,从 Message 中获得 Sharding key
key => 1

// 第二步,PartitionHandler 计算最终的 Sharding key
// 默认情况下,每个 RocketMQ Topic 的队列总数是 4。
key => key.hashCode() % partitionCount = 1.hashCode() % 4 = 1 % 4 = 1

// 第三步,PartitionMessageQueueSelector 获得对应 RocketMQ Topic 的队列
队列 => queues.get(key) = queues.get(1)

这样,我们就能保证相同 Sharding Key 的消息,发送到相同的对应 RocketMQ Topic 的队列中。当前,前提是该 Topic 的队列总数不能变噢,不然计算的 Sharding Key 会发生变化。

8.1.2 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条顺序消息的 HTTP 接口。代码如下:

@GetMapping("/send_orderly")
public boolean sendOrderly() {
// 发送 3 条相同 id 的消息
int id = new Random().nextInt();
for (int i = 0; i < 3; i++) {
// 创建 Message
Demo01Message message = new Demo01Message().setId(id);
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

每次发送的 3 条消息使用相同的 id,配合上我们使用它作为 Sharding key,就可以发送对应 Topic 的相同队列中。

另外,整列发送的虽然是顺序消息,但是和发送普通消息的代码是一模一样的。

8.2 搭建消费者

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-broadcasting演示顺序消费消息

8.2.1 配置文件

修改 application.yml 配置文件,添加 orderly 配置项,设置 Consumer 顺序消费消息。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
orderly: true # 是否顺序消费,默认为 false 并发消费。

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

8.2.2 Demo01Consumer

修改 Demo01Consumer 类,在消费消息时,打印出消息所在队列编号线程编号,这样我们通过队列编号可以判断消息是否顺序发送,通过线程编号可以判断消息是否顺序消费。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@StreamListener(MySink.DEMO01_INPUT)
public void onMessage(Message<?> message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

8.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_orderly 接口,发送顺序消息。IDEA 控制台输出日志如下:

2020-02-20 21:26:52.044  INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:76 消息内容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212037, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD850006, rocketmq_SYS_FLAG=0, id=945725a1-abfb-218a-d480-b220adff9549, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212044}]]
2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:76 消息内容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212039, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD870007, rocketmq_SYS_FLAG=0, id=86a0e912-3cba-8b5b-3928-a7ef0ad80036, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]]
2020-02-20 21:26:52.046 INFO 74637 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:76 消息内容:GenericMessage [payload={"id":58569988}, headers={rocketmq_QUEUE_ID=0, rocketmq_RECONSUME_TIMES=0, scst_partition=0, rocketmq_BORN_TIMESTAMP=1582205212041, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, spring_json_header_types={"scst_partition":"java.lang.Integer"}, rocketmq_MESSAGE_ID=0A25810236DE18B4AAC26672FD890008, rocketmq_SYS_FLAG=0, id=b04416a3-60c2-bf42-a5a4-fe3c5079cc55, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582205212046}]]

id 为 58569988 的消息被发送到 RocketMQ 消息队列编号为 0,并且在线程编号为 76 的线程中消费。😈 胖友可以自己在多调用几次接口,继续尝试。

9. 消息过滤

示例代码对应仓库:

RocketMQ 提供了两种方式给 Consumer 进行消息的过滤:

  • 基于 Tag 过滤

    标签(Tag):为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化 RocketMQ 提供的查询系统。消费者可以根据 Tag 实现对不同子主题的不同消费逻辑,实现更好的扩展性。

  • 基于 SQL92 过滤

消息过滤目前是在 Broker 端实现的,优点是减少了 Broker 和 Consumer 之间的无用消息的网络传输,缺点是增加了 Broker 的负担、而且实现相对复杂。

一般情况下,我们使用 Tag 过滤较多,我们来搭建一个 RocketMQ 使用 Tag 进行消息过滤的示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:

先搭建消费者。

9.1 Demo01Controller

修改 Demo01Controller 类,增加发送 3 条带 Tag 的消息的 HTTP 接口。代码如下:

@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"yunai", "yutou", "tudou"}) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, tag) // <X> 设置 Tag
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

<X> 处,通过添加头 MessageConst.PROPERTY_TAGS,设置发送消息的 Tag

再搭建消费者。

9.2 复制项目

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-filter使用 Tag 过滤消息来消费。

9.3 配置文件

修改 application.yml 配置文件,设置 tags 配置项为 yunai || yutou,只消费带有 Tag 为 yunaiyutou 的消息。完整配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: demo01-consumer-group-DEMO-TOPIC-01 # 消费者分组
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费
tags: yunai || yutou # 基于 Tag 订阅,多个 Tag 使用 || 分隔,默认为空
sql: # 基于 SQL 订阅,默认为空

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

如果胖友想要基于 SQL92 过滤消息,可以通过设置 sql 配置项。

9.4 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:

2020-02-20 22:41:57.639  INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer      : [onMessage][线程编号:76 消息内容:Demo01Message{id=687868446}]
2020-02-20 22:41:57.641 INFO 81013 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:76 消息内容:Demo01Message{id=1088622557}]

只消费了两条消息,目测 Tag 为 tudou 的消息已经被过滤了。要注意,被过滤掉的消息,后续是无法被消费掉了,效果和消费成功是一样的。

9.5 Demo01Consumer

咳咳咳:不知道如何取这标题,暂时用这个噶。

上面我们看到的是 RocketMQ 独有Broker级别的消息过滤机制,而 Spring Cloud Stream 提供了通用Consumer 级别的效率过滤器机制。我们只需要使用 @StreamListener 注解的 condition 属性,设置消息满足指定 Spring EL 表达式的情况下,才进行消费。

> /**
> * A condition that must be met by all items that are dispatched to this method.
> * @return a SpEL expression that must evaluate to a {@code boolean} value.
> */
> String condition() default "";
>

修改 Demo01Consumer 类,使用 @StreamListener 注解的 condition 属性来过滤消息。代码如下:

@Component
public class Demo01Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

// @StreamListener(MySink.DEMO01_INPUT)
// public void onMessage(@Payload Demo01Message message) {
// logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
// }

@StreamListener(value = MySink.DEMO01_INPUT, condition = "headers['rocketmq_TAGS'] == 'yunai'")
public void onMessage(@Payload Demo01Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}

这里我们设置消息的 Header 带有的 rocketmq_TAGS 值为 yunai 时,才进行消费。

9.6 再次测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_tag 接口,发送带有 Tag 的消息。IDEA 控制台输出日志如下:

// Tag 为 `yunai` 的消息被消费
2020-02-20 22:59:11.597 INFO 81438 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:64 消息内容:Demo01Message{id=124549390}]

// Tag 为 `yutou` 的消息被过滤
2020-02-20 22:59:11.599 WARN 81438 --- [MessageThread_1] .DispatchingStreamListenerMessageHandler : Cannot find a @StreamListener matching for message with id: 5edff575-b9a7-e011-154a-532077994685

只消费了一条消息,目测 Tag 为 tudou 的消息被 Broker 过滤,Tag 为 yutou 的消息被 Consumer 过滤。要注意,被过滤掉的消息,后续是无法被消费掉了,效果和消费成功是一样的。

10. 事务消息

示例代码对应仓库:

在分布式消息队列中,目前唯一提供完整的事务消息的,只有 RocketMQ 。关于这一点,还是可以鼓吹下的。

可能会有胖友怒喷艿艿,RabbitMQ 和 Kafka 也有事务消息啊,也支持发送事务消息的发送,以及后续的事务消息的 commit提交或 rollbackc 回滚。但是要考虑一个极端的情况,在本地数据库事务已经提交的时时候,如果因为网络原因,又或者崩溃等等意外,导致事务消息没有被 commit ,最终导致这条事务消息丢失,分布式事务出现问题。

相比来说,RocketMQ 提供事务回查机制,如果应用超过一定时长未 commit 或 rollback 这条事务消息,RocketMQ 会主动回查应用,询问这条事务消息是 commit 还是 rollback ,从而实现事务消息的状态最终能够被 commit 或是 rollback ,达到最终事务的一致性。

这也是为什么艿艿在上面专门加粗“完整的”三个字的原因。可能上述的描述,对于绝大多数没有了解过分布式事务的胖友,会比较陌生,所以推荐阅读如下两篇文章:

热心的艿艿:虽然说 RabbitMQ、Kafka 并未提供完整的事务消息,但是社区里,已经基于它们之上拓展,提供了事务回查的功能。例如说:Myth ,采用消息队列解决分布式事务的开源框架, 基于 Java 语言来开发(JDK1.8),支持 Dubbo,Spring Cloud,Motan 等 RPC 框架进行分布式事务。

下面,我们来搭建一个 RocketMQ 定时消息的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目,修改 labx-06-sca-stream-rocketmq-producer-transaction 发送事务消息,继续使用 labx-06-sca-stream-rocketmq-consumer-demo 消费消息。

10.1 复制项目

labx-06-sca-stream-rocketmq-producer-demo 复制出 labx-06-sca-stream-rocketmq-producer-transaction 来发送事务消息

10.2 配置文件

修改 application.yml 配置文件,添加 transactional 配置项为 true,设置 Producer 发送事务消息。完整配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
transactional: true # 是否发送事务消息,默认为 false。

server:
port: 18080

10.3 Demo01Controller

修改 Demo01Controller 类,增加发送事务消息的 HTTP 接口。代码如下:

@GetMapping("/send_transaction")
public boolean sendTransaction() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Args args = new Args().setArgs1(1).setArgs2("2");
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("args", JSON.toJSONString(args)) // <X>
.build();
// 发送消息
return mySource.demo01Output().send(springMessage);
}

public static class Args { // 这里作为示例,所以直接这么写了

private Integer args1;
private String args2;
// ... 省略 setter、getter、toString 方法
}

因为 Spring Cloud Stream 在设计时,并没有考虑事务消息,所以我们只好在 <X> 处,通过 Header 传递参数。

又因为 Header 后续会被转换成 String 类型,导致我们无法获得正确的真实的原始参数,所以这里我们先使用 JSON 将 args 参数序列化成字符串,这样后续我们可以使用 JSON 反序列化回来。

10.4 TransactionListenerImpl

创建 TransactionListenerImpl 类,实现 MQ 事务的监听。代码如下:

@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 从消息 Header 中解析到 args 参数,并使用 JSON 反序列化
Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),
Demo01Controller.Args.class);
// ... local transaction process, return rollback, commit or unknown
logger.info("[executeLocalTransaction][执行本地事务,消息:{} args:{}]", msg, args);
return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return rollback, commit or unknown
logger.info("[checkLocalTransaction][回查消息:{}]", msg);
return RocketMQLocalTransactionState.COMMIT;
}

}

① 在类上,添加 @RocketMQTransactionListener 注解,声明监听器的是生产者分组是 "test" 的 Producer 发送的事务消息。因为 RocketMQ 是回查(请求)指定指定生产分组下的 Producer,从而获得事务消息的状态,所以一定要正确设置。

② 实现 RocketMQLocalTransactionListener 接口,实现执行本地事务和检查本地事务的方法。

③ 实现 #executeLocalTransaction(...) 方法,实现执行本地事务。

  • 注意,这是一个模板方法。在调用这个方法之前,Spring Cloud Alibaba Stream RocketMQ 已经使用 Producer 发送了一条事务消息。然后根据该方法执行的返回的 RocketMQLocalTransactionState 结果,提交还是回滚该事务消息。

    友情提示:感兴趣的胖友,可以看看 DefaultMQProducerImpl 的 #sendMessageInTransaction(...) 的源码,整个模板方法是怎么执行的。

  • 😈 这里,我们为了模拟 RocketMQ 回查 Producer 来获得事务消息的状态,所以返回了 RocketMQLocalTransactionState.UNKNOWN 未知状态。

④ 实现 #checkLocalTransaction(...) 方法,检查本地事务。

  • 在事务消息长事件未被提交或回滚时,RocketMQ 会回查事务消息对应的生产者分组下的 Producer ,获得事务消息的状态。此时,该方法就会被调用。
  • 😈 这里,我们直接返回 RocketMQLocalTransactionState.COMMIT 提交状态。

一般来说,有两种方式实现本地事务回查时,返回事务消息的状态。

第一种,通过 msg 消息,获得某个业务上的标识或者编号,然后去数据库中查询业务记录,从而判断该事务消息的状态是提交还是回滚。

第二种,记录 msg 的事务编号,与事务状态到数据库中。

  • 第一步,在 #executeLocalTransaction(...) 方法中,先存储一条 idmsg 的事务编号,状态为 RocketMQLocalTransactionState.UNKNOWN 的记录。
  • 第二步,调用带有事务的业务 Service 的方法。在该 Service 方法中,在逻辑都执行成功的情况下,更新 idmsg 的事务编号,状态变更为 RocketMQLocalTransactionState.COMMIT 。这样,我们就可以伴随这个事务的提交,更新 idmsg 的事务编号的记录的状为 RocketMQLocalTransactionState.COMMIT ,美滋滋。。
  • 第三步,要以 try-catch 的方式,调用业务 Service 的方法。如此,如果发生异常,回滚事务的时候,可以在 catch 中,更新 idmsg 的事务编号的记录的状态为 RocketMQLocalTransactionState.ROLLBACK 。😭 极端情况下,可能更新失败,则打印 error 日志,告警知道,人工介入。
  • 如此三步之后,我们在 #executeLocalTransaction(...) 方法中,就可以通过查找数据库,idmsg 的事务编号的记录的状态,然后返回。

相比来说,艿艿倾向第二种,实现更加简单通用,对于业务开发者,更加友好。和有几个朋友沟通了下,他们也是采用第二种。

10.5 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send_transaction 接口,发送事务消息。IDEA 控制台输出日志如下:

// ProduerApplication 控制台
// ### TransactionListenerImpl 执行 executeLocalTransaction 方法,先执行本地事务的逻辑
2020-02-21 00:14:08.773 INFO 83052 --- [io-18080-exec-1] c.i.s.l.r.p.l.TransactionListenerImpl : [executeLocalTransaction][执行本地事务,消息:GenericMessage [payload=byte[17], headers={args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, id=d8604733-9083-5d19-15b4-bda0c549e9d1, contentType=application/json, timestamp=1582215248772}] args:Args{args1=1, args2='2'}]
// ### Producer 发送事务消息成功,但是因为 executeLocalTransaction 方法返回的是 UNKOWN 状态,所以事务消息并未提交或者回滚
// ### RocketMQ Broker 在发送事务消息 30 秒后,发现事务消息还未提交或是回滚,所以回查 Producer 。此时,checkLocalTransaction 方法返回 COMMIT ,所以该事务消息被提交
2020-02-21 00:14:48.685 INFO 83052 --- [pool-1-thread-1] c.i.s.l.r.p.l.TransactionListenerImpl : [checkLocalTransaction][回查消息:GenericMessage [payload=byte[17], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_BORN_TIMESTAMP=1582215248763, args={"args1":1,"args2":"2"}, rocketmq_TOPIC=DEMO-TOPIC-01, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=0A25810200002A9F000000000002868F, rocketmq_TRANSACTION_ID=0A258102446C18B4AAC2670C237B0000, rocketmq_SYS_FLAG=0, id=62383992-5015-f957-41e7-75ec5ace4496, CLUSTER=DefaultCluster, rocketmq_BORN_HOST=10.37.129.2, contentType=application/json, timestamp=1582215288685}]]

// ConsumerApplication 控制台
// ### 事务消息被提交,所以该消息被 Consumer 消费
2020-02-21 00:14:48.756 INFO 83058 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:79 消息内容:Demo01Message{id=1950986029}]

整个的执行过程,看看艿艿在日志上添加的说明。

11. 监控端点

示例代码对应仓库:

Spring Cloud Stream 的 endpoint 模块,基于 Spring Boot Actuator,提供了自定义监控端点 bindingschannels,用于获取 Spring Cloud Stream 的 Binding 和 Channel 信息。

同时,Spring Cloud Alibaba Stream RocketMQ 拓展了 Spring Boot Actuator 内置的 health 端点,通过自定义的 RocketMQBinderHealthIndicator,获取 RocketMQ 客户端的健康状态。

友情提示:对 Spring Boot Actuator 不了解的胖友,可以后续阅读《芋道 Spring Boot 监控端点 Actuator 入门》文章。

我们来搭建一个 Stream RocketMQ 监控端点的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:

11.1 搭建生产者

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-actuator,查看生产者的监控端点结果。

11.1.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

11.1.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

11.1.3 简单测试

① 使用 ProducerApplication 启动生产者。

② 访问应用的 bindings 监控端点 http://127.0.0.1:18080/actuator/bindings,返回结果如下图: 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:18080/actuator/channels,返回结果如下图: 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:18080/actuator/health,返回结果如下图: 监控端点

11.2 搭建消费者

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-filter,查看消费者的监控端点结果。

11.2.1 引入依赖

pom.xml 文件中,额外引入 Spring Boot Actuator 相关依赖。代码如下:

<!-- 实现对 Actuator 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

11.2.2 配置文件

修改 application.yaml 配置文件,额外增加 Spring Boot Actuator 配置项。配置如下:

management:
endpoints:
web:
exposure:
include: '*' # 需要开放的端点。默认值只打开 health 和 info 两个端点。通过设置 * ,可以开放所有端点。
endpoint:
# Health 端点配置项,对应 HealthProperties 配置类
health:
enabled: true # 是否开启。默认为 true 开启。
show-details: ALWAYS # 何时显示完整的健康信息。默认为 NEVER 都不展示。可选 WHEN_AUTHORIZED 当经过授权的用户;可选 ALWAYS 总是展示。

每个配置项的作用,胖友看下艿艿添加的注释。如果还不理解的话,后续看下《芋道 Spring Boot 监控端点 Actuator 入门》文章。

112.3 简单测试

① 使用 ConsumerApplication 启动消费者,随机端口为 19541。

② 访问应用的 bindings 监控端点 http://127.0.0.1:19541/actuator/bindings,返回结果如下图: 监控端点

③ 访问应用的 channels 监控端点 http://127.0.0.1:19541/actuator/channels,返回结果如下图: 监控端点

④ 访问应用的 health 监控端点 http://127.0.0.1:19541/actuator/health,返回结果如下图: 监控端点

12. 更多的配置项信息

Spring Cloud Alibaba Stream RocketMQ 提供的配置项挺多的,我们参考文档将配置项一起梳理下。

RocketMQ Binder Properties

spring.cloud.stream.rocketmq.binder 为前缀。

配置项 说明 默认值
name-server RocketMQ NameServer 地址 127.0.0.1:9876
access-key 阿里云账号 AccessKey
secret-key 阿里云账号 SecretKey
enable-msg-trace 是否为 Producer 和 Consumer 开启消息轨迹功能 true
customized-trace-topic 消息轨迹开启后存储的 Topic 名称 RMQ_SYS_TRACE_TOPIC

RocketMQ Consumer Properties

spring.cloud.stream.rocketmq.bindings.<channelName>.consumer. 为前缀。

配置项 说明 默认值
enable 是否启用 Consumer true
tags Consumer 基于 TAGS 订阅,多个 tag 以 || 分割
sql Consumer 基于 SQL 订阅
broadcasting 是Consumer 是否是广播消费模式。如果想让所有的订阅者都能接收到消息,可以使用广播模式 false
orderly Consumer 是否同步消费消息模式 false
delayLevelWhenNextConsume 异步消费消息模式下消费失败重试策略:-1, 不重复,直接放入死信队列;0, Broker 控制重试策略;>0, Client 控制重试策略 0
suspendCurrentQueueTimeMillis 同步消费消息模式下消费失败后再次消费的时间间隔 1000

RocketMQ Provider Properties

配置项 说明 默认值
enable 是否启用 Producer true
group Producer 分组
maxMessageSize 消息发送的最大字节数 8249344
transactional 是否发送事务消息 false
sync 是否使用同步得方式发送消息 false
vipChannelEnabled 是否在 Vip Channel 上发送消息 true
sendMessageTimeout 发送消息的超时时间(毫秒) 3000
compressMessageBodyThreshold 消息体压缩阀值(当消息体超过 4k 的时候会被压缩) 4096
retryTimesWhenSendFailed 在同步发送消息的模式下,消息发送失败的重试次数 2
retryTimesWhenSendAsyncFailed 在异步发送消息的模式下,消息发送失败的重试次数 2
retryNextServer 消息发送失败的情况下是否重试其它的 Broker false

13.接入阿里云的消息队列 RocketMQ

示例代码对应仓库:

在阿里云上,提供消息队列 RocketMQ 服务。那么,我们是否能够使用 Spring Cloud Alibaba Stream RocketMQ 实现阿里云 RocketMQ 的消息的发送与消费呢?

答案是可以。在 《阿里云 —— 消息队列 MQ —— 开源 Java SDK 接入说明》 中,提到目前开源的 Java SDK 可以接入阿里云 RocketMQ 服务。

如果您已使用开源 Java SDK 进行生产,只需参考方法,重新配置参数,即可实现无缝上云。

前提条件

  • 已在阿里云 MQ 控制台创建资源,包括 Topic、Group ID(GID)、接入点(Endpoint),以及 AccessKeyId 和 AccessKeySecret。
  • 已下载开源 RocketMQ 4.5.1 或以上版本,以支持连接阿里云 MQ。

我们来搭建一个 Stream RocketMQ 监控端点的使用示例。考虑方便,我们直接复用「2. 快速入门」小节的项目:

13.1 搭建生产者

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作为生产者

修改 application.yaml 配置文件,添加 access-keysecret-key 配置项,设置访问阿里云 RocketMQ 的账号。完全配置如下:

spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: TOPIC_YUNAI_TEST # 目的地。这里使用 RocketMQ Topic <ALIYUN>
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>
access-key: ${ALIYUN_ACCESS_KEY} # 阿里云账号 AccessKey
secret-key: ${ALIYUN_SECRET_KEY} # 阿里云账号 SecretKey
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: GID_PRODUCER_GROUP_YUNAI_TEST # 生产者分组 <ALIYUN>
sync: true # 是否同步发送消息,默认为 false 异步。

server:
port: 18080

注意,<ALIYUN> 处的三个配置项,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Producer Group。

13.2 搭建消费者

labx-06-sca-stream-rocketmq-consumer-demo 复制出 labx-06-sca-stream-rocketmq-consumer-aliyun,接入阿里云 RocketMQ 作为消费者

修改 application.yaml 配置文件,添加 access-keysecret-key 配置项,设置访问阿里云 RocketMQ 的账号。完全配置如下:

spring:
application:
name: demo-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-input:
destination: TOPIC_YUNAI_TEST # 目的地。这里使用 RocketMQ Topic <ALIYUN>
content-type: application/json # 内容格式。这里使用 JSON
group: GID_PRODUCER_GROUP_YUNAI_TEST # 消费者分组 <ALIYUN>
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80 # RocketMQ Namesrv 地址 <ALIYUN>
access-key: ${ALIYUN_ACCESS_KEY} # 阿里云账号 AccessKey
secret-key: ${ALIYUN_SECRET_KEY} # 阿里云账号 SecretKey
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: false # 是否使用广播消费,默认为 false 使用集群消费

server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

注意,<ALIYUN> 处的三个配置项,也要修改成阿里云 RocketMQ 的 Namesrv、Topic、Consumer Group。

13.3 简单测试

① 执行 ConsumerApplication,启动消费者的实例。

② 执行 ProducerApplication,启动生产者的实例。

之后,请求 http://127.0.0.1:18080/demo01/send 接口,发送消息。IDEA 控制台输出日志如下:

// ConsumerApplication 控制台
2020-02-21 01:45:16.982 INFO 85901 --- [MessageThread_1] c.i.s.l.r.c.listener.Demo01Consumer : [onMessage][线程编号:89 消息内容:Demo01Message{id=-724066118}]

说明接入阿里云 RocketMQ 成功。

666. 彩蛋

至此,我们已经完成 Spring Cloud Alibaba RocketMQ 的学习。如下是 RocketMQ 相关的官方文档:

另外,想要在 Spring Boot 项目中使用 RocketMQ 作为消息队列的胖友,可以阅读《芋道 Spring Boot 消息队列 RocketMQ 入门》文章。

文章目录
  1. 1. 1. 概述
  2. 2. 2. Spring Cloud Stream 介绍
  3. 3. 3. 快速入门
    1. 3.1. 3.1 搭建生产者
      1. 3.1.1. 3.1.1 引入依赖
      2. 3.1.2. 3.1.2 配置文件
      3. 3.1.3. 3.1.3 MySource
      4. 3.1.4. 3.1.4 Demo01Message
      5. 3.1.5. 3.1.5 Demo01Controller
      6. 3.1.6. 3.1.6 ProducerApplication
    2. 3.2. 3.2 搭建消费者
      1. 3.2.1. 3.2.1 引入依赖
      2. 3.2.2. 3.2.2 配置文件
      3. 3.2.3. 3.2.3 MySink
      4. 3.2.4. 3.2.4 Demo01Message
      5. 3.2.5. 3.2.5 Demo01Consumer
      6. 3.2.6. 3.2.6 ConsumerApplication
    3. 3.3. 3.3 测试单集群多实例的场景
    4. 3.4. 3.4 测试多集群多实例的场景
    5. 3.5. 3.5 小结
  4. 4. 4. 定时消息
    1. 4.1. 4.1 Demo01Controller
    2. 4.2. 4.2 简单测试
  5. 5. 5. 消费重试
    1. 5.1. 5.1 复制项目
    2. 5.2. 5.2 配置文件
    3. 5.3. 5.3 Demo01Consumer
    4. 5.4. 5.4 简单测试
  6. 6. 6. 消费异常处理机制
    1. 6.1. 6.1 复制项目
    2. 6.2. 6.2 Demo01Consumer
    3. 6.3. 6.3 简单测试
  7. 7. 7. 广播消费
    1. 7.1. 7.1 复制项目
    2. 7.2. 7.2 配置文件
    3. 7.3. 7.3 简单测试
  8. 8. 8. 顺序消息
    1. 8.1. 8.1 搭建生产者
      1. 8.1.1. 8.1.1 配置文件
      2. 8.1.2. 8.1.2 Demo01Controller
    2. 8.2. 8.2 搭建消费者
      1. 8.2.1. 8.2.1 配置文件
      2. 8.2.2. 8.2.2 Demo01Consumer
    3. 8.3. 8.3 简单测试
  9. 9. 9. 消息过滤
    1. 9.1. 9.1 Demo01Controller
    2. 9.2. 9.2 复制项目
    3. 9.3. 9.3 配置文件
    4. 9.4. 9.4 简单测试
    5. 9.5. 9.5 Demo01Consumer
    6. 9.6. 9.6 再次测试
  10. 10. 10. 事务消息
    1. 10.1. 10.1 复制项目
    2. 10.2. 10.2 配置文件
    3. 10.3. 10.3 Demo01Controller
    4. 10.4. 10.4 TransactionListenerImpl
    5. 10.5. 10.5 简单测试
  11. 11. 11. 监控端点
    1. 11.1. 11.1 搭建生产者
      1. 11.1.1. 11.1.1 引入依赖
      2. 11.1.2. 11.1.2 配置文件
      3. 11.1.3. 11.1.3 简单测试
    2. 11.2. 11.2 搭建消费者
      1. 11.2.1. 11.2.1 引入依赖
      2. 11.2.2. 11.2.2 配置文件
      3. 11.2.3. 112.3 简单测试
  12. 12. 12. 更多的配置项信息
  13. 13. 13.接入阿里云的消息队列 RocketMQ
    1. 13.1. 13.1 搭建生产者
    2. 13.2. 13.2 搭建消费者
    3. 13.3. 13.3 简单测试
  14. 14. 666. 彩蛋