RocketMQ源码分析之consumer并发消费信息分析.docx
《RocketMQ源码分析之consumer并发消费信息分析.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之consumer并发消费信息分析.docx(22页珍藏版)》请在第一文库网上搜索。
1、RocketMQ源码分析之consumer并发消费信息在RocketMQ源码分析之消息拉取流程文章最后留下了一个问题:consumer端在接收到消息后是如何消费信息呢?本篇文章就来回答这个问题。在RocketMQ中ConsumeMessageService是负责消息消费的,它其实是一个接口,实现该接口的是ConsumeMessageConcurrent1yService和ConsunieMessageOrder1yService,这两个服务分别对应一个消费模式,ConsumeMessageconcurrent1yService是并发消费,ConsumeMessageOrder1yService
2、是顺序消费。在RocketMQ源码分析之消息拉取流程中COnSUmer在收到broker返回的response后会执行回调pu11Ca11back,在回调函数中会将拉取到的消息放入ProcessQueue中,然后再将消息提交到ConsumeMessageQueue,我们就从这里开始分析,即submitConsumeRequest方法。IDefaUi,tMQPushConsumerImp1.this.ConSUmeMeSSageSerVice.SUbmitConSUmeReq1PU1IReSUIt.getMsgFound1ist(),PrOCeSSQUeUe,pu11Request.getMes
3、sageQueue(),SubmitConsumeRequest方法主要完成的提交信息消费请求,其实现逻辑如下:获取ConSUnIeBatChSize,它表示一次消费任务COnSUnIeReqUeSt中包含的消息条数,默认值是1。msgs,size()最大是32,如果msgs,size()小于ConsumeBatchSize则直接由msgs、processQueue和messageQueue构建ConsumeRequest,并将ConSUnIeReqUeSt提交到ConSUnIeEXeCUtOr(消费者线程池),如果在提交的过程中出现RejectedExecutionException异常则延
4、迟5秒再提交。如果InSgS.size()大于COnSU1neBatChSiZe则将msgs进行拆分,创建多个COnSUmeReqUeSt并进行提交,每个COnSUmeReqUeSt中包含ConsumeBatchSize条消息。IUb1icVOidSUbmitConsumeRequestfina11iStfina1ProCeSSQUeUePrOCeSSQUeUe,fina1MeSSageQUeUemessageQueue,fina1boo1eandiSpatchToConsume)tConsumeMessageBatchMac;八.if(msgs,size()二ConsumeBatchSize
5、)COnSUmeReqUeStCOnSUmeReqUeSt二newCOnSUmeReqUeSIhis.ConsumeExecu1or.SUbnIi1(COnSUnIeRCqUeSI);catch(RejectedExecutiOnExceptione)this,submiIConsumeRequest1ater(consumeRequeste1sefor(inttota1=0;tota1msgs,size();)1iStmsgThis=newArray1iStfor(inti=0;iConsumeBatchSize;i+,tota1+)if(tota1msgs,size。)msgThis.ad
6、d(msgs,get(tota1);e1sebreak;newConsuCOnSUn1eReqUeStCOnSUn1eReqUeStheRequest(msgThis,PrOCeSSQUeue,messageQueue);Ithis.consumeExecutor.SUbmit(COnSUmeRe1ICatCh(RejeCtedEXeCUtiOnEXCePtiOne),Ifor(;tota1msgs,size。;totaImsgThis.add(msgs,get(tota1);this.SUbmitCOnSUnIeReqUeSt1ater(COnSUm提交完ConSUmeReqUeSt后就是消
7、息消费了,具体是执行ConSUmeReqUeSt中的run方法,消费的具体逻辑是:1判断processQueue允许被消费,具体是检查其dropped属性,如果为true则表示不能被消费2 .初始化Message1istenerxConsumeconcurrent1yContext(consumer并发消费上下文)和Consumeconcurrent1yStatus(consumer并发消费状态,其状态分为两种:CONSUME_SUCCESS和RECONSUMEJATER)3 .执行KesetRetryAndNamespace方法,如果消息来自延迟队列则设置其topic为%RETRTOPIC%
- 配套讲稿:
如PPT文件的首页显示word图标,表示该PPT已包含配套word讲稿。双击word图标可打开word文档。
- 特殊限制:
部分文档作品中含有的国旗、国徽等图片,仅作为作品整体效果示例展示,禁止商用。设计者仅对作品中独创性部分享有著作权。
- 关 键 词:
- RocketMQ 源码 分析 consumer 并发 消费 信息
