《RocketMQ源码分析之普通消息发送.docx》由会员分享,可在线阅读,更多相关《RocketMQ源码分析之普通消息发送.docx(42页珍藏版)》请在第一文库网上搜索。
1、RocketMQ源码分析之普通消息发送前言本篇文章来分析producer发送消息的流程,包括Message对象属性详解、PrOdUCer启动流程、消息发送流程、ProdUCer获取路由信息、ProdUCer选择MessageQueue机制以及producer端消息发送重试机制O在分析producer获取路由信息时会详细分析TBW1O2的作用。一、MeSSage对象在RocketMQ的客户端将消息封装为Message对象,其主要属性及其说明具体如下:字段名说明topic必填,topic的名称f1ag选填,由应用来设置,ROCkeIMQ不做干预字段名说明propert选填,存储了MeSSage其余
2、各项参数,比如tag、key等关键的消息属性。RocketMQies预定义了一组内置属性,除了内置属性之外,还可以设置任意自定义属性。body必填,消息体transac事务消息相关tion1dkeys选填,代表消息的业务关键词tags选填,消息标签,方便服务器过滤使用,目前只支持每条消息设置一个tag二、DefaU1tMQProdUCer启动流程1. Defau1tMQProducer启动流程图Defau1tMQProducer的启动流程图如下图所示,其对应的代码是Defau1tMQProdcerImp1.java中的Start(fina1boo1eanStartFaCtOry)方法。在Def
3、au1tMQProducer的启动过程有两个重要部分需要了解:(1) MQC1ientInStanCe的启动过程(2)为什么要在topicPub1ishInfoTab1e中添加topic为TBW102的信息?这个问题将在producer获取路由信息部分详细解释。2. MQC1ientInstance启动MQC1ientInstance启动的过程如下,其对应的是MQC1ientInstance.java的StartO方法。ub1icvoidstart()throwsMQC1ientExceptionsynchronized(this)ISWitCh(this.SerViCeState)Ithis
4、.ServiceState=SerViCfif(nu11二二this.c1ientCongetNamesrvddrthis.mQC1ientAPmp1.,etchNameServerAddr();Ithis.StartSChedUTZTZZi757Ithis.PU11MeSSageSerVice.Start1IIhis.reba1anceService.Start()Ithis.defau1tMQProducer.getDefIaU1tMQPrOdUCer1mPI.().Start(faI1og,info(theC1ientfacto?Ithis.ServiceState=Servicecas
5、eSTART_FAI1ED:thrownewMQC1ientEXCePtiOn1(TheFaCtoryobject+this.getC1ient1d()+”hasbeenCaeatedbefore,andfai1ed,tt,nU其执行过程可以使用下图来描述:在MQC1ientInstance的启动过程中需要关注以下问题:(1)客户端采用的通信模型是ROCketMQ自定义的通信协议并在Netty的基础之上扩展的通信模块,其具体实现是NettyRemotingC1ient(2) Pu11MessageService与Reba1anceService都是与consumer相关,其中Reba1ance
6、Service负责consumer端负载均衡以及将重新分配的MessageQueue构建Pu11Request请求并将其放到Pu11MessageService服务中的Pu11RequestQueue队列中,Pu11MessageService负责consumer端消息拉取3. producerGroup限制条件总结(1) producerGroup不能为nu11(2) ProdUCerGroUP不能为DEFAU1T_PRODUCER、C1IENTNNER.PRODUCER(3) producerGroup的长度不能超过255(4) PrOdUCerGroUP可以包含外、字母大小写、数字、-、
7、三、消息发送流程在ROCketMQ中消息发送分为三种类型分别是同步、异步和单向,这三种发送方式的基本步骤是相同的,可以概括为以下步骤:(1)检查topic及发送的消息(2)获取topic路由信息(3)选择MeSSageQUeUe(4)按照发送消息的类型选择对应的方式发送消息根据发送过程中的第一步可以总结下topic以及消息的限制条件,具体如下:topic的限制条件:(1) topic不能为空(2) topic的名称可以包含乐字母大小写、数字、-、(3) topic名称长度不能超过127(4) topic名称不能是TBWIO2、SCHEDU1EjrOPIC_XXXX、BenchmarkTest.
8、RMe1SYS_TRANSj1A1FJTOPIC、RMC1SYSjrRACEjrOPIC、RMc1SYSJrRANSJ)P_HA1FJroP1C、TRANS_CHECK_MAX_TIME_TOPIC、SE1FJrESTJTOPIC、OFFSETMOVED_EVENT消息的限制条件:(1)消息不能为nu11(2)消息的body字段不能为nu11或者消息的body字段超度不能为0(3)消息的body字段长度不能超过HiaxMessageSize,默认是4M,客户端可以通过SetMaxMessageSize方法来进行修改四、producer获取路由信息在发送消息前producer会从namesrv来
9、获取目标topic的路由信息,其具体实现是在tryToFindTopicPub1ishInfo(fina1Stringtopic)方法中,具体如下:riva1。TopicPub1ishInfoIryToFindToPiCPUbIiShInfO(fina1S1ringtopic)TopicPub1ish1nfotopicPub1ish1nfo=this,topiePub1IshInfoTab1e.get(topic);this.topicPubIiSh1nfoTab1_e.PUt1fAbSent(topic,newTopicPub1ish1nIhis.HiQC1ientFactory.UPdat
10、eTOPiCRoUte1nfOFrOnINamCServer(IoPiC);|if(topicPub1ishInfo.isHaveTopicRouterInfo()topicPub1ishInfo.okreturntopicPub1ish1nfo;this.mQCIientFactory.UPdateTOPiCRoUteInfoFrOmNameSerVer(topic,tue,this.defaU1tMQProdUCer);topicPub1ishInfo二this.topicPub1ish1nfoTab1e.get(topic)returntopiCPUbIishInfo;上面方法中从nam
11、esrv更新topic路由信息的方法调用的是同一个函数UpdateTopicRouteInfoFromNameServer(fina1Stringtopic,boo1eanisDefau1t,Defau1tMQProducerdefau1tMQProducer),其区别是在调用时传递的参数是不一样的,在第一次调用是传递的参数是(topic,fa1se,nu11)第二次调用时传递的是(topic,true,this.defau1tMQProducer)接下来详细分析下该方法:1ProdUCer在第一次发送消息时如果topicPub1iShInfoTab1e中没有目标topic的信息则会执行get
12、TopicRoute1nfoFromNameServer(topic,1000*3)来获取目标topic的路由信息,请求类型是RequestCode.GET_ROUTEINFO_BY_TOPIC2 .如果步骤1中获取的topicRouteData不为空,会先从PrOCIUCer端的缓存topicRouteTab1e中获取目标topic旧的路由信息,然后执行topiCRouteDataIsChange方法来判断topic的路由信息是否发生变化,如果发生变化则更新brokerAddrTab1etopicPub1ish1nfoTab1e和topicRouteTab1e3 .tryToFindTopi
13、cPub1ishInfo方法中如果在更新topic路由信息后IopicPub1ishInfo中还没有目标topic的路由信息并且BiessageQueue1ist为空则在执行UPdateTOPiCRoUteInfOFromNanIeSerVer方法从namesrv获取路由信息时获取“TBW102topic的路由信息(前提是broker端autoCreateTopicEnab1e配置项的值为true),然后与本地缓存的路由信息进行对比是否发生变化,如果发生了变化则以该路由信息来更新brokerAddrTab1e、topicPub1ish1nfoTab1etopicRouteTab1e这里有一点需
14、要注意,在从namesrv获取“TBW102”的路由信息后会对其queueDatas进行遍历,然后以defau1tTopiCQueueNums和readQueueNums中较小的队列数更新其读写队列数pub1icboo1eanUPdateTOPiCRoUte1nfoFromNameServer(fina1Stringtopic,boo1eanisDefau11,ITOPiCRoUteDataIIopicRouteData二1his.mQC1ientAPmp1.getDcfau11TopicRouteIInfoFromNameServer(defau1tMQProducer.getCreateT
15、opicKeyOIif(topiCRoUteData!二n?TIfor(QueueDatadata:topicRouteData.getQueueDatas()IintqueueNums二Math,min(defau1tMQProducer.getDefau1tTopIiCQUeUeNUmS(),data.getReadQueueNums();Idata.SetReadQUeUeNUn1S(queueNums);/producerItopicRouteData-this.mQC1ientAPIImp1.getTopicRouteInfoFromif(topiCRoUteData!=nu1DTOPiCROUteDataO1d=this.topicRouteTab1e.get(topic)boo1eanchanged=tOpicRouteDataIsChange(o1d,topicRouteDateChanged二this.isNeedUpdateTopicRouteInfo(topic)1og,info(1heIoPiCrouteinfocha