RocketMQ 与 Spring Boot整合(一、3种发送消息的方式)

  • A+
所属分类:消息队列 软件开发

一、概述

RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

具有以下特点:

  1. 能够保证严格的消息顺序

  2. 提供丰富的消息拉取模式

  3. 高效的订阅者水平扩展能力

  4. 实时的消息订阅机制

  5. 亿级消息堆积能力

在本文中,提供更多的生产者 Producer 和消费者 Consumer 的使用示例。例如说:

  • Producer 三种发送消息的方式。

  • Producer 发送顺序消息,Consumer 顺序消费消息。

  • Producer 发送定时消息。

  • Producer 批量发送消息。

  • Producer 发送事务消息。

  • Consumer 广播集群消费消息。

二、快速入门

我们先来对 RocketMQ-Spring 做一个快速入门,实现 Producer 三种发送消息的方式的功能,同时创建一个 Consumer 消费消息。

考虑到一个应用既可以使用生产者 Producer ,又可以使用消费者 Consumer ,所以2角色都进行了配置。

2.1 引入依赖

在 [pom.xml] 文件中,引入相关依赖。

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.0.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rocketmq</artifactId>

    <dependencies>
        <!-- 实现对 RocketMQ 的自动化配置 -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
        </dependency>

        <!-- 方便等会写单元测试 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies></project>

2.2 应用配置文件

在 resources 目录下,创建 application.yaml 配置文件。配置如下:

# rocketmq 配置项,对应 RocketMQProperties 配置类rocketmq:
  name-server: 101.133.227.13:9876 # RocketMQ Namesrv
  # Producer 配置项
  producer:
    group: erbadagang-producer-group # 生产者分组
    send-message-timeout: 3000 # 发送消息超时时间,单位:毫秒。默认为 3000 。
    compress-message-body-threshold: 4096 # 消息压缩阀值,当消息体的大小超过该阀值后,进行消息压缩。默认为 4 * 1024B
    max-message-size: 4194304 # 消息体的最大允许大小。。默认为 4 * 1024 * 1024B
    retry-times-when-send-failed: 2 # 同步发送消息时,失败重试次数。默认为 2 次。
    retry-times-when-send-async-failed: 2 # 异步发送消息时,失败重试次数。默认为 2 次。
    retry-next-server: false # 发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false
    access-key: # Access Key ,可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/acl/user_guide.md 文档
    secret-key: # Secret Key
    enable-msg-trace: true # 是否开启消息轨迹功能。默认为 true 开启。可阅读 https://github.com/apache/rocketmq/blob/master/docs/cn/msg_trace/user_guide.md 文档
    customized-trace-topic: RMQ_SYS_TRACE_TOPIC # 自定义消息轨迹的 Topic 。默认为 RMQ_SYS_TRACE_TOPIC 。
  # Consumer 配置项
  consumer:
    listeners: # 配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。
      erbadagang-consumer-group:
        topic1: false # 关闭 test-consumer-group 对 topic1 的监听消费

  • 在 rocketmq 配置项,设置 RocketMQ 的配置,对应 RocketMQProperties 配置类。

  • RocketMQ-Spring RocketMQAutoConfiguration 自动化配置类,实现 RocketMQ 的自动配置,创建相应的 Producer 和 Consumer 。

  • rocketmq.name-server 配置项,设置 RocketMQ Namesrv 地址。如果多个,使用逗号分隔。

  • rocketmq.producer 配置项,一看就知道是 RocketMQ Producer 所独有。

    • group 配置,生产者分组。

    • retry-next-server 配置,发送消息给 Broker 时,如果发送失败,是否重试另外一台 Broker 。默认为 false 。如果使用多 Broker 的情况下,需要设置 true ,这样才会在发送消息失败时,重试另外一台 Broker 。

    • 其它配置,一般默认即可。

  • rocketmq.consumer 配置项,一看就知道是 RocketMQ Consumer 所独有。

    • listener 配置,配置某个消费分组,是否监听指定 Topic 。结构为 Map<消费者分组, <Topic, Boolean>> 。默认情况下,不配置表示监听。一般情况下,只有我们在想不监听消费某个消费分组的某个 Topic 时,才需要配 listener 配置。

