- A+
图示
图示
RocketMQ集群部署模式及搭建
1.RocketMQ 中的高可用机制
图示
RocketMQ 分布式集群是通过 Master 和 Slave 的配合达到高可用性的。
Master和Slave 的区别:在 Broker 的配置文件中,参数 brokerId 的值为 0 表明这个Broker是Master,大于0表明这个 Broker是Slave,同时brokerRole参数也会说明这个Broker 是Master还是Slave。
Master角色的 Broker 支持读和写,Slave 角色的 Broker 仅支持读,也就是Producer只能和Master角色的Broker连接写入消息;Consumer可以连接Master 角色的 Broker,也可以连接Slave角色的Broker 来读取消息。
2.集群部署模式
(1)单master模式
也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。
(2)多master模式
多个master节点组成集群,单个master节点宕机或者重启对应用没有影响。
优点:所有模式中性能最高(一个 Topic 的可以分布在不同的 master,进行横向拓展)
在多主多从的架构体系下,无论使用客户端还是管理界面创建主题,一个主题都会创建多份队列在多主中(默认是 4 个的话,双主就会有 8 个队列,每台主 4 个队列,所以双主可以提高性能,一个 Topic 的分布在不同的 master,方便进行横向拓展。
缺点:单个master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性就受到影响。
(3)多master多slave异步复制模式
而从节点(Slave)就是复制主节点的数据,对于生产者完全感知不到,对于消费者正常情况下也感知不到。(只有当 Master 不可用或者繁忙的时候, Consumer 会被自动切换到从 Slave 读。)
在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql的主备模式。
优点: 一般情况下都是 master 消费,在 master 宕机或超过负载时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多master一样。
缺点:使用异步复制的同步方式有可能会有消息丢失的问题。(Master 宕机后,生产者发送的消息没有消费完,同时到 Slave 节点的数据也没有同步完)
(4)多master多slave主从同步复制+异步刷盘(最优推荐)
优点:主从同步复制模式能保证数据不丢失。
缺点:发送单个消息响应时间会略长,性能相比异步复制低 10%左右。
对数据要求较高的场景,主从同步复制方式,保存数据热备份,通过异步刷盘方式,保证 rocketMQ 高吞吐量。
(5)Dlegder(不推荐)
在 RocketMQ4.5 版本之后推出了Dlegder模式,但是这种模式一直存在严重的 BUG,同时性能有可能有问题,包括升级到了 4.8 的版本后也一样,所以目前不讲这种模式。(类似于Zookeeper的集群选举模式)
3.刷盘与主从同步
生产时首先将消息写入到 MappedFile,内存映射文件,然后根据刷盘策略刷写到磁盘。
大致的步骤可以理解成使用 MMAP 中的 MappedByteBuffer 中实际用 flip()。
代码示例
RocketMQ的刷盘是把消息存储到磁盘上的,这样既能保证断电后恢复, 又可以让存储的消息量超出内存的限制。RocketMQ为了提高性能,会尽可能地保证磁盘的顺序写。消息在通过Producer写入RocketMQ 的时候,有两种写磁盘方式,同步刷盘和异步刷盘。
(1)同步刷盘
SYNC_FLUSH(同步刷盘):生产者发送的每一条消息都在保存到磁盘成功后才返回告诉生产者成功。这种方式不会存在消息丢失的问 题,但是有很大的磁盘 IO 开销,性能有一定影响。
(2)异步刷盘
ASYNC_FLUSH(异步刷盘):生产者发送的每一条消息并不是立即保存到磁盘,而是暂时缓存起来,然后就返回生产者成功。随后再异步的将缓存数据保存到磁盘,有两种情况:1是定期将缓存中更新的数据进行刷盘,2是当缓存中更新的数据条数达到某一设定值后进行刷盘。这种异步的方式会存在消息丢失(在还未来得及同步到磁盘的时候宕机),但是性能很好。默认是这种模式。
图示
4.8.0 版本中默认值下是异步刷盘,如下图:
源码图示
(3)主从同步复制
集群环境下需要部署多个Broker,Broker分为两种角色:一种是 master,即可以写也可以读,其brokerId=0,只能有一个;另外一种是slave,只允许读,其brokerId为非0。一个master与多个slave通过指定相同的brokerClusterName被归为一个 broker set(broker 集)。通常生产环境中,我们至少需要2个broker set。Slave是复制master的数据。一个Broker组有 Master和Slave,消息需要从Master复制到Slave上,有同步和异步两种复制方式。
主从同步复制方式(Sync Broker):生产者发送的每一条消息都至少同步复制到一个 slave 后才返回告诉生产者成功,即“同步双写”。
在同步复制方式下,如果 Master 出故障, Slave 上有全部的备份数据,容易恢复,但是同步复制会增大数据写入延迟,降低系统吞吐量。
(4)主从异步复制
主从异步复制方式(Async Broker):生产者发送的每一条消息只要写入master就返回告诉生产者成功。然后再“异步复制”到slave。
在异步复制方式下,系统拥有较低的延迟和较高的吞吐量,但是如果Master出了故障,有些数据因为没有被写入Slave,有可能会丢失;
同步复制和异步复制是通过Broker配置文件里的brokerRole参数进行设置的,这个参数可以被设置成 ASYNC_MASTER、 SYNC_MASTER、SLAVE 三个值中的一个。
4.配置参数及意义
brokerId=0 代表主
brokerId=1 代表从(大于 0 都代表从)
brokerRole=SYNC_MASTER 同步复制(主从)
brokerRole=ASYNC_MASTER 异步复制(主从)
flushDiskType=SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH 异步刷盘
5.搭建双主双从同步复制+异步刷盘
(1)NameServer集群
192.168.1.1
192.168.1.2
(2)Broker服务器
192.168.1.1 --MasterA
192.168.1.2 --MasterB
192.168.1.3 --SlaveA
192.168.1.4 --SlaveB
(3)配置文件
注意,因为RocketMQ使用外网地址,所以配置文件(MQ文件夹/conf/2m-2s-sync/)需要修改(同时修改nameserver地址为集群地址):
注意,如果机器内存不够,建议把启动时的堆内存改小,具体见《RocketMQ 的安装.docx》中 --- 3、RocketMQ 在 Linux 下的安装/注意事项
192.168.1.1 ------主 A
broker-a.properties 增加: brokerIP1=192.168.1.1
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
图示
192.168.1.2 ------主 B
broker-b.properties 增加: brokerIP1=192.168.1.2
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
图示
192.168.1.3 ------从 A
broker-a-s.properties 增加:brokerIP1=192.168.1.3
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
图示
192.168.1.4 ------从 B
broker-b-s.properties 增加:brokerIP1=192.168.1.4
namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
图示
不管是主还是从,如果属于一个集群,使用相同的 brokerClusterName 名称
图示
(4)启动步骤
启动 NameServer
(记得关闭防火墙或者要开通 9876 端口)
1.启动NameServer集群,这里使用192.168.1.1 和 192.168.1.2两台作为集群即可。
1)在机器 A,启动第 1 台 NameServer: 102 服务器进入至‘MQ 文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’
查看日志的命令:tail -f ~/logs/rocketmqlogs/namesrv.log
图示
2) 在机器 B,启动第2台NameServer: 103 服务器进入至‘MQ文件夹/bin’下:然后执行‘nohup sh mqnamesrv &’
查看日志的命令:tail -f ~/logs/rocketmqlogs/namesrv.log
图示
启动 Broker
2.启动双主双从同步集群,顺序是先启动主,然后启动从。
3)启动主A: 102 服务器进入至‘MQ 文件夹/bin’下:执行以下命令(autoCreateTopicEnable=true 测试环境开启,生产环境建议关闭):
nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
图示
4)启动主 B: 103 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
5)启动从 A: 104 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:nohup sh mqbroker -c ../conf/2m-2s-sync/broker-a-s.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
6)启动从 B: 105 服务器进入至‘MQ 文件夹\bin’下:执行以下命令:nohup sh mqbroker -c ../conf/2m-2s-sync/broker-b-s.properties autoCreateTopicEnable=true &
查看日志的命令:tail -f ~/logs/rocketmqlogs/broker.log
每台服务器查看日志:tail -f ~/logs/rocketmqlogs/broker.log
如果是要启动控制台,则需要重新打包:
进入‘\rocketmq-console\src\main\resources’文件夹,打开‘application.properties’进行配置。(多个 NameServer 使用;分隔)
rocketmq.config.namesrvAddr=192.168.1.1:9876;192.168.1.2:9876
进入‘\rocketmq-externals\rocketmq-console’文件夹,执行‘mvn clean package -Dmaven.test.skip=true’,编译生成。
在把编译后的 jar 包丢上服务器:
nohup java -jar rocketmq-console-ng-2.0.0.jar &
进入控制台 http://192.168.1.1:8089/#/cluster
集群搭建成功。
图示
图示
消息生产的高可用机制
图示
在创建Topic的时候,把Topic的多个Message Queue 创建在多个Broker组上(相同Broker名称,不同brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。
RocketMQ目前不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的 Broker,更改配置文件,用新的配置文件启动Broker。
1.高可用消息生产流程
图示
1)TopicA 创建在双主中,BrokerA 和 BrokerB 中,每一个 Broker 中有 4 个队列。
2)选择队列是,默认是使用轮训的方式,比如发送一条消息 A 时,选择 BrokerA 中的 Q4。
3)如果发送成功,消息 A 发结束。
4)如果消息发送失败,默认会采用重试机制。
retryTimesWhenSendFailed 同步模式下内部尝试发送消息的最大次数 默认值是 2。
retryTimesWhenSendAsyncFailed 异步模式下内部尝试发送消息的最大次数 默认值是 2。
代码示例
5)如果发生了消息发送失败,这里有一个规避策略(默认配置):
5.1)默认不启用 Broker 故障延迟机制(规避策略):如果是 BrokerA 宕机,上一次路由选择的是 BrokerA 中的 Q4,那么再次重发的队列选择是 BrokerA中的 Q1。但是这里的问题就是消息发送很大可能再次失败,引发再次重复失败,带来不必要的性能损耗。
注意,这里的规避仅仅只针对消息重试,例如,在一次消息发送过程中如果遇到消息发送失败,规避 broekr-a,但是在下一次消息发送时,即再次调用DefaultMQProducer的send方法发送消息时,还是会选择 broker-a 的消息进行发送,只有继续发送失败后,重试时再次规避 broker-a。
为什么会默认这么设计?
1) 某一时间段,从 NameServer 中读到的路由中包含了不可用的主机。
2)不正常的路由信息也只是一个短暂的时间而已。
生产者每隔 30s 更新一次路由信息,而 NameServer 认为 broker 不可用需要经过 120s。
图示
所以生产者要发送时认为 broker 不正常(从 NameServer 拿到)和实际 Broker 不正常有延迟。
5.2)启用 Broker 故障延迟机制:代码如下
代码示例
开启延迟规避机制,一旦消息发送失败(不是重试的)会将broker-a“悲观”地认为在接下来的一段时间内该 Broker 不可用,在为未来某一段时间内所有的客户端不会向该Broker发送消息。这个延迟时间就是通过notAvailableDuration、latencyMax共同计算的,就首先先计算本次消息发送失败所耗的时延,然后对应latencyMax中哪个区间,即计算在latencyMax的下标,然后返回 notAvailableDuration同一个下标对应的延迟值。
这个里面涉及到一个算法,源码部分进行详细讲解。
比如:在发送失败后,在接下来的固定时间(比如 5 分钟)内,发生错误的 BrokeA 中的队列将不再参加队列负载,发送时只选择BrokerB服务器上的队列。
如果所有的 Broker 都触发了故障规避,并且Broker只是那一瞬间压力大,那岂不是明明存在可用的 Broker,但经过你这样规避,反倒是没有 Broker可用来,那岂不是更糟糕了。所以 RocketMQ 默认不启用 Broker 故障延迟机制。
消息消费的高可用机制
1.主从的高可用原理
在 Consumer 的配置文件中,并不需要设置是从 Master 读还是从 Slave 读,当 Master 不可用或者繁忙的时候,Consumer 会被自动切换到从 Slave 读。 有了自动切换 Consumer 这种机制,当一个 Master 角色的机器出现故障后,Consumer 仍然可以从 Slave 读取消息,不影响Consumer 程序。这就达到了消费端的高可用性。
Master 不可用这个很容易理解,那什么是 Master 繁忙呢?
这个繁忙其实是 RocketMQ 服务器的内存不够导致的。
源码分析:org.apache.rocketmq.store. DefaultMessageStore#getMessage 方法
代码示例
当前需要拉取的消息已经超过常驻内存的大小,表示主服务器繁忙,此时才建议从从服务器拉取。
2.消息消费的重试
消费端如果发生消息失败,没有提交成功,消息默认情况下会进入重试队列中。
图示
注意重试队列的名字其实是跟消费群组有关,不是主题,因为一个主题可以有多个群组消费,所以要注意。
图示
(1)顺序消息的重试
对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ 会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生。
所以玩顺序消息时。consume消费消息失败时,不能返回reconsume——later,这样会导致乱序,应该返回suspend_current_queue_a_moment,意思是先等一会,一会儿再处理这批消息,而不是放到重试队列里。
(2)无须消息的重试
对于无序消息(普通、定时、延时、事务消息),当消费者消费消息失败时,您可以通过设置返回状态达到消息重试的结果。无序消息的重试只针对 集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息。
(3)重试次数
图示
如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。
注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。
(4)重试配置
集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
>返回 RECONSUME_LATER (推荐)
>返回 Null
>抛出异常
代码示例
代码示例
代码示例
集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 CONSUME_SUCCESS,此后这条消息将不会再重试。
(5)自定义消息最大重试次数
消息队列 RocketMQ允许Consumer启动的时候设置最大重试次数,重试时间间隔将按照如下策略:
>最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
>最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。
代码示例
消息最大重试次数的设置对相同Group ID下的所有Consumer实例有效。
如果只对相同Group ID下两个Consumer实例中的其中一个设置了MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
配置采用覆盖的方式生效,即最后启动的Consumer实例会覆盖之前的启动实例的配置。
3.死信队列
当一条消息初次消费失败,消息队列RocketMQ会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正 确地消费该消息,此时,消息队列RocketMQ不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列RocketMQ中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
(1)死信特性
死信消息具有以下特性:
>不会再被消费者正常消费。
>有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
>不会再被消费者正常消费。
>一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
>如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
>一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
(2)查看死信消息
在控制台查询出现死信队列的主题信息
图示
在消息界面根据主题查询死信消息
选择重新发送消息
一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息,因此,通常需要您对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。
RocketMQ中的负载均衡
1.Producer负载均衡
Producer 端,每个实例在发消息的时候,默认会轮询所有的 message queue 发送,以达到让消息平均落在不同的 queue 上。而由于 queue 可以散落在不同的 broker,所以消息就发送到不同的 broker 下,如下图:
图示
发布方会把第一条消息发送至 Queue 0,然后第二条消息发送至 Queue 1,以此类推。
2.Consumer负载均衡
(1)集群模式
在集群消费模式下,每条消息只需要投递到订阅这个 topic 的 Consumer Group 下的一个实例即可。RocketMQ 采用主动拉取的方式拉取并消费消息,在拉取的时候需要明确指定拉取哪一条 message queue。
而每当实例的数量有变更,都会触发一次所有实例的负载均衡,这时候会按照 queue 的数量和实例的数量平均分配 queue 给每个实例。
默认的分配算法是 AllocateMessageQueueAveragely。
还有另外一种平均的算法是 AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条 queue,只是以环状轮流分 queue 的形式 。
如下图:
图示
需要注意的是,集群模式下,queue 都是只允许分配只一个实例,这是由于如果多个实例同时消费一个 queue 的消息,由于拉取哪些消息是 consumer 主动控制的,那样会导致同一个消息在不同的实例下被消费多次,所以算法上都是一个 queue 只分给一个 consumer 实例,一个 consumer 实例可以允许同时分到不同的 queue。
通过增加 consumer 实例去分摊 queue 的消费,可以起到水平扩展的消费能力的作用。而有实例下线的时候,会重新触发负载均衡,这时候原来分配到的queue 将分配到其他实例上继续消费。
但是如果 consumer 实例的数量比 message queue 的总数量还多的话,多出来的 consumer 实例将无法分到 queue,也就无法消费到消息,也就无法起到分摊负载的作用了。所以需要控制让 queue 的总数量大于等于 consumer 的数量。
(2)广播模式
由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
在实现上,其中一个不同就是在 consumer 分配 queue 的时候,所有 consumer 都分到所有的 queue。
我是娆疆_蚩梦,让坚持成为一种习惯,感谢各位大佬的:点赞、收藏和评论,我们下期见!