一. RocketMQ是什么

RocketMQ是一个统一消息引擎、轻量级数据处理平台。

RocketMQ是⼀款阿⾥巴巴开源的消息中间件,双十一承载了万亿级消息的流转,2016年11⽉,阿⾥巴巴向 Apache 软件基⾦会捐赠 RocketMQ,成为 Apache 孵化项⽬,2017 年 9 ⽉ ,Apache 宣布 RocketMQ孵化成为 Apache 顶级项⽬(TLP )成为国内⾸个互联⽹中间件在 Apache 上的顶级项⽬。

二. RocketMQ特征

  • 支持集群模型、负载均衡、水平扩展能力
  • 亿级别消息堆积能力

  • 采用零拷贝的原理,顺序写盘,随机读

  • 底层通信框架采用Netty NIO

  • NameServer代替Zookeeper,实现服务寻址和服务协调

  • 消息失败重试机制、消息可查询

  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展

  • 经过多次双十一的考验

三. RocketMQ安装

1.下载RocketMQ

https://rocketmq.apache.org/download

2. 下载后解压

  • Bin : 可执行文件目录

  • Conif:配置文件目录

  • Lib : 依赖库,一堆Jar包

3. 配置ROCKETMQ_HOME

例如:D:\installation\rocketmq-all-4.8.0-bin-release

welt3

四. 启动MQ

1. 启动NameServer

Cmd命令框执行进入至MQ文件夹\bin下,然后执行 start mqnamesrv.cmd,启动NameServer。

image-20221009204641999

2.启动Broker

进入至MQ文件夹\bin下,修改Bean目录下的 runbroker.cmd 中JVM占用内存大小

1
set "JAVA_OPT=%JAVA_OPT% -server -Xms1g -Xmx1g -Xmn512m"

CMD执行start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true ,启动Broker。

2. RocketMQ存储结构

RocketMQ安装好之后会在用户目录下产生一个store目录用来存储相关数据:C:\Users\coderyeah\store

五. RocketMQ插件

RocketMQ可视化管理插件下载地址(该项目是一个springboot工程):https://github.com/apache/rocketmq-externals

1. 修改配置

解压后,修改配置:src/main/resource/application.properties ,这里需要指向Name Server 的地址和端口 如下:

1
rocketmq.config.namesrvAddr=127.0.0.1:9876

2. 打包插件

回到安装目录(pom.xml所在目录),执行: mvn clean package -Dmaven.test.skip=true ,然后会在target目录生成打包后的jar文件

3. 启动插件

进入 target 目录,CMD执行 java -jar rocketmq-console-ng-1.0.0.jar , 访问 http://localhost:8080

六. 原理

RocketMQ开发官方文档:

https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

RocketMQ的集群架构如下:

image-20221009210056877

1. Producer

消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

2. Consumer

消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

3. Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

4. NameServer

NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现,主要包括两个功能

  • Broker管理

    NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。

  • 路由信息管理

    每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费

七. MQ实践

官方案例:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md

1. 导入依赖

注意和安装的MQ版本一致

1
2
3
4
5
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>

