日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

消息隊(duì)列技術(shù)選型:5種主流消息隊(duì)列,哪個(gè)最香?

 昵稱10087950 2022-06-16 發(fā)布于江蘇

RabbitMQ、Kafka、RocketMQ和ActiveMQ,肝了我一個(gè)月,原理是什么,如何選型,本文會(huì)告訴你答案。

消息隊(duì)列中間件重要嗎?面試必問問題之一,你說重不重要。我有時(shí)會(huì)問同事,為啥你用RabbitMQ,不用Kafka,或者RocketMQ呢,他給我的回答“因?yàn)楣居玫木褪沁@個(gè),大家都這么用”,如果你去面試,直接就被Pass,今天這篇文章,告訴你如何回答。

這篇文章純理論,主要整理網(wǎng)絡(luò)資料,肝了我整整一個(gè)月!文章依然延續(xù)上幾篇的風(fēng)格,很長(zhǎng),長(zhǎng)到我只整理排版,手都整麻了。全文2.5萬字,建議先收藏,后續(xù)面試、或者技術(shù)選型,再拿出來喵喵,不BB,上思維導(dǎo)圖!

圖片

消息隊(duì)列

消息隊(duì)列模式

消息隊(duì)列目前主要2種模式,分別為“點(diǎn)對(duì)點(diǎn)模式”和“發(fā)布/訂閱模式”。

點(diǎn)對(duì)點(diǎn)模式

一個(gè)具體的消息只能由一個(gè)消費(fèi)者消費(fèi)。多個(gè)生產(chǎn)者可以向同一個(gè)消息隊(duì)列發(fā)送消息;但是,一個(gè)消息在被一個(gè)消息者處理的時(shí)候,這個(gè)消息在隊(duì)列上會(huì)被鎖住或者被移除并且其他消費(fèi)者無法處理該消息。需要額外注意的是,如果消費(fèi)者處理一個(gè)消息失敗了,消息系統(tǒng)一般會(huì)把這個(gè)消息放回隊(duì)列,這樣其他消費(fèi)者可以繼續(xù)處理。

圖片

發(fā)布/訂閱模式

單個(gè)消息可以被多個(gè)訂閱者并發(fā)的獲取和處理。一般來說,訂閱有兩種類型:

  • 臨時(shí)(ephemeral)訂閱,這種訂閱只有在消費(fèi)者啟動(dòng)并且運(yùn)行的時(shí)候才存在。一旦消費(fèi)者退出,相應(yīng)的訂閱以及尚未處理的消息就會(huì)丟失。
  • 持久(durable)訂閱,這種訂閱會(huì)一直存在,除非主動(dòng)去刪除。消費(fèi)者退出后,消息系統(tǒng)會(huì)繼續(xù)維護(hù)該訂閱,并且后續(xù)消息可以被繼續(xù)處理。
圖片

衡量標(biāo)準(zhǔn)

對(duì)消息隊(duì)列進(jìn)行技術(shù)選型時(shí),需要通過以下指標(biāo)衡量你所選擇的消息隊(duì)列,是否可以滿足你的需求:

  • 消息順序:發(fā)送到隊(duì)列的消息,消費(fèi)時(shí)是否可以保證消費(fèi)的順序,比如A先下單,B后下單,應(yīng)該是A先去扣庫(kù)存,B再去扣,順序不能反。
  • 消息路由:根據(jù)路由規(guī)則,只訂閱匹配路由規(guī)則的消息,比如有A/B兩者規(guī)則的消息,消費(fèi)者可以只訂閱A消息,B消息不會(huì)消費(fèi)。
  • 消息可靠性:是否會(huì)存在丟消息的情況,比如有A/B兩個(gè)消息,最后只有B消息能消費(fèi),A消息丟失。
  • 消息時(shí)序:主要包括“消息存活時(shí)間”和“延遲/預(yù)定的消息”,“消息存活時(shí)間”表示生產(chǎn)者可以對(duì)消息設(shè)置TTL,如果超過該TTL,消息會(huì)自動(dòng)消失;“延遲/預(yù)定的消息”指的是可以延遲或者預(yù)訂消費(fèi)消息,比如延時(shí)5分鐘,那么消息會(huì)5分鐘后才能讓消費(fèi)者消費(fèi),時(shí)間未到的話,是不能消費(fèi)的。
  • 消息留存:消息消費(fèi)成功后,是否還會(huì)繼續(xù)保留在消息隊(duì)列。
  • 容錯(cuò)性:當(dāng)一條消息消費(fèi)失敗后,是否有一些機(jī)制,保證這條消息是一種能成功,比如異步第三方退款消息,需要保證這條消息消費(fèi)掉,才能確定給用戶退款成功,所以必須保證這條消息消費(fèi)成功的準(zhǔn)確性。
  • 伸縮:當(dāng)消息隊(duì)列性能有問題,比如消費(fèi)太慢,是否可以快速支持庫(kù)容;當(dāng)消費(fèi)隊(duì)列過多,浪費(fèi)系統(tǒng)資源,是否可以支持縮容。
  • 吞吐量:支持的最高并發(fā)數(shù)。

消息隊(duì)列比較

下圖是從網(wǎng)上摘抄過來的,可以看到主流MQ的對(duì)比:

圖片

下面簡(jiǎn)單介紹常用的消息隊(duì)列:

  • Kafka:Apache Kafka它最初由LinkedIn公司基于獨(dú)特的設(shè)計(jì)實(shí)現(xiàn)為一個(gè)分布式的提交日志系統(tǒng)( a distributed commit log),之后成為Apache項(xiàng)目的一部分。號(hào)稱大數(shù)據(jù)的殺手锏,談到大數(shù)據(jù)領(lǐng)域內(nèi)的消息傳輸,則繞不開Kafka,這款為大數(shù)據(jù)而生的消息中間件,以其百萬級(jí)TPS的吞吐量名聲大噪,迅速成為大數(shù)據(jù)領(lǐng)域的寵兒,在數(shù)據(jù)采集、傳輸、存儲(chǔ)的過程中發(fā)揮著舉足輕重的作用。
  • RabbitMQ:RabbitMQ 2007年發(fā)布,是使用Erlang語言開發(fā)的開源消息隊(duì)列系統(tǒng),基于AMQP協(xié)議來實(shí)現(xiàn)。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。AMQP協(xié)議更多用在企業(yè)系統(tǒng)內(nèi),對(duì)數(shù)據(jù)一致性、穩(wěn)定性和可靠性要求很高的場(chǎng)景,對(duì)性能和吞吐量的要求還在其次。
  • RocketMQ:是阿里開源的消息中間件,它是純Java開發(fā),具有高吞吐量、高可用性、適合大規(guī)模分布式系統(tǒng)應(yīng)用的特點(diǎn)。RocketMQ思路起源于Kafka,但并不是Kafka的一個(gè)Copy,它對(duì)消息的可靠傳輸及事務(wù)性做了優(yōu)化,目前在阿里集團(tuán)被廣泛應(yīng)用于交易、充值、流計(jì)算、消息推送、日志流式處理、binglog分發(fā)等場(chǎng)景。
  • ActiveMQ:是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來越少,較少在大規(guī)模吞吐的場(chǎng)景中使用,所以該消息隊(duì)列也不是我們文章中重點(diǎn)討論的內(nèi)容。

優(yōu)缺點(diǎn)

Kafka

