RocketMQ使用教程相關(guān)系列 目錄 架構(gòu)設(shè)計(jì)1 技術(shù)架構(gòu)RocketMQ架構(gòu)上主要分為四部分,如上圖所示:
2 部署架構(gòu)RocketMQ 網(wǎng)絡(luò)部署特點(diǎn)
結(jié)合部署架構(gòu)圖,描述集群工作流程:
設(shè)計(jì)原理1 消息存儲(chǔ)消息存儲(chǔ)是RocketMQ中最為復(fù)雜和最為重要的一部分,本節(jié)將分別從RocketMQ的消息存儲(chǔ)整體架構(gòu)、PageCache與Mmap內(nèi)存映射以及RocketMQ中兩種不同的刷盤(pán)方式三方面來(lái)分別展開(kāi)敘述。 1.1 消息存儲(chǔ)整體架構(gòu)消息存儲(chǔ)架構(gòu)圖中主要有下面三個(gè)跟消息存儲(chǔ)相關(guān)的文件構(gòu)成。 (1) CommitLog:消息主體以及元數(shù)據(jù)的存儲(chǔ)主體,存儲(chǔ)Producer端寫(xiě)入的消息主體內(nèi)容,消息內(nèi)容不是定長(zhǎng)的。單個(gè)文件大小默認(rèn)1G ,文件名長(zhǎng)度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫(xiě)滿了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類(lèi)推。消息主要是順序?qū)懭肴罩疚募?#xff0c;當(dāng)文件滿了,寫(xiě)入下一個(gè)文件; (2) ConsumeQueue:消息消費(fèi)隊(duì)列,引入的目的主要是提高消息消費(fèi)的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費(fèi)是針對(duì)主題進(jìn)行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來(lái)查找待消費(fèi)的消息。其中,ConsumeQueue(邏輯消費(fèi)隊(duì)列)作為消費(fèi)消息的索引,保存了指定Topic下的隊(duì)列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結(jié)構(gòu),具體存儲(chǔ)路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長(zhǎng)設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長(zhǎng)度、8字節(jié)tag hashcode,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問(wèn)每一個(gè)條目,每個(gè)ConsumeQueue文件大小約5.72M; (3) IndexFile:IndexFile(索引文件)提供了一種可以通過(guò)key或時(shí)間區(qū)間來(lái)查詢(xún)消息的方法。Index文件的存儲(chǔ)位置是:$HOME \store\index${fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,固定的單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實(shí)現(xiàn)為hash索引。 在上面的RocketMQ的消息存儲(chǔ)整體架構(gòu)圖中可以看出,RocketMQ采用的是混合型的存儲(chǔ)結(jié)構(gòu),即為Broker單個(gè)實(shí)例下所有的隊(duì)列共用一個(gè)日志數(shù)據(jù)文件(即為CommitLog)來(lái)存儲(chǔ)。RocketMQ的混合型存儲(chǔ)結(jié)構(gòu)(多個(gè)Topic的消息實(shí)體內(nèi)容都存儲(chǔ)于一個(gè)CommitLog中)針對(duì)Producer和Consumer分別采用了數(shù)據(jù)和索引部分相分離的存儲(chǔ)結(jié)構(gòu),Producer發(fā)送消息至Broker端,然后Broker端使用同步或者異步的方式對(duì)消息刷盤(pán)持久化,保存至CommitLog中。只要消息被刷盤(pán)持久化至磁盤(pán)文件CommitLog中,那么Producer發(fā)送的消息就不會(huì)丟失。正因?yàn)槿绱?#xff0c;Consumer也就肯定有機(jī)會(huì)去消費(fèi)這條消息。當(dāng)無(wú)法拉取到消息后,可以等下一次消息拉取,同時(shí)服務(wù)端也支持長(zhǎng)輪詢(xún)模式,如果一個(gè)消息拉取請(qǐng)求未拉取到消息,Broker允許等待30s的時(shí)間,只要這段時(shí)間內(nèi)有新消息到達(dá),將直接返回給消費(fèi)端。這里,RocketMQ的具體做法是,使用Broker端的后臺(tái)服務(wù)線程—ReputMessageService不停地分發(fā)請(qǐng)求并異步構(gòu)建ConsumeQueue(邏輯消費(fèi)隊(duì)列)和IndexFile(索引文件)數(shù)據(jù)。 1.2 頁(yè)緩存與內(nèi)存映射頁(yè)緩存(PageCache)是OS對(duì)文件的緩存,用于加速對(duì)文件的讀寫(xiě)。一般來(lái)說(shuō),程序?qū)ξ募M(jìn)行順序讀寫(xiě)的速度幾乎接近于內(nèi)存的讀寫(xiě)速度,主要原因就是由于OS使用PageCache機(jī)制對(duì)讀寫(xiě)訪問(wèn)操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache。對(duì)于數(shù)據(jù)的寫(xiě)入,OS會(huì)先寫(xiě)入至Cache內(nèi),隨后通過(guò)異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤(pán)至物理磁盤(pán)上。對(duì)于數(shù)據(jù)的讀取,如果一次讀取文件時(shí)出現(xiàn)未命中PageCache的情況,OS從物理磁盤(pán)上訪問(wèn)讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取。 在RocketMQ中,ConsumeQueue邏輯消費(fèi)隊(duì)列存儲(chǔ)的數(shù)據(jù)較少,并且是順序讀取,在page cache機(jī)制的預(yù)讀取作用下,Consume Queue文件的讀性能幾乎接近讀內(nèi)存,即使在有消息堆積情況下也不會(huì)影響性能。而對(duì)于CommitLog消息存儲(chǔ)的日志數(shù)據(jù)文件來(lái)說(shuō),讀取消息內(nèi)容時(shí)候會(huì)產(chǎn)生較多的隨機(jī)訪問(wèn)讀取,嚴(yán)重影響性能。如果選擇合適的系統(tǒng)IO調(diào)度算法,比如設(shè)置調(diào)度算法為“Deadline”(此時(shí)塊存儲(chǔ)采用SSD的話),隨機(jī)讀的性能也會(huì)有所提升。 另外,RocketMQ主要通過(guò)MappedByteBuffer對(duì)文件進(jìn)行讀寫(xiě)操作。其中,利用了NIO中的FileChannel模型將磁盤(pán)上的物理文件直接映射到用戶(hù)態(tài)的內(nèi)存地址中(這種Mmap的方式減少了傳統(tǒng)IO將磁盤(pán)文件數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間的緩沖區(qū)和用戶(hù)應(yīng)用程序地址空間的緩沖區(qū)之間來(lái)回進(jìn)行拷貝的性能開(kāi)銷(xiāo)),將對(duì)文件的操作轉(zhuǎn)化為直接對(duì)內(nèi)存地址進(jìn)行操作,從而極大地提高了文件的讀寫(xiě)效率(正因?yàn)樾枰褂脙?nèi)存映射機(jī)制,故RocketMQ的文件存儲(chǔ)都使用定長(zhǎng)結(jié)構(gòu)來(lái)存儲(chǔ),方便一次將整個(gè)文件映射至內(nèi)存)。 1.3 消息刷盤(pán)(1) 同步刷盤(pán):如上圖所示,只有在消息真正持久化至磁盤(pán)后RocketMQ的Broker端才會(huì)真正返回給Producer端一個(gè)成功的ACK響應(yīng)。同步刷盤(pán)對(duì)MQ消息可靠性來(lái)說(shuō)是一種不錯(cuò)的保障,但是性能上會(huì)有較大影響,一般適用于金融業(yè)務(wù)應(yīng)用該模式較多。 (2) 異步刷盤(pán):能夠充分利用OS的PageCache的優(yōu)勢(shì),只要消息寫(xiě)入PageCache即可將成功的ACK返回給Producer端。消息刷盤(pán)采用后臺(tái)異步線程提交的方式進(jìn)行,降低了讀寫(xiě)延遲,提高了MQ的性能和吞吐量。 2 通信機(jī)制RocketMQ消息隊(duì)列集群主要包括NameServe、Broker(Master/Slave)、Producer、Consumer4個(gè)角色,基本通訊流程如下: (1) Broker啟動(dòng)后需要完成一次將自己注冊(cè)至NameServer的操作;隨后每隔30s時(shí)間定時(shí)向NameServer上報(bào)Topic路由信息。 (2) 消息生產(chǎn)者Producer作為客戶(hù)端發(fā)送消息時(shí)候,需要根據(jù)消息的Topic從本地緩存的TopicPublishInfoTable獲取路由信息。如果沒(méi)有則更新路由信息會(huì)從NameServer上重新拉取,同時(shí)Producer會(huì)默認(rèn)每隔30s向NameServer拉取一次路由信息。 (3) 消息生產(chǎn)者Producer根據(jù)2)中獲取的路由信息選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行消息發(fā)送;Broker作為消息的接收者接收消息并落盤(pán)存儲(chǔ)。 (4) 消息消費(fèi)者Consumer根據(jù)2)中獲取的路由信息,并再完成客戶(hù)端的負(fù)載均衡后,選擇其中的某一個(gè)或者某幾個(gè)消息隊(duì)列來(lái)拉取消息并進(jìn)行消費(fèi)。 從上面1)~3)中可以看出在消息生產(chǎn)者, Broker和NameServer之間都會(huì)發(fā)生通信(這里只說(shuō)了MQ的部分通信),因此如何設(shè)計(jì)一個(gè)良好的網(wǎng)絡(luò)通信模塊在MQ中至關(guān)重要,它將決定RocketMQ集群整體的消息傳輸能力與最終的性能。 rocketmq-remoting 模塊是 RocketMQ消息隊(duì)列中負(fù)責(zé)網(wǎng)絡(luò)通信的模塊,它幾乎被其他所有需要網(wǎng)絡(luò)通信的模塊(諸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依賴(lài)和引用。為了實(shí)現(xiàn)客戶(hù)端與服務(wù)器之間高效的數(shù)據(jù)請(qǐng)求與接收,RocketMQ消息隊(duì)列自定義了通信協(xié)議并在Netty的基礎(chǔ)之上擴(kuò)展了通信模塊。 2.1 Remoting通信類(lèi)結(jié)構(gòu)2.2 協(xié)議設(shè)計(jì)與編解碼在Client和Server之間完成一次消息發(fā)送時(shí),需要對(duì)發(fā)送的消息進(jìn)行一個(gè)協(xié)議約定,因此就有必要自定義RocketMQ的消息協(xié)議。同時(shí),為了高效地在網(wǎng)絡(luò)中傳輸消息和對(duì)收到的消息讀取,就需要對(duì)消息進(jìn)行編解碼。在RocketMQ中,RemotingCommand這個(gè)類(lèi)在消息傳輸過(guò)程中對(duì)所有數(shù)據(jù)內(nèi)容的封裝,不但包含了所有的數(shù)據(jù)結(jié)構(gòu),還包含了編碼解碼操作。
可見(jiàn)傳輸內(nèi)容主要可以分為以下4部分: (1) 消息長(zhǎng)度:總長(zhǎng)度,四個(gè)字節(jié)存儲(chǔ),占用一個(gè)int類(lèi)型; (2) 序列化類(lèi)型&消息頭長(zhǎng)度:同樣占用一個(gè)int類(lèi)型,第一個(gè)字節(jié)表示序列化類(lèi)型,后面三個(gè)字節(jié)表示消息頭長(zhǎng)度; (3) 消息頭數(shù)據(jù):經(jīng)過(guò)序列化后的消息頭數(shù)據(jù); (4) 消息主體數(shù)據(jù):消息主體的二進(jìn)制字節(jié)數(shù)據(jù)內(nèi)容; 2.3 消息的通信方式和流程在RocketMQ消息隊(duì)列中支持通信的方式主要有同步(sync)、異步(async)、單向(oneway) 2.4 Reactor多線程設(shè)計(jì)RocketMQ的RPC通信采用Netty組件作為底層通信庫(kù),同樣也遵循了Reactor多線程模型,同時(shí)又在這之上做了一些擴(kuò)展和優(yōu)化。 上面的框圖中可以大致了解RocketMQ中NettyRemotingServer的Reactor 多線程模型。一個(gè) Reactor 主線程(eventLoopGroupBoss,即為上面的1)負(fù)責(zé)監(jiān)聽(tīng) TCP網(wǎng)絡(luò)連接請(qǐng)求,建立好連接,創(chuàng)建SocketChannel,并注冊(cè)到selector上。RocketMQ的源碼中會(huì)自動(dòng)根據(jù)OS的類(lèi)型選擇NIO和Epoll,也可以通過(guò)參數(shù)配置),然后監(jiān)聽(tīng)真正的網(wǎng)絡(luò)數(shù)據(jù)。拿到網(wǎng)絡(luò)數(shù)據(jù)后,再丟給Worker線程池(eventLoopGroupSelector,即為上面的“N”,源碼中默認(rèn)設(shè)置為3),在真正執(zhí)行業(yè)務(wù)邏輯之前需要進(jìn)行SSL驗(yàn)證、編解碼、空閑檢查、網(wǎng)絡(luò)連接管理,這些工作交給defaultEventExecutorGroup(即為上面的“M1”,源碼中默認(rèn)設(shè)置為8)去做。而處理業(yè)務(wù)操作放在業(yè)務(wù)線程池中執(zhí)行,根據(jù) RomotingCommand 的業(yè)務(wù)請(qǐng)求碼code去processorTable這個(gè)本地緩存變量中找到對(duì)應(yīng)的 processor,然后封裝成task任務(wù)后,提交給對(duì)應(yīng)的業(yè)務(wù)processor處理線程池來(lái)執(zhí)行(sendMessageExecutor,以發(fā)送消息為例,即為上面的 “M2”)。從入口到業(yè)務(wù)邏輯的幾個(gè)步驟中線程池一直再增加,這跟每一步邏輯復(fù)雜性相關(guān),越復(fù)雜,需要的并發(fā)通道越寬。
3 消息過(guò)濾RocketMQ分布式消息隊(duì)列的消息過(guò)濾方式有別于其它MQ中間件,是在Consumer端訂閱消息時(shí)再做消息過(guò)濾的。RocketMQ這么做是還是在于其Producer端寫(xiě)入消息和Consomer端訂閱消息采用分離存儲(chǔ)的機(jī)制來(lái)實(shí)現(xiàn)的,Consumer端訂閱消息是需要通過(guò)ConsumeQueue這個(gè)消息消費(fèi)的邏輯隊(duì)列拿到一個(gè)索引,然后再?gòu)腃ommitLog里面讀取真正的消息實(shí)體內(nèi)容,所以說(shuō)到底也是還繞不開(kāi)其存儲(chǔ)結(jié)構(gòu)。其ConsumeQueue的存儲(chǔ)結(jié)構(gòu)如下,可以看到其中有8個(gè)字節(jié)存儲(chǔ)的Message Tag的哈希值,基于Tag的消息過(guò)濾正式基于這個(gè)字段值的。 主要支持如下2種的過(guò)濾方式 (2) SQL92的過(guò)濾方式:這種方式的大致做法和上面的Tag過(guò)濾方式一樣,只是在Store層的具體過(guò)濾過(guò)程不太一樣,真正的 SQL expression 的構(gòu)建和執(zhí)行由rocketmq-filter模塊負(fù)責(zé)的。每次過(guò)濾都去執(zhí)行SQL表達(dá)式會(huì)影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執(zhí)行。SQL92的表達(dá)式上下文為消息的屬性。 4 負(fù)載均衡RocketMQ中的負(fù)載均衡都在Client端完成,具體來(lái)說(shuō)的話,主要可以分為Producer端發(fā)送消息時(shí)候的負(fù)載均衡和Consumer端訂閱消息的負(fù)載均衡。 4.1 Producer的負(fù)載均衡Producer端在發(fā)送消息的時(shí)候,會(huì)先根據(jù)Topic找到指定的TopicPublishInfo,在獲取了TopicPublishInfo路由信息后,RocketMQ的客戶(hù)端在默認(rèn)方式下selectOneMessageQueue()方法會(huì)從TopicPublishInfo中的messageQueueList中選擇一個(gè)隊(duì)列(MessageQueue)進(jìn)行發(fā)送消息。具體的容錯(cuò)策略均在MQFaultStrategy這個(gè)類(lèi)中定義。這里有一個(gè)sendLatencyFaultEnable開(kāi)關(guān)變量,如果開(kāi)啟,在隨機(jī)遞增取模的基礎(chǔ)上,再過(guò)濾掉not available的Broker代理。所謂的"latencyFaultTolerance",是指對(duì)之前失敗的,按一定的時(shí)間做退避。例如,如果上次請(qǐng)求的latency超過(guò)550Lms,就退避3000Lms;超過(guò)1000L,就退避60000L;如果關(guān)閉,采用隨機(jī)遞增取模的方式選擇一個(gè)隊(duì)列(MessageQueue)來(lái)發(fā)送消息,latencyFaultTolerance機(jī)制是實(shí)現(xiàn)消息發(fā)送高可用的核心關(guān)鍵所在。 4.2 Consumer的負(fù)載均衡在RocketMQ中,Consumer端的兩種消費(fèi)模式(Push/Pull)都是基于拉模式來(lái)獲取消息的,而在Push模式只是對(duì)pull模式的一種封裝,其本質(zhì)實(shí)現(xiàn)為消息拉取線程在從服務(wù)器拉取到一批消息后,然后提交到消息消費(fèi)線程池后,又“馬不停蹄”的繼續(xù)向服務(wù)器再次嘗試?yán)∠?。如果未拉取到消?#xff0c;則延遲一下又繼續(xù)拉取。在兩種基于拉模式的消費(fèi)方式(Push/Pull)中,均需要Consumer端在知道從Broker端的哪一個(gè)消息隊(duì)列—隊(duì)列中去獲取消息。因此,有必要在Consumer端來(lái)做負(fù)載均衡,即Broker端中多個(gè)MessageQueue分配給同一個(gè)ConsumerGroup中的哪些Consumer消費(fèi)。 1、Consumer端的心跳包發(fā)送 在Consumer啟動(dòng)后,它就會(huì)通過(guò)定時(shí)任務(wù)不斷地向RocketMQ集群中的所有Broker實(shí)例發(fā)送心跳包(其中包含了,消息消費(fèi)分組名稱(chēng)、訂閱關(guān)系集合、消息通信模式和客戶(hù)端id的值等信息)。Broker端在收到Consumer的心跳消息后,會(huì)將它維護(hù)在ConsumerManager的本地緩存變量—consumerTable,同時(shí)并將封裝后的客戶(hù)端網(wǎng)絡(luò)通道信息保存在本地緩存變量—channelInfoTable中,為之后做Consumer端的負(fù)載均衡提供可以依據(jù)的元數(shù)據(jù)信息。 2、Consumer端實(shí)現(xiàn)負(fù)載均衡的核心類(lèi)—RebalanceImpl 在Consumer實(shí)例的啟動(dòng)流程中的啟動(dòng)MQClientInstance實(shí)例部分,會(huì)完成負(fù)載均衡服務(wù)線程—RebalanceService的啟動(dòng)(每隔20s執(zhí)行一次)。通過(guò)查看源碼可以發(fā)現(xiàn),RebalanceService線程的run()方法最終調(diào)用的是RebalanceImpl類(lèi)的rebalanceByTopic()方法,該方法是實(shí)現(xiàn)Consumer端負(fù)載均衡的核心。這里,rebalanceByTopic()方法會(huì)根據(jù)消費(fèi)者通信類(lèi)型為“廣播模式”還是“集群模式”做不同的邏輯處理。這里主要來(lái)看下集群模式下的主要處理流程: (1) 從rebalanceImpl實(shí)例的本地緩存變量—topicSubscribeInfoTable中,獲取該Topic主題下的消息消費(fèi)隊(duì)列集合(mqSet); (2) 根據(jù)topic和consumerGroup為參數(shù)調(diào)用mQClientFactory.findConsumerIdList()方法向Broker端發(fā)送獲取該消費(fèi)組下消費(fèi)者Id列表的RPC通信請(qǐng)求(Broker端基于前面Consumer端上報(bào)的心跳包數(shù)據(jù)而構(gòu)建的consumerTable做出響應(yīng)返回,業(yè)務(wù)請(qǐng)求碼:GET_CONSUMER_LIST_BY_GROUP); (3) 先對(duì)Topic下的消息消費(fèi)隊(duì)列、消費(fèi)者Id排序,然后用消息隊(duì)列分配策略算法(默認(rèn)為:消息隊(duì)列的平均分配算法),計(jì)算出待拉取的消息隊(duì)列。這里的平均分配算法,類(lèi)似于分頁(yè)的算法,將所有MessageQueue排好序類(lèi)似于記錄,將所有消費(fèi)端Consumer排好序類(lèi)似頁(yè)數(shù),并求出每一頁(yè)需要包含的平均size和每個(gè)頁(yè)面記錄的范圍range,最后遍歷整個(gè)range而計(jì)算出當(dāng)前Consumer端應(yīng)該分配到的記錄(這里即為:MessageQueue)。 (4) 然后,調(diào)用updateProcessQueueTableInRebalance()方法,具體的做法是,先將分配到的消息隊(duì)列集合(mqSet)與processQueueTable做一個(gè)過(guò)濾比對(duì)。
最后,為過(guò)濾后的消息隊(duì)列集合(mqSet)中的每個(gè)MessageQueue創(chuàng)建一個(gè)ProcessQueue對(duì)象并存入RebalanceImpl的processQueueTable隊(duì)列中(其中調(diào)用RebalanceImpl實(shí)例的computePullFromWhere(MessageQueue mq)方法獲取該MessageQueue對(duì)象的下一個(gè)進(jìn)度消費(fèi)值offset,隨后填充至接下來(lái)要?jiǎng)?chuàng)建的pullRequest對(duì)象屬性中),并創(chuàng)建拉取請(qǐng)求對(duì)象—pullRequest添加到拉取列表—pullRequestList中,最后執(zhí)行dispatchPullRequest()方法,將Pull消息的請(qǐng)求對(duì)象PullRequest依次放入PullMessageService服務(wù)線程的阻塞隊(duì)列pullRequestQueue中,待該服務(wù)線程取出后向Broker端發(fā)起Pull消息的請(qǐng)求。其中,可以重點(diǎn)對(duì)比下,RebalancePushImpl和RebalancePullImpl兩個(gè)實(shí)現(xiàn)類(lèi)的dispatchPullRequest()方法不同,RebalancePullImpl類(lèi)里面的該方法為空,這樣子也就回答了上一篇中最后的那道思考題了。 消息消費(fèi)隊(duì)列在同一消費(fèi)組不同消費(fèi)者之間的負(fù)載均衡,其核心設(shè)計(jì)理念是在一個(gè)消息消費(fèi)隊(duì)列在同一時(shí)間只允許被同一消費(fèi)組內(nèi)的一個(gè)消費(fèi)者消費(fèi),一個(gè)消息消費(fèi)者能同時(shí)消費(fèi)多個(gè)消息隊(duì)列。 5 事務(wù)消息Apache RocketMQ在4.3.0版中已經(jīng)支持分布式事務(wù)消息,這里RocketMQ采用了2PC的思想來(lái)實(shí)現(xiàn)了提交事務(wù)消息,同時(shí)增加一個(gè)補(bǔ)償邏輯來(lái)處理二階段超時(shí)或者失敗的消息,如下圖所示。 5.1 RocketMQ事務(wù)消息流程概要上圖說(shuō)明了事務(wù)消息的大致方案,其中分為兩個(gè)流程:正常事務(wù)消息的發(fā)送及提交、事務(wù)消息的補(bǔ)償流程。 1.事務(wù)消息發(fā)送及提交: (1) 發(fā)送消息(half消息)。 (2) 服務(wù)端響應(yīng)消息寫(xiě)入結(jié)果。 (3) 根據(jù)發(fā)送結(jié)果執(zhí)行本地事務(wù)(如果寫(xiě)入失敗,此時(shí)half消息對(duì)業(yè)務(wù)不可見(jiàn),本地邏輯不執(zhí)行)。 (4) 根據(jù)本地事務(wù)狀態(tài)執(zhí)行Commit或者Rollback(Commit操作生成消息索引,消息對(duì)消費(fèi)者可見(jiàn)) 2.補(bǔ)償流程: (1) 對(duì)沒(méi)有Commit/Rollback的事務(wù)消息(pending狀態(tài)的消息),從服務(wù)端發(fā)起一次“回查” (2) Producer收到回查消息,檢查回查消息對(duì)應(yīng)的本地事務(wù)的狀態(tài) (3) 根據(jù)本地事務(wù)狀態(tài),重新Commit或者Rollback 其中,補(bǔ)償階段用于解決消息Commit或者Rollback發(fā)生超時(shí)或者失敗的情況。 5.2 RocketMQ事務(wù)消息設(shè)計(jì)1.事務(wù)消息在一階段對(duì)用戶(hù)不可見(jiàn) 在RocketMQ事務(wù)消息的主要流程中,一階段的消息如何對(duì)用戶(hù)不可見(jiàn)。其中,事務(wù)消息相對(duì)普通消息最大的特點(diǎn)就是一階段發(fā)送的消息對(duì)用戶(hù)是不可見(jiàn)的。那么,如何做到寫(xiě)入消息但是對(duì)用戶(hù)不可見(jiàn)呢?RocketMQ事務(wù)消息的做法是:如果消息是half消息,將備份原消息的主題與消息消費(fèi)隊(duì)列,然后改變主題為RMQ_SYS_TRANS_HALF_TOPIC。由于消費(fèi)組未訂閱該主題,故消費(fèi)端無(wú)法消費(fèi)half類(lèi)型的消息,然后RocketMQ會(huì)開(kāi)啟一個(gè)定時(shí)任務(wù),從Topic為RMQ_SYS_TRANS_HALF_TOPIC中拉取消息進(jìn)行消費(fèi),根據(jù)生產(chǎn)者組獲取一個(gè)服務(wù)提供者發(fā)送回查事務(wù)狀態(tài)請(qǐng)求,根據(jù)事務(wù)狀態(tài)來(lái)決定是提交或回滾消息。 在RocketMQ中,消息在服務(wù)端的存儲(chǔ)結(jié)構(gòu)如下,每條消息都會(huì)有對(duì)應(yīng)的索引信息,Consumer通過(guò)ConsumeQueue這個(gè)二級(jí)索引來(lái)讀取消息實(shí)體內(nèi)容,其流程如下: RocketMQ的具體實(shí)現(xiàn)策略是:寫(xiě)入的如果事務(wù)消息,對(duì)消息的Topic和Queue等屬性進(jìn)行替換,同時(shí)將原來(lái)的Topic和Queue信息存儲(chǔ)到消息的屬性中,正因?yàn)橄⒅黝}被替換,故消息并不會(huì)轉(zhuǎn)發(fā)到該原主題的消息消費(fèi)隊(duì)列,消費(fèi)者無(wú)法感知消息的存在,不會(huì)消費(fèi)。其實(shí)改變消息主題是RocketMQ的常用“套路”,回想一下延時(shí)消息的實(shí)現(xiàn)機(jī)制。 2.Commit和Rollback操作以及Op消息的引入 在完成一階段寫(xiě)入一條對(duì)用戶(hù)不可見(jiàn)的消息后,二階段如果是Commit操作,則需要讓消息對(duì)用戶(hù)可見(jiàn);如果是Rollback則需要撤銷(xiāo)一階段的消息。先說(shuō)Rollback的情況。對(duì)于Rollback,本身一階段的消息對(duì)用戶(hù)是不可見(jiàn)的,其實(shí)不需要真正撤銷(xiāo)消息(實(shí)際上RocketMQ也無(wú)法去真正的刪除一條消息,因?yàn)槭琼樞驅(qū)懳募?#xff09;。但是區(qū)別于這條消息沒(méi)有確定狀態(tài)(Pending狀態(tài),事務(wù)懸而未決),需要一個(gè)操作來(lái)標(biāo)識(shí)這條消息的最終狀態(tài)。RocketMQ事務(wù)消息方案中引入了Op消息的概念,用Op消息標(biāo)識(shí)事務(wù)消息已經(jīng)確定的狀態(tài)(Commit或者Rollback)。如果一條事務(wù)消息沒(méi)有對(duì)應(yīng)的Op消息,說(shuō)明這個(gè)事務(wù)的狀態(tài)還無(wú)法確定(可能是二階段失敗了)。引入Op消息后,事務(wù)消息無(wú)論是Commit或者Rollback都會(huì)記錄一個(gè)Op操作。Commit相對(duì)于Rollback只是在寫(xiě)入Op消息前創(chuàng)建Half消息的索引。 3.Op消息的存儲(chǔ)和對(duì)應(yīng)關(guān)系 RocketMQ將Op消息寫(xiě)入到全局一個(gè)特定的Topic中通過(guò)源碼中的方法—TransactionalMessageUtil.buildOpTopic();這個(gè)Topic是一個(gè)內(nèi)部的Topic(像Half消息的Topic一樣),不會(huì)被用戶(hù)消費(fèi)。Op消息的內(nèi)容為對(duì)應(yīng)的Half消息的存儲(chǔ)的Offset,這樣通過(guò)Op消息能索引到Half消息進(jìn)行后續(xù)的回查操作。 4.Half消息的索引構(gòu)建 在執(zhí)行二階段Commit操作時(shí),需要構(gòu)建出Half消息的索引。一階段的Half消息由于是寫(xiě)到一個(gè)特殊的Topic,所以二階段構(gòu)建索引時(shí)需要讀取出Half消息,并將Topic和Queue替換成真正的目標(biāo)的Topic和Queue,之后通過(guò)一次普通消息的寫(xiě)入操作來(lái)生成一條對(duì)用戶(hù)可見(jiàn)的消息。所以RocketMQ事務(wù)消息二階段其實(shí)是利用了一階段存儲(chǔ)的消息的內(nèi)容,在二階段時(shí)恢復(fù)出一條完整的普通消息,然后走一遍消息寫(xiě)入流程。 5.如何處理二階段失敗的消息? 如果在RocketMQ事務(wù)消息的二階段過(guò)程中失敗了,例如在做Commit操作時(shí),出現(xiàn)網(wǎng)絡(luò)問(wèn)題導(dǎo)致Commit失敗,那么需要通過(guò)一定的策略使這條消息最終被Commit。RocketMQ采用了一種補(bǔ)償機(jī)制,稱(chēng)為“回查”。Broker端對(duì)未確定狀態(tài)的消息發(fā)起回查,將消息發(fā)送到對(duì)應(yīng)的Producer端(同一個(gè)Group的Producer),由Producer根據(jù)消息來(lái)檢查本地事務(wù)的狀態(tài),進(jìn)而執(zhí)行Commit或者Rollback。Broker端通過(guò)對(duì)比Half消息和Op消息進(jìn)行事務(wù)消息的回查并且推進(jìn)CheckPoint(記錄那些事務(wù)消息的狀態(tài)是確定的)。 值得注意的是,rocketmq并不會(huì)無(wú)休止的的信息事務(wù)狀態(tài)回查,默認(rèn)回查15次,如果15次回查還是無(wú)法得知事務(wù)狀態(tài),rocketmq默認(rèn)回滾該消息。 6 消息查詢(xún)RocketMQ支持按照下面兩種維度(“按照Message Id查詢(xún)消息”、“按照Message Key查詢(xún)消息”)進(jìn)行消息查詢(xún)。 6.1 按照MessageId查詢(xún)消息RocketMQ中的MessageId的長(zhǎng)度總共有16字節(jié),其中包含了消息存儲(chǔ)主機(jī)地址(IP地址和端口),消息Commit Log offset?!鞍凑誐essageId查詢(xún)消息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個(gè)RPC請(qǐng)求后通過(guò)Remoting通信層發(fā)送(業(yè)務(wù)請(qǐng)求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過(guò)程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個(gè)完整的消息返回。 6.2 按照Message Key查詢(xún)消息“按照Message Key查詢(xún)消息”,主要是基于RocketMQ的IndexFile索引文件來(lái)實(shí)現(xiàn)的。RocketMQ的索引文件邏輯結(jié)構(gòu),類(lèi)似JDK中HashMap的實(shí)現(xiàn)。索引文件的具體結(jié)構(gòu)如下: IndexFile索引文件為用戶(hù)提供通過(guò)“按照Message Key查詢(xún)消息”的消息索引查詢(xún)服務(wù),IndexFile文件的存儲(chǔ)位置是:$HOME\store\index${fileName},文件名fileName是以創(chuàng)建時(shí)的時(shí)間戳命名的,文件大小是固定的,等于40+500W*4+2000W*20= 420000040個(gè)字節(jié)大小。如果消息的properties中設(shè)置了UNIQ_KEY這個(gè)屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來(lái)做寫(xiě)入操作。如果消息設(shè)置了KEYS屬性(多個(gè)KEY以空格分隔),也會(huì)用 topic + “#” + KEY 來(lái)做索引。 其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個(gè)字段,一共20 Byte。NextIndex offset 即前面讀出來(lái)的 slotValue,如果有 hash沖突,就可以用這個(gè)字段將所有沖突的索引用鏈表的方式串起來(lái)了。Timestamp記錄的是消息storeTimestamp之間的差,并不是一個(gè)絕對(duì)的時(shí)間。整個(gè)Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計(jì)信息,4*500W的 Slot Table并不保存真正的索引數(shù)據(jù),而是保存每個(gè)槽位對(duì)應(yīng)的單向鏈表的頭。20*2000W 是真正的索引數(shù)據(jù),即一個(gè) Index File 可以保存 2000W個(gè)索引。 “按照Message Key查詢(xún)消息”的方式,RocketMQ的具體做法是,主要通過(guò)Broker端的QueryMessageProcessor業(yè)務(wù)處理器來(lái)查詢(xún),讀取消息的過(guò)程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實(shí)體內(nèi)容。 參考:https://github.com/apache/rocketmq/tree/master/docs/cn |
|
來(lái)自: 小虛竹 > 《待分類(lèi)》