- A+
一、概述
RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
具有以下特点:
-
能够保证严格的消息顺序
-
提供丰富的消息拉取模式
-
高效的订阅者水平扩展能力
-
实时的消息订阅机制
-
亿级消息堆积能力
在本文中,提供更多的生产者 Producer 和消费者 Consumer 的使用示例。例如说:
-
Producer 三种发送消息的方式。
-
Producer 发送顺序消息,Consumer 顺序消费消息。
-
Producer 发送定时消息。
-
Producer 批量发送消息。
-
Producer 发送事务消息。
-
Consumer 广播和集群消费消息。
二、快速入门
我们先来对 RocketMQ-Spring 做一个快速入门,实现 Producer 三种发送消息的方式的功能,同时创建一个 Consumer 消费消息。
考虑到一个应用既可以使用生产者 Producer ,又可以使用消费者 Consumer ,所以2角色都进行了配置。
2.1 引入依赖
在 [pom.xml
] 文件中,引入相关依赖。
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>rocketmq</artifactId> <dependencies> <!-- 实现对 RocketMQ 的自动化配置 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <!-- 方便等会写单元测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies></project>
2.2 应用配置文件
在 resources
目录下,创建 application.yaml
配置文件。配置如下:
# rocketmq 配置项,对应 RocketMQProperties 配置类rocketmq: name-server: 101.133.227.13:9876 # RocketMQ Namesrv # Producer 配置项 producer: group: erbadagang-producer-group # 生产者分组 send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。 compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。 retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。 retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档 secret-key: # Secret Key enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档 customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。 # Consumer 配置项 consumer: listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。 erbadagang-consumer-group: topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费
-
在
rocketmq
配置项,设置 RocketMQ 的配置,对应 RocketMQProperties 配置类。 -
RocketMQ-Spring RocketMQAutoConfiguration 自动化配置类,实现 RocketMQ 的自动配置,创建相应的 Producer 和 Consumer 。
-
rocketmq.name-server
配置项,设置 RocketMQ Namesrv 地址。如果多个,使用逗号分隔。 -
rocketmq.producer
配置项,一看就知道是 RocketMQ Producer 所独有。 -
group
配置,生产者分组。 -
retry-next-server
配置,发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为false
。如果使用多主 Broker 的情况下,需要设置true
,这样才会在发送消息失败时,重试另外一台 Broker 。 -
其它配置,一般默认即可。
-
rocketmq.consumer
配置项,一看就知道是 RocketMQ Consumer 所独有。 -
listener
配置,配置某个消费分组,是否监听指定 Topic 。结构为Map<消费者分组, <Topic, Boolean>>
。默认情况下,不配置表示监听。一般情况下,只有我们在想不监听消费某个消费分组的某个 Topic 时,才需要配listener
配置。
2.3 Demo01Message
创建 [Demo01Message 消息类,提供给当前示例使用。代码如下:
package com.ebadagang.springboot.rocketmq.message;/** * 示例 01 的 Message 消息 */public class Demo01Message { public static final String TOPIC = "DEMO_01"; /** * 编号 */ private Integer id; public Demo01Message setId(Integer id) { this.id = id; return this; } public Integer getId() { return id; } @Override public String toString() { return "Demo01Message{" + "id=" + id + '}'; }}
-
TOPIC 静态属性,我们设置该消息类对应 Topic 为 "DEMO_01" 。
2.4 Demo01Producer
它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种(同步、异步、oneway)发送消息的方式。代码如下:
package com.ebadagang.springboot.rocketmq.producer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Demo01Producer { @Autowired private RocketMQTemplate rocketMQTemplate; public SendResult syncSend(Integer id) { // 创建 Demo01Message 消息 Demo01Message message = new Demo01Message(); message.setId(id); // 同步发送消息 return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message); } public void asyncSend(Integer id, SendCallback callback) { // 创建 Demo01Message 消息 Demo01Message message = new Demo01Message(); message.setId(id); // 异步发送消息 rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback); } public void onewaySend(Integer id) { // 创建 Demo01Message 消息 Demo01Message message = new Demo01Message(); message.setId(id); // oneway 发送消息 rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message); }}
-
三个方法,对应三个 RocketMQ 发送消息的方式,分别调用 RocketMQTemplate 提供的
#syncSend(...)
和#asyncSend(...)
以及#sendOneWay(...)
方法。
我们来简单聊下 RocketMQTemplate 类,它继承 Spring Messaging 定义的 AbstractMessageSendingTemplate 抽象类,以达到融入 Spring Messaging 体系中。
在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer
,所以 RocketMQTemplate 后续的各种发送消息的方法,都是使用它。当然,因为 RocketMQTemplate 的封装,所以我们可以像使用 Spring Messaging 一样的方式,进行消息的发送,而无需直接使用 RocketMQ 提供的 Producer 发送消息。
2.5 Demo01Consumer
实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
package com.ebadagang.springboot.rocketmq.consumer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener( topic = Demo01Message.TOPIC, consumerGroup = "demo01-consumer-group-" + Demo01Message.TOPIC)public class Demo01Consumer implements RocketMQListener<Demo01Message> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void onMessage(Demo01Message message) { logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); }}
-
在类上,添加了
@RocketMQMessageListener
注解,声明消费的 Topic 是"DEMO_01"
,消费者分组是"demo01-consumer-group-DEMO_01"
。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有两个好处: -
每个消费者分组职责单一,只消费一个 Topic 。
-
每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。
当然如果是同样消费注册topic的积分和会员子系统他们的消费者分组是不同的,来实现分别消费topic。
-
实现 RocketMQListener 接口,在
T
泛型里,设置消费的消息对应的类。此处,我们就设置了 Demo01Message 类。
2.6 Demo01AConsumer
实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:
package com.ebadagang.springboot.rocketmq.consumer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener( topic = Demo01Message.TOPIC, consumerGroup = "demo01-A-consumer-group-" + Demo01Message.TOPIC)public class Demo01AConsumer implements RocketMQListener<MessageExt> { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void onMessage(MessageExt message) { logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); }}
-
整体和上面 [Demo01Consumer]是一致的,主要有两个差异点,也是为什么我们又额外创建了这个消费者的原因。
差异一,在类上,添加了 @RocketMQMessageListener
注解,声明消费的 Topic 还是 "DEMO_01"
,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01"
。这样,我们就可以测试 RocketMQ 集群消费的特性。
集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。
-
也就是说,如果我们发送一条 Topic 为
"DEMO_01"
的消息,可以分别被"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都消费一次。 -
但是,如果我们启动两个该示例的实例,则消费者分组
"demo01-A-consumer-group-DEMO_01"
和"demo01-consumer-group-DEMO_01"
都会有多个 Consumer 实例。此时,我们再发送一条 Topic 为"DEMO_01"
的消息,只会被"demo01-A-consumer-group-DEMO_01"
的一个 Consumer 消费一次,也同样只会被"demo01-consumer-group-DEMO_01"
的一个 Consumer 消费一次。
好好理解上述的两段话,非常重要。
通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER"
的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:
-
积分模块:判断如果是手机注册,给用户增加 20 积分。
-
优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。
-
站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。
-
… 等等
这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。
差异二,实现 RocketMQListener 接口,在 T
泛型里,设置消费的消息对应的类不是 Demo01Message 类,而是 RocketMQ 内置的 MessageExt 类。通过 MessageExt 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(body
)就需要自己去反序列化。当然,一般情况下,我们不会使用 MessageExt 类。
2.7 测试
创建 [Demo01ProducerTest]测试类,编写三个单元测试方法,调用 Demo01Producer 三种发送消息的方式。代码如下:
package com.ebadagang.springboot.rocketmq.producer;import com.ebadagang.springboot.rocketmq.Application;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CountDownLatch;@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo01ProducerTest { private Logger logger = LoggerFactory.getLogger(getClass()); @Autowired private Demo01Producer producer; @Test public void testSyncSend() throws InterruptedException { int id = (int) (System.currentTimeMillis() / 1000); SendResult result = producer.syncSend(id); logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result); // 阻塞等待,保证消费 new CountDownLatch(1).await(); } @Test public void testASyncSend() throws InterruptedException { int id = (int) (System.currentTimeMillis() / 1000); producer.asyncSend(id, new SendCallback() { @Override public void onSuccess(SendResult result) { logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result); } @Override public void onException(Throwable e) { logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e); } }); // 阻塞等待,保证消费 new CountDownLatch(1).await(); } @Test public void testOnewaySend() throws InterruptedException { int id = (int) (System.currentTimeMillis() / 1000); producer.onewaySend(id); logger.info("[testOnewaySend][发送编号:[{}] 发送完成]", id); // 阻塞等待,保证消费 new CountDownLatch(1).await(); }}
2.7.1 测试#testSyncSend()
启动执行#testSyncSend()
方法,测试同步发送消息,生产者会报这样的错误 :
RemotingTooMuchRequestException: sendDefaultImpl call timeout
其实是连不上远程mq,解决方法:
在conf/broker.conf 中 加入 两行配置
namesrvAddr = 你的公网IP:9876 brokerIP1=你的公网IP
重新启动 broker,启动broker的指令要修改下, 要将这个配置文件指定加载:nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &
正常情况我们看到控制台输出consumer的注册信息:
running container: DefaultRocketMQListenerContainer{consumerGroup='demo01-consumer-group-DEMO_01', nameServer='101.133.227.13:9876', topic='DEMO_01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
信息生产和消费信息:
2020-08-04 15:48:43.059 INFO 14056 --- [ main] c.e.s.r.producer.Demo01ProducerTest : [testSyncSend][发送编号:[1596527322] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, offsetMsgId=6585E30D00002A9F0000000000031954, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=0]]] 2020-08-04 15:48:43.605 INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer : [onMessage][线程编号:168 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596527322965, bornHost=/202.99.106.26:38252, storeTimestamp=1596527322642, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000031954, commitLogOffset=203092, bodyCRC=68977144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596527323605, id=8007a731-58b8-bee7-c3e5-755cf737b5bd, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596527322658}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 55, 51, 50, 50, 125], transactionId='null'}]] 2020-08-04 15:49:04.155 INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer : [onMessage][线程编号:172 消息内容:Demo01Message{id=1596527322}]
上面信息看到queueId=3
,可以到控制台去验证一下。
查看具体的存储queue
通过日志我们可以看到,我们发送的消息,分别被 Demo01AConsumer 和 Demo01Consumer 两个消费者(消费者分组)都消费了一次。
同时,两个消费者在不同的线程池中,消费了这条消息。虽然说,我们看到两条日志里,我们都看到了线程名为 "MessageThread_1" ,但是线程编号分别是 168 和 172 。 因为,每个 RocketMQ Consumer 的消费线程池创建的线程都是以 "MessageThread_" 开头,同时这里相同的线程名结果不同的线程编号,很容易判断出时候用了两个不同的消费线程池。
2.7.3 测试#testASyncSend()方法
注意,不要关闭上一个 #testSyncSend() 单元测试方法,Springboot每次启动的时候会向nameServer注册consumer,启动2个相当于2个consumer集群消费同样topic。这里我们要模拟每个消费者集群,都有多个 Consumer 节点。
控制台输出如下:
2020-08-04 16:06:04.245 INFO 14916 --- [ublicExecutor_1] c.e.s.r.producer.Demo01ProducerTest : [testASyncSend][发送编号:[1596528363] 发送成功,结果为:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, offsetMsgId=6585E30D00002A9F000000000003220A, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=1]]] 2020-08-04 16:06:04.291 INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer : [onMessage][线程编号:166 消息内容:Demo01Message{id=1596528363}] 2020-08-04 16:06:04.463 INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer : [onMessage][线程编号:175 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=1, sysFlag=0, bornTimestamp=1596528364166, bornHost=/202.99.106.26:38723, storeTimestamp=1596528363846, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F000000000003220A, commitLogOffset=205322, bodyCRC=408850356, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1596528364463, id=ee6e1a9c-4a78-e840-1820-616637e5a9b3, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596528364026}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 56, 51, 54, 51, 125], transactionId='null'}]]
和 #testSyncSend() 方法执行的结果,是一致的。此时,我们打开 #testSyncSend() 方法所在的控制台,不会看到有新的消息消费日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。而广播模式是每个 Consumer 实例都消费消息。
不过如上的日志,也可能出现在 #testSyncSend() 方法所在的控制台,而不在 #testASyncSend() 方法所在的控制台,但只有一个地方消费。
2.7.4 #testOnewaySend()方法
执行后控制台输出发送消息,日志输出发送编号:[1596528903]。
2020-08-04 16:15:03.479 INFO 9364 --- [ main] c.e.s.r.producer.Demo01ProducerTest : [testOnewaySend][发送编号:[1596528903] 发送完成]
消费消息在第一个控制台输出:
被集群中的其他消费者消费
三、@RocketMQMessageListener
在 [ Demo01Consumer] 中,我们已经使用了 @RocketMQMessageListener
注解,设置每个 RocketMQ 消费者 Consumer 的消息监听器的配置。
@RocketMQMessageListener
注解的常用属性如下:
/** * Consumer 所属消费者分组 * * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve * load balance. It's required and needs to be globally unique. * * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion. */String consumerGroup();/** * 消费的 Topic * * Topic name. */String topic();/** * 选择器类型。默认基于 Message 的 Tag 选择。 * * Control how to selector message. * * @see SelectorType */SelectorType selectorType() default SelectorType.TAG;/** * 选择器的表达式。 * 设置为 * 时,表示全部。 * * 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。 * 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档 * * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92} */String selectorExpression() default "*";/** * 消费模式。可选择并发消费,还是顺序消费。 * * Control consume mode, you can choice receive message concurrently or orderly. */ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;/** * 消息模型。可选择是集群消费,还是广播消费。 * * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice. */MessageModel messageModel() default MessageModel.CLUSTERING;/** * 消费的线程池的最大线程数 * * Max consumer thread number. */int consumeThreadMax() default 64;/** * 消费单条消息的超时时间 * * Max consumer timeout, default 30s. */long consumeTimeout() default 30000L;
@RocketMQMessageListener 注解的不常用属性如下:
// 默认从配置文件读取的占位符String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";/** * The property of "access-key". */ String accessKey() default ACCESS_KEY_PLACEHOLDER; /** * The property of "secret-key". */String secretKey() default SECRET_KEY_PLACEHOLDER;/** * Switch flag instance for message trace. */boolean enableMsgTrace() default true;/** * The name value of message trace topic.If you don't config,you can use the default trace topic name. */String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;/** * Consumer 连接的 RocketMQ Namesrv 地址。默认情况下,使用 `rocketmq.name-server` 配置项即可。 * * 如果一个项目中,Consumer 需要使用不同的 RocketMQ Namesrv ,则需要配置该属性。 * * The property of "name-server". */String nameServer() default NAME_SERVER_PLACEHOLDER;/** * 访问通道。目前有 LOCAL 和 CLOUD 两种通道。 * * LOCAL ,指的是本地部署的 RocketMQ 开源项目。 * CLOUD ,指的是阿里云的 ONS 服务。具体可见 https://help.aliyun.com/document_detail/128585.html 文档。 * * The property of "access-channel". */String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
四、 @ExtRocketMQTemplateConfiguration
RocketMQ-Spring 考虑到开发者可能需要连接多个不同的 RocketMQ 集群,所以提供了 @ExtRocketMQTemplateConfiguration
注解,实现配置连接不同 RocketMQ 集群的 Producer 的 RocketMQTemplate Bean 对象。
@ExtRocketMQTemplateConfiguration
注解的具体属性,和我们在 [ 应用配置文件」]的 rocketmq.producer
配置项是一致的,就不重复赘述啦。
@ExtRocketMQTemplateConfiguration
注解的简单使用示例,代码如下:
@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer:demo.rocketmq.name-server}")public class ExtRocketMQTemplate extends RocketMQTemplate {}
在类上,添加 @ExtRocketMQTemplateConfiguration 注解,并设置连接的 RocketMQ Namesrv 地址。
同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用 @Autowire 或 @Resource 注解,注入 RocketMQTemplate Bean 属性。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码