優(yōu)點(diǎn):

  • 高吞吐、低延遲:kakfa 最大的特點(diǎn)就是收發(fā)消息非常快,kafka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒;
  • 高伸縮性:每個(gè)主題(topic) 包含多個(gè)分區(qū)(partition),主題中的分區(qū)可以分布在不同的主機(jī)(broker)中;
  • 持久性、可靠性:Kafka 能夠允許數(shù)據(jù)的持久化存儲(chǔ),消息被持久化到磁盤,并支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失,Kafka 底層的數(shù)據(jù)存儲(chǔ)是基于 Zookeeper 存儲(chǔ)的,Zookeeper 我們知道它的數(shù)據(jù)能夠持久存儲(chǔ);
  • 容錯(cuò)性:非常高,kafka是分布式的,一個(gè)數(shù)據(jù)多個(gè)副本,某個(gè)節(jié)點(diǎn)宕機(jī),Kafka 集群能夠正常工作;
  • 消息有序:消費(fèi)者采用Pull方式獲取消息,消息有序,通過控制能夠保證所有消息被消費(fèi)且僅被消費(fèi)一次;
  • 有優(yōu)秀的第三方Kafka Web管理界面Kafka-Manager,在日志領(lǐng)域比較成熟,被多家公司和多個(gè)開源項(xiàng)目使用;
  • 功能支持:功能較為簡(jiǎn)單,主要支持簡(jiǎn)單的MQ功能,在大數(shù)據(jù)領(lǐng)域的實(shí)時(shí)計(jì)算以及日志采集被大規(guī)模使用。

缺點(diǎn):

  • Kafka單機(jī)超過64個(gè)隊(duì)列/分區(qū),Load會(huì)發(fā)生明顯的飆高現(xiàn)象,隊(duì)列越多,load越高,發(fā)送消息響應(yīng)時(shí)間變長(zhǎng);
  • 使用短輪詢方式,實(shí)時(shí)性取決于輪詢間隔時(shí)間;
  • 消費(fèi)失敗不支持重試;
  • 支持消息順序,但是一臺(tái)代理宕機(jī)后,就會(huì)產(chǎn)生消息亂序;
  • 社區(qū)更新較慢。

總結(jié):

  • Kafka主要特點(diǎn)是基于Pull的模式來處理消息消費(fèi),追求高吞吐量,一開始的目的就是用于日志收集和傳輸,適合產(chǎn)生大量數(shù)據(jù)的互聯(lián)網(wǎng)服務(wù)的數(shù)據(jù)收集業(yè)務(wù)。
  • 大型公司建議可以選用,如果有日志采集功能,肯定是首選kafka。

RabbitMQ

優(yōu)點(diǎn):

  • 異步消息傳遞:支持多種消息協(xié)議,消息隊(duì)列,傳送確認(rèn),靈活的路由到隊(duì)列,多種交換類型;
  • 支持幾乎所有最受歡迎的編程語言:Java,C,C ++,C#,Ruby,Perl,Python,PHP等等;
  • 可以部署為高可用性和吞吐量的集群;,跨多個(gè)可用區(qū)域和區(qū)域進(jìn)行聯(lián)合;
  • 可插入的身份驗(yàn)證,授權(quán),支持TLS和LDAP;
  • 提供了許多插件,來從多方面進(jìn)行擴(kuò)展,也可以編寫自己的插件;
  • 提供了一個(gè)易用的用戶界面,使得用戶可以監(jiān)控和管理消息Broker,社區(qū)活躍度高。

缺點(diǎn):

  • erlang開發(fā),很難去看懂源碼,基本職能依賴于開源社區(qū)的快速維護(hù)和修復(fù)bug,不利于做二次開發(fā)和維護(hù);
  • RabbitMQ確實(shí)吞吐量會(huì)低一些,這是因?yàn)樗龅膶?shí)現(xiàn)機(jī)制比較重;
  • 需要學(xué)習(xí)比較復(fù)雜的接口和協(xié)議,學(xué)習(xí)和維護(hù)成本較高。

總結(jié):

  • 結(jié)合erlang語言本身的并發(fā)優(yōu)勢(shì),性能較好,社區(qū)活躍度也比較高,但是不利于做二次開發(fā)和維護(hù)。不過RabbitMQ的社區(qū)十分活躍,可以解決開發(fā)過程中遇到的bug。
  • 如果你的數(shù)據(jù)量沒有那么大,小公司優(yōu)先選擇功能比較完備的RabbitMQ。

RocketMQ

優(yōu)點(diǎn):

  • 支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型;
  • 在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞;
  • 支持拉(pull)和推(push)兩種消息模式;
  • 單一隊(duì)列百萬消息的堆積能力;
  • 支持多種消息協(xié)議,如 JMS、MQTT 等;
  • 可靠的FIFO和嚴(yán)格的有序消息傳遞在同一隊(duì)列中;
  • 靈活的分布式橫向擴(kuò)展部署架構(gòu),滿足至少一次消息傳遞語義;
  • 提供 docker 鏡像用于隔離測(cè)試和云集群部署;
  • 提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard。

缺點(diǎn):

  • 支持的客戶端語言不多,目前是java及c++,其中c++不成熟
  • 社區(qū)活躍度一般
  • 沒有在 mq 核心中去實(shí)現(xiàn)JMS等接口,有些系統(tǒng)要遷移需要修改大量代碼

總結(jié):

  • 天生為金融互聯(lián)網(wǎng)領(lǐng)域而生,對(duì)于可靠性要求很高的場(chǎng)景,尤其是電商里面的訂單扣款,以及業(yè)務(wù)削峰,在大量交易涌入時(shí),后端可能無法及時(shí)處理的情況。
  • RoketMQ在穩(wěn)定性上可能更值得信賴,這些業(yè)務(wù)場(chǎng)景在阿里雙11已經(jīng)經(jīng)歷了多次考驗(yàn),如果你的業(yè)務(wù)有上述并發(fā)場(chǎng)景,建議可以選擇RocketMQ。

ActiveMQ

優(yōu)點(diǎn)

  • 支持來自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各種跨語言客戶端和協(xié)議;
  • 完全支持JMS客戶端和Message Broker中的企業(yè)集成模式;
  • 支持許多高級(jí)功能,如消息組,虛擬目標(biāo),通配符和復(fù)合目標(biāo);
  • 完全支持JMS 1.1和J2EE 1.4,支持瞬態(tài),持久,事務(wù)和XA消息;
  • Spring支持,以便ActiveMQ可以輕松嵌入到Spring應(yīng)用程序中,并使用Spring的XML配置機(jī)制進(jìn)行配置;
  • 專為高性能集群,客戶端 - 服務(wù)器,基于對(duì)等的通信而設(shè)計(jì);
  • CXF和Axis支持,以便ActiveMQ可以輕松地放入這些Web服務(wù)堆棧中以提供可靠的消息傳遞;
  • 可以用作內(nèi)存JMS提供程序,非常適合單元測(cè)試JMS;
  • 支持可插拔傳輸協(xié)議,例如in-VM,TCP,SSL,NIO,UDP,多播,JGroups和JXTA傳輸;
  • 使用JDBC和高性能日志支持非常快速的持久性。

缺點(diǎn):

  • 官方社區(qū)現(xiàn)在對(duì)ActiveMQ 5.x維護(hù)越來越少,較少在大規(guī)模吞吐的場(chǎng)景中使用。

Kafka

圖片

Kafka 是由 Linkedin 公司開發(fā)的,它是一個(gè)分布式的,支持多分區(qū)、多副本,基于 Zookeeper 的分布式消息流平臺(tái),它同時(shí)也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)。

基本概念

  • 消息:Kafka 中的數(shù)據(jù)單元被稱為消息,也被稱為記錄,可以把它看作數(shù)據(jù)庫(kù)表中某一行的記錄。
  • 批次:為了提高效率, 消息會(huì)分批次寫入 Kafka,批次就代指的是一組消息。
  • 主題:消息的種類稱為 主題(Topic),可以說一個(gè)主題代表了一類消息,相當(dāng)于是對(duì)消息進(jìn)行分類。主題就像是數(shù)據(jù)庫(kù)中的表。
  • 分區(qū):主題可以被分為若干個(gè)分區(qū)(partition),同一個(gè)主題中的分區(qū)可以不在一個(gè)機(jī)器上,有可能會(huì)部署在多個(gè)機(jī)器上,由此來實(shí)現(xiàn) kafka 的伸縮性,單一主題中的分區(qū)有序,但是無法保證主題中所有的分區(qū)有序。
