1.什么是消息隊列
消息隊列允許應用間通過消息的發(fā)送與接收的方式進行通信,當消息接收方服務忙或不可用時,其提供了一個消息暫存的功能。 消息隊列中的幾個基本概念: Producer: 消息發(fā)送方,也叫做消息生產(chǎn)者 Consumer: 消息接收方,也叫做消息消費者 Broker: 消息投遞的代理 2.應用場景低耦合、可靠投遞、廣播、流量控制、最終一致性。 1)解耦(為面向服務的架構(SOA)提供基本的最終一致性實現(xiàn))基于消息的模型,關心的是“通知”,而非“處理”。 短信、郵件通知、緩存刷新等操作使用消息隊列進行通知。 系統(tǒng)性能評價標準: 非消息系統(tǒng)隊列系統(tǒng):系統(tǒng)中最慢的組件運行時間(短板效應) 消息隊列系統(tǒng):異步通知減少短板效應的影響 消息隊列和RPC的區(qū)別與比較: RPC: 異步調用,及時獲得調用結果,具有強一致性結果,關心業(yè)務調用處理結果。 消息隊列:兩次異步RPC調用,將調用內(nèi)容在隊列中進行轉儲,并選擇合適的時機進行投遞(錯峰流控) 2)廣播(發(fā)布/訂閱模型,基于消息格式進行編程)如果沒有消息隊列,每當一個新的業(yè)務方接入,我們都要聯(lián)調一次新接口。有了消息隊列,我們只需要關心消息是否送達了隊列,至于誰希望訂閱,是下游的事情,無疑極大地減少了開發(fā)和聯(lián)調的工作量。 3) 錯峰及流控不同服務間的處理能力不同。比如: WEB前端上千萬次/s的負載,數(shù)據(jù)庫tps跟不上。 沒有消息隊列時,通過滑動窗口、擁塞控制等一系列措施,可以做到服務調用間的流量控制,但是不具有通用性,且維護復雜。 通過控制消息隊列的分發(fā)速度可以實現(xiàn)通用的流量控制。利用中間系統(tǒng)轉儲兩個系統(tǒng)的通信內(nèi)容,并在下游系統(tǒng)有能力處理這些消息的時候,再處理這些消息,是一套相對較通用的方式。 總結: 1. 消息隊列適用于處理非強一致性事務要求,非延遲敏感的業(yè)務場景。RPC可滿足強一致性和延遲敏感要求; 2.可以用支持最終一致性的消息隊列來實現(xiàn)輕量級、非延遲敏感的分布式事務; 3.上下游業(yè)務系統(tǒng)處理能力存在較大差距,且不包含在業(yè)務主流程的功能,可以交給消息隊列; 4.有多個下游業(yè)務關心系統(tǒng)發(fā)出的通知消息時,基于標準的消息內(nèi)容格式,使用消息隊列可減少重復開發(fā)和聯(lián)調。 3. 消息隊列的基本特性數(shù)據(jù)流轉過程:producer發(fā)送給broker, broker發(fā)送給consumer, consumer回復消費確認,broker刪除消息 1)消息丟失(可靠性)會導致消息重復及延遲。 當有可能發(fā)生消息丟失風險的操作前,先將消息數(shù)據(jù)落地,然后發(fā)送。失敗或超時時,不斷輪詢所有待發(fā)送消息重新發(fā)送,保證最終一定送達。消息消費者收到消息后給服務端一個確認,當所有訂閱者都確認收到消息后,刪除消息。 消息重復和丟失必須得面對一個,只能在兩者間做平衡。 2)順序投遞1.允許消息丟失。 2.從發(fā)送方到服務方到接受者都是單點單線程。 3)重復投遞如何鑒別重復消息? 重復消息如何冪等處理? i) 版本號 在一個回話周期內(nèi)維護一個單調遞增的消息版本號,檢測到小于最近接收到的版本號時判別為 重復投遞的消息。 發(fā)送方必須攜帶消息版本號; 對于嚴格要求消息順序的業(yè)務,接收方必須存儲消息版本號。 ii)狀態(tài)機 主流消息隊列的設計范式里,在不丟消息的前提下,盡量減少重復消息,不保證消息的投遞順序。 4)push or pullpush 模式, 即 broker 主動推送消息給消費者 pull 模式, 即消費者主動從 broker 中拉取消息 4. rabbitmq(以下內(nèi)容均基于AMQP協(xié)議)1)基本特性* 可靠性 消息持久化、消息發(fā)送和投遞確認機制、集群高可用方案 * 靈活路由 消息通過exchange的方式路由到不同的queue中,提供了包括fanout, direct, topic等多種exchange實現(xiàn),并且支持通過編寫exchange插件的方式自實現(xiàn)路由方案 * 支持集群 同網(wǎng)段下的rabbitmq節(jié)點可以通過集群的方式,組成一個邏輯上的單一broker * Federation 通過Federation可以在跨網(wǎng)段節(jié)點間組件集群 * 高可用消息隊列 通過設置鏡像隊列的方式,消息可以在鏡像隊列間進行復制,使節(jié)點down機或硬件損壞的情況下保證隊列服務的高可用 * 多協(xié)議支持 包括AMQP, STOMP, MQTT, HTTP等多種消息交換協(xié)議 * 多客戶端支持 JAVA, .NET, Ruby, Python, PHP, Node, Go...... * 可視化管理界面 * 豐富的插件支持 tracing, managment-plugin, and you can also write your own. 2)基礎模型基礎組件:Publisher, Exchange, Queue, Consumer 基礎動作:Publish, Binding, Route, Consume i) Queue的基本屬性: Name Durable (the queue will survive a broker restart) Exclusive (used by only one connection and the queue will be deleted when that connection closes) Auto-delete (queue is deleted when last consumer unsubscribes) Arguments (some brokers use it to implement additional features like message TTL) ii) Exchange路由策略 direct(default): 路由到完全匹配的routing_key對應綁定的queue中 Fanout: 將消息路由轉發(fā)到所有綁定到該exchange上的queue Topic: 通過比對routing_key是否匹配來路由消息。 *(star) can substitute for exactly one word. #(hash) can substitute for zero or more words. Header: 忽略routingKey,通過匹配header中的鍵值對方式來路由,有all(全部匹配)和any(部分匹配)兩種模式。 iii) Binding 綁定動作是將發(fā)送到exchange上的消息路由到特定queue的規(guī)則的綁定。 3)消息投遞的可靠性保證 Consumet Acknowledgements & Publisher Confirms i) (Consumer) Delivery Acknowledgements 當broker投遞一條消息給consumer后,broker需要獲取客戶端是否正確處理了該條消息的狀態(tài),當所有消息訂閱者都成功確認了該條消息時,證明該條消息投遞成功,broker會從隊列里面刪除該條消息。 Delivery Identifiers: Delivery Tags 當一個consumer向服務器注冊了一個連接(建立起了一個channel),broker向consumer投遞消息時會攜帶一個作用域為channel,單調遞增,long類型的delivery tag,consumer通過收到的delivery tag作為標識,向broker發(fā)起消息確認流程。 (注:delivery tag是一個64位的long類型數(shù),其最大值為:9223372036854775807,channel作用域內(nèi)超過其最大值的可能性很?。?/p> Acknowledgements Models basic.ack消息被成功處理 basic.nack客戶端無法處理該消息,支持一次拒絕多條消息(multiple表示reject basic.reject客戶端無法處理該消息 Channel Prefetch Setting (QoS) Consumer可以同時接收多少條消息。 ex: tag=5,6,7,8都處于未確認狀態(tài),QoS=4,那么broker將不會再向該Consumer投遞任何消息,直到有一條消息被確認后。 ii) Publisher Confirms 保證消息被準確送達到broker的兩種方式: transaction & Publisher Acknowledgements transaction 當channel處于事務模式下時,publisher投遞消息至broker,broker的消息落地在同一個事務中,保證了消息從publisher到broker的確定性。 事務模式相較于普通異步模式,性能上有250倍的下降。 Publisher Acknowledgements 通過delivery tag來作為消息的唯一標識,來對publisher的消息做異步確認,對于basic.reject & nack的情況需要業(yè)務方自己對消息做暫存重試。 官方示例:http://hg./rabbitmq-java-client/file/default/test/src/com/rabbitmq/examples/ConfirmDontLoseMessages.java 消息在什么時候被確認 消息沒有找到對應的路由策略,broker會返回basic.nack表示broker不能正確處理該條消息; 對于持久化隊列的消息,當消息被成功寫入磁盤后,會返回basic.ack的確認消息,表示消息broker正常收到消息且數(shù)據(jù)已落地; 對于鏡像隊列,當所有鏡像結點的對應queue已經(jīng)成功接收到數(shù)據(jù)時,會返回broker.ack表示數(shù)據(jù)已成功同步; rabbitmq的異步publisher confirm策略會將一段時間間隔內(nèi)(a few hundred milliseconds)的所有message寫入到磁盤,所有異步confirm大概會有幾百毫秒的延遲,但是這樣卻帶來了吞吐量的極大提高。 4)消息投遞的順序當消息從單publisher產(chǎn)生,通過單個exchange路由到單個queue,并被單個consumer消費時保證消息是嚴格按照順序消費的。 當消息被多個consumer訂閱時,c1消費成功,c2消費失敗且進行了requeue操作,此時就打亂了原來的順序。 5)TTL(time to live)消息存活時間,當一個消息從入隊開始計時,過了設定的TTL時間后,就認為該消息為dead message. 6)DLX(Dead Letter Exchange)死信隊列。當發(fā)生如下情況時,消息會進入DLX: The message is rejected (basic.rejectorbasic.nack) withrequeue=false, The TTL for the message expires; or The queue length limit is exceeded. 總結rabbitmq保證如下特性: 消息不丟失 消息至少被成功消費一次 不保證消息不重復,需業(yè)務方自行進行去重邏輯(版本號&狀態(tài)機&msgid) 不保證完全按照順序消費 5)高可用方案cluster + ha policycluster機制多個全聯(lián)通節(jié)點之間元信息(exchange、queue、binding等)保持強一致,但是隊列消息只會存儲在其中一個節(jié)點。 優(yōu)點:提高吞吐量,部分解決擴展性問題。 缺點:不能提升數(shù)據(jù)可靠性和系統(tǒng)可用性。 ha policy機制在cluster機制基礎上可以指定集群內(nèi)任意數(shù)量隊列組成鏡像隊列,隊列消息會在多節(jié)點間復制。實現(xiàn)數(shù)據(jù)高可靠和系統(tǒng)高可用。 設置參數(shù):ha-mode和ha-params可以細粒度(哪些節(jié)點,哪些隊列)設置鏡像隊列。 設置參數(shù):ha-sync-mode=manual(默認)/automatic可以指定集群中新節(jié)點的數(shù)據(jù)同步策略。 home node:rabbitmq中每一個queue都有一個home node,稱之為queue master,所有對queue的操作都是首先通過queue master向其他節(jié)點進行復制的,該機制保證了隊列的FIFO特性。 我們可以通過設置queue的x-queue-master-locator設置queue master策略 Pick the node hosting the minimum number of masters:min-masters Pick the node the client that declares the queue is connected to:client-local Pick a random node:random 消息節(jié)點故障的幾種情況slave節(jié)點故障 slave節(jié)點故障時,集群會自動關將其剔除,不再將master的信息同步至該節(jié)點。 new node joining the cluster. 節(jié)點可在任意時間加入集群,初始化加入集群的slave隊列信息為空,但是可以接收從master新同步過來的消息。master隊列頭部消息不斷被消費,尾部不斷新增消息,當某一時刻,在slave加入之前的歷史消息都被消費完畢時,master隊列的size和slave的size相等,認為slave節(jié)點從master的數(shù)據(jù)同步完成。 新加入的slave不能為加入之前的數(shù)據(jù)增加額外的冗余和可用性。因為執(zhí)行明確的同步操作會使queue無響應,因此只允許非活躍的queues進行明確的同步操作而活躍的queues進行自然的同步操作是一種好的策略。 也可以在新的slave加入集群后,手動或自動觸發(fā)同步歷史數(shù)據(jù),但是在同步過程中,隊列queue將處于不可用狀態(tài)(類似于MySQL在更改表結構的時候的鎖表)。 rabbitmqctl sync_queuename 手動觸發(fā)歷史消息同步 rabbitmqctl cancel_sync_queuename 取消消息同步操作 rabbitmqctl list_queues name slave_pids synchronised_slave_pids 查看正在同步的隊列 通過設置隊列的ha-sync-mode屬性automatic-新slave加入自動同步,manual-通過增量的方式同步 Stopping nodes and synchronisation master節(jié)點故障,最先加入集群的slave將被提升為master,假設此時集群中存在一個已經(jīng)完全同步的節(jié)點,即最先加入集群的slave。 集群中的節(jié)點繼續(xù)故障,到最終只剩下唯一的一個節(jié)點,該節(jié)點為master,對于該節(jié)點上持久化隊列,會在其故障或者重啟前不斷持久化消息,當故障節(jié)點恢復后,重新加入節(jié)點,因為其無法得知其故障之前持久化的鏡像隊列的歷史消息是否被正確投遞,rabbitmq采取的策略是清空其持久化信息,作為一個空的新slave加入集群。 Stopping master nodes with only unsynchronised slaves 當master節(jié)點故障時,cluster中只有未同步完成的節(jié)點時,有以下兩種情況: 當在可控條件下故障:rabbitmqctl stop, 優(yōu)雅地關閉OS時,故障節(jié)點將不會轉移到鏡像隊列上,此時鏡像隊列處于不可用狀態(tài)。(defalut setting) 當不可控條件下故障,機器斷電損壞等,master將會轉移到未同步完成的節(jié)點上。 Loss of a master while all slaves are stopped 通常情況下,我們希望最后的節(jié)點在重啟后繼續(xù)接管master權限,因為該情況下,最后的master節(jié)點保存了最全的信息,而這些信息其他之前故障的slave節(jié)點并不能接收到。 然而,我們在Stopping nodes and synchronisation一節(jié)中知道,新啟動加入集群的slave會首先清空其持久化的消息信息,作為一個新節(jié)點加入集群,所以在這里我們需要先執(zhí)行rabbitmqctl forget_cluster_node,此時rabbitmq會嘗試尋找當前node上的所有queue的master節(jié)點機器,并在master機器再次重啟加入集群后重新提升其為master節(jié)點。 master節(jié)點故障時: 一個slave被提升為新的master。被提升的slave是“最舊”的slave。因為該slave與原master中內(nèi)容完全同步的幾率最大。然而,也有可能所有的salve都未與master完全同步,此時只有master中存在而slaves中不存在的message將丟失。 slave認為所有之前的consumers的連接突然斷開了。因此,它重新將已經(jīng)投遞但還未被確認的messages重新排隊。這些“未被確認的”message可能包含client已發(fā)出確認但確認在到達master前丟失了的情形,也包含client已發(fā)出確認且確認已到達master但在master廣播給slaves時丟失的情形。在上述任意一種情況下,新的master都必須為他認為未收到確認的message重新排隊。 之前請求原master的clients被取消。 由于重新排隊,從queue重新consume的clients需要知道此時可能接收到之前已經(jīng)被處理過的message。 x-cancel-on-ha-failover=true時,consuming過程將被取消,consumer cancellation notification會被發(fā)出,由業(yè)務方確認是否重新消費。 當master節(jié)點故障時,如果publisher使用的時noAck模式,消息可能會丟失,因為noAck模式下,消息一旦進入broker的處理流程,就代表消息已經(jīng)處理完成,不需要任何確認過程。此時master發(fā)生故障,將不會requeue該消息,消息有丟失的風險。 4.消息隊列的應用場景舉例可靠的延遲隊列 RPC: Our RPC will work like this: When the Client starts up, it creates an anonymous exclusive callback queue. For an RPC request, the Client sends a message with two properties:replyTo, which is set to the callback queue andcorrelationId, which is set to a unique value for every request. The request is sent to anrpc_queuequeue. The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from thereplyTofield. The client waits for data on the callback queue. When a message appears, it checks thecorrelationIdproperty. If it matches the value from the request it returns the response to the application.
|