- A+
延时消息即消息发送后并不立即对消费者可见,而是在用户指定的时间投递给消费者。比如我们现在发送一条延时30秒的消息,消息发送后立即发送给服务器,但是服务器在30秒后才将该消息交给消费者。
RocketMQ通过配置的延迟级别延迟消息投递到消费者,其中不同的延迟级别对应不同的延迟时间,可配置,默认的延迟级别有18种,分别是1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,支持时间单位 s 秒 m分钟 h小时 d天。
源码 MessageStoreConfig.java 是定义如下:
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
可以在brocker配置 messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,自定义其时间级别。
1、代码验证
前提:先启动消费者等待消息的发送,先发送消息,消费者启动需要时间,影响测试结果。
1.1、生产者Producer
public class DelayProducer { public static void main(String[] args) throws MQClientException, InterruptedException { DefaultMQProducer producer = new DefaultMQProducer("producer_test"); producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); producer.start(); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); for (int i = 0; i < 10; i++) { try { //构建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("延迟消息" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //延时的级别为3 对应的时间为10s 就是发送后延时10S在把消息投递出去 msg.setDelayTimeLevel(3); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sd.format(new Date())+" == "+sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } }
查看结果:
1.2、消费者Consumer
public class DelayConsumer { public static void main(String[] args) { try { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setConsumerGroup("consumer_delay"); consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876"); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently(){ @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList, ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) { try { for(MessageExt msg : paramList){ String msgbody = new String(msg.getBody(), "utf-8"); SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss"); System.out.println("接收时间 : "+ sd.format(new Date()) +" == MessageBody: "+ msgbody);//输出消息内容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功 } }); consumer.start(); System.out.println("DelayConsumer===启动成功!"); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
查看结果:
2、内部机制分析
查看其消息投递的核心方法org.apache.rocketmq.store.CommitLog.putMessage
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { //设置消息存储到文件中的时间 msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // Delay Delivery消息的延迟级别是否大于0 if (msg.getDelayTimeLevel() > 0) { //如果消息的延迟级别大于最大的延迟级别则置为最大延迟级别 if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } //将消息主题设置为SCHEDULE_TOPIC_XXXX topic = ScheduleMessageService.SCHEDULE_TOPIC; //将消息队列设置为延迟的消息队列的ID queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId //消息的原有的主题和消息队列存入属性中 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long eclipseTimeInLock = 0; MappedFile unlockMappedFile = null; //获取最后一个消息的映射文件,mappedFileQueue可看作是CommitLog文件夹下的一个个文件的映射 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); //写入消息之前先申请putMessageLock,也就是保证消息写入CommitLog文件中串行的 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global //设置消息的存储时间 msg.setStoreTimestamp(beginLockTimestamp); //mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件 //mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件 if (null == mappedFile || mappedFile.isFull()) { //里面的参数0代表偏移量 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } //mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够 if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } //mappedFile文件后面追加消息 result = mappedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } eclipseTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { //释放锁 putMessageLock.unlock(); } if (eclipseTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", eclipseTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); //消息刷盘 handleDiskFlush(result, putMessageResult, msg); //主从数据同步复制 handleHA(result, putMessageResult, msg); return putMessageResult; }
我们发现在通过putMessage 延迟消息就被放存放到了主题为 SCHEDULE_TOPIC_XXXX的commitlog中,消息的原有的主题和消息队列存入属性中,后面再通过定时的方式对这这些消息进行重新发送。
ScheduleMessageService.start()启动会为每一个延迟队列创建一个调度任务每一个调度任务对应SCHEDULE_TOPIC_XXXX主题下的一个消息消费队列。
public void start() { for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) { Integer level = entry.getKey(); Long timeDelay = entry.getValue(); Long offset = this.offsetTable.get(level); if (null == offset) { offset = 0L; } if (timeDelay != null) { this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME); } } this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { ScheduleMessageService.this.persist(); } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
定时任务的实现类DeliverDelayedMessageTimerTask,核心方法是executeOnTimeup
public void executeOnTimeup() { //根据延迟级别获取该延迟队列信息 ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel)); long failScheduleOffset = offset; //未找到说明目前没有该延迟级别的消息,忽略本次任务 if (cq != null) { //根据offset获取队列中获取当前队列中有效的消息, SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset); if (bufferCQ != null) { try { long nextOffset = offset; int i = 0; ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); //遍历ConsumeQueue,每一个ConsumeQueue条目是20个字节解析消息 for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { //物理偏移量 long offsetPy = bufferCQ.getByteBuffer().getLong(); //消息长度 int sizePy = bufferCQ.getByteBuffer().getInt(); //消息的tag的Hash值 long tagsCode = bufferCQ.getByteBuffer().getLong(); // if (cq.isExtAddr(tagsCode)) { if (cq.getExt(tagsCode, cqExtUnit)) { tagsCode = cqExtUnit.getTagsCode(); } else { //can't find ext content.So re compute tags code. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}", tagsCode, offsetPy, sizePy); long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy); tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime); } } long now = System.currentTimeMillis(); long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode); nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long countdown = deliverTimestamp - now; if (countdown <= 0) { //根据物理偏移量和消息的大小从Commitlog文件中查找消息 MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset( offsetPy, sizePy); if (msgExt != null) { try { MessageExtBrokerInner msgInner = this.messageTimeup(msgExt); //消息存储到Commitlog文件中,转发到主题对应的消息队列上,供消费者再次消费。 PutMessageResult putMessageResult = ScheduleMessageService.this.defaultMessageStore .putMessage(msgInner); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { continue; } else { // XXX: warn and notify me log.error( "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}", msgExt.getTopic(), msgExt.getMsgId()); ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), DELAY_FOR_A_PERIOD); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } catch (Exception e) { log.error( "ScheduleMessageService, messageTimeup execute error, drop it. msgExt=" + msgExt + ", nextOffset=" + nextOffset + ",offsetPy=" + offsetPy + ",sizePy=" + sizePy, e); } } } else { ScheduleMessageService.this.timer.schedule( new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset), countdown); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } } // end of for nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask( this.delayLevel, nextOffset), DELAY_FOR_A_WHILE); ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset); return; } finally { bufferCQ.release(); } }else { //未找到有效的消息,更新延迟队列定时拉取进度,并创建定时任务带下一次继续尝试 long cqMinOffset = cq.getMinOffsetInQueue(); if (offset < cqMinOffset) { failScheduleOffset = cqMinOffset; log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset=" + cqMinOffset + ", queueId=" + cq.getQueueId()); } } } //创建延迟任务 ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel, failScheduleOffset), DELAY_FOR_A_WHILE); }
图解:
1、消息生产者发送消息,如果发送的消息DelayTimeLevel大于0,则改变消息主题为SCHEDULE_TOPIC_XXXX,消息的队列为DelayTimeLevel-1
2、消息经由Commitlog转发到消息队列SCHEDULE_TOPIC_XXXX的消费队列1。
3、定时任务Timer每隔1秒根据上次拉取消息的偏移量从消费队列中取出所有消息。
4、根据消息的物理偏移量和消息大小从Commitlog中拉取消息。(PS:消息存储章节中会重点讲解)
5、根据消息的属性重新创建消息,并恢复原主题TopicTest、原消息队列ID,清除DelayTimeLevel属性存入Commitlog中。
6、记录原主题TopicTest的消息队列的消息偏移量,供消费者索引检索消息进行消费。