圖片
  • 生產(chǎn)者:向主題發(fā)布消息的客戶端應(yīng)用程序稱為生產(chǎn)者(Producer),生產(chǎn)者用于持續(xù)不斷的向某個(gè)主題發(fā)送消息。
  • 消費(fèi)者:訂閱主題消息的客戶端程序稱為消費(fèi)者(Consumer),消費(fèi)者用于處理生產(chǎn)者產(chǎn)生的消息。
  • 消費(fèi)者群組:生產(chǎn)者與消費(fèi)者的關(guān)系就如同餐廳中的廚師和顧客之間的關(guān)系一樣,一個(gè)廚師對(duì)應(yīng)多個(gè)顧客,也就是一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,消費(fèi)者群組(Consumer Group)指的就是由一個(gè)或多個(gè)消費(fèi)者組成的群體。
圖片
  • 偏移量:偏移量(Consumer Offset)是一種元數(shù)據(jù),它是一個(gè)不斷遞增的整數(shù)值,用來記錄消費(fèi)者發(fā)生重平衡時(shí)的位置,以便用來恢復(fù)數(shù)據(jù)。
  • broker: 一個(gè)獨(dú)立的 Kafka 服務(wù)器就被稱為 broker,broker 接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。
  • broker 集群:broker 是集群 的組成部分,broker 集群由一個(gè)或多個(gè) broker 組成,每個(gè)集群都有一個(gè) broker 同時(shí)充當(dāng)了集群控制器的角色(自動(dòng)從集群的活躍成員中選舉出來)。
  • 副本:Kafka 中消息的備份又叫做 副本(Replica),副本的數(shù)量是可以配置的,Kafka 定義了兩類副本:領(lǐng)導(dǎo)者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對(duì)外提供服務(wù),后者只是被動(dòng)跟隨。
  • 重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。

系統(tǒng)架構(gòu)

一個(gè)典型的 Kafka 集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費(fèi)消息。

圖片

生產(chǎn)者

數(shù)據(jù)執(zhí)行流程

在 Kafka 中,我們把產(chǎn)生消息的那一方稱為生產(chǎn)者,比如我們經(jīng)?;厝ヌ詫氋?gòu)物,你打開淘寶的那一刻,你的登陸信息,登陸次數(shù)都會(huì)作為消息傳輸?shù)?Kafka 后臺(tái),當(dāng)你瀏覽購(gòu)物的時(shí)候,你的瀏覽信息,你的搜索指數(shù),你的購(gòu)物愛好都會(huì)作為一個(gè)個(gè)消息傳遞給 Kafka 后臺(tái),然后淘寶會(huì)根據(jù)你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那么這些生產(chǎn)者產(chǎn)生的消息是怎么傳到 Kafka 應(yīng)用程序的呢?發(fā)送過程是怎么樣的呢?

盡管消息的產(chǎn)生非常簡(jiǎn)單,但是消息的發(fā)送過程還是比較復(fù)雜的,如圖:

圖片

我們從創(chuàng)建一個(gè)ProducerRecord 對(duì)象開始,ProducerRecord 是 Kafka 中的一個(gè)核心類,它代表了一組 Kafka 需要發(fā)送的 key/value 鍵值對(duì),它由記錄要發(fā)送到的主題名稱(Topic Name),可選的分區(qū)號(hào)(Partition Number)以及可選的鍵值對(duì)構(gòu)成。

在發(fā)送 ProducerRecord 時(shí),我們需要將鍵值對(duì)對(duì)象由序列化器轉(zhuǎn)換為字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡(luò)上傳輸。然后消息到達(dá)了分區(qū)器。如果發(fā)送過程中指定了有效的分區(qū)號(hào),那么在發(fā)送記錄時(shí)將使用該分區(qū)。如果發(fā)送過程中未指定分區(qū),則將使用key 的 hash 函數(shù)映射指定一個(gè)分區(qū)。如果發(fā)送的過程中既沒有分區(qū)號(hào)也沒有,則將以循環(huán)的方式分配一個(gè)分區(qū)。選好分區(qū)后,生產(chǎn)者就知道向哪個(gè)主題和分區(qū)發(fā)送數(shù)據(jù)了。ProducerRecord 還有關(guān)聯(lián)的時(shí)間戳,如果用戶沒有提供時(shí)間戳,那么生產(chǎn)者將會(huì)在記錄中使用當(dāng)前的時(shí)間作為時(shí)間戳。Kafka 最終使用的時(shí)間戳取決于 topic 主題配置的時(shí)間戳類型。然后,這條消息被存放在一個(gè)記錄批次里,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。由一個(gè)獨(dú)立的線程負(fù)責(zé)把它們發(fā)到 Kafka Broker 上。

Kafka Broker 在收到消息時(shí)會(huì)返回一個(gè)響應(yīng),如果寫入成功,會(huì)返回一個(gè) RecordMetaData 對(duì)象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量,上面兩種的時(shí)間戳類型也會(huì)返回給用戶。如果寫入失敗,會(huì)返回一個(gè)錯(cuò)誤。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗的話,就返回錯(cuò)誤消息。

上面寫的有點(diǎn)多,總結(jié)一下流程:創(chuàng)建對(duì)象(主題、分區(qū)、key/value)-> 序列化數(shù)據(jù) -> 到達(dá)分區(qū)(可自己指定,也可以通過key hash)-> 放入批次(相同主題和分區(qū)) ->  獨(dú)立線程發(fā)送 -> 返回主題/分區(qū)/分區(qū)偏移量/時(shí)間戳。

分區(qū)策略

Kafka 對(duì)于數(shù)據(jù)的讀寫是以分區(qū)為粒度的,分區(qū)可以分布在多個(gè)主機(jī)(Broker)中,這樣每個(gè)節(jié)點(diǎn)能夠?qū)崿F(xiàn)獨(dú)立的數(shù)據(jù)寫入和讀取,并且能夠通過增加新的節(jié)點(diǎn)來增加 Kafka 集群的吞吐量,通過分區(qū)部署在多個(gè) Broker 來實(shí)現(xiàn)負(fù)載均衡的效果,下面我們看看數(shù)據(jù)如何選擇分區(qū)。

方式1:順序輪詢

順序分配,消息是均勻的分配給每個(gè) partition,即每個(gè)分區(qū)存儲(chǔ)一次消息,見下圖。輪訓(xùn)策略是 Kafka Producer 提供的默認(rèn)策略,如果你不使用指定的輪訓(xùn)策略的話,Kafka 默認(rèn)會(huì)使用順序輪訓(xùn)策略的方式。

圖片

方式2:隨機(jī)輪詢

本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來看,它要遜于輪詢策略,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。

圖片

方式3:key hash

這個(gè)策略也叫做 key-ordering 策略,Kafka 中每條消息都會(huì)有自己的key,一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如下圖所示

圖片

消費(fèi)者

消費(fèi)者群組

應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要?jiǎng)?chuàng)建一個(gè) KafkaConsumer 對(duì)象,訂閱主題并開始接受消息,驗(yàn)證消息并保存結(jié)果。一段時(shí)間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這時(shí)候該如何處理?如果只使用單個(gè)消費(fèi)者的話,應(yīng)用程序會(huì)跟不上消息生成的速度,就像多個(gè)生產(chǎn)者像相同的主題寫入消息一樣,這時(shí)候就需要多個(gè)消費(fèi)者共同參與消費(fèi)主題中的消息,對(duì)消息進(jìn)行分流處理。Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組中的消費(fèi)者訂閱的都是相同的主題,每個(gè)消費(fèi)者接收主題一部分分區(qū)的消息。下面是一個(gè) Kafka 分區(qū)消費(fèi)示意圖。

圖片

上圖中的主題 T1 有四個(gè)分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個(gè)消費(fèi)者群組1,消費(fèi)者群組中只有一個(gè)消費(fèi)者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個(gè)消費(fèi)者處理四個(gè)生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D

圖片

這樣一來,消費(fèi)者的消費(fèi)能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時(shí)候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費(fèi)者吃不消,那就繼續(xù)增加消費(fèi)者。

圖片

如上圖所示,每個(gè)分區(qū)所產(chǎn)生的消息能夠被每個(gè)消費(fèi)者群組中的消費(fèi)者消費(fèi),如果向消費(fèi)者群組中增加更多的消費(fèi)者,那么多余的消費(fèi)者將會(huì)閑置,如下圖所示。