2.3 Demo01Message

创建 [Demo01Message 消息类,提供给当前示例使用。代码如下:

package com.ebadagang.springboot.rocketmq.message;/**
 * 示例 01 的 Message 消息
 */public class Demo01Message {

    public static final String TOPIC = "DEMO_01";

    /**
     * 编号
     */
    private Integer id;

    public Demo01Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo01Message{" +
                "id=" + id +
                '}';
    }}

  • TOPIC 静态属性,我们设置该消息类对应 Topic 为 "DEMO_01" 。

2.4 Demo01Producer

它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现三种(同步、异步、oneway)发送消息的方式。代码如下:

package com.ebadagang.springboot.rocketmq.producer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.apache.rocketmq.spring.core.RocketMQTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class Demo01Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult syncSend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 同步发送消息
        return rocketMQTemplate.syncSend(Demo01Message.TOPIC, message);
    }

    public void asyncSend(Integer id, SendCallback callback) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // 异步发送消息
        rocketMQTemplate.asyncSend(Demo01Message.TOPIC, message, callback);
    }

    public void onewaySend(Integer id) {
        // 创建 Demo01Message 消息
        Demo01Message message = new Demo01Message();
        message.setId(id);
        // oneway 发送消息
        rocketMQTemplate.sendOneWay(Demo01Message.TOPIC, message);
    }}

  • 三个方法,对应三个 RocketMQ 发送消息的方式,分别调用 RocketMQTemplate 提供的 #syncSend(...) 和 #asyncSend(...) 以及 #sendOneWay(...) 方法。

我们来简单聊下 RocketMQTemplate 类,它继承 Spring Messaging 定义的 AbstractMessageSendingTemplate 抽象类,以达到融入 Spring Messaging 体系中。

在 RocketMQTemplate 中,会创建一个 RocketMQ DefaultMQProducer 生产者 producer ,所以 RocketMQTemplate 后续的各种发送消息的方法,都是使用它。当然,因为 RocketMQTemplate 的封装,所以我们可以像使用 Spring Messaging 一样的方式,进行消息的发送,而无需直接使用 RocketMQ 提供的 Producer 发送消息。

2.5 Demo01Consumer

实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(
        topic = Demo01Message.TOPIC,
        consumerGroup = "demo01-consumer-group-" + Demo01Message.TOPIC)public class Demo01Consumer implements RocketMQListener<Demo01Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo01Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }}

  • 在类上,添加了 @RocketMQMessageListener 注解,声明消费的 Topic 是 "DEMO_01" ,消费者分组是 "demo01-consumer-group-DEMO_01" 。一般情况下,我们建议一个消费者分组,仅消费一个 Topic 。这样做会有两个好处:

    • 每个消费者分组职责单一,只消费一个 Topic 。

    • 每个消费者分组是独占一个线程池,这样能够保证多个 Topic 隔离在不同线程池,保证隔离性,从而避免一个 Topic 消费很慢,影响到另外的 Topic 的消费。

当然如果是同样消费注册topic的积分和会员子系统他们的消费者分组是不同的,来实现分别消费topic。

  • 实现 RocketMQListener 接口,在 T 泛型里,设置消费的消息对应的类。此处,我们就设置了 Demo01Message 类。

2.6 Demo01AConsumer

实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;import com.ebadagang.springboot.rocketmq.message.Demo01Message;import org.apache.rocketmq.common.message.MessageExt;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;@Component@RocketMQMessageListener(
        topic = Demo01Message.TOPIC,
        consumerGroup = "demo01-A-consumer-group-" + Demo01Message.TOPIC)public class Demo01AConsumer implements RocketMQListener<MessageExt> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(MessageExt message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }}

  • 整体和上面 [Demo01Consumer]是一致的,主要有两个差异点,也是为什么我们又额外创建了这个消费者的原因。

差异一,在类上,添加了 @RocketMQMessageListener 注解,声明消费的 Topic 还是 "DEMO_01" ,消费者分组修改成了 "demo01-A-consumer-group-DEMO_01" 。这样,我们就可以测试 RocketMQ 集群消费的特性。

