消息隊(duì)列連環(huán)炮
消息隊(duì)列技術(shù)選型解決的問題:
不用 MQ 系統(tǒng)耦合場(chǎng)景A 系統(tǒng)產(chǎn)生了一個(gè)比較關(guān)鍵的數(shù)據(jù),很多系統(tǒng)需要 A 系統(tǒng)將數(shù)據(jù)發(fā)過來,強(qiáng)耦合(B,C,D,E 系統(tǒng)可能參數(shù)不一樣、一會(huì)需要一會(huì)不需要數(shù)據(jù),A 系統(tǒng)要不斷修改代碼維護(hù)) A 系統(tǒng)還要考慮 B、C、D、E 系統(tǒng)是否掛了,是否訪問超時(shí)?是否重試? 使用 MQ 系統(tǒng)解耦場(chǎng)景
總結(jié):通過一個(gè) MQ 的發(fā)布訂閱消息模型(Pub/Sub), 系統(tǒng) A 跟其他系統(tǒng)就徹底解耦了。 不用 MQ 同步高延遲請(qǐng)求場(chǎng)景一般互聯(lián)網(wǎng)類的企業(yè),對(duì)用戶的直接操作,一般要求每個(gè)請(qǐng)求都必須在 200ms以內(nèi),對(duì)用戶幾乎是無感知的。 使用 MQ 進(jìn)行異步化之后的接口性能優(yōu)化提高高延時(shí)接口 沒有用 MQ 時(shí)高峰期系統(tǒng)被打死的場(chǎng)景高峰期每秒 5000 個(gè)請(qǐng)求,每秒對(duì) MySQL 執(zhí)行 5000 條 SQL(一般MySQL每秒 2000 個(gè)請(qǐng)求差不多了),如果MySQL被打死,然后整個(gè)系統(tǒng)就崩潰,用戶就沒辦法使用系統(tǒng)了。但是高峰期過了之后,每秒鐘可能就 50 個(gè)請(qǐng)求,對(duì)整個(gè)系統(tǒng)沒有任何壓力。 使用 MQ 進(jìn)行削峰的場(chǎng)景5000 個(gè)請(qǐng)求寫入到 MQ 里面,系統(tǒng) A 每秒鐘最多只能處理 2000 個(gè)請(qǐng)求(MySQL 每秒鐘最多處理 2000 個(gè)請(qǐng)求),系統(tǒng) A 從 MQ 里慢慢拉取請(qǐng)求,每秒鐘拉取 2000 個(gè)請(qǐng)求。MQ,每秒鐘 5000 個(gè)請(qǐng)求進(jìn)來,結(jié)果只有 2000 個(gè)請(qǐng)求出去,結(jié)果導(dǎo)致在高峰期(21小時(shí)),可能有幾十萬甚至幾百萬的請(qǐng)求積壓在 MQ 中,這個(gè)是正常的,因?yàn)檫^了高峰期之后,每秒鐘就 50 個(gè)請(qǐng)求,但是系統(tǒng) A 還是會(huì)按照每秒 2000 個(gè)該請(qǐng)求的速度去處理。只要高峰期一過,系統(tǒng) A 就會(huì)快速的將積壓的消息給解決掉。
架構(gòu)中引入 MQ 后存在的問題
MQ 可能掛掉,導(dǎo)致整個(gè)系統(tǒng)崩潰
可能發(fā)重復(fù)消息,導(dǎo)致插入重復(fù)數(shù)據(jù);消息丟了;消息順序亂了;系統(tǒng) B,C,D 掛了,導(dǎo)致 MQ 消息積累,磁盤滿了;
本來應(yīng)該A,B,C,D 都執(zhí)行成功了再返回,結(jié)果A,B,C 執(zhí)行成功 D 失敗 Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么優(yōu)缺點(diǎn)建議:中小型公司 RabbitMQ 大公司:RocketMQ 大數(shù)據(jù)實(shí)時(shí)計(jì)算:Kafka 消息隊(duì)列高可用RabbtitMQ 高可用RabbitMQ有三種模式:?jiǎn)螜C(jī)模式 、普通集群模式、鏡像集群模式
demo級(jí)
隊(duì)列的元數(shù)據(jù)存在于多個(gè)實(shí)例中,但是消息不存在多個(gè)實(shí)例中,每次多臺(tái)機(jī)器上啟動(dòng)多個(gè) rabbitmq 實(shí)例,每個(gè)機(jī)器啟動(dòng)一個(gè)。
沒有高可用性可言
隊(duì)列的元數(shù)據(jù)和消息都會(huì)存在于多個(gè)實(shí)例中,每次寫消息到 queue的時(shí)候,都會(huì)自動(dòng)把消息到多個(gè)實(shí)例的 queue 里進(jìn)行消息同步。也就 是每個(gè)節(jié)點(diǎn)上都有這個(gè) queue 的一個(gè)完整鏡像(這個(gè) queue的全部數(shù)據(jù))。任何一個(gè)節(jié)點(diǎn)宕機(jī)了,其他節(jié)點(diǎn)還包含這個(gè) queue的完整數(shù)據(jù),其他 consumer 都可以到其他活著的節(jié)點(diǎn)上去消費(fèi)數(shù)據(jù)都是 OK 的。 缺點(diǎn):不是分布式的,如果這個(gè) queue的數(shù)據(jù)量很大,大到這個(gè)機(jī)器上的容量無法容納 。 開啟鏡像集群模式方法:管理控制臺(tái),Admin頁面下,新增一個(gè)鏡像集群模式的策略,指定的時(shí)候可以要求數(shù)據(jù)同步到所有節(jié)點(diǎn),也可以要求同步到指定數(shù)量的節(jié)點(diǎn),然后你再次創(chuàng)建 queue 的時(shí)候 ,應(yīng)用這個(gè)策略,就 會(huì)自動(dòng)將數(shù)據(jù)同步到其他的節(jié)點(diǎn)上去。
broker進(jìn)程就是kafka在每臺(tái)機(jī)器上啟動(dòng)的自己的一個(gè)進(jìn)程。每臺(tái)機(jī)器+機(jī)器上的broker進(jìn)程,就可以認(rèn)為是 kafka集群中的一個(gè)節(jié)點(diǎn)。 你創(chuàng)建一個(gè) topic,這個(gè)topic可以劃分為多個(gè) partition,每個(gè) partition 可以存在于不同的 broker 上,每個(gè) partition就存放一部分?jǐn)?shù)據(jù)。 這就是天然的分布式消息隊(duì)列,也就是說一個(gè) topic的數(shù)據(jù),是分散放在 多個(gè)機(jī)器上的,每個(gè)機(jī)器就放一部分?jǐn)?shù)據(jù)。 分布式的真正含義是每個(gè)節(jié)點(diǎn)只放一部分?jǐn)?shù)據(jù),而不是完整數(shù)據(jù)(完整數(shù)據(jù)就是HA、集群機(jī)制)
每個(gè) partition的數(shù)據(jù)都會(huì)同步到其他機(jī)器上,形成自己的多個(gè) replica 副本。然后所有 replica 會(huì)選舉一個(gè) leader。那么生產(chǎn)者、消費(fèi)者都會(huì)和這個(gè) leader 打交道,然后其他 replica 就是 follow。寫的時(shí)候,leader 負(fù)責(zé)把數(shù)據(jù)同步到所有 follower上去,讀的時(shí)候就直接讀 leader 上的數(shù)據(jù)即可。 如果某個(gè) broker宕機(jī)了,剛好也是 partition的leader,那么此時(shí)會(huì)選舉一個(gè)新的 leader出來,大家繼續(xù)讀寫那個(gè)新的 leader即可,這個(gè)就 是所謂的高可用性。更多面試題:面試題內(nèi)容聚合 leader和follower的同步機(jī)制:寫數(shù)據(jù)的時(shí)候,生產(chǎn)者就寫 leader,然后 leader將數(shù)據(jù)落地寫本地磁盤,接著其他 follower 自己主動(dòng)從 leader來pull數(shù)據(jù)。一旦所有 follower同步好數(shù)據(jù)了,就會(huì)發(fā)送 ack給 leader,leader收到所有 follower的 ack之后,就會(huì)返回寫成功的消息給生產(chǎn)者。 消費(fèi)的時(shí)候,只會(huì)從 leader去讀,但是只有一個(gè)消息已經(jīng)被所有 follower都同步成功返回 ack的時(shí)候,這個(gè)消息才會(huì)被消費(fèi)者讀到。 消息隊(duì)列重復(fù)數(shù)據(jù)MQ 只能保證消息不丟,不能保證重復(fù)發(fā)送 Kafka 消費(fèi)端可能出現(xiàn)的重復(fù)消費(fèi)問題每條消息都有一個(gè) offset 代表 了這個(gè)消息的順序的序號(hào),按照數(shù)據(jù)進(jìn)入 kafka的順序,kafka會(huì)給每條數(shù)據(jù)分配一個(gè) offset,代表了這個(gè)是數(shù)據(jù)的序號(hào),消費(fèi)者從 kafka去消費(fèi)的時(shí)候,按照這個(gè)順序去消費(fèi),消費(fèi)者會(huì)去提交 offset,就是告訴 kafka已經(jīng)消費(fèi)到 offset=153這條數(shù)據(jù)了 ;zk里面就記錄了消費(fèi)者當(dāng)前消費(fèi)到了 offset =幾的那條消息;假如此時(shí)消費(fèi)者系統(tǒng)被重啟,重啟之后,消費(fèi)者會(huì)找kafka,讓kafka把上次我消費(fèi)到的那個(gè)地方后面的數(shù)據(jù)繼續(xù)給我傳遞過來。更多面試題:面試題內(nèi)容聚合
消費(fèi)者不是說消費(fèi)完一條數(shù)據(jù)就立馬提交 offset的,而是定時(shí)定期提交一次 offset。消費(fèi)者如果再準(zhǔn)備提交 offset,但是還沒提交 offset的時(shí)候,消費(fèi)者進(jìn)程重啟了,那么此時(shí)已經(jīng)消費(fèi)過的消息的 offset并沒有提交,kafka也就不知道你已經(jīng)消費(fèi)了 offset= 153那條數(shù)據(jù),這個(gè)時(shí)候kafka會(huì)給你發(fā)offset=152,153,154的數(shù)據(jù),此時(shí) offset = 152,153的消息重復(fù)消費(fèi)了 保證 MQ 重復(fù)消費(fèi)冪等性冪等:一個(gè)數(shù)據(jù)或者一個(gè)請(qǐng)求,給你重復(fù)來多次,你得確保對(duì)應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò)。
保證 MQ 消息不丟MQ 傳遞非常核心的消息,比如:廣告計(jì)費(fèi)系統(tǒng),用戶點(diǎn)擊一次廣告,扣費(fèi)一塊錢,如果扣費(fèi)的時(shí)候消息丟了,則會(huì)不斷少錢,積少成多,對(duì)公司是一個(gè)很大的損失。 RabbitMQ可能存在的數(shù)據(jù)丟失問題
問題 1解決方案: 事務(wù)機(jī)制:(一般不采用,同步的,生產(chǎn)者發(fā)送消息會(huì)同步阻塞卡住等待你是成功還是失敗。會(huì)導(dǎo)致生產(chǎn)者發(fā)送消息的吞吐量降下來) channel.txSelect confirm機(jī)制:(一般采用這種機(jī)制,異步的模式,不會(huì)阻塞,吞吐量會(huì)比較高)
問題 2 解決方案: 持久化到磁盤
缺點(diǎn):可能會(huì)有一點(diǎn)點(diǎn)丟失數(shù)據(jù)的可能,消息剛好寫到了 rabbitmq中,但是還沒來得及持久化到磁盤上,結(jié)果不巧, rabbitmq掛了,會(huì)導(dǎo)致內(nèi)存里的一點(diǎn)點(diǎn)數(shù)據(jù)會(huì)丟失。更多面試題:面試題內(nèi)容聚合 問題 3 解決方案: 原因:消費(fèi)者打開了 autoAck機(jī)制(消費(fèi)到一條消息,還在處理中,還沒處理完,此時(shí)消費(fèi)者自動(dòng) autoAck了,通知 rabbitmq說這條消息已經(jīng)消費(fèi)了,此時(shí)不巧,消費(fèi)者系統(tǒng)宕機(jī)了,那條消息丟失了,還沒處理完,而且 rabbitmq還以為這個(gè)消息已經(jīng)處理掉了) 解決方案:關(guān)閉 autoAck,自己處理完了一條消息后,再發(fā)送 ack給 rabbitmq,如果此時(shí)還沒處理完就宕機(jī)了,此時(shí)rabbitmq沒收到你發(fā)的ack消息,然后 rabbitmq 就會(huì)將這條消息重新分配給其他的消費(fèi)者去處理。 Kafka 可能存在的數(shù)據(jù)丟失問題消費(fèi)端弄丟數(shù)據(jù)原因:消費(fèi)者消費(fèi)到那條消息后,自動(dòng)提交了 offset,kafka以為你已經(jīng)消費(fèi)好了這條消息,結(jié)果消費(fèi)者掛了,這條消息就丟了。 例子:消費(fèi)者消費(fèi)到數(shù)據(jù)后寫到一個(gè)內(nèi)存 queue里緩存下,消息自動(dòng)提交 offset,重啟了系統(tǒng),結(jié)果會(huì)導(dǎo)致內(nèi)存 queue 里還沒來得及處理的數(shù)據(jù)丟失。 解決方法:kafka會(huì)自動(dòng)提交 offset,那么只要關(guān)閉自動(dòng)提交 offset,在處理完之后自己手動(dòng)提交,可以保證數(shù)據(jù)不會(huì)丟。但是此時(shí)確實(shí)還是會(huì)重復(fù)消費(fèi),比如剛好處理完,還沒提交 offset,結(jié)果自己掛了,此時(shí)肯定會(huì)重復(fù)消費(fèi)一次 ,做好冪等即可。 Kafka 丟掉消息原因:kafka 某個(gè) broker 宕機(jī),然后重新選舉 partition 的 leader時(shí),此時(shí)其他的 follower 剛好還有一些數(shù)據(jù)沒有同步,結(jié)果此時(shí) leader掛了,然后選舉某個(gè) follower成 leader之后,就丟掉了之前l(fā)eader里未同步的數(shù)據(jù)。更多面試題:面試題內(nèi)容聚合 例子:kafka的leader機(jī)器宕機(jī),將 follower 切換為 leader之后,發(fā)現(xiàn)數(shù)據(jù)丟了
按 2 的方案設(shè)置了 ack =all,一定不會(huì)丟。它會(huì)要求 leader 接收到消息,所有的 follower 都同步 到了消息之后,才認(rèn)為本次寫成功。如果沒滿足這個(gè)條件,生產(chǎn)者會(huì)無限次重試 。 消息隊(duì)列順序性背景:mysql binlog 同步的系統(tǒng),在mysql里增刪改一條數(shù)據(jù),對(duì)應(yīng)出來了增刪改 3 條binlog,接著這 3 條binlog發(fā)送到 MQ 里面,到消費(fèi)出來依次執(zhí)行,起碼是要保證順序的吧,不然順序變成了 刪除、修改、增加。日同步數(shù)據(jù)達(dá)到上億,mysql->mysql,比如大數(shù)據(jù) team,需要同步一個(gè)mysql庫(kù),來對(duì)公司的業(yè)務(wù)系統(tǒng)的數(shù)據(jù)做各種復(fù)雜的操作。 場(chǎng)景:
RabbitMQ 消息順序錯(cuò)亂RabbitMQ 如何保證消息順序性需要保證順序的數(shù)據(jù)放到同一個(gè)queue里 Kafka 消息順序錯(cuò)亂寫入一個(gè) partition中的數(shù)據(jù)一定是有順序的。 生產(chǎn)者在寫的時(shí)候,可以指定一個(gè) key,比如訂單id作為key,那么訂單相關(guān)的數(shù)據(jù),一定會(huì)被分發(fā)到一個(gè) partition中區(qū),此時(shí)這個(gè) partition中的數(shù)據(jù)一定是有順序的。Kafka 中一個(gè) partition 只能被一個(gè)消費(fèi)者消費(fèi)。消費(fèi)者從partition中取出數(shù)據(jù)的時(shí)候 ,一定是有順序的。 Kafka 保證消息順序性如果消費(fèi)者單線程消費(fèi)+處理,如果處理比較耗時(shí),處理一條消息是幾十ms,一秒鐘只能處理幾十條數(shù)據(jù),這個(gè)吞吐量太低了??隙ㄒ枚嗑€程去并發(fā)處理,壓測(cè)消費(fèi)者4 核 8G 單機(jī),32 條線程,最高每秒可以處理上千條消息 消息隊(duì)列延遲以及過期失效消費(fèi)端出了問題,不消費(fèi)了或者消費(fèi)極其慢。接著坑爹了,你的消息隊(duì)列集群的磁盤都快寫滿了 ,都沒人消費(fèi),怎么辦?積壓了幾個(gè)小時(shí),rabbitmq設(shè)置了消息過期時(shí)間后就沒了,怎么辦? 例如:
場(chǎng)景:幾千萬條數(shù)據(jù)再 MQ 里積壓了七八個(gè)小時(shí) 快速處理積壓的消息一個(gè)消費(fèi)者一秒是 1000 條,一秒 3 個(gè)消費(fèi)者是 3000 條,一分鐘是 18W 條,1000 多 W 條需要一個(gè)小時(shí)恢復(fù)。 步驟:
原來 3 個(gè)消費(fèi)者需要 1 個(gè)小時(shí)可以搞定,現(xiàn)在 30 個(gè)臨時(shí)消費(fèi)者需要 10 分鐘就可以搞定。 如果用的 rabbitmq,并且設(shè)置了過期時(shí)間,如果此消費(fèi)在 queue里積壓超過一定的時(shí)間會(huì)被 rabbitmq清理掉,數(shù)據(jù)直接搞丟。 如果消息積壓mq,長(zhǎng)時(shí)間沒被處理掉,導(dǎo)致mq快寫完滿了,你臨時(shí)寫一個(gè)程序,接入數(shù)據(jù)來消費(fèi),寫到一個(gè)臨時(shí)的mq里,再讓其他消費(fèi)者慢慢消費(fèi) 或者消費(fèi)一個(gè)丟棄一個(gè),都不要了,快速消費(fèi)掉所有的消息,然后晚上補(bǔ)數(shù)據(jù)。 如何設(shè)計(jì)消息隊(duì)列中間件架構(gòu)
|
|