圖片

向群組中增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式。總而言之,我們可以通過增加消費(fèi)組的消費(fèi)者來進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜淼南M(fèi)者是空閑的,沒有任何幫助。

Kafka 一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說,每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組 G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么就演變?yōu)橄聢D這樣。在這個(gè)場(chǎng)景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。

圖片

總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。

消費(fèi)者重平衡

我們從上面的消費(fèi)者演變圖中可以知道這么一個(gè)過程:最初是一個(gè)消費(fèi)者訂閱一個(gè)主題并消費(fèi)其全部分區(qū)的消息,后來有一個(gè)消費(fèi)者加入群組,隨后又有更多的消費(fèi)者加入群組,而新加入的消費(fèi)者實(shí)例分?jǐn)偭俗畛跸M(fèi)者的部分消息,這種把分區(qū)的所有權(quán)通過一個(gè)消費(fèi)者轉(zhuǎn)到其他消費(fèi)者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示。

圖片

重平衡非常重要,它為消費(fèi)者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費(fèi)者或移除消費(fèi)者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費(fèi)者無法讀取消息,造成整個(gè)消費(fèi)者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消息當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢應(yīng)用程序。

消費(fèi)者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護(hù)自己是消費(fèi)者組的一員并確認(rèn)其擁有的分區(qū)。對(duì)于不同不的消費(fèi)群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費(fèi)者定期發(fā)送心跳,就會(huì)認(rèn)為消費(fèi)者是存活的并處理其分區(qū)中的消息。當(dāng)消費(fèi)者檢索記錄或者提交它所消費(fèi)的記錄時(shí)就會(huì)發(fā)送心跳。如果過了一段時(shí)間 Kafka 停止發(fā)送心跳了,會(huì)話(Session)就會(huì)過期,組織協(xié)調(diào)者就會(huì)認(rèn)為這個(gè) Consumer 已經(jīng)死亡,就會(huì)觸發(fā)一次重平衡。如果消費(fèi)者宕機(jī)并且停止發(fā)送消息,組織協(xié)調(diào)者會(huì)等待幾秒鐘,確認(rèn)它死亡了才會(huì)觸發(fā)重平衡。在這段時(shí)間里,死亡的消費(fèi)者將不處理任何消息。在清理消費(fèi)者時(shí),消費(fèi)者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會(huì)觸發(fā)一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費(fèi)者群組帶來高可用性和伸縮性的同時(shí),還有有一些明顯的缺點(diǎn)(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。重平衡的過程對(duì)消費(fèi)者組有極大的影響。因?yàn)槊看沃仄胶膺^程中都會(huì)導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機(jī)制,也就是 Stop The World ,STW。也就是說,在重平衡期間,消費(fèi)者組中的消費(fèi)者實(shí)例都會(huì)停止消費(fèi),等待重平衡的完成。而且重平衡這個(gè)過程很慢...

特性分析

這里才是內(nèi)容的重點(diǎn),不僅需要知道Kafka的特性,還需要知道支持這些特性的原因:

  • 消息路由(不支持):Kafka在處理消息之前是不允許消費(fèi)者過濾一個(gè)主題中的消息。一個(gè)訂閱的消費(fèi)者在沒有異常情況下會(huì)接受一個(gè)分區(qū)中的所有消息。
  • 消息有序(支持):當(dāng)消費(fèi)消息時(shí),如果消費(fèi)失敗,消息不會(huì)被放回,所以整個(gè)消費(fèi)過程都是有序進(jìn)行;
  • 消息時(shí)序(不支持):消息直接發(fā)送,不會(huì)延遲發(fā)送,或者指定消息的TTL。
  • 容錯(cuò)處理(集群支持/消息不支持):集群容錯(cuò)能力高,因?yàn)槭欠植际讲渴?,但是消息容錯(cuò)處理弱,因?yàn)橄⑾M(fèi)失敗,需要程序員手動(dòng)處理,Kafka不支持消息重新進(jìn)行消費(fèi)。
  • 伸縮(非常好):通過擴(kuò)充分區(qū)和消費(fèi)者數(shù)量,實(shí)現(xiàn)分區(qū)擴(kuò)容,并提升消費(fèi)速度。
  • 持久化(非常好):數(shù)據(jù)存儲(chǔ)在磁盤,可以隨時(shí)訂閱消費(fèi),消費(fèi)完后,數(shù)據(jù)仍然保留。
  • 消息回溯(支持):因?yàn)橄⒅С殖志没?,就支持回溯,可以理解是附帶的功能?/section>
  • 高吞吐(非常好):因?yàn)镵afka內(nèi)部同一個(gè)主題包含多個(gè)分區(qū),所以實(shí)現(xiàn)分布式存儲(chǔ),然后消費(fèi)者數(shù)量可以擴(kuò)充到和分區(qū)數(shù)量一致,保證了Kafka的高吞吐。

RocketMQ

圖片

RocketMQ是一個(gè)純Java、分布式、隊(duì)列模型的開源消息中間件,前身是MetaQ,是阿里參考Kafka特點(diǎn)研發(fā)的一個(gè)隊(duì)列模型的消息中間件,后開源給apache基金會(huì)成為了apache的頂級(jí)開源項(xiàng)目,具有高性能、高可靠、高實(shí)時(shí)、分布式特點(diǎn)。

基本概念

先對(duì)常用的詞匯有個(gè)基本認(rèn)識(shí),相關(guān)詞匯后面會(huì)再詳細(xì)介紹:

  • NameServer:一個(gè)功能齊全的服務(wù)器,其角色類似Dubbo中的Zookeeper。
  • Producer:消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息。
  • Consumer:消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)。
  • Broker:消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)發(fā)消息。
  • Message:消息,一條消息必須有一個(gè)主題(Topic),主題可以看做是你的信件要郵寄的地址。(一條消息也可以擁有一個(gè)可選的標(biāo)簽(Tag)和額處的鍵值對(duì),它們可以用于設(shè)置一個(gè)業(yè)務(wù) Key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。)
  • Topic:主題,可以看做消息的規(guī)類,它是消息的第一級(jí)類型。(比如一個(gè)電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個(gè) Topic 。Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個(gè) Topic 可以有0個(gè)、1個(gè)、多個(gè)生產(chǎn)者向其發(fā)送消息,一個(gè)生產(chǎn)者也可以同時(shí)向不同的 Topic 發(fā)送消息。一個(gè) Topic 也可以被 0個(gè)、1個(gè)、多個(gè)消費(fèi)者訂閱。)
  • Tag:子主題,它是消息的第二級(jí)類型,用于為用戶提供額外的靈活性。(使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標(biāo)識(shí)。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag 。標(biāo)簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。)
  • Group:分組,一個(gè)組可以訂閱多個(gè)Topic。(分為ProducerGroup,ConsumerGroup,代表某一類的生產(chǎn)者和消費(fèi)者,一般來說同一個(gè)服務(wù)可以作為Group,同一個(gè)Group一般來說發(fā)送和消費(fèi)的消息都是一樣的。)
  • Producer Group:生產(chǎn)者組,代表某一類的生產(chǎn)者,比如我們有多個(gè)秒殺系統(tǒng)作為生產(chǎn)者,這多個(gè)合在一起就是一個(gè) Producer Group 生產(chǎn)者組,它們一般生產(chǎn)相同的消息。
  • Consumer Group:消費(fèi)者組,代表某一類的消費(fèi)者,比如我們有多個(gè)短信系統(tǒng)作為消費(fèi)者,這多個(gè)合在一起就是一個(gè) Consumer Group 消費(fèi)者組,它們一般消費(fèi)相同的消息。
  • Queue:隊(duì)列,在Kafka中叫Partition。(每個(gè)Queue內(nèi)部是有序的,在RocketMQ中分為讀和寫兩種隊(duì)列,一般來說讀寫隊(duì)列數(shù)量一致,如果不一致就會(huì)出現(xiàn)很多問題。)
  • Message Queue:消息隊(duì)列,主題被劃分為一個(gè)或多個(gè)子主題,即消息隊(duì)列。(一個(gè) Topic 下可以設(shè)置多個(gè)消息隊(duì)列,發(fā)送消息時(shí)執(zhí)行該消息的 Topic ,RocketMQ 會(huì)輪詢?cè)?Topic 下的所有隊(duì)列將消息發(fā)出去。消息的物理管理單位。一個(gè)Topic下可以有多個(gè)Queue,Queue的引入使得消息的存儲(chǔ)可以分布式集群化,具有了水平擴(kuò)展能力。)