2. 生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package io.coderyeah;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
* @author lqs
* @date 2022/10/9
*/
public class SyncProducer {// 同步消息

public static void main(String[] args) throws Exception {
// 实例化消息生产者Producer
final DefaultMQProducer producer = new DefaultMQProducer("hello-mq");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
final Message message = new Message("hello-rocketMq", "hello-mq", "哈哈哈哈啊哈".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
// message.setDelayTimeLevel(3);
// 发送消息到一个Broker
final SendResult result = producer.send(message);
// 发送单向消息,没有任何返回结果
// producer.sendOneway(message);
// 通过sendResult返回消息是否成功送达
// [sendStatus=SEND_OK, msgId=7F000001983C18B4AAC22CAD1C830000, offsetMsgId=AC100BEA00002A9F0000000000000173, messageQueue=MessageQueue [topic=topic-async, brokerName=LAPTOP-CRB8PEDK, queueId=1], queueOffset=0]
System.out.println("result:" + result);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
}

3. 消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package io.coderyeah;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.nio.charset.StandardCharsets;
import java.util.List;

/**
* @author lqs
* @date 2022/10/9
*/
public class Consumer {// 消费者

public static void main(String[] args) throws Exception {
// 实例化消费者
final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("topic-consumer");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("topic-async", "tag-async");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
try {
for (MessageExt ext : list) {
System.out.println(new String(ext.getBody(), StandardCharsets.UTF_8));
System.out.println(context);
}
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 启动消费者实例
consumer.start();
System.out.println("消费者启动成功");
}
}

4. RabbitMQ工作流程

  1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

  3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息

6. Producer 生产者

RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。

RocketMQ中的消息生产者都是以生产者组(Producer Group)的形式出现的。生产者组是同一类生产者的集合,这类Producer发送相同Topic类型的消息。一个生产者组可以同时发送多个主题的消息。

Producer会使用一定的算法选择把消息发送到哪个master的某个queue中。

7. Consumer 消费者

Consumer 支持两种消费形式:拉取式消费、推动式消费。(主动,被动),RocketMQ中的消息消费者都是以消费者组(Consumer Group)的形式出现的。消费者组是同一类消费者的集合,这类Consumer消费的是同一个Topic类型的消息,不同的 Consumer Group可以消费同一个Topic。

8. Topic 消息主题

Topic表示一类消息的集合,每个topic主题包含若干条message消息,每条message消息只能属于一个topic主题,Topic是RocketMQ进行消息订阅的基本单位。

9. Message

消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。

10. Tag 标签

为消息设置的标志,用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。Topic是消息的一级分类,Tag是消息的二级分类(如订单order下可有服务订单、收购订单、消费订单等)。

11. MessageQueue队列

一个Topic中可以包含多个Queue,一 个Topic的Queue也被称为一个Topic中消息的分区(Partition)。 在一个Consumer Group内,一个Queue最多只能分配给一个Consumer,一个Cosumer可以分配得到多个Queue。这样的分配规则,每个Queue只有一个消费者,可以避免消费过程中的多线程处理和资源锁定,有效提高各Consumer消费的并行度和处理效率。

八. 异步发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package io.coderyeah;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;

/**
* @author lqs
* @date 2022/10/9 16:01
*/
public class AsyncProducer {// 异步生产者

public static void main(String[] args) throws Exception {
final DefaultMQProducer producer = new DefaultMQProducer("AsyncProducer");
producer.setNamesrvAddr("localhost:9876");
// 启动producer实例
producer.start();
final Message message = new Message("topic-async", "tag-async", "生产者发送的异步消息".getBytes(StandardCharsets.UTF_8));
// SendCallback接收异步返回结果的回调
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送异步消息成功:" + sendResult);
}

@Override
public void onException(Throwable throwable) {
System.out.println("发送异步消息失败:" + throwable);
}
});
// producer.shutdown();
}
}

SendCallback 是消息发送结果回调。如果:sendResult.getSendStatus() == SendStatus.SEND_OK 表示成功

① 单项发送消息

这种方式指的是发送者发送消息后无需等待Broker的结果返回,Broker也不会返回结果,该方式性能最高,但是消息可靠性低。API : producer.sendOneway(message) 示例代码:

1
2
3
// ... 省略...
Message message = new Message("asyn-topic", "sms", "我是消息".getBytes(CharsetUtil.UTF_8));
producer.sendOneway(message);

sendOneway 单向发送是没有返回结果值的。

九. 延迟消息概述

我们通常使用定时任务比如Quartz来解决超时业务,比如:订单支付超时关单,VIP会员超时提醒。但是使用定时任务来处理这些业务场景在数据量大的时候并不是一个很好的选择,会造成大量的空扫描浪费性能。我们可以考虑使用延迟消息来解决。