集群消费(Clustering):集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。

  • 也就是说,如果我们发送一条 Topic 为 "DEMO_01" 的消息,可以分别被 "demo01-A-consumer-group-DEMO_01"和 "demo01-consumer-group-DEMO_01" 都消费一次。

  • 但是,如果我们启动两个该示例的实例,则消费者分组 "demo01-A-consumer-group-DEMO_01" 和 "demo01-consumer-group-DEMO_01" 都会有多个 Consumer 实例。此时,我们再发送一条 Topic 为 "DEMO_01"的消息,只会被 "demo01-A-consumer-group-DEMO_01" 的一个 Consumer 消费一次,也同样只会被 "demo01-consumer-group-DEMO_01" 的一个 Consumer 消费一次。

好好理解上述的两段话,非常重要。

通过集群消费的机制,我们可以实现针对相同 Topic ,不同消费者分组实现各自的业务逻辑。例如说:用户注册成功时,发送一条 Topic 为 "USER_REGISTER" 的消息。然后,不同模块使用不同的消费者分组,订阅该 Topic ,实现各自的拓展逻辑:

  • 积分模块:判断如果是手机注册,给用户增加 20 积分。

  • 优惠劵模块:因为是新用户,所以发放新用户专享优惠劵。

  • 站内信模块:因为是新用户,所以发送新用户的欢迎语的站内信。

  • … 等等

这样,我们就可以将注册成功后的业务拓展逻辑,实现业务上的解耦,未来也更加容易拓展。同时,也提高了注册接口的性能,避免用户需要等待业务拓展逻辑执行完成后,才响应注册成功。

差异二,实现 RocketMQListener 接口,在 T 泛型里,设置消费的消息对应的类不是 Demo01Message 类,而是 RocketMQ 内置的 MessageExt 类。通过 MessageExt 类,我们可以获取到消费的消息的更多信息,例如说消息的所属队列、创建时间等等属性,不过消息的内容(body)就需要自己去反序列化。当然,一般情况下,我们不会使用 MessageExt 类。

2.7 测试

创建 [Demo01ProducerTest]测试类,编写三个单元测试方法,调用 Demo01Producer 三种发送消息的方式。代码如下:

package com.ebadagang.springboot.rocketmq.producer;import com.ebadagang.springboot.rocketmq.Application;import org.apache.rocketmq.client.producer.SendCallback;import org.apache.rocketmq.client.producer.SendResult;import org.junit.Test;import org.junit.runner.RunWith;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.concurrent.CountDownLatch;@RunWith(SpringRunner.class)@SpringBootTest(classes = Application.class)public class Demo01ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo01Producer producer;

    @Test
    public void testSyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testASyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        producer.asyncSend(id, new SendCallback() {

            @Override
            public void onSuccess(SendResult result) {
                logger.info("[testASyncSend][发送编号:[{}] 发送成功,结果为:[{}]]", id, result);
            }

            @Override
            public void onException(Throwable e) {
                logger.info("[testASyncSend][发送编号:[{}] 发送异常]]", id, e);
            }

        });

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testOnewaySend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        producer.onewaySend(id);
        logger.info("[testOnewaySend][发送编号:[{}] 发送完成]", id);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }}

2.7.1 测试#testSyncSend()

启动执行#testSyncSend()方法,测试同步发送消息,生产者会报这样的错误 :

RemotingTooMuchRequestException: sendDefaultImpl call timeout

其实是连不上远程mq,解决方法:
在conf/broker.conf 中 加入 两行配置

namesrvAddr = 你的公网IP:9876
brokerIP1=你的公网IP

重新启动 broker,启动broker的指令要修改下, 要将这个配置文件指定加载:
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf autoCreateTopicEnable=true &

正常情况我们看到控制台输出consumer的注册信息:

running container: DefaultRocketMQListenerContainer{consumerGroup='demo01-consumer-group-DEMO_01', nameServer='101.133.227.13:9876', topic='DEMO_01', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}

信息生产和消费信息:

