RabbitMQ、Kafka、RocketMQ和ActiveMQ,肝了我一個(gè)月,原理是什么,如何選型,本文會(huì)告訴你答案。
消息隊(duì)列中間件重要嗎?面試必問問題之一,你說重不重要。我有時(shí)會(huì)問同事,為啥你用RabbitMQ,不用Kafka,或者RocketMQ呢,他給我的回答“因?yàn)楣居玫木褪沁@個(gè),大家都這么用”,如果你去面試,直接就被Pass,今天這篇文章,告訴你如何回答。
這篇文章純理論,主要整理網(wǎng)絡(luò)資料,肝了我整整一個(gè)月!文章依然延續(xù)上幾篇的風(fēng)格,很長(zhǎng),長(zhǎng)到我只整理排版,手都整麻了。全文2.5萬字,建議先收藏,后續(xù)面試、或者技術(shù)選型,再拿出來喵喵,不BB,上思維導(dǎo)圖!
消息隊(duì)列 消息隊(duì)列模式 消息隊(duì)列目前主要2種模式,分別為“點(diǎn)對(duì)點(diǎn)模式”和“發(fā)布/訂閱模式”。
點(diǎn)對(duì)點(diǎn)模式 一個(gè)具體的消息只能由一個(gè)消費(fèi)者消費(fèi)。多個(gè)生產(chǎn)者可以向同一個(gè)消息隊(duì)列發(fā)送消息;但是,一個(gè)消息在被一個(gè)消息者處理的時(shí)候,這個(gè)消息在隊(duì)列上會(huì)被鎖住或者被移除并且其他消費(fèi)者無法處理該消息。需要額外注意的是,如果消費(fèi)者處理一個(gè)消息失敗了,消息系統(tǒng)一般會(huì)把這個(gè)消息放回隊(duì)列,這樣其他消費(fèi)者可以繼續(xù)處理。
發(fā)布/訂閱模式 單個(gè)消息可以被多個(gè)訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:
臨時(shí)(ephemeral)訂閱,這種訂閱只有在消費(fèi)者啟動(dòng)并且運(yùn)行的時(shí)候才存在。一旦消費(fèi)者退出,相應(yīng)的訂閱以及尚未處理的消息就會(huì)丟失。 持久(durable)訂閱,這種訂閱會(huì)一直存在,除非主動(dòng)去刪除。消費(fèi)者退出后,消息系統(tǒng)會(huì)繼續(xù)維護(hù)該訂閱,并且后續(xù)消息可以被繼續(xù)處理。 衡量標(biāo)準(zhǔn) 對(duì)消息隊(duì)列進(jìn)行技術(shù)選型時(shí),需要通過以下指標(biāo)衡量你所選擇的消息隊(duì)列,是否可以滿足你的需求:
消息順序:發(fā)送到隊(duì)列的消息,消費(fèi)時(shí)是否可以保證消費(fèi)的順序,比如A先下單,B后下單,應(yīng)該是A先去扣庫(kù)存,B再去扣,順序不能反。 消息路由:根據(jù)路由規(guī)則,只訂閱匹配路由規(guī)則的消息,比如有A/B兩者規(guī)則的消息,消費(fèi)者可以只訂閱A消息,B消息不會(huì)消費(fèi)。 消息可靠性:是否會(huì)存在丟消息的情況,比如有A/B兩個(gè)消息,最后只有B消息能消費(fèi),A消息丟失。 消息時(shí)序:主要包括“消息存活時(shí)間”和“延遲/預(yù)定的消息”,“消息存活時(shí)間”表示生產(chǎn)者可以對(duì)消息設(shè)置TTL,如果超過該TTL,消息會(huì)自動(dòng)消失;“延遲/預(yù)定的消息”指的是可以延遲或者預(yù)訂消費(fèi)消息,比如延時(shí)5分鐘,那么消息會(huì)5分鐘后才能讓消費(fèi)者消費(fèi),時(shí)間未到的話,是不能消費(fèi)的。 消息留存:消息消費(fèi)成功后,是否還會(huì)繼續(xù)保留在消息隊(duì)列。 容錯(cuò)性:當(dāng)一條消息消費(fèi)失敗后,是否有一些機(jī)制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費(fèi)掉,才能確定給用戶退款成功,所以必須保證這條消息消費(fèi)成功的準(zhǔn)確性。 伸縮:當(dāng)消息隊(duì)列性能有問題,比如消費(fèi)太慢,是否可以快速支持庫(kù)容;當(dāng)消費(fèi)隊(duì)列過多,浪費(fèi)系統(tǒng)資源,是否可以支持縮容。 消息隊(duì)列比較 下圖是從網(wǎng)上摘抄過來的,可以看到主流MQ的對(duì)比:
下面簡(jiǎn)單介紹常用的消息隊(duì)列:
Kafka:Apache Kafka它最初由LinkedIn公司基于獨(dú)特的設(shè)計(jì)實(shí)現(xiàn)為一個(gè)分布式的提交日志系統(tǒng)( a distributed commit log),之后成為Apache項(xiàng)目的一部分。號(hào)稱大數(shù)據(jù)的殺手锏,談到大數(shù)據(jù)領(lǐng)域內(nèi)的消息傳輸,則繞不開Kafka,這款為大數(shù)據(jù)而生的消息中間件,以其百萬級(jí)TPS的吞吐量名聲大噪,迅速成為大數(shù)據(jù)領(lǐng)域的寵兒,在數(shù)據(jù)采集、傳輸、存儲(chǔ)的過程中發(fā)揮著舉足輕重的作用。 RabbitMQ:RabbitMQ 2007年發(fā)布,是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。 RocketMQ:是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消息推送、日志流式處理、binglog分發(fā)等場(chǎng)景。 ActiveMQ:是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來越少,較少在大規(guī)模吞吐的場(chǎng)景中使用,所以該消息隊(duì)列也不是我們文章中重點(diǎn)討論的內(nèi)容。 優(yōu)缺點(diǎn) Kafka 優(yōu)點(diǎn):
高吞吐、低延遲:kakfa 最大的特點(diǎn)就是收發(fā)消息非常快,kafka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒; 高伸縮性:每個(gè)主題(topic) 包含多個(gè)分區(qū)(partition),主題中的分區(qū)可以分布在不同的主機(jī)(broker)中; 持久性、可靠性:Kafka 能夠允許數(shù)據(jù)的持久化存儲(chǔ),消息被持久化到磁盤,并支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失,Kafka 底層的數(shù)據(jù)存儲(chǔ)是基于 Zookeeper 存儲(chǔ)的,Zookeeper 我們知道它的數(shù)據(jù)能夠持久存儲(chǔ); 容錯(cuò)性:非常高,kafka是分布式的,一個(gè)數(shù)據(jù)多個(gè)副本,某個(gè)節(jié)點(diǎn)宕機(jī),Kafka 集群能夠正常工作; 消息有序:消費(fèi)者采用Pull方式獲取消息,消息有序,通過控制能夠保證所有消息被消費(fèi)且僅被消費(fèi)一次; 有優(yōu)秀的第三方Kafka Web管理界面Kafka-Manager,在日志領(lǐng)域比較成熟,被多家公司和多個(gè)開源項(xiàng)目使用; 功能支持:功能較為簡(jiǎn)單,主要支持簡(jiǎn)單的MQ功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集被大規(guī)模使用。 缺點(diǎn):
Kafka單機(jī)超過64個(gè)隊(duì)列/分區(qū),Load會(huì)發(fā)生明顯的飆高現(xiàn)象,隊(duì)列越多,load越高,發(fā)送消息響應(yīng)時(shí)間變長(zhǎng); 使用短輪詢方式,實(shí)時(shí)性取決于輪詢間隔時(shí)間; 支持消息順序,但是一臺(tái)代理宕機(jī)后,就會(huì)產(chǎn)生消息亂序; 總結(jié):
Kafka主要特點(diǎn)是基于Pull的模式來處理消息消費(fèi),追求高吞吐量,一開始的目的就是用于日志收集和傳輸,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。 大型公司建議可以選用,如果有日志采集功能,肯定是首選kafka。 RabbitMQ 優(yōu)點(diǎn):
異步消息傳遞:支持多種消息協(xié)議,消息隊(duì)列,傳送確認(rèn),靈活的路由到隊(duì)列,多種交換類型; 支持幾乎所有最受歡迎的編程語言:Java,C,C ++,C#,Ruby,Perl,Python,PHP等等; 可以部署為高可用性和吞吐量的集群;,跨多個(gè)可用區(qū)域和區(qū)域進(jìn)行聯(lián)合; 可插入的身份驗(yàn)證,授權(quán),支持TLS和LDAP; 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件; 提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息Broker,社區(qū)活躍度高。 缺點(diǎn):
erlang開發(fā),很難去看懂源碼,基本職能依賴于開源社區(qū)的快速維護(hù)和修復(fù)bug,不利于做二次開發(fā)和維護(hù); RabbitMQ確實(shí)吞吐量會(huì)低一些,這是因?yàn)樗龅膶?shí)現(xiàn)機(jī)制比較重; 需要學(xué)習(xí)比較復(fù)雜的接口和協(xié)議,學(xué)習(xí)和維護(hù)成本較高。 總結(jié):
結(jié)合erlang語言本身的并發(fā)優(yōu)勢(shì),性能較好,社區(qū)活躍度也比較高,但是不利于做二次開發(fā)和維護(hù)。不過RabbitMQ的社區(qū)十分活躍,可以解決開發(fā)過程中遇到的bug。 如果你的數(shù)據(jù)量沒有那么大,小公司優(yōu)先選擇功能比較完備的RabbitMQ。 RocketMQ 優(yōu)點(diǎn):
支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型; 在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞; 支持多種消息協(xié)議,如 JMS、MQTT 等; 可靠的FIFO和嚴(yán)格的有序消息傳遞在同一隊(duì)列中; 靈活的分布式橫向擴(kuò)展部署架構(gòu),滿足至少一次消息傳遞語義; 提供 docker 鏡像用于隔離測(cè)試和云集群部署; 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard。 缺點(diǎn):
支持的客戶端語言不多,目前是java及c++,其中c++不成熟 沒有在 mq 核心中去實(shí)現(xiàn)JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼 總結(jié):
天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對(duì)于可靠性要求很高的場(chǎng)景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時(shí),后端可能無法及時(shí)處理的情況。 RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場(chǎng)景在阿里雙11已經(jīng)經(jīng)歷了多次考驗(yàn),如果你的業(yè)務(wù)有上述并發(fā)場(chǎng)景,建議可以選擇RocketMQ。 ActiveMQ 優(yōu)點(diǎn)
支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協(xié)議; 完全支持JMS客戶端和Message Broker中的企業(yè)集成模式; 支持許多高級(jí)功能,如消息組,虛擬目標(biāo),通配符和復(fù)合目標(biāo); 完全支持JMS 1.1和J2EE 1.4,支持瞬態(tài),持久,事務(wù)和XA消息; Spring支持,以便ActiveMQ可以輕松嵌入到Spring應(yīng)用程序中,并使用Spring的XML配置機(jī)制進(jìn)行配置; 專為高性能集群,客戶端 - 服務(wù)器,基于對(duì)等的通信而設(shè)計(jì); CXF和Axis支持,以便ActiveMQ可以輕松地放入這些Web服務(wù)堆棧中以提供可靠的消息傳遞; 可以用作內(nèi)存JMS提供程序,非常適合單元測(cè)試JMS; 支持可插拔傳輸協(xié)議,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA傳輸; 缺點(diǎn):
官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來越少,較少在大規(guī)模吞吐的場(chǎng)景中使用。 Kafka Kafka 是由 Linkedin 公司開發(fā)的,它是一個(gè)分布式的,支持多分區(qū)、多副本,基于 Zookeeper 的分布式消息流平臺(tái),它同時(shí)也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)。
基本概念 消息:Kafka 中的數(shù)據(jù)單元被稱為消息,也被稱為記錄,可以把它看作數(shù)據(jù)庫(kù)表中某一行的記錄。 批次:為了提高效率, 消息會(huì)分批次寫入 Kafka,批次就代指的是一組消息。 主題:消息的種類稱為 主題(Topic),可以說一個(gè)主題代表了一類消息,相當(dāng)于是對(duì)消息進(jìn)行分類。主題就像是數(shù)據(jù)庫(kù)中的表。 分區(qū):主題可以被分為若干個(gè)分區(qū)(partition),同一個(gè)主題中的分區(qū)可以不在一個(gè)機(jī)器上,有可能會(huì)部署在多個(gè)機(jī)器上,由此來實(shí)現(xiàn) kafka 的伸縮性,單一主題中的分區(qū)有序,但是無法保證主題中所有的分區(qū)有序。 生產(chǎn)者:向主題發(fā)布消息的客戶端應(yīng)用程序稱為生產(chǎn)者(Producer),生產(chǎn)者用于持續(xù)不斷的向某個(gè)主題發(fā)送消息。 消費(fèi)者:訂閱主題消息的客戶端程序稱為消費(fèi)者(Consumer),消費(fèi)者用于處理生產(chǎn)者產(chǎn)生的消息。 消費(fèi)者群組:生產(chǎn)者與消費(fèi)者的關(guān)系就如同餐廳中的廚師和顧客之間的關(guān)系一樣,一個(gè)廚師對(duì)應(yīng)多個(gè)顧客,也就是一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,消費(fèi)者群組(Consumer Group)指的就是由一個(gè)或多個(gè)消費(fèi)者組成的群體。 偏移量:偏移量(Consumer Offset)是一種元數(shù)據(jù),它是一個(gè)不斷遞增的整數(shù)值,用來記錄消費(fèi)者發(fā)生重平衡時(shí)的位置,以便用來恢復(fù)數(shù)據(jù)。 broker: 一個(gè)獨(dú)立的 Kafka 服務(wù)器就被稱為 broker,broker 接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。 broker 集群:broker 是集群 的組成部分,broker 集群由一個(gè)或多個(gè) broker 組成,每個(gè)集群都有一個(gè) broker 同時(shí)充當(dāng)了集群控制器的角色(自動(dòng)從集群的活躍成員中選舉出來)。 副本:Kafka 中消息的備份又叫做 副本(Replica),副本的數(shù)量是可以配置的,Kafka 定義了兩類副本:領(lǐng)導(dǎo)者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對(duì)外提供服務(wù),后者只是被動(dòng)跟隨。 重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。 系統(tǒng)架構(gòu) 一個(gè)典型的 Kafka 集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費(fèi)消息。
生產(chǎn)者 數(shù)據(jù)執(zhí)行流程 在 Kafka 中,我們把產(chǎn)生消息的那一方稱為生產(chǎn)者,比如我們經(jīng)?;厝ヌ詫氋?gòu)物,你打開淘寶的那一刻,你的登陸信息,登陸次數(shù)都會(huì)作為消息傳輸?shù)?Kafka 后臺(tái),當(dāng)你瀏覽購(gòu)物的時(shí)候,你的瀏覽信息,你的搜索指數(shù),你的購(gòu)物愛好都會(huì)作為一個(gè)個(gè)消息傳遞給 Kafka 后臺(tái),然后淘寶會(huì)根據(jù)你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那么這些生產(chǎn)者產(chǎn)生的消息是怎么傳到 Kafka 應(yīng)用程序的呢?發(fā)送過程是怎么樣的呢?
盡管消息的產(chǎn)生非常簡(jiǎn)單,但是消息的發(fā)送過程還是比較復(fù)雜的,如圖:
我們從創(chuàng)建一個(gè)ProducerRecord 對(duì)象開始,ProducerRecord 是 Kafka 中的一個(gè)核心類,它代表了一組 Kafka 需要發(fā)送的 key/value 鍵值對(duì),它由記錄要發(fā)送到的主題名稱(Topic Name),可選的分區(qū)號(hào)(Partition Number)以及可選的鍵值對(duì)構(gòu)成。
在發(fā)送 ProducerRecord 時(shí),我們需要將鍵值對(duì)對(duì)象由序列化器轉(zhuǎn)換為字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡(luò)上傳輸。然后消息到達(dá)了分區(qū)器。如果發(fā)送過程中指定了有效的分區(qū)號(hào),那么在發(fā)送記錄時(shí)將使用該分區(qū)。如果發(fā)送過程中未指定分區(qū),則將使用key 的 hash 函數(shù)映射指定一個(gè)分區(qū)。如果發(fā)送的過程中既沒有分區(qū)號(hào)也沒有,則將以循環(huán)的方式分配一個(gè)分區(qū)。選好分區(qū)后,生產(chǎn)者就知道向哪個(gè)主題和分區(qū)發(fā)送數(shù)據(jù)了。ProducerRecord 還有關(guān)聯(lián)的時(shí)間戳,如果用戶沒有提供時(shí)間戳,那么生產(chǎn)者將會(huì)在記錄中使用當(dāng)前的時(shí)間作為時(shí)間戳。Kafka 最終使用的時(shí)間戳取決于 topic 主題配置的時(shí)間戳類型。然后,這條消息被存放在一個(gè)記錄批次里,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。由一個(gè)獨(dú)立的線程負(fù)責(zé)把它們發(fā)到 Kafka Broker 上。
Kafka Broker 在收到消息時(shí)會(huì)返回一個(gè)響應(yīng),如果寫入成功,會(huì)返回一個(gè) RecordMetaData 對(duì)象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量,上面兩種的時(shí)間戳類型也會(huì)返回給用戶。如果寫入失敗,會(huì)返回一個(gè)錯(cuò)誤。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗的話,就返回錯(cuò)誤消息。
上面寫的有點(diǎn)多,總結(jié)一下流程:創(chuàng)建對(duì)象(主題、分區(qū)、key/value)-> 序列化數(shù)據(jù) -> 到達(dá)分區(qū)(可自己指定,也可以通過key hash)-> 放入批次(相同主題和分區(qū)) -> 獨(dú)立線程發(fā)送 -> 返回主題/分區(qū)/分區(qū)偏移量/時(shí)間戳。
分區(qū)策略 Kafka 對(duì)于數(shù)據(jù)的讀寫是以分區(qū)為粒度的,分區(qū)可以分布在多個(gè)主機(jī)(Broker)中,這樣每個(gè)節(jié)點(diǎn)能夠?qū)崿F(xiàn)獨(dú)立的數(shù)據(jù)寫入和讀取,并且能夠通過增加新的節(jié)點(diǎn)來增加 Kafka 集群的吞吐量,通過分區(qū)部署在多個(gè) Broker 來實(shí)現(xiàn)負(fù)載均衡的效果,下面我們看看數(shù)據(jù)如何選擇分區(qū)。
方式1:順序輪詢 順序分配,消息是均勻的分配給每個(gè) partition,即每個(gè)分區(qū)存儲(chǔ)一次消息,見下圖。輪訓(xùn)策略是 Kafka Producer 提供的默認(rèn)策略,如果你不使用指定的輪訓(xùn)策略的話,Kafka 默認(rèn)會(huì)使用順序輪訓(xùn)策略的方式。
方式2:隨機(jī)輪詢 本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來看,它要遜于輪詢策略,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
方式3:key hash 這個(gè)策略也叫做 key-ordering 策略,Kafka 中每條消息都會(huì)有自己的key,一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如下圖所示
消費(fèi)者 消費(fèi)者群組 應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要?jiǎng)?chuàng)建一個(gè) KafkaConsumer 對(duì)象,訂閱主題并開始接受消息,驗(yàn)證消息并保存結(jié)果。一段時(shí)間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這時(shí)候該如何處理?如果只使用單個(gè)消費(fèi)者的話,應(yīng)用程序會(huì)跟不上消息生成的速度,就像多個(gè)生產(chǎn)者像相同的主題寫入消息一樣,這時(shí)候就需要多個(gè)消費(fèi)者共同參與消費(fèi)主題中的消息,對(duì)消息進(jìn)行分流處理。Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組中的消費(fèi)者訂閱的都是相同的主題,每個(gè)消費(fèi)者接收主題一部分分區(qū)的消息。下面是一個(gè) Kafka 分區(qū)消費(fèi)示意圖。
上圖中的主題 T1 有四個(gè)分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個(gè)消費(fèi)者群組1,消費(fèi)者群組中只有一個(gè)消費(fèi)者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個(gè)消費(fèi)者處理四個(gè)生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D
這樣一來,消費(fèi)者的消費(fèi)能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時(shí)候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費(fèi)者吃不消,那就繼續(xù)增加消費(fèi)者。
如上圖所示,每個(gè)分區(qū)所產(chǎn)生的消息能夠被每個(gè)消費(fèi)者群組中的消費(fèi)者消費(fèi),如果向消費(fèi)者群組中增加更多的消費(fèi)者,那么多余的消費(fèi)者將會(huì)閑置,如下圖所示。
向群組中增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。總而言之,我們可以通過增加消費(fèi)組的消費(fèi)者來進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜淼南M(fèi)者是空閑的,沒有任何幫助。
Kafka 一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說,每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組 G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么就演變?yōu)橄聢D這樣。在這個(gè)場(chǎng)景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。
總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。
消費(fèi)者重平衡 我們從上面的消費(fèi)者演變圖中可以知道這么一個(gè)過程:最初是一個(gè)消費(fèi)者訂閱一個(gè)主題并消費(fèi)其全部分區(qū)的消息,后來有一個(gè)消費(fèi)者加入群組,隨后又有更多的消費(fèi)者加入群組,而新加入的消費(fèi)者實(shí)例分?jǐn)偭俗畛跸M(fèi)者的部分消息,這種把分區(qū)的所有權(quán)通過一個(gè)消費(fèi)者轉(zhuǎn)到其他消費(fèi)者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示。
重平衡非常重要,它為消費(fèi)者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費(fèi)者或移除消費(fèi)者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費(fèi)者無法讀取消息,造成整個(gè)消費(fèi)者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消息當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢應(yīng)用程序。
消費(fèi)者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護(hù)自己是消費(fèi)者組的一員并確認(rèn)其擁有的分區(qū)。對(duì)于不同不的消費(fèi)群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費(fèi)者定期發(fā)送心跳,就會(huì)認(rèn)為消費(fèi)者是存活的并處理其分區(qū)中的消息。當(dāng)消費(fèi)者檢索記錄或者提交它所消費(fèi)的記錄時(shí)就會(huì)發(fā)送心跳。如果過了一段時(shí)間 Kafka 停止發(fā)送心跳了,會(huì)話(Session)就會(huì)過期,組織協(xié)調(diào)者就會(huì)認(rèn)為這個(gè) Consumer 已經(jīng)死亡,就會(huì)觸發(fā)一次重平衡。如果消費(fèi)者宕機(jī)并且停止發(fā)送消息,組織協(xié)調(diào)者會(huì)等待幾秒鐘,確認(rèn)它死亡了才會(huì)觸發(fā)重平衡。在這段時(shí)間里,死亡的消費(fèi)者將不處理任何消息。在清理消費(fèi)者時(shí),消費(fèi)者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會(huì)觸發(fā)一次重平衡,盡量降低處理停頓。
重平衡是一把雙刃劍,它為消費(fèi)者群組帶來高可用性和伸縮性的同時(shí),還有有一些明顯的缺點(diǎn)(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。重平衡的過程對(duì)消費(fèi)者組有極大的影響。因?yàn)槊看沃仄胶膺^程中都會(huì)導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機(jī)制,也就是 Stop The World ,STW。也就是說,在重平衡期間,消費(fèi)者組中的消費(fèi)者實(shí)例都會(huì)停止消費(fèi),等待重平衡的完成。而且重平衡這個(gè)過程很慢...
特性分析 這里才是內(nèi)容的重點(diǎn),不僅需要知道Kafka的特性,還需要知道支持這些特性的原因:
消息路由(不支持):Kafka在處理消息之前是不允許消費(fèi)者過濾一個(gè)主題中的消息。一個(gè)訂閱的消費(fèi)者在沒有異常情況下會(huì)接受一個(gè)分區(qū)中的所有消息。 消息有序(支持):當(dāng)消費(fèi)消息時(shí),如果消費(fèi)失敗,消息不會(huì)被放回,所以整個(gè)消費(fèi)過程都是有序進(jìn)行; 消息時(shí)序(不支持):消息直接發(fā)送,不會(huì)延遲發(fā)送,或者指定消息的TTL。 容錯(cuò)處理(集群支持/消息不支持):集群容錯(cuò)能力高,因?yàn)槭欠植际讲渴?,但是消息容錯(cuò)處理弱,因?yàn)橄⑾M(fèi)失敗,需要程序員手動(dòng)處理,Kafka不支持消息重新進(jìn)行消費(fèi)。 伸縮(非常好):通過擴(kuò)充分區(qū)和消費(fèi)者數(shù)量,實(shí)現(xiàn)分區(qū)擴(kuò)容,并提升消費(fèi)速度。 持久化(非常好):數(shù)據(jù)存儲(chǔ)在磁盤,可以隨時(shí)訂閱消費(fèi),消費(fèi)完后,數(shù)據(jù)仍然保留。 消息回溯(支持):因?yàn)橄⒅С殖志没?,就支持回溯,可以理解是附帶的功能?/section> 高吞吐(非常好):因?yàn)镵afka內(nèi)部同一個(gè)主題包含多個(gè)分區(qū),所以實(shí)現(xiàn)分布式存儲(chǔ),然后消費(fèi)者數(shù)量可以擴(kuò)充到和分區(qū)數(shù)量一致,保證了Kafka的高吞吐。 RocketMQ RocketMQ是一個(gè)純Java、分布式、隊(duì)列模型的開源消息中間件,前身是MetaQ,是阿里參考Kafka特點(diǎn)研發(fā)的一個(gè)隊(duì)列模型的消息中間件,后開源給apache基金會(huì)成為了apache的頂級(jí)開源項(xiàng)目,具有高性能、高可靠、高實(shí)時(shí)、分布式特點(diǎn)。
基本概念 先對(duì)常用的詞匯有個(gè)基本認(rèn)識(shí),相關(guān)詞匯后面會(huì)再詳細(xì)介紹:
NameServer:一個(gè)功能齊全的服務(wù)器,其角色類似Dubbo中的Zookeeper。 Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息。 Consumer:消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。 Broker:消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息。 Message:消息,一條消息必須有一個(gè)主題(Topic),主題可以看做是你的信件要郵寄的地址。(一條消息也可以擁有一個(gè)可選的標(biāo)簽(Tag)和額處的鍵值對(duì),它們可以用于設(shè)置一個(gè)業(yè)務(wù) Key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。) Topic:主題,可以看做消息的規(guī)類,它是消息的第一級(jí)類型。(比如一個(gè)電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個(gè) Topic 。Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個(gè) Topic 可以有0個(gè)、1個(gè)、多個(gè)生產(chǎn)者向其發(fā)送消息,一個(gè)生產(chǎn)者也可以同時(shí)向不同的 Topic 發(fā)送消息。一個(gè) Topic 也可以被 0個(gè)、1個(gè)、多個(gè)消費(fèi)者訂閱。) Tag:子主題,它是消息的第二級(jí)類型,用于為用戶提供額外的靈活性。(使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標(biāo)識(shí)。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag 。標(biāo)簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。) Group:分組,一個(gè)組可以訂閱多個(gè)Topic。(分為ProducerGroup,ConsumerGroup,代表某一類的生產(chǎn)者和消費(fèi)者,一般來說同一個(gè)服務(wù)可以作為Group,同一個(gè)Group一般來說發(fā)送和消費(fèi)的消息都是一樣的。) Producer Group:生產(chǎn)者組,代表某一類的生產(chǎn)者,比如我們有多個(gè)秒殺系統(tǒng)作為生產(chǎn)者,這多個(gè)合在一起就是一個(gè) Producer Group 生產(chǎn)者組,它們一般生產(chǎn)相同的消息。 Consumer Group:消費(fèi)者組,代表某一類的消費(fèi)者,比如我們有多個(gè)短信系統(tǒng)作為消費(fèi)者,這多個(gè)合在一起就是一個(gè) Consumer Group 消費(fèi)者組,它們一般消費(fèi)相同的消息。 Queue:隊(duì)列,在Kafka中叫Partition。(每個(gè)Queue內(nèi)部是有序的,在RocketMQ中分為讀和寫兩種隊(duì)列,一般來說讀寫隊(duì)列數(shù)量一致,如果不一致就會(huì)出現(xiàn)很多問題。) Message Queue:消息隊(duì)列,主題被劃分為一個(gè)或多個(gè)子主題,即消息隊(duì)列。(一個(gè) Topic 下可以設(shè)置多個(gè)消息隊(duì)列,發(fā)送消息時(shí)執(zhí)行該消息的 Topic ,RocketMQ 會(huì)輪詢?cè)?Topic 下的所有隊(duì)列將消息發(fā)出去。消息的物理管理單位。一個(gè)Topic下可以有多個(gè)Queue,Queue的引入使得消息的存儲(chǔ)可以分布式集群化,具有了水平擴(kuò)展能力。) 消息模型 RockerMQ 中的消息模型就是按照主題模型所實(shí)現(xiàn)的,在主題模型中,消息的生產(chǎn)者稱為發(fā)布者(Publisher),消息的消費(fèi)者稱為訂閱者(Subscriber),存放消息的容器稱為主題(Topic)。RocketMQ 中的主題模型到底是如何實(shí)現(xiàn)的呢?
我們可以看到在整個(gè)圖中有 Producer Group、Topic、Consumer Group 三個(gè)角色,你可以看到圖中生產(chǎn)者組中的生產(chǎn)者會(huì)向主題發(fā)送消息,而主題中存在多個(gè)隊(duì)列,生產(chǎn)者每次生產(chǎn)消息之后是指定主題中的某個(gè)隊(duì)列發(fā)送消息的。
每個(gè)主題中都有多個(gè)隊(duì)列(這里還不涉及到 Broker),集群消費(fèi)模式下,一個(gè)消費(fèi)者集群多臺(tái)機(jī)器共同消費(fèi)一個(gè) topic 的多個(gè)隊(duì)列,一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)者消費(fèi)。如果某個(gè)消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會(huì)接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。就像上圖中 Consumer1 和 Consumer2 分別對(duì)應(yīng)著兩個(gè)隊(duì)列,而 Consuer3 是沒有隊(duì)列對(duì)應(yīng)的,所以一般來講要控制消費(fèi)者組中的消費(fèi)者個(gè)數(shù)和主題中隊(duì)列個(gè)數(shù)相同。這個(gè)簡(jiǎn)直和kafak一毛一樣?。?/p>
當(dāng)然也可以消費(fèi)者個(gè)數(shù)小于隊(duì)列個(gè)數(shù),只不過不太建議。如下圖:
每個(gè)消費(fèi)組在每個(gè)隊(duì)列上維護(hù)一個(gè)消費(fèi)位置,為什么呢?因?yàn)槲覀儎倓偖嫷膬H僅是一個(gè)消費(fèi)者組,我們知道在發(fā)布訂閱模式中一般會(huì)涉及到多個(gè)消費(fèi)者組,而每個(gè)消費(fèi)者組在每個(gè)隊(duì)列中的消費(fèi)位置都是不同的。如果此時(shí)有多個(gè)消費(fèi)者組,那么消息被一個(gè)消費(fèi)者組消費(fèi)完之后是不會(huì)刪除的(因?yàn)槠渌M(fèi)者組也需要呀),它僅僅是為每個(gè)消費(fèi)者組維護(hù)一個(gè)消費(fèi)位移(offset),每次消費(fèi)者組消費(fèi)完會(huì)返回一個(gè)成功的響應(yīng),然后隊(duì)列再把維護(hù)的消費(fèi)位移加一,這樣就不會(huì)出現(xiàn)剛剛消費(fèi)過的消息再一次被消費(fèi)了。
可能你還有一個(gè)問題,為什么一個(gè)主題中需要維護(hù)多個(gè)隊(duì)列?答案是提高并發(fā)能力。的確,每個(gè)主題中只存在一個(gè)隊(duì)列也是可行的。你想一下,如果每個(gè)主題中只存在一個(gè)隊(duì)列,這個(gè)隊(duì)列中也維護(hù)著每個(gè)消費(fèi)者組的消費(fèi)位置,這樣也可以做到發(fā)布訂閱模式。如下圖:
但是,這樣我生產(chǎn)者是不是只能向一個(gè)隊(duì)列發(fā)送消息?又因?yàn)樾枰S護(hù)消費(fèi)位置所以一個(gè)隊(duì)列只能對(duì)應(yīng)一個(gè)消費(fèi)者組中的消費(fèi)者,這樣是不是其他的 Consumer 就沒有用武之地了?從這兩個(gè)角度來講,并發(fā)度一下子就小了很多。
所以總結(jié)來說,RocketMQ 通過使用在一個(gè) Topic 中配置多個(gè)隊(duì)列,并且每個(gè)隊(duì)列維護(hù)每個(gè)消費(fèi)者組的消費(fèi)位置,實(shí)現(xiàn)了主題模式/發(fā)布訂閱模式。
系統(tǒng)架構(gòu) 講完了消息模型,我們理解起 RocketMQ 的技術(shù)架構(gòu)起來就容易多了。RocketMQ 技術(shù)架構(gòu)中有四大角色 NameServer、Broker、Producer、Consumer。這4大角色,已經(jīng)在基本概念中簡(jiǎn)單解釋過,對(duì)于相關(guān)詞匯,這里再重點(diǎn)解釋一下。
Broker:主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證。說白了就是消息隊(duì)列服務(wù)器嘛,生產(chǎn)者生產(chǎn)消息到 Broker,消費(fèi)者從 Broker 拉取消息并消費(fèi)。這里,我還得普及一下關(guān)于 Broker、Topic 和隊(duì)列的關(guān)系。上面我講解了 Topic 和隊(duì)列的關(guān)系——一個(gè) Topic 中存在多個(gè)隊(duì)列,那么這個(gè) Topic 和隊(duì)列存放在哪呢?一個(gè) Topic 分布在多個(gè) Broker 上,一個(gè) Broker 可以配置多個(gè) Topic,它們是多對(duì)多的關(guān)系。如果某個(gè) Topic 消息量很大,應(yīng)該給它多配置幾個(gè)隊(duì)列,并且盡量多分布在不同 Broker 上,以減輕某個(gè) Broker 的壓力。Topic 消息量都比較均勻的情況下,如果某個(gè) broker 上的隊(duì)列越多,則該 broker 壓力越大。 NameServer:不知道你們有沒有接觸過 ZooKeeper 和 Spring Cloud 中的 Eureka,它其實(shí)也是一個(gè)注冊(cè)中心,主要提供兩個(gè)功能:Broker 管理和路由信息管理。說白了就是 Broker 會(huì)將自己的信息注冊(cè)到 NameServer 中,此時(shí) NameServer 就存放了很多 Broker 的信息(Broker的路由表),消費(fèi)者和生產(chǎn)者就從 NameServer 中獲取路由表然后照著路由表的信息和對(duì)應(yīng)的 Broker 進(jìn)行通信(生產(chǎn)者和消費(fèi)者定期會(huì)向 NameServer 去查詢相關(guān)的 Broker 的信息)。 Producer:消息發(fā)布的角色,支持分布式集群方式部署。 Consumer:消息消費(fèi)的角色,支持分布式集群方式部署。支持以 push 推,pull 拉兩種模式對(duì)消息進(jìn)行消費(fèi),同時(shí)也支持集群方式和廣播方式的消費(fèi),它提供實(shí)時(shí)消息訂閱機(jī)制。 聽完了上面的解釋你可能會(huì)覺得,這玩意好簡(jiǎn)單。不就是這樣的么?
嗯?你可能會(huì)發(fā)現(xiàn)一個(gè)問題,這老家伙 NameServer 干啥用的,這不多余嗎?直接 Producer、Consumer 和 Broker 直接進(jìn)行生產(chǎn)消息,消費(fèi)消息不就好了么?但是,我們上文提到過 Broker 是需要保證高可用的,如果整個(gè)系統(tǒng)僅僅靠著一個(gè) Broker 來維持的話,那么這個(gè) Broker 的壓力會(huì)不會(huì)很大?所以我們需要使用多個(gè) Broker 來保證負(fù)載均衡。如果說,我們的消費(fèi)者和生產(chǎn)者直接和多個(gè) Broker 相連,那么當(dāng) Broker 修改的時(shí)候必定會(huì)牽連著每個(gè)生產(chǎn)者和消費(fèi)者,這樣就會(huì)產(chǎn)生耦合問題,而 NameServer 注冊(cè)中心就是用來解決這個(gè)問題的。
當(dāng)然,RocketMQ 中的技術(shù)架構(gòu)肯定不止前面那么簡(jiǎn)單,因?yàn)樯厦鎴D中的四個(gè)角色都是需要做集群的。我給出一張官網(wǎng)的架構(gòu)圖,大家嘗試?yán)斫庖幌隆?/p>
其實(shí)和我們最開始畫的那張乞丐版的架構(gòu)圖也沒什么區(qū)別,主要是一些細(xì)節(jié)上的差別,聽我細(xì)細(xì)道來。
第一、我們的 Broker 做了集群并且還進(jìn)行了主從部署,由于消息分布在各個(gè) Broker 上,一旦某個(gè) Broker 宕機(jī),則該 Broker 上的消息讀寫都會(huì)受到影響。所以 RocketMQ 提供了 master/slave 的結(jié)構(gòu),salve 定時(shí)從 master 同步數(shù)據(jù)(同步刷盤或者異步刷盤),如果 master 宕機(jī),則 slave 提供消費(fèi)服務(wù),但是不能寫入消息(后面我還會(huì)提到)。 第二、為了保證 HA,我們的 NameServer 也做了集群部署,但是請(qǐng)注意它是去中心化的。也就意味著它沒有主節(jié)點(diǎn),你可以很明顯地看出 NameServer 的所有節(jié)點(diǎn)是沒有進(jìn)行 Info Replicate 的,在 RocketMQ 中是通過單個(gè) Broker 和所有 NameServer 保持長(zhǎng)連接,并且在每隔 30 秒 Broker 會(huì)向所有 Nameserver 發(fā)送心跳,心跳包含了自身的 Topic 配置信息,這個(gè)步驟就對(duì)應(yīng)這上面的 Routing Info。 第三、在生產(chǎn)者需要向 Broker 發(fā)送消息的時(shí)候,需要先從 NameServer 獲取關(guān)于 Broker 的路由信息,然后通過輪詢的方法去向每個(gè)隊(duì)列中生產(chǎn)數(shù)據(jù)以達(dá)到負(fù)載均衡的效果。 第四、消費(fèi)者通過 NameServer 獲取所有 Broker 的路由信息后,向 Broker 發(fā)送 Pull 請(qǐng)求來獲取消息數(shù)據(jù)。Consumer 可以以兩種模式啟動(dòng)—— 廣播(Broadcast)和集群(Cluster)。廣播模式下,一條消息會(huì)發(fā)送給同一個(gè)消費(fèi)組中的所有消費(fèi)者,集群模式下消息只會(huì)發(fā)送給一個(gè)消費(fèi)者。 高級(jí)特性&常見問題 順序消費(fèi) 在上面的技術(shù)架構(gòu)介紹中,我們已經(jīng)知道了 RocketMQ 在主題上是無序的、它只有在隊(duì)列層面才是保證有序的。這又扯到兩個(gè)概念——普通順序和嚴(yán)格順序。所謂普通順序是指消費(fèi)者通過同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。普通順序消息在 Broker 重啟情況下不會(huì)保證消息順序性(短暫時(shí)間)。
所謂嚴(yán)格順序是指消費(fèi)者收到的所有消息均是有順序的。嚴(yán)格順序消息即使在異常情況下也會(huì)保證消息的順序性。但是,嚴(yán)格順序看起來雖好,實(shí)現(xiàn)它可會(huì)付出巨大的代價(jià)。如果你使用嚴(yán)格順序模式,Broker 集群中只要有一臺(tái)機(jī)器不可用,則整個(gè)集群都不可用。你還用啥?現(xiàn)在主要場(chǎng)景也就在 binlog 同步。一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式。(這個(gè)嚴(yán)格順序,感覺沒太懂,后面再查一下相關(guān)資料。。。)
那么,我們現(xiàn)在使用了普通順序模式,我們從上面學(xué)習(xí)知道了在 Producer 生產(chǎn)消息的時(shí)候會(huì)進(jìn)行輪詢(取決你的負(fù)載均衡策略)來向同一主題的不同消息隊(duì)列發(fā)送消息。那么如果此時(shí)我有幾個(gè)消息分別是同一個(gè)訂單的創(chuàng)建、支付、發(fā)貨,在輪詢的策略下這三個(gè)消息會(huì)被發(fā)送到不同隊(duì)列,因?yàn)樵诓煌年?duì)列此時(shí)就無法使用 RocketMQ 帶來的隊(duì)列有序特性來保證消息有序性了。
那么,怎么解決呢?其實(shí)很簡(jiǎn)單,我們需要處理的僅僅是將同一語義下的消息放入同一個(gè)隊(duì)列(比如這里是同一個(gè)訂單),那我們就可以使用 Hash 取模法來保證同一個(gè)訂單在同一個(gè)隊(duì)列中就行了。
重復(fù)消費(fèi) 就兩個(gè)字——冪等。在編程中一個(gè)冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。比如說,這個(gè)時(shí)候我們有一個(gè)訂單的處理積分的系統(tǒng),每當(dāng)來一個(gè)消息的時(shí)候它就負(fù)責(zé)為創(chuàng)建這個(gè)訂單的用戶的積分加上相應(yīng)的數(shù)值??墒怯幸淮?,消息隊(duì)列發(fā)送給訂單系統(tǒng) FrancisQ 的訂單信息,其要求是給 FrancisQ 的積分加上 500。但是積分系統(tǒng)在收到 FrancisQ 的訂單信息處理完成之后返回給消息隊(duì)列處理成功的信息的時(shí)候出現(xiàn)了網(wǎng)絡(luò)波動(dòng)(當(dāng)然還有很多種情況,比如 Broker 意外重啟等等),這條回應(yīng)沒有發(fā)送成功。
那么,消息隊(duì)列沒收到積分系統(tǒng)的回應(yīng)會(huì)不會(huì)嘗試重發(fā)這個(gè)消息?問題就來了,我再發(fā)這個(gè)消息,萬一它又給 FrancisQ 的賬戶加上 500 積分怎么辦呢?所以我們需要給我們的消費(fèi)者實(shí)現(xiàn)冪等,也就是對(duì)同一個(gè)消息的處理結(jié)果,執(zhí)行多少次都不變。
那么如何給業(yè)務(wù)實(shí)現(xiàn)冪等呢?這個(gè)還是需要結(jié)合具體的業(yè)務(wù)的。你可以使用寫入 Redis 來保證,因?yàn)?Redis 的 key 和 value 就是天然支持冪等的。當(dāng)然還有使用數(shù)據(jù)庫(kù)插入法,基于數(shù)據(jù)庫(kù)的唯一鍵來保證重復(fù)數(shù)據(jù)不會(huì)被插入多條。不過最主要的還是需要根據(jù)特定場(chǎng)景使用特定的解決方案,你要知道你的消息消費(fèi)是否是完全不可重復(fù)消費(fèi)還是可以忍受重復(fù)消費(fèi)的,然后再選擇強(qiáng)校驗(yàn)和弱校驗(yàn)的方式。畢竟在 CS 領(lǐng)域還是很少有技術(shù)銀彈的說法。
簡(jiǎn)單了來說,冪等的校驗(yàn),還是需要業(yè)務(wù)方來支持,因?yàn)槟憬鉀Q不了網(wǎng)絡(luò)抖動(dòng)問題哈~~
分布式事務(wù) 如何解釋分布式事務(wù)呢?事務(wù)大家都知道吧?要么都執(zhí)行要么都不執(zhí)行。在同一個(gè)系統(tǒng)中我們可以輕松地實(shí)現(xiàn)事務(wù),但是在分布式架構(gòu)中,我們有很多服務(wù)是部署在不同系統(tǒng)之間的,而不同服務(wù)之間又需要進(jìn)行調(diào)用。比如此時(shí)我下訂單然后增加積分,如果保證不了分布式事務(wù)的話,就會(huì)出現(xiàn)A系統(tǒng)下了訂單,但是B系統(tǒng)增加積分失敗或者A系統(tǒng)沒有下訂單,B系統(tǒng)卻增加了積分。前者對(duì)用戶不友好,后者對(duì)運(yùn)營(yíng)商不利,這是我們都不愿意見到的。那么,如何去解決這個(gè)問題呢?
如今比較常見的分布式事務(wù)實(shí)現(xiàn)有 2PC、TCC 和事務(wù)消息(half 半消息機(jī)制)。每一種實(shí)現(xiàn)都有其特定的使用場(chǎng)景,但是也有各自的問題,都不是完美的解決方案。在 RocketMQ 中使用的是事務(wù)消息加上事務(wù)反查機(jī)制來解決分布式事務(wù)問題的。
下面是上圖的執(zhí)行流程:
A服務(wù)先發(fā)送個(gè)Half Message給Brock端,消息中攜帶 B服務(wù) 即將要+100元的信息。 當(dāng)A服務(wù)知道Half Message發(fā)送成功后,那么開始第3步執(zhí)行本地事務(wù)。 執(zhí)行本地事務(wù)(會(huì)有三種情況1、執(zhí)行成功。2、執(zhí)行失敗。3、網(wǎng)絡(luò)等原因?qū)е聸]有響應(yīng)) 如果本地事務(wù)成功,那么Product像Brock服務(wù)器發(fā)送Commit,這樣B服務(wù)就可以消費(fèi)該message。 如果本地事務(wù)失敗,那么Product像Brock服務(wù)器發(fā)送Rollback,那么就會(huì)直接刪除上面這條半消息。 如果因?yàn)榫W(wǎng)絡(luò)等原因遲遲沒有返回失敗還是成功,那么會(huì)執(zhí)行RocketMQ的回調(diào)接口,來進(jìn)行事務(wù)的回查。 消息堆積 消息中間件的主要功能是異步解耦,還有個(gè)重要功能是擋住前端的數(shù)據(jù)洪峰,保證后端系統(tǒng)的穩(wěn)定性,這就要求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:
消息堆積在內(nèi)存Buffer,一旦超過內(nèi)存Buffer,可以根據(jù)一定的丟棄策略來丟棄消息,如CORBA Notification規(guī)范中描述。適合能容忍丟棄消息的業(yè)務(wù),這種情況消息的堆積能力主要在于內(nèi)存Buffer大小,而且消息堆積后,性能下降不會(huì)太大,因?yàn)閮?nèi)存中數(shù)據(jù)多少對(duì)于對(duì)外提供的訪問能力影響有限。 消息堆積到持久化存儲(chǔ)系統(tǒng)中,例如DB,KV存儲(chǔ),文件記錄形式。當(dāng)消息不能在內(nèi)存Cache命中時(shí),要不可避免的訪問磁盤,會(huì)產(chǎn)生大量讀IO,讀IO的吞吐量直接決定了消息堆積后的訪問能力。 評(píng)估消息堆積能力主要有以下四點(diǎn):
消息能堆積多少條,多少字節(jié)?即消息的堆積容量。 消息堆積后,發(fā)消息的吞吐量大小,是否會(huì)受堆積影響? 消息堆積后,正常消費(fèi)的Consumer是否會(huì)受影響? 消息堆積后,訪問堆積在磁盤的消息時(shí),吞吐量有多大? 簡(jiǎn)單來說,RocketMQ支持大量消息堆積,消息會(huì)存在內(nèi)存,超出內(nèi)存的消息會(huì)持久化到磁盤中。
定時(shí)消息 定時(shí)消息是指消息發(fā)到Broker后,不能立刻被Consumer消費(fèi),要到特定的時(shí)間點(diǎn)或者等待特定的時(shí)間后才能被消費(fèi)。如果要支持任意的時(shí)間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的產(chǎn)生巨大性能開銷。RocketMQ支持定時(shí)消息,但是不支持任意時(shí)間精度,支持特定的level,例如定時(shí)5s,10s,1m等。
簡(jiǎn)單來說,RocketMQ支持定時(shí)消息,但是只支持固定時(shí)間,不支持任意精度時(shí)間。
回溯消費(fèi) 同步刷盤和異步刷盤 上面我講了那么多的 RocketMQ 的架構(gòu)和設(shè)計(jì)原理,你有沒有好奇,在 Topic 中的隊(duì)列是以什么樣的形式存在的?隊(duì)列中的消息又是如何進(jìn)行存儲(chǔ)持久化的呢?我在上文中提到的同步刷盤和異步刷盤又是什么呢?它們會(huì)給持久化帶來什么樣的影響呢?下面我將給你們一一解釋。
如上圖所示,在同步刷盤中需要等待一個(gè)刷盤成功的 ACK,同步刷盤對(duì) MQ 消息可靠性來說是一種不錯(cuò)的保障,但是性能上會(huì)有較大影響,一般地適用于金融等特定業(yè)務(wù)場(chǎng)景。而異步刷盤往往是開啟一個(gè)線程去異步地執(zhí)行刷盤操作。消息刷盤采用后臺(tái)異步線程提交的方式進(jìn)行,降低了讀寫延遲,提高了 MQ 的性能和吞吐量,一般適用于如發(fā)驗(yàn)證碼等對(duì)于消息保證要求不太高的業(yè)務(wù)場(chǎng)景。一般地,異步刷盤只有在 Broker 意外宕機(jī)的時(shí)候會(huì)丟失部分?jǐn)?shù)據(jù),你可以設(shè)置 Broker 的參數(shù) FlushDiskType 來調(diào)整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。
簡(jiǎn)單來說,同步刷盤是刷盤后請(qǐng)求再返回,異步刷盤是直接返回請(qǐng)求,再去慢慢刷盤,可能會(huì)導(dǎo)致數(shù)據(jù)丟失。
同步復(fù)制和異步復(fù)制 上面的同步刷盤和異步刷盤是在單個(gè)結(jié)點(diǎn)層面的,而同步復(fù)制和異步復(fù)制主要是指的 Borker 主從模式下,主節(jié)點(diǎn)返回消息給客戶端的時(shí)候是否需要同步從節(jié)點(diǎn)。
同步復(fù)制:也叫 “同步雙寫”,也就是說,只有消息同步雙寫到主從結(jié)點(diǎn)上時(shí)才返回寫入成功。 異步復(fù)制:消息寫入主節(jié)點(diǎn)之后就直接返回寫入成功。異步復(fù)制會(huì)不會(huì)也像異步刷盤那樣影響消息的可靠性呢?答案是不會(huì)的,因?yàn)閮烧呔褪遣煌母拍?,?duì)于消息可靠性是通過不同的刷盤策略保證的,而像異步同步復(fù)制策略僅僅是影響到了可用性。為什么呢?其主要原因是 RocketMQ 是不支持自動(dòng)主從切換的,當(dāng)主節(jié)點(diǎn)掛掉之后,生產(chǎn)者就不能再給這個(gè)主節(jié)點(diǎn)生產(chǎn)消息了。比如這個(gè)時(shí)候采用異步復(fù)制的方式,在主節(jié)點(diǎn)還未發(fā)送完需要同步的消息的時(shí)候主節(jié)點(diǎn)掛掉了,這個(gè)時(shí)候從節(jié)點(diǎn)就少了一部分消息。但是此時(shí)生產(chǎn)者無法再給主節(jié)點(diǎn)生產(chǎn)消息了,消費(fèi)者可以自動(dòng)切換到從節(jié)點(diǎn)進(jìn)行消費(fèi)(僅僅是消費(fèi)),所以在主節(jié)點(diǎn)掛掉的時(shí)間只會(huì)產(chǎn)生主從結(jié)點(diǎn)短暫的消息不一致的情況,降低了可用性,而當(dāng)主節(jié)點(diǎn)重啟之后,從節(jié)點(diǎn)那部分未來得及復(fù)制的消息還會(huì)繼續(xù)復(fù)制。 擴(kuò)展知識(shí)1:在單主從架構(gòu)中,如果一個(gè)主節(jié)點(diǎn)掛掉了,那么也就意味著整個(gè)系統(tǒng)不能再生產(chǎn)了。那么這個(gè)可用性的問題能否解決呢?一個(gè)主從不行那就多個(gè)主從的唄,別忘了在我們最初的架構(gòu)圖中,每個(gè) Topic 是分布在不同 Broker 中的。但是這種復(fù)制方式同樣也會(huì)帶來一個(gè)問題,那就是無法保證嚴(yán)格順序。在上文中我們提到了如何保證的消息順序性是通過將一個(gè)語義的消息發(fā)送在同一個(gè)隊(duì)列中,使用 Topic 下的隊(duì)列來保證順序性的。如果此時(shí)我們主節(jié)點(diǎn) A 負(fù)責(zé)的是訂單 A 的一系列語義消息,然后它掛了,這樣其他節(jié)點(diǎn)是無法代替主節(jié)點(diǎn)A的,如果我們?nèi)我夤?jié)點(diǎn)都可以存入任何消息,那就沒有順序性可言了。(這點(diǎn)我并不認(rèn)同,我理解主從的對(duì)列信息應(yīng)該是一樣的,我從主節(jié)點(diǎn)讀到哪里,如果主節(jié)點(diǎn)掛掉,應(yīng)該是可以到從結(jié)點(diǎn)去讀取的,如果不能這樣,搞個(gè)主從就完全沒有意義了。因?yàn)橹鲝牡男畔⑹且粯拥模瑢?duì)隊(duì)列的順序是內(nèi)有影響的,我不可能把不同的信息,搞兩個(gè)隊(duì)列,分別放到主從機(jī)器。)
擴(kuò)展知識(shí)2:在 RocketMQ 中采用了 Dledger 解決主從數(shù)據(jù)同步問題。他要求在寫入消息的時(shí)候,要求至少消息復(fù)制到半數(shù)以上的節(jié)點(diǎn)之后,才給客?端返回寫?成功,并且它是?持通過選舉來動(dòng)態(tài)切換主節(jié)點(diǎn)的。這里我就不展開說明了,讀者可以自己去了解。也不是說 Dledger 是個(gè)完美的方案,至少在 Dledger 選舉過程中是無法提供服務(wù)的,而且他必須要使用三個(gè)節(jié)點(diǎn)或以上,如果多數(shù)節(jié)點(diǎn)同時(shí)掛掉他也是無法保證可用性的,而且要求消息復(fù)制板書以上節(jié)點(diǎn)的效率和直接異步復(fù)制還是有一定的差距的。
這個(gè)機(jī)制,感覺就像大眾化的版本,基本思路都一樣,為了保證數(shù)據(jù)可用性,我還是推薦同步復(fù)制,當(dāng)大多數(shù)節(jié)點(diǎn)復(fù)制成功,就認(rèn)為復(fù)制完畢,和ETCD的Raft協(xié)議的日志同步原理一樣。
容錯(cuò)機(jī)制 在實(shí)際使用RocketMQ的時(shí)候我們并不能保證每次發(fā)送的消息都剛好能被消費(fèi)者一次性正常消費(fèi)成功,可能會(huì)存在需要多次消費(fèi)才能成功或者一直消費(fèi)失敗的情況,那作為發(fā)送者該做如何處理呢?
RocketMQ提供了ack機(jī)制,以保證消息能夠被正常消費(fèi)。發(fā)送者為了保證消息肯定消費(fèi)成功,只有使用方明確表示消費(fèi)成功,RocketMQ才會(huì)認(rèn)為消息消費(fèi)成功。中途斷電,拋出異常等都不會(huì)認(rèn)為成功——即都會(huì)重新投遞。當(dāng)然,如果消費(fèi)者不告知發(fā)送者我這邊消費(fèi)信息異常,那么發(fā)送者是不會(huì)知道的,所以消費(fèi)者在設(shè)置監(jiān)聽的時(shí)候需要給個(gè)回調(diào)。
為了保證消息是肯定被至少消費(fèi)成功一次,RocketMQ會(huì)把這批消息重發(fā)回Broker(topic不是原topic而是這個(gè)消費(fèi)租的RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞到這個(gè)ConsumerGroup。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來做人工干預(yù)。
簡(jiǎn)單來說,通過ACK保證消息一定能正常消費(fèi),對(duì)于異常消息,會(huì)重新放回Broker,但是這樣就會(huì)打亂消息的順序,所以容錯(cuò)機(jī)制和消息嚴(yán)格順序,魚和熊掌不可兼得。
特性分析 這里才是內(nèi)容的重點(diǎn),不僅需要知道RocketMQ的特性,還需要知道支持這些特性的原因:
消息路由(不支持):RocketMQ在處理消息之前是不允許消費(fèi)者過濾一個(gè)主題中的消息。一個(gè)訂閱的消費(fèi)者在沒有異常情況下會(huì)接受一個(gè)隊(duì)列中的所有消息; 消息有序(部分支持):需要將同一類的消息hash到同一個(gè)隊(duì)列Queue中,才能支持消息的順序,如果同一類消息散落到不同的Queue中,就不能支持消息的順序,如果設(shè)定消息一定要正常消費(fèi),那么就不能保證消息順序。 消息時(shí)序(可以支持):可以發(fā)送定時(shí)消息,但是只能制定系統(tǒng)定義好的時(shí)間,不支持自定義時(shí)間; 容錯(cuò)處理(支持):通過ACK機(jī)制,保證消息一定能正常消費(fèi),這個(gè)和RabbitMQ很像; 伸縮(支持):整體架構(gòu)其實(shí)和kafaka很像,可以擴(kuò)容broker和內(nèi)部隊(duì)列數(shù),或者增加消費(fèi)組中的消費(fèi)組數(shù)量,提高消費(fèi)能力。 持久化(支持):消息可以持久化到磁盤中,所以支持消息的回溯,和kafaka很像。 消息回溯(支持):因?yàn)橄⒅С殖志没?,就支持回溯,可以理解是附帶的功能?/section> 高吞吐(非常好):借鑒kafaka的設(shè)計(jì),不會(huì)出現(xiàn)rabbitMQ的單Master抗壓力問題,可以從多個(gè)borker寫入和消費(fèi)消息。 RabbitMQ 我們也不能天天去背八股,還需要實(shí)踐,RabbitMQ的實(shí)操實(shí)例,直接看這篇《入門RabbitMQ,這一篇絕對(duì)夠!》