延迟消息即:把消息写到Broker后需要延迟一定时间才能被消费 , 在RocketMQ中消息的延迟时间不能任意指定,而是由特定的等级(1 到 18)来指定,分别有:

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

可以通过修改配置来增加级别,比如在mq安装目录的 broker.conf 文件中增加

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 2d 这个时候总共就有19个level。

image-20221009213320025

延迟流程

RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。不过,在分发之前,系统会先判断消息中是否带有延时等级。若没有,则直接正常分发;如果有就走下面的流程:

  1. 修改消息Topic的名字为SCHEDULE_TOPIC_XXXX

  2. 根据延时等级,在consumequeue目录中SCHEDULE_TOPIC_XXXX主题下创建出相应的queueId

目录与consumequeue文件

  1. 修改消息索引单元,计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,投递时间 = 消息存储时间 + 延时等级时间 。下面是CosumeQueue单个存储单元组成结构如下

1634813716044

  • Commit Log Offset:记录在CommitLog中的位置。

  • Size:记录消息的大小

  • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。

  1. 将消息索引写入到SCHEDULE_TOPIC_XXXX主题下相应的consumequeue中

  2. Broker内部有⼀个延迟消息服务类ScheuleMessageService,根据延迟级别数,创建对应数量的定时器Timer,定时消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

  3. 在将消息到期后,队列的Level等级改为0,作为一条普通消息,投递到目标Topic。

只需要一处改动,发送者通过 message.setDelayTimeLevel(3); 设置延迟级别即可

1
2
3
4
5
final Message message = new Message("hello-rocketMq", "hello-mq", "哈哈".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息到一个Broker
final SendResult result = producer.send(message);

十. 事务消息

如果业务只涉及到一个数据库的写操作,我们只需要保证这一个事物的提交和回滚,这种事务管理叫传统事物或本地事务,如果业务涉及到多个数据库(多个服务)的写操作,我们需要保证多个数据库同时提交或回滚,这种夸多个数据库的事务操作叫分布式事务。

事务消息实战

  1. 编写本地事务检查监听TransactionListener ,一是执行本地事务逻辑,二是返回本地事务执行状态

  2. 发消息时生产者需要设置producer.setTransactionListener 事务监听

事务监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class MyTransactionCheckListener implements TransactionListener {

@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//执行业务,保存本地事务

//保存成功
return LocalTransactionState.COMMIT_MESSAGE ; //ROLLBACK_MESSAGE; //未知
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
//这里查询本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}

消息生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TransationSender {

public static void main(String[] args) throws MQClientException {
//使用事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("tran-product-group");

producer.setNamesrvAddr("127.0.0.1:9876");

//线程池底层是使用新开线程去发布消息到MQ
ExecutorService excutorService = Executors.newFixedThreadPool(20);

producer.setExecutorService(excutorService);
//指定事务监听器
producer.setTransactionListener(new MyTransactionCheckListener());

//设置事务消息监听
producer.start();

for(int i = 0 ; i < 10 ; i++){
String orderId = UUID.randomUUID().toString();
String tags = "Tag";
Message message = new Message("topic-tran", "tag", orderId, ("下单:"+i).getBytes(CharsetUtil.UTF_8));
//发送事务消息
TransactionSendResult transactionSendResult = producer.sendMessageInTransaction(message, null);
System.out.println(transactionSendResult);
}
producer.shutdown();
}
}

消息消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class TransationConsumer {
public static void main(String[] args) throws MQClientException {
//创建消费者
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("trans-consumer-group");
//设置name server 地址
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");

//订阅
defaultMQPushConsumer.subscribe("topic-tran", "tag");

defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {

list.forEach(message->{
System.out.println(message+" ; "+new String(message.getBody(), CharsetUtil.UTF_8));
});

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

defaultMQPushConsumer.start();
}
}