2020-08-04 15:48:43.059  INFO 14056 --- [           main] c.e.s.r.producer.Demo01ProducerTest      : [testSyncSend][发送编号:[1596527322] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, offsetMsgId=6585E30D00002A9F0000000000031954, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=0]]]
2020-08-04 15:48:43.605  INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer  : [onMessage][线程编号:168 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=0, sysFlag=0, bornTimestamp=1596527322965, bornHost=/202.99.106.26:38252, storeTimestamp=1596527322642, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F0000000000031954, commitLogOffset=203092, bodyCRC=68977144, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1596527323605, id=8007a731-58b8-bee7-c3e5-755cf737b5bd, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A36E818B4AAC212D7A7160000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596527322658}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 55, 51, 50, 50, 125], transactionId='null'}]]
2020-08-04 15:49:04.155  INFO 14056 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer   : [onMessage][线程编号:172 消息内容:Demo01Message{id=1596527322}]

上面信息看到queueId=3,可以到控制台去验证一下。

查看具体的存储queue

通过日志我们可以看到,我们发送的消息,分别被 Demo01AConsumer 和 Demo01Consumer 两个消费者(消费者分组)都消费了一次。
同时,两个消费者在不同的线程池中,消费了这条消息。虽然说,我们看到两条日志里,我们都看到了线程名为 "MessageThread_1" ,但是线程编号分别是 168 和 172 。 因为,每个 RocketMQ Consumer 的消费线程池创建的线程都是以 "MessageThread_" 开头,同时这里相同的线程名结果不同的线程编号,很容易判断出时候用了两个不同的消费线程池。

2.7.3 测试#testASyncSend()方法

注意,不要关闭上一个 #testSyncSend() 单元测试方法,Springboot每次启动的时候会向nameServer注册consumer,启动2个相当于2个consumer集群消费同样topic。这里我们要模拟每个消费者集群,都有多个 Consumer 节点。
控制台输出如下:

2020-08-04 16:06:04.245  INFO 14916 --- [ublicExecutor_1] c.e.s.r.producer.Demo01ProducerTest      : [testASyncSend][发送编号:[1596528363] 发送成功,结果为:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, offsetMsgId=6585E30D00002A9F000000000003220A, messageQueue=MessageQueue [topic=DEMO_01, brokerName=broker-a, queueId=3], queueOffset=1]]]
2020-08-04 16:06:04.291  INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01Consumer   : [onMessage][线程编号:166 消息内容:Demo01Message{id=1596528363}]
2020-08-04 16:06:04.463  INFO 14916 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo01AConsumer  : [onMessage][线程编号:175 消息内容:MessageExt [brokerName=broker-a, queueId=3, storeSize=308, queueOffset=1, sysFlag=0, bornTimestamp=1596528364166, bornHost=/202.99.106.26:38723, storeTimestamp=1596528363846, storeHost=/101.133.227.13:10911, msgId=6585E30D00002A9F000000000003220A, commitLogOffset=205322, bodyCRC=408850356, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='DEMO_01', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1596528364463, id=ee6e1a9c-4a78-e840-1820-616637e5a9b3, UNIQ_KEY=240884E30114A66731EF8A0EAAFD768A3A4418B4AAC212E78A480000, CLUSTER=DefaultCluster, WAIT=false, contentType=application/json, timestamp=1596528364026}, body=[123, 34, 105, 100, 34, 58, 49, 53, 57, 54, 53, 50, 56, 51, 54, 51, 125], transactionId='null'}]]

和 #testSyncSend() 方法执行的结果,是一致的。此时,我们打开 #testSyncSend() 方法所在的控制台,不会看到有新的消息消费日志。说明,符合集群消费的机制:集群消费模式下,相同 Consumer Group 的每个 Consumer 实例平均分摊消息。而广播模式是每个 Consumer 实例都消费消息。
不过如上的日志,也可能出现在 #testSyncSend() 方法所在的控制台,而不在 #testASyncSend() 方法所在的控制台,但只有一个地方消费。

2.7.4 #testOnewaySend()方法

执行后控制台输出发送消息,日志输出发送编号:[1596528903]。

2020-08-04 16:15:03.479  INFO 9364 --- [           main] c.e.s.r.producer.Demo01ProducerTest      : [testOnewaySend][发送编号:[1596528903] 发送完成]

消费消息在第一个控制台输出:

被集群中的其他消费者消费

三、@RocketMQMessageListener