消息模型

RockerMQ 中的消息模型就是按照主題模型所實(shí)現(xiàn)的,在主題模型中,消息的生產(chǎn)者稱為發(fā)布者(Publisher),消息的消費(fèi)者稱為訂閱者(Subscriber),存放消息的容器稱為主題(Topic)。RocketMQ 中的主題模型到底是如何實(shí)現(xiàn)的呢?

圖片

我們可以看到在整個(gè)圖中有 Producer Group、Topic、Consumer Group 三個(gè)角色,你可以看到圖中生產(chǎn)者組中的生產(chǎn)者會(huì)向主題發(fā)送消息,而主題中存在多個(gè)隊(duì)列,生產(chǎn)者每次生產(chǎn)消息之后是指定主題中的某個(gè)隊(duì)列發(fā)送消息的。

每個(gè)主題中都有多個(gè)隊(duì)列(這里還不涉及到 Broker),集群消費(fèi)模式下,一個(gè)消費(fèi)者集群多臺(tái)機(jī)器共同消費(fèi)一個(gè) topic 的多個(gè)隊(duì)列,一個(gè)隊(duì)列只會(huì)被一個(gè)消費(fèi)者消費(fèi)。如果某個(gè)消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會(huì)接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。就像上圖中 Consumer1 和 Consumer2 分別對(duì)應(yīng)著兩個(gè)隊(duì)列,而 Consuer3 是沒有隊(duì)列對(duì)應(yīng)的,所以一般來講要控制消費(fèi)者組中的消費(fèi)者個(gè)數(shù)和主題中隊(duì)列個(gè)數(shù)相同。這個(gè)簡(jiǎn)直和kafak一毛一樣?。?/p>

當(dāng)然也可以消費(fèi)者個(gè)數(shù)小于隊(duì)列個(gè)數(shù),只不過不太建議。如下圖:

圖片

每個(gè)消費(fèi)組在每個(gè)隊(duì)列上維護(hù)一個(gè)消費(fèi)位置,為什么呢?因?yàn)槲覀儎倓偖嫷膬H僅是一個(gè)消費(fèi)者組,我們知道在發(fā)布訂閱模式中一般會(huì)涉及到多個(gè)消費(fèi)者組,而每個(gè)消費(fèi)者組在每個(gè)隊(duì)列中的消費(fèi)位置都是不同的。如果此時(shí)有多個(gè)消費(fèi)者組,那么消息被一個(gè)消費(fèi)者組消費(fèi)完之后是不會(huì)刪除的(因?yàn)槠渌M(fèi)者組也需要呀),它僅僅是為每個(gè)消費(fèi)者組維護(hù)一個(gè)消費(fèi)位移(offset),每次消費(fèi)者組消費(fèi)完會(huì)返回一個(gè)成功的響應(yīng),然后隊(duì)列再把維護(hù)的消費(fèi)位移加一,這樣就不會(huì)出現(xiàn)剛剛消費(fèi)過的消息再一次被消費(fèi)了。

圖片

可能你還有一個(gè)問題,為什么一個(gè)主題中需要維護(hù)多個(gè)隊(duì)列?答案是提高并發(fā)能力。的確,每個(gè)主題中只存在一個(gè)隊(duì)列也是可行的。你想一下,如果每個(gè)主題中只存在一個(gè)隊(duì)列,這個(gè)隊(duì)列中也維護(hù)著每個(gè)消費(fèi)者組的消費(fèi)位置,這樣也可以做到發(fā)布訂閱模式。如下圖:

圖片但是,這樣我生產(chǎn)者是不是只能向一個(gè)隊(duì)列發(fā)送消息?又因?yàn)樾枰S護(hù)消費(fèi)位置所以一個(gè)隊(duì)列只能對(duì)應(yīng)一個(gè)消費(fèi)者組中的消費(fèi)者,這樣是不是其他的 Consumer 就沒有用武之地了?從這兩個(gè)角度來講,并發(fā)度一下子就小了很多。

所以總結(jié)來說,RocketMQ 通過使用在一個(gè) Topic 中配置多個(gè)隊(duì)列,并且每個(gè)隊(duì)列維護(hù)每個(gè)消費(fèi)者組的消費(fèi)位置,實(shí)現(xiàn)了主題模式/發(fā)布訂閱模式。

系統(tǒng)架構(gòu)

講完了消息模型,我們理解起 RocketMQ 的技術(shù)架構(gòu)起來就容易多了。RocketMQ 技術(shù)架構(gòu)中有四大角色 NameServer、Broker、Producer、Consumer。這4大角色,已經(jīng)在基本概念中簡(jiǎn)單解釋過,對(duì)于相關(guān)詞匯,這里再重點(diǎn)解釋一下。

  • Broker:主要負(fù)責(zé)消息的存儲(chǔ)、投遞和查詢以及服務(wù)高可用保證。說白了就是消息隊(duì)列服務(wù)器嘛,生產(chǎn)者生產(chǎn)消息到 Broker,消費(fèi)者從 Broker 拉取消息并消費(fèi)。這里,我還得普及一下關(guān)于 Broker、Topic 和隊(duì)列的關(guān)系。上面我講解了 Topic 和隊(duì)列的關(guān)系——一個(gè) Topic 中存在多個(gè)隊(duì)列,那么這個(gè) Topic 和隊(duì)列存放在哪呢?一個(gè) Topic 分布在多個(gè) Broker 上,一個(gè) Broker 可以配置多個(gè) Topic,它們是多對(duì)多的關(guān)系。如果某個(gè) Topic 消息量很大,應(yīng)該給它多配置幾個(gè)隊(duì)列,并且盡量多分布在不同 Broker 上,以減輕某個(gè) Broker 的壓力。Topic 消息量都比較均勻的情況下,如果某個(gè) broker 上的隊(duì)列越多,則該 broker 壓力越大。
  • NameServer:不知道你們有沒有接觸過 ZooKeeper 和 Spring Cloud 中的 Eureka,它其實(shí)也是一個(gè)注冊(cè)中心,主要提供兩個(gè)功能:Broker 管理和路由信息管理。說白了就是 Broker 會(huì)將自己的信息注冊(cè)到 NameServer 中,此時(shí) NameServer 就存放了很多 Broker 的信息(Broker的路由表),消費(fèi)者和生產(chǎn)者就從 NameServer 中獲取路由表然后照著路由表的信息和對(duì)應(yīng)的 Broker 進(jìn)行通信(生產(chǎn)者和消費(fèi)者定期會(huì)向 NameServer 去查詢相關(guān)的 Broker 的信息)。
  • Producer:消息發(fā)布的角色,支持分布式集群方式部署。
  • Consumer:消息消費(fèi)的角色,支持分布式集群方式部署。支持以 push 推,pull 拉兩種模式對(duì)消息進(jìn)行消費(fèi),同時(shí)也支持集群方式和廣播方式的消費(fèi),它提供實(shí)時(shí)消息訂閱機(jī)制。
圖片

聽完了上面的解釋你可能會(huì)覺得,這玩意好簡(jiǎn)單。不就是這樣的么?

圖片

嗯?你可能會(huì)發(fā)現(xiàn)一個(gè)問題,這老家伙 NameServer 干啥用的,這不多余嗎?直接 Producer、Consumer 和 Broker 直接進(jìn)行生產(chǎn)消息,消費(fèi)消息不就好了么?但是,我們上文提到過 Broker 是需要保證高可用的,如果整個(gè)系統(tǒng)僅僅靠著一個(gè) Broker 來維持的話,那么這個(gè) Broker 的壓力會(huì)不會(huì)很大?所以我們需要使用多個(gè) Broker 來保證負(fù)載均衡。如果說,我們的消費(fèi)者和生產(chǎn)者直接和多個(gè) Broker 相連,那么當(dāng) Broker 修改的時(shí)候必定會(huì)牽連著每個(gè)生產(chǎn)者和消費(fèi)者,這樣就會(huì)產(chǎn)生耦合問題,而 NameServer 注冊(cè)中心就是用來解決這個(gè)問題的。

當(dāng)然,RocketMQ 中的技術(shù)架構(gòu)肯定不止前面那么簡(jiǎn)單,因?yàn)樯厦鎴D中的四個(gè)角色都是需要做集群的。我給出一張官網(wǎng)的架構(gòu)圖,大家嘗試?yán)斫庖幌隆?/p>

圖片

其實(shí)和我們最開始畫的那張乞丐版的架構(gòu)圖也沒什么區(qū)別,主要是一些細(xì)節(jié)上的差別,聽我細(xì)細(xì)道來。

  • 第一、我們的 Broker 做了集群并且還進(jìn)行了主從部署,由于消息分布在各個(gè) Broker 上,一旦某個(gè) Broker 宕機(jī),則該 Broker 上的消息讀寫都會(huì)受到影響。所以 RocketMQ 提供了 master/slave 的結(jié)構(gòu),salve 定時(shí)從 master 同步數(shù)據(jù)(同步刷盤或者異步刷盤),如果 master 宕機(jī),則 slave 提供消費(fèi)服務(wù),但是不能寫入消息(后面我還會(huì)提到)。
  • 第二、為了保證 HA,我們的 NameServer 也做了集群部署,但是請(qǐng)注意它是去中心化的。也就意味著它沒有主節(jié)點(diǎn),你可以很明顯地看出 NameServer 的所有節(jié)點(diǎn)是沒有進(jìn)行 Info Replicate 的,在 RocketMQ 中是通過單個(gè) Broker 和所有 NameServer 保持長(zhǎng)連接,并且在每隔 30 秒 Broker 會(huì)向所有 Nameserver 發(fā)送心跳,心跳包含了自身的 Topic 配置信息,這個(gè)步驟就對(duì)應(yīng)這上面的 Routing Info。
  • 第三、在生產(chǎn)者需要向 Broker 發(fā)送消息的時(shí)候,需要先從 NameServer 獲取關(guān)于 Broker 的路由信息,然后通過輪詢的方法去向每個(gè)隊(duì)列中生產(chǎn)數(shù)據(jù)以達(dá)到負(fù)載均衡的效果。
  • 第四、消費(fèi)者通過 NameServer 獲取所有 Broker 的路由信息后,向 Broker 發(fā)送 Pull 請(qǐng)求來獲取消息數(shù)據(jù)。Consumer 可以以兩種模式啟動(dòng)—— 廣播(Broadcast)和集群(Cluster)。廣播模式下,一條消息會(huì)發(fā)送給同一個(gè)消費(fèi)組中的所有消費(fèi)者,集群模式下消息只會(huì)發(fā)送給一個(gè)消費(fèi)者。

高級(jí)特性&常見問題

順序消費(fèi)

在上面的技術(shù)架構(gòu)介紹中,我們已經(jīng)知道了 RocketMQ 在主題上是無序的、它只有在隊(duì)列層面才是保證有序的。這又扯到兩個(gè)概念——普通順序和嚴(yán)格順序。所謂普通順序是指消費(fèi)者通過同一個(gè)消費(fèi)隊(duì)列收到的消息是有順序的,不同消息隊(duì)列收到的消息則可能是無順序的。普通順序消息在 Broker 重啟情況下不會(huì)保證消息順序性(短暫時(shí)間)。

所謂嚴(yán)格順序是指消費(fèi)者收到的所有消息均是有順序的。嚴(yán)格順序消息即使在異常情況下也會(huì)保證消息的順序性。但是,嚴(yán)格順序看起來雖好,實(shí)現(xiàn)它可會(huì)付出巨大的代價(jià)。如果你使用嚴(yán)格順序模式,Broker 集群中只要有一臺(tái)機(jī)器不可用,則整個(gè)集群都不可用。你還用啥?現(xiàn)在主要場(chǎng)景也就在 binlog 同步。一般而言,我們的 MQ 都是能容忍短暫的亂序,所以推薦使用普通順序模式。(這個(gè)嚴(yán)格順序,感覺沒太懂,后面再查一下相關(guān)資料。。。)

那么,我們現(xiàn)在使用了普通順序模式,我們從上面學(xué)習(xí)知道了在 Producer 生產(chǎn)消息的時(shí)候會(huì)進(jìn)行輪詢(取決你的負(fù)載均衡策略)來向同一主題的不同消息隊(duì)列發(fā)送消息。那么如果此時(shí)我有幾個(gè)消息分別是同一個(gè)訂單的創(chuàng)建、支付、發(fā)貨,在輪詢的策略下這三個(gè)消息會(huì)被發(fā)送到不同隊(duì)列,因?yàn)樵诓煌年?duì)列此時(shí)就無法使用 RocketMQ 帶來的隊(duì)列有序特性來保證消息有序性了。

圖片

那么,怎么解決呢?其實(shí)很簡(jiǎn)單,我們需要處理的僅僅是將同一語義下的消息放入同一個(gè)隊(duì)列(比如這里是同一個(gè)訂單),那我們就可以使用 Hash 取模法來保證同一個(gè)訂單在同一個(gè)隊(duì)列中就行了。

重復(fù)消費(fèi)

就兩個(gè)字——冪等。在編程中一個(gè)冪等操作的特點(diǎn)是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。比如說,這個(gè)時(shí)候我們有一個(gè)訂單的處理積分的系統(tǒng),每當(dāng)來一個(gè)消息的時(shí)候它就負(fù)責(zé)為創(chuàng)建這個(gè)訂單的用戶的積分加上相應(yīng)的數(shù)值??墒怯幸淮?,消息隊(duì)列發(fā)送給訂單系統(tǒng) FrancisQ 的訂單信息,其要求是給 FrancisQ 的積分加上 500。但是積分系統(tǒng)在收到 FrancisQ 的訂單信息處理完成之后返回給消息隊(duì)列處理成功的信息的時(shí)候出現(xiàn)了網(wǎng)絡(luò)波動(dòng)(當(dāng)然還有很多種情況,比如 Broker 意外重啟等等),這條回應(yīng)沒有發(fā)送成功。

那么,消息隊(duì)列沒收到積分系統(tǒng)的回應(yīng)會(huì)不會(huì)嘗試重發(fā)這個(gè)消息?問題就來了,我再發(fā)這個(gè)消息,萬一它又給 FrancisQ 的賬戶加上 500 積分怎么辦呢?所以我們需要給我們的消費(fèi)者實(shí)現(xiàn)冪等,也就是對(duì)同一個(gè)消息的處理結(jié)果,執(zhí)行多少次都不變。

那么如何給業(yè)務(wù)實(shí)現(xiàn)冪等呢?這個(gè)還是需要結(jié)合具體的業(yè)務(wù)的。你可以使用寫入 Redis 來保證,因?yàn)?Redis 的 key 和 value 就是天然支持冪等的。當(dāng)然還有使用數(shù)據(jù)庫(kù)插入法,基于數(shù)據(jù)庫(kù)的唯一鍵來保證重復(fù)數(shù)據(jù)不會(huì)被插入多條。不過最主要的還是需要根據(jù)特定場(chǎng)景使用特定的解決方案,你要知道你的消息消費(fèi)是否是完全不可重復(fù)消費(fèi)還是可以忍受重復(fù)消費(fèi)的,然后再選擇強(qiáng)校驗(yàn)和弱校驗(yàn)的方式。畢竟在 CS 領(lǐng)域還是很少有技術(shù)銀彈的說法。

簡(jiǎn)單了來說,冪等的校驗(yàn),還是需要業(yè)務(wù)方來支持,因?yàn)槟憬鉀Q不了網(wǎng)絡(luò)抖動(dòng)問題哈~~

分布式事務(wù)

如何解釋分布式事務(wù)呢?事務(wù)大家都知道吧?要么都執(zhí)行要么都不執(zhí)行。在同一個(gè)系統(tǒng)中我們可以輕松地實(shí)現(xiàn)事務(wù),但是在分布式架構(gòu)中,我們有很多服務(wù)是部署在不同系統(tǒng)之間的,而不同服務(wù)之間又需要進(jìn)行調(diào)用。比如此時(shí)我下訂單然后增加積分,如果保證不了分布式事務(wù)的話,就會(huì)出現(xiàn)A系統(tǒng)下了訂單,但是B系統(tǒng)增加積分失敗或者A系統(tǒng)沒有下訂單,B系統(tǒng)卻增加了積分。前者對(duì)用戶不友好,后者對(duì)運(yùn)營(yíng)商不利,這是我們都不愿意見到的。那么,如何去解決這個(gè)問題呢?

如今比較常見的分布式事務(wù)實(shí)現(xiàn)有 2PC、TCC 和事務(wù)消息(half 半消息機(jī)制)。每一種實(shí)現(xiàn)都有其特定的使用場(chǎng)景,但是也有各自的問題,都不是完美的解決方案。在 RocketMQ 中使用的是事務(wù)消息加上事務(wù)反查機(jī)制來解決分布式事務(wù)問題的。

圖片下面是上圖的執(zhí)行流程:

  1. A服務(wù)先發(fā)送個(gè)Half Message給Brock端,消息中攜帶 B服務(wù) 即將要+100元的信息。
  2. 當(dāng)A服務(wù)知道Half Message發(fā)送成功后,那么開始第3步執(zhí)行本地事務(wù)。
  3. 執(zhí)行本地事務(wù)(會(huì)有三種情況1、執(zhí)行成功。2、執(zhí)行失敗。3、網(wǎng)絡(luò)等原因?qū)е聸]有響應(yīng))
  4. 如果本地事務(wù)成功,那么Product像Brock服務(wù)器發(fā)送Commit,這樣B服務(wù)就可以消費(fèi)該message。
  5. 如果本地事務(wù)失敗,那么Product像Brock服務(wù)器發(fā)送Rollback,那么就會(huì)直接刪除上面這條半消息。
  6. 如果因?yàn)榫W(wǎng)絡(luò)等原因遲遲沒有返回失敗還是成功,那么會(huì)執(zhí)行RocketMQ的回調(diào)接口,來進(jìn)行事務(wù)的回查。

消息堆積

消息中間件的主要功能是異步解耦,還有個(gè)重要功能是擋住前端的數(shù)據(jù)洪峰,保證后端系統(tǒng)的穩(wěn)定性,這就要求消息中間件具有一定的消息堆積能力,消息堆積分以下兩種情況:

  • 消息堆積在內(nèi)存Buffer,一旦超過內(nèi)存Buffer,可以根據(jù)一定的丟棄策略來丟棄消息,如CORBA Notification規(guī)范中描述。適合能容忍丟棄消息的業(yè)務(wù),這種情況消息的堆積能力主要在于內(nèi)存Buffer大小,而且消息堆積后,性能下降不會(huì)太大,因?yàn)閮?nèi)存中數(shù)據(jù)多少對(duì)于對(duì)外提供的訪問能力影響有限。
  • 消息堆積到持久化存儲(chǔ)系統(tǒng)中,例如DB,KV存儲(chǔ),文件記錄形式。當(dāng)消息不能在內(nèi)存Cache命中時(shí),要不可避免的訪問磁盤,會(huì)產(chǎn)生大量讀IO,讀IO的吞吐量直接決定了消息堆積后的訪問能力。

評(píng)估消息堆積能力主要有以下四點(diǎn):

  • 消息能堆積多少條,多少字節(jié)?即消息的堆積容量。
  • 消息堆積后,發(fā)消息的吞吐量大小,是否會(huì)受堆積影響?
  • 消息堆積后,正常消費(fèi)的Consumer是否會(huì)受影響?
  • 消息堆積后,訪問堆積在磁盤的消息時(shí),吞吐量有多大?

簡(jiǎn)單來說,RocketMQ支持大量消息堆積,消息會(huì)存在內(nèi)存,超出內(nèi)存的消息會(huì)持久化到磁盤中。

定時(shí)消息

定時(shí)消息是指消息發(fā)到Broker后,不能立刻被Consumer消費(fèi),要到特定的時(shí)間點(diǎn)或者等待特定的時(shí)間后才能被消費(fèi)。如果要支持任意的時(shí)間精度,在Broker層面,必須要做消息排序,如果再涉及到持久化,那么消息排序要不可避免的產(chǎn)生巨大性能開銷。RocketMQ支持定時(shí)消息,但是不支持任意時(shí)間精度,支持特定的level,例如定時(shí)5s,10s,1m等。

簡(jiǎn)單來說,RocketMQ支持定時(shí)消息,但是只支持固定時(shí)間,不支持任意精度時(shí)間。

回溯消費(fèi)

同步刷盤和異步刷盤

上面我講了那么多的 RocketMQ 的架構(gòu)和設(shè)計(jì)原理,你有沒有好奇,在 Topic 中的隊(duì)列是以什么樣的形式存在的?隊(duì)列中的消息又是如何進(jìn)行存儲(chǔ)持久化的呢?我在上文中提到的同步刷盤和異步刷盤又是什么呢?它們會(huì)給持久化帶來什么樣的影響呢?下面我將給你們一一解釋。

圖片

如上圖所示,在同步刷盤中需要等待一個(gè)刷盤成功的 ACK,同步刷盤對(duì) MQ 消息可靠性來說是一種不錯(cuò)的保障,但是性能上會(huì)有較大影響,一般地適用于金融等特定業(yè)務(wù)場(chǎng)景。而異步刷盤往往是開啟一個(gè)線程去異步地執(zhí)行刷盤操作。消息刷盤采用后臺(tái)異步線程提交的方式進(jìn)行,降低了讀寫延遲,提高了 MQ 的性能和吞吐量,一般適用于如發(fā)驗(yàn)證碼等對(duì)于消息保證要求不太高的業(yè)務(wù)場(chǎng)景。一般地,異步刷盤只有在 Broker 意外宕機(jī)的時(shí)候會(huì)丟失部分?jǐn)?shù)據(jù),你可以設(shè)置 Broker 的參數(shù) FlushDiskType 來調(diào)整你的刷盤策略(ASYNC_FLUSH 或者 SYNC_FLUSH)。

簡(jiǎn)單來說,同步刷盤是刷盤后請(qǐng)求再返回,異步刷盤是直接返回請(qǐng)求,再去慢慢刷盤,可能會(huì)導(dǎo)致數(shù)據(jù)丟失。

同步復(fù)制和異步復(fù)制

上面的同步刷盤和異步刷盤是在單個(gè)結(jié)點(diǎn)層面的,而同步復(fù)制和異步復(fù)制主要是指的 Borker 主從模式下,主節(jié)點(diǎn)返回消息給客戶端的時(shí)候是否需要同步從節(jié)點(diǎn)。

  • 同步復(fù)制:也叫 “同步雙寫”,也就是說,只有消息同步雙寫到主從結(jié)點(diǎn)上時(shí)才返回寫入成功。
  • 異步復(fù)制:消息寫入主節(jié)點(diǎn)之后就直接返回寫入成功。異步復(fù)制會(huì)不會(huì)也像異步刷盤那樣影響消息的可靠性呢?答案是不會(huì)的,因?yàn)閮烧呔褪遣煌母拍?,?duì)于消息可靠性是通過不同的刷盤策略保證的,而像異步同步復(fù)制策略僅僅是影響到了可用性。為什么呢?其主要原因是 RocketMQ 是不支持自動(dòng)主從切換的,當(dāng)主節(jié)點(diǎn)掛掉之后,生產(chǎn)者就不能再給這個(gè)主節(jié)點(diǎn)生產(chǎn)消息了。比如這個(gè)時(shí)候采用異步復(fù)制的方式,在主節(jié)點(diǎn)還未發(fā)送完需要同步的消息的時(shí)候主節(jié)點(diǎn)掛掉了,這個(gè)時(shí)候從節(jié)點(diǎn)就少了一部分消息。但是此時(shí)生產(chǎn)者無法再給主節(jié)點(diǎn)生產(chǎn)消息了,消費(fèi)者可以自動(dòng)切換到從節(jié)點(diǎn)進(jìn)行消費(fèi)(僅僅是消費(fèi)),所以在主節(jié)點(diǎn)掛掉的時(shí)間只會(huì)產(chǎn)生主從結(jié)點(diǎn)短暫的消息不一致的情況,降低了可用性,而當(dāng)主節(jié)點(diǎn)重啟之后,從節(jié)點(diǎn)那部分未來得及復(fù)制的消息還會(huì)繼續(xù)復(fù)制。

擴(kuò)展知識(shí)1:在單主從架構(gòu)中,如果一個(gè)主節(jié)點(diǎn)掛掉了,那么也就意味著整個(gè)系統(tǒng)不能再生產(chǎn)了。那么這個(gè)可用性的問題能否解決呢?一個(gè)主從不行那就多個(gè)主從的唄,別忘了在我們最初的架構(gòu)圖中,每個(gè) Topic 是分布在不同 Broker 中的。但是這種復(fù)制方式同樣也會(huì)帶來一個(gè)問題,那就是無法保證嚴(yán)格順序。在上文中我們提到了如何保證的消息順序性是通過將一個(gè)語義的消息發(fā)送在同一個(gè)隊(duì)列中,使用 Topic 下的隊(duì)列來保證順序性的。如果此時(shí)我們主節(jié)點(diǎn) A 負(fù)責(zé)的是訂單 A 的一系列語義消息,然后它掛了,這樣其他節(jié)點(diǎn)是無法代替主節(jié)點(diǎn)A的,如果我們?nèi)我夤?jié)點(diǎn)都可以存入任何消息,那就沒有順序性可言了。(這點(diǎn)我并不認(rèn)同,我理解主從的對(duì)列信息應(yīng)該是一樣的,我從主節(jié)點(diǎn)讀到哪里,如果主節(jié)點(diǎn)掛掉,應(yīng)該是可以到從結(jié)點(diǎn)去讀取的,如果不能這樣,搞個(gè)主從就完全沒有意義了。因?yàn)橹鲝牡男畔⑹且粯拥模瑢?duì)隊(duì)列的順序是內(nèi)有影響的,我不可能把不同的信息,搞兩個(gè)隊(duì)列,分別放到主從機(jī)器。)

擴(kuò)展知識(shí)2:在 RocketMQ 中采用了 Dledger 解決主從數(shù)據(jù)同步問題。他要求在寫入消息的時(shí)候,要求至少消息復(fù)制到半數(shù)以上的節(jié)點(diǎn)之后,才給客?端返回寫?成功,并且它是?持通過選舉來動(dòng)態(tài)切換主節(jié)點(diǎn)的。這里我就不展開說明了,讀者可以自己去了解。也不是說 Dledger 是個(gè)完美的方案,至少在 Dledger 選舉過程中是無法提供服務(wù)的,而且他必須要使用三個(gè)節(jié)點(diǎn)或以上,如果多數(shù)節(jié)點(diǎn)同時(shí)掛掉他也是無法保證可用性的,而且要求消息復(fù)制板書以上節(jié)點(diǎn)的效率和直接異步復(fù)制還是有一定的差距的。

這個(gè)機(jī)制,感覺就像大眾化的版本,基本思路都一樣,為了保證數(shù)據(jù)可用性,我還是推薦同步復(fù)制,當(dāng)大多數(shù)節(jié)點(diǎn)復(fù)制成功,就認(rèn)為復(fù)制完畢,和ETCD的Raft協(xié)議的日志同步原理一樣。

容錯(cuò)機(jī)制

在實(shí)際使用RocketMQ的時(shí)候我們并不能保證每次發(fā)送的消息都剛好能被消費(fèi)者一次性正常消費(fèi)成功,可能會(huì)存在需要多次消費(fèi)才能成功或者一直消費(fèi)失敗的情況,那作為發(fā)送者該做如何處理呢?

RocketMQ提供了ack機(jī)制,以保證消息能夠被正常消費(fèi)。發(fā)送者為了保證消息肯定消費(fèi)成功,只有使用方明確表示消費(fèi)成功,RocketMQ才會(huì)認(rèn)為消息消費(fèi)成功。中途斷電,拋出異常等都不會(huì)認(rèn)為成功——即都會(huì)重新投遞。當(dāng)然,如果消費(fèi)者不告知發(fā)送者我這邊消費(fèi)信息異常,那么發(fā)送者是不會(huì)知道的,所以消費(fèi)者在設(shè)置監(jiān)聽的時(shí)候需要給個(gè)回調(diào)。

為了保證消息是肯定被至少消費(fèi)成功一次,RocketMQ會(huì)把這批消息重發(fā)回Broker(topic不是原topic而是這個(gè)消費(fèi)租的RETRY topic),在延遲的某個(gè)時(shí)間點(diǎn)(默認(rèn)是10秒,業(yè)務(wù)可設(shè)置)后,再次投遞到這個(gè)ConsumerGroup。而如果一直這樣重復(fù)消費(fèi)都持續(xù)失敗到一定次數(shù)(默認(rèn)16次),就會(huì)投遞到DLQ死信隊(duì)列。應(yīng)用可以監(jiān)控死信隊(duì)列來做人工干預(yù)。

簡(jiǎn)單來說,通過ACK保證消息一定能正常消費(fèi),對(duì)于異常消息,會(huì)重新放回Broker,但是這樣就會(huì)打亂消息的順序,所以容錯(cuò)機(jī)制和消息嚴(yán)格順序,魚和熊掌不可兼得。

特性分析

這里才是內(nèi)容的重點(diǎn),不僅需要知道RocketMQ的特性,還需要知道支持這些特性的原因:

  • 消息路由(不支持):RocketMQ在處理消息之前是不允許消費(fèi)者過濾一個(gè)主題中的消息。一個(gè)訂閱的消費(fèi)者在沒有異常情況下會(huì)接受一個(gè)隊(duì)列中的所有消息;
  • 消息有序(部分支持):需要將同一類的消息hash到同一個(gè)隊(duì)列Queue中,才能支持消息的順序,如果同一類消息散落到不同的Queue中,就不能支持消息的順序,如果設(shè)定消息一定要正常消費(fèi),那么就不能保證消息順序。
  • 消息時(shí)序(可以支持):可以發(fā)送定時(shí)消息,但是只能制定系統(tǒng)定義好的時(shí)間,不支持自定義時(shí)間;
  • 容錯(cuò)處理(支持):通過ACK機(jī)制,保證消息一定能正常消費(fèi),這個(gè)和RabbitMQ很像;
  • 伸縮(支持):整體架構(gòu)其實(shí)和kafaka很像,可以擴(kuò)容broker和內(nèi)部隊(duì)列數(shù),或者增加消費(fèi)組中的消費(fèi)組數(shù)量,提高消費(fèi)能力。
  • 持久化(支持):消息可以持久化到磁盤中,所以支持消息的回溯,和kafaka很像。
  • 消息回溯(支持):因?yàn)橄⒅С殖志没?,就支持回溯,可以理解是附帶的功能?/section>
  • 高吞吐(非常好):借鑒kafaka的設(shè)計(jì),不會(huì)出現(xiàn)rabbitMQ的單Master抗壓力問題,可以從多個(gè)borker寫入和消費(fèi)消息。

RabbitMQ

圖片

我們也不能天天去背八股,還需要實(shí)踐,RabbitMQ的實(shí)操實(shí)例,直接看這篇《入門RabbitMQ,這一篇絕對(duì)夠!》


    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多