在 [ Demo01Consumer] 中,我们已经使用了 @RocketMQMessageListener 注解,设置每个 RocketMQ 消费者 Consumer 的消息监听器的配置。

@RocketMQMessageListener 注解的常用属性如下:

/**
 * Consumer 所属消费者分组
 *
 * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
 * load balance. It's required and needs to be globally unique.
 *
 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
 */String consumerGroup();/**
 * 消费的 Topic
 *
 * Topic name.
 */String topic();/**
 * 选择器类型。默认基于 Message 的 Tag 选择。
 *
 * Control how to selector message.
 *
 * @see SelectorType
 */SelectorType selectorType() default SelectorType.TAG;/**
 * 选择器的表达式。
 * 设置为 * 时,表示全部。
 *
 * 如果使用 SelectorType.TAG 类型,则设置消费 Message 的具体 Tag 。
 * 如果使用 SelectorType.SQL92 类型,可见 https://rocketmq.apache.org/rocketmq/filter-messages-by-sql92-in-rocketmq/ 文档
 *
 * Control which message can be select. Grammar please see {@link SelectorType#TAG} and {@link SelectorType#SQL92}
 */String selectorExpression() default "*";/**
 * 消费模式。可选择并发消费,还是顺序消费。
 *
 * Control consume mode, you can choice receive message concurrently or orderly.
 */ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;/**
 * 消息模型。可选择是集群消费,还是广播消费。
 *
 * Control message mode, if you want all subscribers receive message all message, broadcasting is a good choice.
 */MessageModel messageModel() default MessageModel.CLUSTERING;/**
 * 消费的线程池的最大线程数
 *
 * Max consumer thread number.
 */int consumeThreadMax() default 64;/**
 * 消费单条消息的超时时间
 *
 * Max consumer timeout, default 30s.
 */long consumeTimeout() default 30000L;

@RocketMQMessageListener 注解的不常用属性如下:

// 默认从配置文件读取的占位符String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";/**
 * The property of "access-key".
 */
 String accessKey() default ACCESS_KEY_PLACEHOLDER;
 /**
 * The property of "secret-key".
 */String secretKey() default SECRET_KEY_PLACEHOLDER;/**
 * Switch flag instance for message trace.
 */boolean enableMsgTrace() default true;/**
 * The name value of message trace topic.If you don't config,you can use the default trace topic name.
 */String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;/**
 * Consumer 连接的 RocketMQ Namesrv 地址。默认情况下,使用 `rocketmq.name-server` 配置项即可。
 *
 * 如果一个项目中,Consumer 需要使用不同的 RocketMQ Namesrv ,则需要配置该属性。
 *
 * The property of "name-server".
 */String nameServer() default NAME_SERVER_PLACEHOLDER;/**
 * 访问通道。目前有 LOCAL 和 CLOUD 两种通道。
 *
 * LOCAL ,指的是本地部署的 RocketMQ 开源项目。
 * CLOUD ,指的是阿里云的 ONS 服务。具体可见 https://help.aliyun.com/document_detail/128585.html 文档。
 *
 * The property of "access-channel".
 */String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;

四、 @ExtRocketMQTemplateConfiguration

RocketMQ-Spring 考虑到开发者可能需要连接多个不同的 RocketMQ 集群,所以提供了 @ExtRocketMQTemplateConfiguration 注解,实现配置连接不同 RocketMQ 集群的 Producer 的 RocketMQTemplate Bean 对象。

@ExtRocketMQTemplateConfiguration 注解的具体属性,和我们在 [ 应用配置文件」]的 rocketmq.producer 配置项是一致的,就不重复赘述啦。

@ExtRocketMQTemplateConfiguration 注解的简单使用示例,代码如下:

@ExtRocketMQTemplateConfiguration(nameServer = "${demo.rocketmq.extNameServer:demo.rocketmq.name-server}")public class ExtRocketMQTemplate extends RocketMQTemplate {}

在类上,添加 @ExtRocketMQTemplateConfiguration 注解,并设置连接的 RocketMQ Namesrv 地址。
同时,需要继承 RocketMQTemplate 类,从而使我们可以直接使用 @Autowire 或 @Resource 注解,注入 RocketMQTemplate Bean 属性。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码

发表评论

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen: