Kafka 涉及的知識點如下圖所示,本文將逐一講解:

一、消息隊列
1. 消息隊列的介紹
消息(Message)是指在應用之間傳送的數(shù)據(jù),消息可以非常簡單,比如只包含文本字符串,也可以更復雜,可能包含嵌入對象。消息隊列(Message Queue)是一種應用間的通信方式,消息發(fā)送后可以立即返回,有消息系統(tǒng)來確保信息的可靠專遞,消息發(fā)布者只管把消息發(fā)布到MQ中而不管誰來取,消息使用者只管從MQ中取消息而不管誰發(fā)布的,這樣發(fā)布者和使用者都不用知道對方的存在。
2. 消息隊列的應用場景
消息隊列在實際應用中包括如下四個場景:
應用耦合:多應用間通過消息隊列對同一消息進行處理,避免調用接口失敗導致整個過程失??;
異步處理:多應用對消息隊列中同一消息進行處理,應用間并發(fā)處理消息,相比串行處理,減少處理時間;
限流削峰:廣泛應用于秒殺或搶購活動中,避免流量過大導致應用系統(tǒng)掛掉的情況;
消息驅動的系統(tǒng):系統(tǒng)分為消息隊列、消息生產者、消息消費者,生產者負責產生消息,消費者(可能有多個)負責對消息進行處理;
下面詳細介紹上述四個場景以及消息隊列如何在上述四個場景中使用:
具體場景:用戶為了使用某個應用,進行注冊,系統(tǒng)需要發(fā)送注冊郵件并驗證短信。對這兩個操作的處理方式有兩種:串行及并行。
串行方式:新注冊信息生成后,先發(fā)送注冊郵件,再發(fā)送驗證短信;
在這種方式下,需要最終發(fā)送驗證短信后再返回給客戶端。
并行處理:新注冊信息寫入后,由發(fā)短信和發(fā)郵件并行處理;
在這種方式下,發(fā)短信和發(fā)郵件 需處理完成后再返回給客戶端。假設以上三個子系統(tǒng)處理的時間均為50ms,且不考慮網絡延遲,則總的處理時間:
串行:50+50+50=150ms
并行:50+50 = 100ms

在寫入消息隊列后立即返回成功給客戶端,則總的響應時間依賴于寫入消息隊列的時間,而寫入消息隊列的時間本身是可以很快的,基本可以忽略不計,因此總的處理時間相比串行提高了2倍,相比并行提高了一倍;
具體場景:用戶使用QQ相冊上傳一張圖片,人臉識別系統(tǒng)會對該圖片進行人臉識別,一般的做法是,服務器接收到圖片后,圖片上傳系統(tǒng)立即調用人臉識別系統(tǒng),調用完成后再返回成功,如下圖所示:
該方法有如下缺點:
人臉識別系統(tǒng)被調失敗,導致圖片上傳失??;
延遲高,需要人臉識別系統(tǒng)處理完成后,再返回給客戶端,即使用戶并不需要立即知道結果;
圖片上傳系統(tǒng)與人臉識別系統(tǒng)之間互相調用,需要做耦合;
若使用消息隊列:

客戶端上傳圖片后,圖片上傳系統(tǒng)將圖片信息如uin、批次寫入消息隊列,直接返回成功;而人臉識別系統(tǒng)則定時從消息隊列中取數(shù)據(jù),完成對新增圖片的識別。
此時圖片上傳系統(tǒng)并不需要關心人臉識別系統(tǒng)是否對這些圖片信息的處理、以及何時對這些圖片信息進行處理。事實上,由于用戶并不需要立即知道人臉識別結果,人臉識別系統(tǒng)可以選擇不同的調度策略,按照閑時、忙時、正常時間,對隊列中的圖片信息進行處理。
具體場景:購物網站開展秒殺活動,一般由于瞬時訪問量過大,服務器接收過大,會導致流量暴增,相關系統(tǒng)無法處理請求甚至崩潰。而加入消息隊列后,系統(tǒng)可以從消息隊列中取數(shù)據(jù),相當于消息隊列做了一次緩沖。
該方法有如下優(yōu)點:
請求先入消息隊列,而不是由業(yè)務處理系統(tǒng)直接處理,做了一次緩沖,極大地減少了業(yè)務處理系統(tǒng)的壓力;
隊列長度可以做限制,事實上,秒殺時,后入隊列的用戶無法秒殺到商品,這些請求可以直接被拋棄,返回活動已結束或商品已售完信息;
4.消息驅動的系統(tǒng)
具體場景:用戶新上傳了一批照片,人臉識別系統(tǒng)需要對這個用戶的所有照片進行聚類,聚類完成后由對賬系統(tǒng)重新生成用戶的人臉索引(加快查詢)。這三個子系統(tǒng)間由消息隊列連接起來,前一個階段的處理結果放入隊列中,后一個階段從隊列中獲取消息繼續(xù)處理。
該方法有如下優(yōu)點:
避免了直接調用下一個系統(tǒng)導致當前系統(tǒng)失??;
每個子系統(tǒng)對于消息的處理方式可以更為靈活,可以選擇收到消息時就處理,可以選擇定時處理,也可以劃分時間段按不同處理速度處理;
3. 消息隊列的兩種模式
消息隊列包括兩種模式,點對點模式(point to point, queue)和發(fā)布/訂閱模式(publish/subscribe,topic)
1) 點對點模式
點對點模式下包括三個角色:
- 接收者(消費者)
消息發(fā)送者生產消息發(fā)送到queue中,然后消息接收者從queue中取出并且消費消息。消息被消費以后,queue中不再有存儲,所以消息接收者不可能消費到已經被消費的消息。
點對點模式特點:
- 每個消息只有一個接收者(Consumer)(即一旦被消費,消息就不再在消息隊列中);
- 發(fā)送者和接發(fā)收者間沒有依賴性,發(fā)送者發(fā)送消息之后,不管有沒有接收者在運行,都不會影響到發(fā)送者下次發(fā)送消息;
- 接收者在成功接收消息之后需向隊列應答成功,以便消息隊列刪除當前接收的消息;
2) 發(fā)布/訂閱模式
發(fā)布/訂閱模式下包括三個角色:
- 訂閱者(Subscriber)
發(fā)布者將消息發(fā)送到Topic,系統(tǒng)將這些消息傳遞給多個訂閱者。
發(fā)布/訂閱模式特點:
- 發(fā)布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創(chuàng)建一個訂閱者之后,才能消費發(fā)布者的消息。
- 為了消費消息,訂閱者需要提前訂閱該角色主題,并保持在線運行;
4. 常用的消息隊列介紹
1) RabbitMQ
RabbitMQ 2007年發(fā)布,是一個在AMQP(高級消息隊列協(xié)議)基礎上完成的,可復用的企業(yè)消息系統(tǒng),是當前最主流的消息中間件之一。
2) ActiveMQ
ActiveMQ是由Apache出品,ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實現(xiàn)。它非??焖?,支持多種語言的客戶端和協(xié)議,而且可以非常容易的嵌入到企業(yè)的應用環(huán)境中,并有許多高級功能。
3) RocketMQ
RocketMQ出自 阿里公司的開源產品,用 Java 語言實現(xiàn),在設計時參考了 Kafka,并做出了自己的一些改進,消息可靠性上比 Kafka 更好。RocketMQ在阿里集團被廣泛應用在訂單,交易,充值,流計算,消息推送,日志流式處理等。
4) Kafka
Apache Kafka是一個分布式消息發(fā)布訂閱系統(tǒng)。它最初由LinkedIn公司基于獨特的設計實現(xiàn)為一個分布式的提交日志系統(tǒng)( a distributed commit log),之后成為Apache項目的一部分。Kafka系統(tǒng)快速、可擴展并且可持久化。它的分區(qū)特性,可復制和可容錯都是其不錯的特性。
5. Pulsar
Apahce Pulasr是一個企業(yè)級的發(fā)布-訂閱消息系統(tǒng),最初是由雅虎開發(fā),是下一代云原生分布式消息流平臺,集消息、存儲、輕量化函數(shù)式計算為一體,采用計算與存儲分離架構設計,支持多租戶、持久化存儲、多機房跨區(qū)域數(shù)據(jù)復制,具有強一致性、高吞吐、低延時及高可擴展性等流數(shù)據(jù)存儲特性。
Pulsar 非常靈活:它既可以應用于像 Kafka 這樣的分布式日志應用場景,也可以應用于像 RabbitMQ 這樣的純消息傳遞系統(tǒng)場景。它支持多種類型的訂閱、多種交付保證、保留策略以及處理模式演變的方法,以及其他諸多特性。
1. Pulsar 的特性
內置多租戶:不同的團隊可以使用相同的集群并將其隔離,解決了許多管理難題。它支持隔離、身份驗證、授權和配額;
多層體系結構:Pulsar 將所有 topic 數(shù)據(jù)存儲在由 Apache BookKeeper 支持的專業(yè)數(shù)據(jù)層中。存儲和消息傳遞的分離解決了擴展、重新平衡和維護集群的許多問題。它還提高了可靠性,幾乎不可能丟失數(shù)據(jù)。另外,在讀取數(shù)據(jù)時可以直連 BookKeeper,且不影響實時攝取。例如,可以使用 Presto 對 topic 執(zhí)行 SQL 查詢,類似于 KSQL,但不會影響實時數(shù)據(jù)處理;
虛擬 topic:由于采用 n 層體系結構,因此對 topic 的數(shù)量沒有限制,topic 及其存儲是分離的。用戶還可以創(chuàng)建非持久性 topic;
N 層存儲:Kafka 的一個問題是,存儲費用可能變高。因此,它很少用于存儲'冷'數(shù)據(jù),并且消息經常被刪除,Apache Pulsar 可以借助分層存儲自動將舊數(shù)據(jù)卸載到 Amazon S3 或其他數(shù)據(jù)存儲系統(tǒng),并且仍然向客戶端展示透明視圖;Pulsar 客戶端可以從時間開始節(jié)點讀取,就像所有消息都存在于日志中一樣;
2. Pulsar 存儲架構
Pulsar 的多層架構影響了存儲數(shù)據(jù)的方式。Pulsar 將 topic 分區(qū)劃分為分片(segment),然后將這些分片存儲在 Apache BookKeeper 的存儲節(jié)點上,以提高性能、可伸縮性和可用性。

Pulsar 的無限分布式日志以分片為中心,借助擴展日志存儲(通過 Apache BookKeeper)實現(xiàn),內置分層存儲支持,因此分片可以均勻地分布在存儲節(jié)點上。
由于與任一給定 topic 相關的數(shù)據(jù)都不會與特定存儲節(jié)點進行捆綁,因此很容易替換存儲節(jié)點或縮擴容。另外,集群中最小或最慢的節(jié)點也不會成為存儲或帶寬的短板。
Pulsar 架構能實現(xiàn)分區(qū)管理,負載均衡,因此使用 Pulsar 能夠快速擴展并達到高可用。這兩點至關重要,所以 Pulsar 非常適合用來構建關鍵任務服務,如金融應用場景的計費平臺,電子商務和零售商的交易處理系統(tǒng),金融機構的實時風險控制系統(tǒng)等。
通過性能強大的 Netty 架構,數(shù)據(jù)從 producers 到 broker,再到 bookie 的轉移都是零拷貝,不會生成副本。這一特性對所有流應用場景都非常友好,因為數(shù)據(jù)直接通過網絡或磁盤進行傳輸,沒有任何性能損失。
3. Pulsar 消息消費
Pulsar 的消費模型采用了流拉取的方式。流拉取是長輪詢的改進版,不僅實現(xiàn)了單個調用和請求之間的零等待,還可以提供雙向消息流。通過流拉取模型,Pulsar 實現(xiàn)了端到端的低延遲,這種低延遲比所有現(xiàn)有的長輪詢消息系統(tǒng)(如 Kafka)都低。
6. Kafka與Pulsar對比
1. Pulsar 的主要優(yōu)勢:
更多功能:Pulsar Function、多租戶、Schema registry、n 層存儲、多種消費模式和持久性模式等;
更大的靈活性:3 種訂閱類型(獨占,共享和故障轉移),用戶可以在一個訂閱上管理多個 topic;
與 Presto 的 SQL 集成,可直接查詢存儲而不會影響 broker;
2. Pulsar 的劣勢
Pulsar 并不完美,Pulsar 也存在一些問題:
n 層體系結構導致需要更多組件:BookKeeper;
云中的支持較少,Confluent 具有托管云產品。
3. 什么時候應該考慮 Pulsar
同時需要像 RabbitMQ 這樣的隊列和 Kafka 這樣的流處理程序;
實現(xiàn)多租戶,并確保每個團隊的訪問權限;
需要長時間保留消息,并且不想將其卸載到另一個存儲中;
需要高性能,基準測試表明 Pulsar 提供了更低的延遲和更高的吞吐量;
總之,Pulsar還比較新,社區(qū)不完善,用的企業(yè)比較少,網上有價值的討論和問題的解決比較少,遠沒有Kafka生態(tài)系統(tǒng)龐大,且用戶量非常龐大,目前Kafka依舊是大數(shù)據(jù)領域消息隊列的王者!所以我們還是以Kafka為主!
7. 其他消息隊列與Kafka對比

二、Kafka基礎
1. kafka的基本介紹
官網:http://kafka./
kafka是最初由linkedin公司開發(fā)的,使用scala語言編寫,kafka是一個分布式,分區(qū)的,多副本的,多訂閱者的日志系統(tǒng)(分布式MQ系統(tǒng)),可以用于搜索日志,監(jiān)控日志,訪問日志等。
Kafka is a distributed, partitioned, replicated commit logservice。它提供了類似于JMS的特性,但是在設計實現(xiàn)上完全不同,此外它并不是JMS規(guī)范的實現(xiàn)。
kafka對消息保存時根據(jù)Topic進行歸類,發(fā)送消息者成為Producer, 消息接受者成為Consumer, 此外kafka集群有多個kafka實例組成,每個實例(server)成為broker。無論是kafka集群,還是producer和consumer都依賴于zookeeper來保證系統(tǒng)可用性集群保存一些meta信息。
2. kafka的好處
- 可擴展性:kafka消息傳遞系統(tǒng)輕松縮放,無需停機。
- 耐用性:kafka使用分布式提交日志,這意味著消息會盡可能快速的保存在磁盤上,因此它是持久的。
- 性能:kafka對于發(fā)布和定于消息都具有高吞吐量。即使存儲了許多TB的消息,他也爆出穩(wěn)定的性能。
- kafka非???/strong>:保證零停機和零數(shù)據(jù)丟失。
3. 分布式的發(fā)布與訂閱系統(tǒng)
apache kafka是一個分布式發(fā)布-訂閱消息系統(tǒng)和一個強大的隊列,可以處理大量的數(shù)據(jù),并使能夠將消息從一個端點傳遞到另一個端點,kafka適合離線和在線消息消費。
kafka消息保留在磁盤上,并在集群內復制以防止數(shù)據(jù)丟失。kafka構建在zookeeper同步服務之上。它與apache和spark非常好的集成,應用于實時流式數(shù)據(jù)分析。
4. kafka的主要應用場景
1. 指標分析
kafka 通常用于操作監(jiān)控數(shù)據(jù)。這設計聚合來自分布式應用程序的統(tǒng)計信息,以產生操作的數(shù)據(jù)集中反饋
2. 日志聚合解決方法
kafka可用于跨組織從多個服務器收集日志,并使他們以標準的格式提供給多個服務器。
3. 流式處理
流式處理框架(spark,storm,?ink)重主題中讀取數(shù)據(jù),對齊進行處理,并將處理后的數(shù)據(jù)寫入新的主題,供 用戶和應用程序使用,kafka的強耐久性在流處理的上下文中也非常的有用。
三、Kafka架構及組件
1. kafka架構

允許應用程序發(fā)布記錄流至一個或者多個kafka的主題(topics)。
允許應用程序訂閱一個或者多個主題,并處理這些主題接收到的記錄流。
3. StreamsAPI
允許應用程序充當流處理器(stream processor),從一個或者多個主題獲取輸入流,并生產一個輸出流到一個或 者多個主題,能夠有效的變化輸入流為輸出流。
允許構建和運行可重用的生產者或者消費者,能夠把kafka主題連接到現(xiàn)有的應用程序或數(shù)據(jù)系統(tǒng)。例如:一個連接到關系數(shù)據(jù)庫的連接器可能會獲取每個表的變化。
Kafka 架構注:在Kafka 2.8.0 版本,移除了對Zookeeper的依賴,通過KRaft進行自己的集群管理,使用Kafka內部的Quorum控制器來取代ZooKeeper,因此用戶第一次可在完全不需要ZooKeeper的情況下執(zhí)行Kafka,這不只節(jié)省運算資源,并且也使得Kafka效能更好,還可支持規(guī)模更大的集群。
過去Apache ZooKeeper是Kafka這類分布式系統(tǒng)的關鍵,ZooKeeper扮演協(xié)調代理的角色,所有代理服務器啟動時,都會連接到Zookeeper進行注冊,當代理狀態(tài)發(fā)生變化時,Zookeeper也會儲存這些數(shù)據(jù),在過去,ZooKeeper是一個強大的工具,但是畢竟ZooKeeper是一個獨立的軟件,使得Kafka整個系統(tǒng)變得復雜,因此官方決定使用內部Quorum控制器來取代ZooKeeper。
這項工作從去年4月開始,而現(xiàn)在這項工作取得部分成果,用戶將可以在2.8版本,在沒有ZooKeeper的情況下執(zhí)行Kafka,官方稱這項功能為Kafka Raft元數(shù)據(jù)模式(KRaft)。
在KRaft模式,過去由Kafka控制器和ZooKeeper所操作的元數(shù)據(jù),將合并到這個新的Quorum控制器,并且在Kafka集群內部執(zhí)行,當然,如果使用者有特殊使用情境,Quorum控制器也可以在專用的硬件上執(zhí)行。
說完在新版本中移除zookeeper這個事,接著聊kafka的其他功能:
kafka支持消息持久化,消費端是主動拉取數(shù)據(jù),消費狀態(tài)和訂閱關系由客戶端負責維護,消息消費完后,不會立即刪除,會保留歷史消息。因此支持多訂閱時,消息只會存儲一份就可以。
- broker:kafka集群中包含一個或者多個服務實例(節(jié)點),這種服務實例被稱為broker(一個broker就是一個節(jié)點/一個服務器);
- topic:每條發(fā)布到kafka集群的消息都屬于某個類別,這個類別就叫做topic;
- partition:partition是一個物理上的概念,每個topic包含一個或者多個partition;
- segment:一個partition當中存在多個segment文件段,每個segment分為兩部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查詢, .log 文件當中數(shù)據(jù)的偏移量位置;
- producer:消息的生產者,負責發(fā)布消息到 kafka 的 broker 中;
- consumer:消息的消費者,向 kafka 的 broker 中讀取消息的客戶端;
- consumer group:消費者組,每一個 consumer 屬于一個特定的 consumer group(可以為每個consumer指定 groupName);
- .index:存放.log文件的索引數(shù)據(jù)。
2. Kafka 主要組件
1. producer(生產者)
producer主要是用于生產消息,是kafka當中的消息生產者,生產的消息通過topic進行歸類,保存到kafka的broker里面去。
2. topic(主題)
- topic特指kafka處理的消息源(feeds of messages)的不同分類;
- topic是一種分類或者發(fā)布的一些列記錄的名義上的名字。kafka主題始終是支持多用戶訂閱的;也就是說,一 個主題可以有零個,一個或者多個消費者訂閱寫入的數(shù)據(jù);
- 生產者和消費者消費數(shù)據(jù)一般以主題為單位。更細粒度可以到分區(qū)級別。
3. partition(分區(qū))
kafka當中,topic是消息的歸類,一個topic可以有多個分區(qū)(partition),每個分區(qū)保存部分topic的數(shù)據(jù),所有的partition當中的數(shù)據(jù)全部合并起來,就是一個topic當中的所有的數(shù)據(jù)。
一個broker服務下,可以創(chuàng)建多個分區(qū),broker數(shù)與分區(qū)數(shù)沒有關系;
在kafka中,每一個分區(qū)會有一個編號:編號從0開始。
每一個分區(qū)內的數(shù)據(jù)是有序的,但全局的數(shù)據(jù)不能保證是有序的。(有序是指生產什么樣順序,消費時也是什么樣的順序)
4. consumer(消費者)
consumer是kafka當中的消費者,主要用于消費kafka當中的數(shù)據(jù),消費者一定是歸屬于某個消費組中的。
5. consumer group(消費者組)
消費者組由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次。
每個消費者都屬于某個消費者組,如果不指定,那么所有的消費者都屬于默認的組。
每個消費者組都有一個ID,即group ID。組內的所有消費者協(xié)調在一起來消費一個訂閱主題( topic)的所有分區(qū)(partition)。
當然,每個分區(qū)只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組來消費。
partition數(shù)量決定了每個consumer group中并發(fā)消費者的最大數(shù)量。如下圖:
示例 1如上面左圖所示,如果只有兩個分區(qū),即使一個組內的消費者有4個,也會有兩個空閑的。
如上面右圖所示,有4個分區(qū),每個消費者消費一個分區(qū),并發(fā)量達到最大4。
在來看如下一幅圖:
示例 2如上圖所示,不同的消費者組消費同一個topic,這個topic有4個分區(qū),分布在兩個節(jié)點上。左邊的消費組1有兩個消費者,每個消費者就要消費兩個分區(qū)才能把消息完整的消費完,右邊的消費組2有四個消費者,每個消費者消費一個分區(qū)即可。
總結下kafka中分區(qū)與消費組的關系:
消費組:由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次。某一個主題下的分區(qū)數(shù),對于消費該主題的同一個消費組下的消費者數(shù)量,應該小于等于該主題下的分區(qū)數(shù)。
如:某一個主題有4個分區(qū),那么消費組中的消費者應該小于等于4,而且最好與分區(qū)數(shù)成整數(shù)倍 124 這樣。同一個分區(qū)下的數(shù)據(jù),在同一時刻,不能同一個消費組的不同消費者消費。
總結:分區(qū)數(shù)越多,同一時間可以有越多的消費者來進行消費,消費數(shù)據(jù)的速度就會越快,提高消費的性能。
6. partition replicas(分區(qū)副本)
kafka 中的分區(qū)副本如下圖所示:
kafka 分區(qū)副本副本數(shù)(replication-factor):控制消息保存在幾個broker(服務器)上,一般情況下副本數(shù)等于broker的個數(shù)。
一個broker服務下,不可以創(chuàng)建多個副本因子。創(chuàng)建主題時,副本因子應該小于等于可用的broker數(shù)。
副本因子操作以分區(qū)為單位的。每個分區(qū)都有各自的主副本和從副本;
主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個分區(qū)下的所有分區(qū),設定角色關系:一個leader和N個 follower),處于同步狀態(tài)的副本叫做in-sync-replicas(ISR);
follower通過拉的方式從leader同步數(shù)據(jù)。消費者和生產者都是從leader讀寫數(shù)據(jù),不與follower交互。
副本因子的作用:讓kafka讀取數(shù)據(jù)和寫入數(shù)據(jù)時的可靠性。
副本因子是包含本身,同一個副本因子不能放在同一個broker中。
如果某一個分區(qū)有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因為在另一臺啟動的話,存在數(shù)據(jù)傳遞,只要在機器之間有數(shù)據(jù)傳遞,就會長時間占用網絡IO,kafka是一個高吞吐量的消息系統(tǒng),這個情況不允許發(fā)生)所以不會在另一個broker中啟動。
如果所有的副本都掛了,生產者如果生產數(shù)據(jù)到指定分區(qū)的話,將寫入不成功。
lsr表示:當前可用的副本。
7. segment文件
一個partition當中由多個segment文件組成,每個segment文件,包含兩部分,一個是 .log 文件,另外一個是 .index 文件,其中 .log 文件包含了我們發(fā)送的數(shù)據(jù)存儲,.index 文件,記錄的是我們.log文件的數(shù)據(jù)索引值,以便于我們加快數(shù)據(jù)的查詢速度。
索引文件與數(shù)據(jù)文件的關系
既然它們是一一對應成對出現(xiàn),必然有關系。索引文件中元數(shù)據(jù)指向對應數(shù)據(jù)文件中message的物理偏移地址。
比如索引文件中 3,497 代表:數(shù)據(jù)文件中的第三個message,它的偏移地址為497。
再來看數(shù)據(jù)文件中,Message 368772表示:在全局partiton中是第368772個message。
注:segment index file 采取稀疏索引存儲方式,減少索引文件大小,通過mmap(內存映射)可以直接內存操作,稀疏索引為數(shù)據(jù)文件的每個對應message設置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間。
.index 與 .log 對應關系如下:
.index 與 .log上圖左半部分是索引文件,里面存儲的是一對一對的key-value,其中key是消息在數(shù)據(jù)文件(對應的log文件)中的編號,比如“1,3,6,8……”,分別表示在log文件中的第1條消息、第3條消息、第6條消息、第8條消息……
那么為什么在index文件中這些編號不是連續(xù)的呢?這是因為index文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。
這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內存中。但缺點是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。
value代表的是在全局partiton中的第幾個消息。
以索引文件中元數(shù)據(jù) 3,497 為例,其中3代表在右邊log數(shù)據(jù)文件中從上到下第3個消息,497表示該消息的物理偏移地址(位置)為497(也表示在全局partiton表示第497個消息-順序寫入特性)。
log日志目錄及組成kafka在我們指定的log.dir目錄下,會創(chuàng)建一些文件夾;名字是 (主題名字-分區(qū)名) 所組成的文件夾。在(主題名字-分區(qū)名)的目錄下,會有兩個文件存在,如下所示:
#索引文件
00000000000000000000.index
#日志內容
00000000000000000000.log
在目錄下的文件,會根據(jù)log日志的大小進行切分,.log文件的大小為1G的時候,就會進行切分文件;如下:
-rw-r--r--. 1 root root 389k 1月 17 18:03 00000000000000000000.index
-rw-r--r--. 1 root root 1.0G 1月 17 18:03 00000000000000000000.log
-rw-r--r--. 1 root root 10M 1月 17 18:03 00000000000000077894.index
-rw-r--r--. 1 root root 127M 1月 17 18:03 00000000000000077894.log
在kafka的設計中,將offset值作為了文件名的一部分。
segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個全局partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字就用0填充。
通過索引信息可以快速定位到message。通過index元數(shù)據(jù)全部映射到內存,可以避免segment File的IO磁盤操作;
通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
稀疏索引:為了數(shù)據(jù)創(chuàng)建索引,但范圍并不是為每一條創(chuàng)建,而是為某一個區(qū)間創(chuàng)建;好處:就是可以減少索引值的數(shù)量。不好的地方:找到索引區(qū)間之后,要得進行第二次處理。
8. message的物理結構
生產者發(fā)送到kafka的每條消息,都被kafka包裝成了一個message
message的物理結構如下圖所示:
.index 與 .log所以生產者發(fā)送給kafka的消息并不是直接存儲起來,而是經過kafka的包裝,每條消息都是上圖這個結構,只有最后一個字段才是真正生產者發(fā)送的消息數(shù)據(jù)。
四、Kafka集群操作
1. 創(chuàng)建topic
創(chuàng)建一個名字為test的主題, 有三個分區(qū),有兩個副本:
bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 3 --topic test
2. 查看主題命令
查看kafka當中存在的主題:
bin/kafka-topics.sh --list --zookeeper node01:2181,node02:2181,node03:2181
3. 生產者生產數(shù)據(jù)
模擬生產者來生產數(shù)據(jù):
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
4. 消費者消費數(shù)據(jù)
執(zhí)行以下命令來模擬消費者進行消費數(shù)據(jù):
bin/kafka-console-consumer.sh --from-beginning --topic test --zookeeper node01:2181,node02:2181,node03:2181
5. 運行describe topics命令
執(zhí)行以下命令運行describe查看topic的相關信息:
bin/kafka-topics.sh --describe --zookeeper node01:2181 --topic test
結果說明:
這是輸出的解釋。第一行給出了所有分區(qū)的摘要,每個附加行提供有關一個分區(qū)的信息。由于我們只有一個分 區(qū)用于此主題,因此只有一行。
“l(fā)eader”是負責給定分區(qū)的所有讀取和寫入的節(jié)點。每個節(jié)點將成為隨機選擇的分區(qū)部分的領導者。(因為在kafka中 如果有多個副本的話,就會存在leader和follower的關系,表示當前這個副本為leader所在的broker是哪一個)
“replicas”是復制此分區(qū)日志的節(jié)點列表,無論它們是否為領導者,或者即使它們當前處于活動狀態(tài)。(所有副本列表0,1,2)
“isr”是“同步”復制品的集合。這是副本列表的子集,該列表當前處于活躍狀態(tài)并且已經被領導者捕獲。(可用的列表數(shù))
6. 增加topic分區(qū)數(shù)
執(zhí)行以下命令可以增加topic分區(qū)數(shù):
bin/kafka-topics.sh --zookeeper zkhost:port --alter --topic topicName --partitions 8
7. 增加配置
動態(tài)修改kakfa的配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --config flush.messages=1
8. 刪除配置
動態(tài)刪除kafka集群配置:
bin/kafka-topics.sh --zookeeper node01:2181 --alter --topic test --delete-config flush.messages
9. 刪除topic
目前刪除topic在默認情況下知識打上一個刪除的標記,在重新啟動kafka后才刪除。
如果需要立即刪除,則需要在server.properties中配置:
delete.topic.enable=true
然后執(zhí)行以下命令進行刪除topic:
kafka-topics.sh --zookeeper zkhost:port --delete --topic topicName
五、Kafka的JavaAPI操作
1. 生產者代碼
使用生產者,生產數(shù)據(jù):
/**
* 訂單的生產者代碼,
*/
public class OrderProducer {
public static void main(String[] args) throws InterruptedException {
/* 1、連接集群,通過配置文件的方式
* 2、發(fā)送數(shù)據(jù)-topic:order,value
*/
Properties props = new Properties(); props.put('bootstrap.servers', 'node01:9092'); props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432);
props.put('key.serializer',
'org.apache.kafka.common.serialization.StringSerializer');
props.put('value.serializer',
'org.apache.kafka.common.serialization.StringSerializer');
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>
(props);
for (int i = 0; i < 1000; i++) {
// 發(fā)送數(shù)據(jù) ,需要一個producerRecord對象,最少參數(shù) String topic, V value kafkaProducer.send(new ProducerRecord<String, String>('order', '訂單信
息!'+i));
Thread.sleep(100);
}
}
}
kafka當中的數(shù)據(jù)分區(qū):
kafka生產者發(fā)送的消息,都是保存在broker當中,我們可以自定義分區(qū)規(guī)則,決定消息發(fā)送到哪個partition里面去進行保存查看ProducerRecord這個類的源碼,就可以看到kafka的各種不同分區(qū)策略
kafka當中支持以下四種數(shù)據(jù)的分區(qū)方式:
//第一種分區(qū)策略,如果既沒有指定分區(qū)號,也沒有指定數(shù)據(jù)key,那么就會使用輪詢的方式將數(shù)據(jù)均勻的發(fā)送到不同的分區(qū)里面去
//ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>('mypartition', 'mymessage' + i);
//kafkaProducer.send(producerRecord1);
//第二種分區(qū)策略 如果沒有指定分區(qū)號,指定了數(shù)據(jù)key,通過key.hashCode % numPartitions來計算數(shù)據(jù)究竟會保存在哪一個分區(qū)里面
//注意:如果數(shù)據(jù)key,沒有變化 key.hashCode % numPartitions = 固定值 所有的數(shù)據(jù)都會寫入到某一個分區(qū)里面去
//ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>('mypartition', 'mykey', 'mymessage' + i);
//kafkaProducer.send(producerRecord2);
//第三種分區(qū)策略:如果指定了分區(qū)號,那么就會將數(shù)據(jù)直接寫入到對應的分區(qū)里面去
// ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>('mypartition', 0, 'mykey', 'mymessage' + i);
// kafkaProducer.send(producerRecord3);
//第四種分區(qū)策略:自定義分區(qū)策略。如果不自定義分區(qū)規(guī)則,那么會將數(shù)據(jù)使用輪詢的方式均勻的發(fā)送到各個分區(qū)里面去
kafkaProducer.send(new ProducerRecord<String, String>('mypartition','mymessage'+i));
自定義分區(qū)策略:
public class KafkaCustomPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object arg1, byte[] keyBytes, Object arg3, byte[] arg4, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int partitionNum = partitions.size();
Random random = new Random();
int partition = random.nextInt(partitionNum);
return partition;
}
@Override
public void close() {
}
}
主代碼中添加配置:
@Test
public void kafkaProducer() throws Exception {
//1、準備配置文件
Properties props = new Properties();
props.put('bootstrap.servers', 'node01:9092,node02:9092,node03:9092');
props.put('acks', 'all');
props.put('retries', 0);
props.put('batch.size', 16384);
props.put('linger.ms', 1);
props.put('buffer.memory', 33554432);
props.put('partitioner.class', 'cn.itcast.kafka.partitioner.KafkaCustomPartitioner');
props.put('key.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
props.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
//2、創(chuàng)建KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props);
for (int i=0;i<100;i++){
//3、發(fā)送數(shù)據(jù)
kafkaProducer.send(new ProducerRecord<String, String>('testpart','0','value'+i));
}
kafkaProducer.close();
}
2. 消費者代碼
消費必要條件:
消費者要從kafka Cluster進行消費數(shù)據(jù),必要條件有以下四個:
地址:bootstrap.servers=node01:9092
序列化:key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer
主題(topic):需要制定具體的某個topic(order)即可。
1) 自動提交offset
消費完成之后,自動提交offset:
/**
* 消費訂單數(shù)據(jù)--- javaben.tojson
*/
public class OrderConsumer {
public static void main(String[] args) {
// 1\連接集群
Properties props = new Properties(); props.put('bootstrap.servers', 'hadoop-01:9092'); props.put('group.id', 'test');
//以下兩行代碼 ---消費者自動提交offset值
props.put('enable.auto.commit', 'true');
props.put('auto.commit.interval.ms', '1000');
props.put('key.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>
(props);
// 2、發(fā)送數(shù)據(jù) 發(fā)送數(shù)據(jù)需要,訂閱下要消費的topic。order kafkaConsumer.subscribe(Arrays.asList('order'));
while (true) {
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(100);// jdk queue offer插入、poll獲取元素。blockingqueue put插入原生, take獲取元素
for (ConsumerRecord<String, String> record : consumerRecords) { System.out.println('消費的數(shù)據(jù)為:' + record.value());
}
}
}
}
2) 手動提交offset
如果Consumer在獲取數(shù)據(jù)后,需要加入處理,數(shù)據(jù)完畢后才確認offset,需要程序來控制offset的確認。
關閉自動提交確認選項:props.put('enable.auto.commit', 'false');
手動提交offset值:kafkaConsumer.commitSync();
完整代碼如下:
Properties props = new Properties();
props.put('bootstrap.servers', 'localhost:9092');
props.put('group.id', 'test');
//關閉自動提交確認選項
props.put('enable.auto.commit', 'false');
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList('test'));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
// 手動提交offset值
consumer.commitSync();
buffer.clear();
}
}
3) 消費完每個分區(qū)之后手動提交offset
上面的示例使用commitSync將所有已接收的記錄標記為已提交。在某些情況下,可能希望通過明確指定偏移量來更好地控制已提交的記錄。在下面的示例中,我們在完成處理每個分區(qū)中的記錄后提交偏移量:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) { System.out.println(record.offset() + ': ' + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() -1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally { consumer.close();}
注意事項:
提交的偏移量應始終是應用程序將讀取的下一條消息的偏移量。因此,在調用commitSync(偏移量)時,應該在最后處理的消息的偏移量中添加一個。
4) 指定分區(qū)數(shù)據(jù)進行消費
如果進程正在維護與該分區(qū)關聯(lián)的某種本地狀態(tài)(如本地磁盤上的鍵值存儲),那么它應該只獲取它在磁盤上維護的分區(qū)的記錄。
如果進程本身具有高可用性,并且如果失敗則將重新啟動(可能使用YARN,Mesos或AWS工具等集群管理框 架,或作為流處理框架的一部分)。在這種情況下,Kafka不需要檢測故障并重新分配分區(qū),因為消耗過程將在另一臺機器上重新啟動。
Properties props = new Properties(); props.put('bootstrap.servers', 'localhost:9092'); props.put('group.id', 'test');
props.put('enable.auto.commit', 'true');
props.put('auto.commit.interval.ms', '1000');
props.put('key.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
props.put('value.deserializer',
'org.apache.kafka.common.serialization.StringDeserializer');
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//consumer.subscribe(Arrays.asList('foo', 'bar'));
//手動指定消費指定分區(qū)的數(shù)據(jù)---start
String topic = 'foo';
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1); consumer.assign(Arrays.asList(partition0, partition1));
//手動指定消費指定分區(qū)的數(shù)據(jù)---end
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf('offset = %d, key = %s, value = %s%n', record.offset(), record.key(), record.value());
}
注意事項:
要使用此模式,只需使用要使用的分區(qū)的完整列表調用assign(Collection),而不是使用subscribe訂閱主題。
5) 重復消費與數(shù)據(jù)丟失
說明:
已經消費的數(shù)據(jù)對于kafka來說,會將消費組里面的o?set值進行修改,那什么時候進行修改了?是在數(shù)據(jù)消費 完成之后,比如在控制臺打印完后自動提交;
提交過程:是通過kafka將o?set進行移動到下個message所處的o?set的位置。
拿到數(shù)據(jù)后,存儲到hbase中或者mysql中,如果hbase或者mysql在這個時候連接不上,就會拋出異常,如果在處理數(shù)據(jù)的時候已經進行了提交,那么kafka傷的o?set值已經進行了修改了,但是hbase或者mysql中沒有數(shù)據(jù),這個時候就會出現(xiàn)數(shù)據(jù)丟失。
- 什么時候提交o?set值?在Consumer將數(shù)據(jù)處理完成之后,再來進行o?set的修改提交。默認情況下o?set是 自動提交,需要修改為手動提交o?set值。
如果在處理代碼中正常處理了,但是在提交o?set請求的時候,沒有連接到kafka或者出現(xiàn)了故障,那么該次修 改o?set的請求是失敗的,那么下次在進行讀取同一個分區(qū)中的數(shù)據(jù)時,會從已經處理掉的o?set值再進行處理一 次,那么在hbase中或者mysql中就會產生兩條一樣的數(shù)據(jù),也就是數(shù)據(jù)重復。
6) consumer消費者消費數(shù)據(jù)流程
流程描述:
Consumer連接指定的Topic partition所在leader broker,采用pull方式從kafkalogs中獲取消息。對于不同的消費模式,會將offset保存在不同的地方官網關于high level API 以及l(fā)ow level API的簡介:http://kafka./0100/documentation.html#impl_consumer
高階API(High Level API):
kafka消費者高階API簡單;隱藏Consumer與Broker細節(jié);相關信息保存在zookeeper中:
/* create a connection to the cluster */
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
This method is used to get a list of KafkaStreams, which are iterators over
MessageAndMetadata objects from which you can obtain messages and their
associated metadata (currently only topic).
Input: a map of <topic, #streams>
Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
You can also obtain a list of KafkaStreams, that iterate over messages
from topics that match a TopicFilter. (A TopicFilter encapsulates a
whitelist or a blacklist which is a standard Java regex.)
*/
public List<KafkaStream> createMessageStreamsByFilter( TopicFilter topicFilter, int numStreams);
/* Commit the offsets of all messages consumed so far. */ public commitOffsets()
/* Shut down the connector */ public shutdown()
}
說明:大部分的操作都已經封裝好了,比如:當前消費到哪個位置下了,但是不夠靈活(工作過程推薦使用)
低級API(Low Level API):
kafka消費者低級API非常靈活;需要自己負責維護連接Controller Broker。保存offset,Consumer Partition對應關系:
class SimpleConsumer {
/* Send fetch request to a broker and get back a set of messages. */
public ByteBufferMessageSet fetch(FetchRequest request);
/* Send a list of fetch requests to a broker and get back a response set. */ public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
Get a list of valid offsets (up to maxSize) before the given time.
The result is a list of offsets, in descending order.
@param time: time in millisecs,
if set to OffsetRequest$.MODULE$.LATEST_TIME(), get from the latest
offset available. if set to OffsetRequest$.MODULE$.EARLIEST_TIME(), get from the earliest
available. public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
* offset
*/
說明:沒有進行包裝,所有的操作有用戶決定,如自己的保存某一個分區(qū)下的記錄,你當前消費到哪個位置。
3. kafka Streams API開發(fā)
需求:使用StreamAPI獲取test這個topic當中的數(shù)據(jù),然后將數(shù)據(jù)全部轉為大寫,寫入到test2這個topic當中去。
第一步:創(chuàng)建一個topic
node01服務器使用以下命令來常見一個 topic 名稱為test2:
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
第二步:開發(fā)StreamAPI
public class StreamAPI {
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, 'wordcount-application');
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 'node01:9092');
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
KStreamBuilder builder = new KStreamBuilder();
builder.stream('test').mapValues(line -> line.toString().toUpperCase()).to('test2');
KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}
執(zhí)行上述代碼,監(jiān)聽獲取 test中的數(shù)據(jù),然后轉成大寫,將結果寫入 test2。
第三步:生產數(shù)據(jù)
node01執(zhí)行以下命令,向test這個topic當中生產數(shù)據(jù):
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
第四步:消費數(shù)據(jù)
node02執(zhí)行一下命令消費test2這個topic當中的數(shù)據(jù):
bin/kafka-console-consumer.sh --from-beginning --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
六、Kafka中的數(shù)據(jù)不丟失機制
1. 生產者生產數(shù)據(jù)不丟失
發(fā)送消息方式
生產者發(fā)送給kafka數(shù)據(jù),可以采用同步方式或異步方式
同步方式:
發(fā)送一批數(shù)據(jù)給kafka后,等待kafka返回結果:
- 生產者等待10s,如果broker沒有給出ack響應,就認為失敗。
異步方式:
發(fā)送一批數(shù)據(jù)給kafka,只是提供一個回調函數(shù):
- 先將數(shù)據(jù)保存在生產者端的buffer中。buffer大小是2萬條 。
- 滿足數(shù)據(jù)閾值或者數(shù)量閾值其中的一個條件就可以發(fā)送數(shù)據(jù)。
- 發(fā)送一批數(shù)據(jù)的大小是500條。
注:如果broker遲遲不給ack,而buffer又滿了,開發(fā)者可以設置是否直接清空buffer中的數(shù)據(jù)。
ack機制(確認機制)
生產者數(shù)據(jù)發(fā)送出去,需要服務端返回一個確認碼,即ack響應碼;ack的響應有三個狀態(tài)值0,1,-1
0:生產者只負責發(fā)送數(shù)據(jù),不關心數(shù)據(jù)是否丟失,丟失的數(shù)據(jù),需要再次發(fā)送
1:partition的leader收到數(shù)據(jù),不管follow是否同步完數(shù)據(jù),響應的狀態(tài)碼為1
-1:所有的從節(jié)點都收到數(shù)據(jù),響應的狀態(tài)碼為-1
如果broker端一直不返回ack狀態(tài),producer永遠不知道是否成功;producer可以設置一個超時時間10s,超過時間認為失敗。
2. broker中數(shù)據(jù)不丟失
在broker中,保證數(shù)據(jù)不丟失主要是通過副本因子(冗余),防止數(shù)據(jù)丟失。
3. 消費者消費數(shù)據(jù)不丟失
在消費者消費數(shù)據(jù)的時候,只要每個消費者記錄好offset值即可,就能保證數(shù)據(jù)不丟失。也就是需要我們自己維護偏移量(offset),可保存在 Redis 中。
七、Kafka配置文件說明
Server.properties配置文件說明:
#broker的全局唯一編號,不能重復
broker.id=0
#用來監(jiān)聽鏈接的端口,producer或consumer將在此端口建立連接
port=9092
#處理網絡請求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的線程數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接受套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運行日志存放的路徑
log.dirs=/export/data/kafka/
#topic在當前broker上的分片個數(shù)
num.partitions=2
#用來恢復和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時間,超時將被刪除
log.retention.hours=168
#滾動生成新的segment文件的最大時間
log.roll.hours=1
#日志文件中每個segment的大小,默認為1G
log.segment.bytes=1073741824
#周期性檢查文件大小的時間
log.retention.check.interval.ms=300000
#日志清理是否打開
log.cleaner.enable=true
#broker需要使用zookeeper保存meta數(shù)據(jù)
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
#zookeeper鏈接超時時間
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的條數(shù)達到閾值,將觸發(fā)flush到磁盤
log.flush.interval.messages=10000
#消息buffer的時間,達到閾值,將觸發(fā)flush到磁盤
log.flush.interval.ms=3000
#刪除topic需要server.properties中設置delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true
#此處的host.name為本機IP(重要),如果不改,則客戶端會拋出:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=kafka01
advertised.host.name=192.168.140.128
producer生產者配置文件說明
#指定kafka節(jié)點列表,用于獲取metadata,不必全部指定
metadata.broker.list=node01:9092,node02:9092,node03:9092
# 指定分區(qū)處理類。默認kafka.producer.DefaultPartitioner,表通過key哈希到對應分區(qū)
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,默認0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮后消息中會有頭來指明消息壓縮類型,故在消費者端消息解壓是透明的無需指定。
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮消息,這里指定哪些topic要壓縮消息,默認empty,表示不壓縮。
#compressed.topics=
# 設置發(fā)送數(shù)據(jù)是否需要服務端的反饋,有三個值0,1,-1
# 0: producer不會等待broker發(fā)送ack
# 1: 當leader接收到消息之后發(fā)送ack
# -1: 當所有的follower都同步消息成功后發(fā)送ack.
request.required.acks=0
# 在向producer發(fā)送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer發(fā)送一個error ACK.意味著上一次消息因為某種原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
# 同步還是異步發(fā)送消息,默認“sync”表同步,'async'表異步。異步可以提高發(fā)送吞吐量,
也意味著消息將會在本地buffer中,并適時批量發(fā)送,但是也可能導致丟失未發(fā)送過去的消息
producer.type=sync
# 在async模式下,當message被緩存的時間超過此值后,將會批量發(fā)送給broker,默認為5000ms
# 此值和batch.num.messages協(xié)同工作.
queue.buffering.max.ms = 5000
# 在async模式下,producer端允許buffer的最大消息量
# 無論如何,producer都無法盡快的將消息發(fā)送給broker,從而導致消息在producer端大量沉積
# 此時,如果消息的條數(shù)達到閥值,將會導致producer端阻塞或者消息被拋棄,默認為10000
queue.buffering.max.messages=20000
# 如果是異步,指定每次批量發(fā)送數(shù)據(jù)量,默認為200
batch.num.messages=500
# 當消息在producer端沉積的條數(shù)達到'queue.buffering.max.meesages'后
# 阻塞一定時間后,隊列仍然沒有enqueue(producer仍然沒有發(fā)送出任何消息)
# 此時producer可以繼續(xù)阻塞或者將消息拋棄,此timeout值用于控制'阻塞'的時間
# -1: 無阻塞超時限制,消息不會被拋棄
# 0:立即清空隊列,消息被拋棄
queue.enqueue.timeout.ms=-1
# 當producer接收到error ACK,或者沒有接收到ACK時,允許消息重發(fā)的次數(shù)
# 因為broker并沒有完整的機制來避免消息重復,所以當網絡異常時(比如ACK丟失)
# 有可能導致broker接收到重復的消息,默認值為3.
message.send.max.retries=3
# producer刷新topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況
# 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即刷新
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此參數(shù)來配置額外的刷新機制,默認值600000
topic.metadata.refresh.interval.ms=60000
consumer消費者配置詳細說明:
# zookeeper連接服務器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper的session過期時間,默認5000ms,用于檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
#當消費者掛掉,其他消費者要等該指定時間才能檢查到并且觸發(fā)重新負載均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消費者更新offset到zookeeper中。注意offset更新時基于time而不是每次獲得的消息。一旦在更新zookeeper發(fā)生異常并重啟,將可能拿到已拿到過的消息
zookeeper.sync.time.ms=2000
#指定消費
group.id=itcast
# 當consumer消費一定量的消息之后,將會自動向zookeeper提交offset信息
# 注意offset信息并不是每消費一次消息就向zk提交一次,而是現(xiàn)在本地保存(內存),并定期提交,默認為true
auto.commit.enable=true
# 自動更新時間。默認60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,可以設定,也可以有系統(tǒng)生成,主要用來跟蹤消息消費情況,便于觀察
conusmer.id=xxx
# 消費者客戶端編號,用于區(qū)分不同客戶端,默認客戶端程序自動產生
client.id=xxxx
# 最大取多少塊緩存到消費者(默認10)
queued.max.message.chunks=50
# 當有新的consumer加入到group時,將會reblance,此后將會有partitions的消費端遷移到新 的consumer上,如果一個consumer獲得了某個partition的消費權限,那么它將會向zk注冊 'Partition Owner registry'節(jié)點信息,但是有可能此時舊的consumer尚沒有釋放此節(jié)點, 此值用于控制,注冊節(jié)點的重試次數(shù).
rebalance.max.retries=5
# 獲取消息的最大尺寸,broker不會像consumer輸出大于此值的消息chunk 每次feth將得到多條消息,此值為總大小,提升此值,將會消耗更多的consumer端內存
fetch.min.bytes=6553600
# 當消息的尺寸不足時,server阻塞的時間,如果超時,消息將立即發(fā)送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出范圍。那么就給個初始的offset。有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。默認largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder
八、CAP理論
1. 分布式系統(tǒng)當中的CAP理論
分布式系統(tǒng)(distributed system)正變得越來越重要,大型網站幾乎都是分布式的。
分布式系統(tǒng)的最大難點,就是各個節(jié)點的狀態(tài)如何同步。
為了解決各個節(jié)點之間的狀態(tài)同步問題,在1998年,由加州大學的計算機科學家 Eric Brewer 提出分布式系統(tǒng)的三個指標,分別是:
Partition tolerance:分區(qū)容錯性
Eric Brewer 說,這三個指標不可能同時做到。最多只能同時滿足其中兩個條件,這個結論就叫做 CAP 定理。
CAP理論是指:分布式系統(tǒng)中,一致性、可用性和分區(qū)容忍性最多只能同時滿足兩個。
一致性:Consistency
- 通過某個節(jié)點的寫操作結果對后面通過其它節(jié)點的讀操作可見
- 如果更新數(shù)據(jù)后,并發(fā)訪問情況下后續(xù)讀操作可立即感知該更新,稱為強一致性
- 如果允許之后部分或者全部感知不到該更新,稱為弱一致性
- 若在之后的一段時間(通常該時間不固定)后,一定可以感知到該更新,稱為最終一致性
可用性:Availability
- 任何一個沒有發(fā)生故障的節(jié)點必須在有限的時間內返回合理的結果
分區(qū)容錯性:Partition tolerance
- 部分節(jié)點宕機或者無法與其它節(jié)點通信時,各分區(qū)間還可保持分布式系統(tǒng)的功能
一般而言,都要求保證分區(qū)容忍性。所以在CAP理論下,更多的是需要在可用性和一致性之間做權衡。

2. Partition tolerance
先看 Partition tolerance,中文叫做'分區(qū)容錯'。
大多數(shù)分布式系統(tǒng)都分布在多個子網絡。每個子網絡就叫做一個區(qū)(partition)。分區(qū)容錯的意思是,區(qū)間通信可能失敗。比如,一臺服務器放在中國,另一臺服務器放在美國,這就是兩個區(qū),它們之間可能無法通信。

上圖中,G1 和 G2 是兩臺跨區(qū)的服務器。G1 向 G2 發(fā)送一條消息,G2 可能無法收到。系統(tǒng)設計的時候,必須考慮到這種情況。
一般來說,分區(qū)容錯無法避免,因此可以認為 CAP 的 P 總是存在的。即永遠可能存在分區(qū)容錯這個問題
3. Consistency
Consistency 中文叫做'一致性'。意思是,寫操作之后的讀操作,必須返回該值。舉例來說,某條記錄是 v0,用戶向 G1 發(fā)起一個寫操作,將其改為 v1。
接下來,用戶的讀操作就會得到 v1。這就叫一致性。
問題是,用戶有可能向 G2 發(fā)起讀操作,由于 G2 的值沒有發(fā)生變化,因此返回的是 v0。G1 和 G2 讀操作的結果不一致,這就不滿足一致性了。

為了讓 G2 也能變?yōu)?v1,就要在 G1 寫操作的時候,讓 G1 向 G2 發(fā)送一條消息,要求 G2 也改成 v1。

這樣的話,用戶向 G2 發(fā)起讀操作,也能得到 v1。

4. Availability
Availability 中文叫做'可用性',意思是只要收到用戶的請求,服務器就必須給出回應。用戶可以選擇向 G1 或 G2 發(fā)起讀操作。不管是哪臺服務器,只要收到請求,就必須告訴用戶,到底是 v0 還是 v1,否則就不滿足可用性。
九、Kafka中的CAP機制
kafka是一個分布式的消息隊列系統(tǒng),既然是一個分布式的系統(tǒng),那么就一定滿足CAP定律,那么在kafka當中是如何遵循CAP定律的呢?kafka滿足CAP定律當中的哪兩個呢?
kafka滿足的是CAP定律當中的CA,其中Partition tolerance通過的是一定的機制盡量的保證分區(qū)容錯性。
其中C表示的是數(shù)據(jù)一致性。A表示數(shù)據(jù)可用性。
kafka首先將數(shù)據(jù)寫入到不同的分區(qū)里面去,每個分區(qū)又可能有好多個副本,數(shù)據(jù)首先寫入到leader分區(qū)里面去,讀寫的操作都是與leader分區(qū)進行通信,保證了數(shù)據(jù)的一致性原則,也就是滿足了Consistency原則。
然后kafka通過分區(qū)副本機制,來保證了kafka當中數(shù)據(jù)的可用性。但是也存在另外一個問題,就是副本分區(qū)當中的數(shù)據(jù)與leader當中的數(shù)據(jù)存在差別的問題如何解決,這個就是Partition tolerance的問題。
kafka為了解決Partition tolerance的問題,使用了ISR的同步策略,來盡最大可能減少Partition tolerance的問題。
每個leader會維護一個ISR(a set of in-sync replicas,基本同步)列表。
ISR列表主要的作用就是決定哪些副本分區(qū)是可用的,也就是說可以將leader分區(qū)里面的數(shù)據(jù)同步到副本分區(qū)里面去,決定一個副本分區(qū)是否可用的條件有兩個:
replica.lag.time.max.ms=10000 副本分區(qū)與主分區(qū)心跳時間延遲
replica.lag.max.messages=4000 副本分區(qū)與主分區(qū)消息同步最大差
produce 請求被認為完成時的確認值:request.required.acks=0
。
- ack=0:producer不等待broker同步完成的確認,繼續(xù)發(fā)送下一條(批)信息。
- ack=1(默認):producer要等待leader成功收到數(shù)據(jù)并得到確認,才發(fā)送下一條message。
- ack=-1:producer得到follwer確認,才發(fā)送下一條數(shù)據(jù)。
十、Kafka監(jiān)控及運維
在開發(fā)工作中,消費在Kafka集群中消息,數(shù)據(jù)變化是我們關注的問題,當業(yè)務前提不復雜時,我們可以使用Kafka 命令提供帶有Zookeeper客戶端工具的工具,可以輕松完成我們的工作。隨著業(yè)務的復雜性,增加Group和 Topic,那么我們使用Kafka提供命令工具,已經感到無能為力,那么Kafka監(jiān)控系統(tǒng)目前尤為重要,我們需要觀察 消費者應用的細節(jié)。
1. kafka-eagle概述
為了簡化開發(fā)者和服務工程師維護Kafka集群的工作有一個監(jiān)控管理工具,叫做 Kafka-eagle。這個管理工具可以很容易地發(fā)現(xiàn)分布在集群中的哪些topic分布不均勻,或者是分區(qū)在整個集群分布不均勻的的情況。它支持管理多個集群、選擇副本、副本重新分配以及創(chuàng)建Topic。同時,這個管理工具也是一個非常好的可以快速瀏覽這個集群的工具,
2. 環(huán)境和安裝
1. 環(huán)境要求
需要安裝jdk,啟動zk以及kafka的服務
2. 安裝步驟
kafka-eagle官網:http://download./
我們可以從官網上面直接下載最細的安裝包即可kafka-eagle-bin-1.3.2.tar.gz這個版本即可
代碼托管地址:
https://github.com/smartloli/kafka-eagle/releases
這里我們選擇將kafak-eagle安裝在第三臺。
直接將kafka-eagle安裝包上傳到node03服務器的/export/softwares路徑下,然后進行解壓node03服務器執(zhí)行一下命令進行解壓。
kafka-eagle需要使用一個數(shù)據(jù)庫來保存一些元數(shù)據(jù)信息,我們這里直接使用msyql數(shù)據(jù)庫來保存即可,在node03服務器執(zhí)行以下命令創(chuàng)建一個mysql數(shù)據(jù)庫即可。
進入mysql客戶端:
create database eagle;
修改kafak-eagle配置文件
執(zhí)行以下命令修改kafak-eagle配置文件:
vim system-config.properties
修改為如下:
kafka.eagle.zk.cluster.alias=cluster1,cluster2
cluster1.zk.list=node01:2181,node02:2181,node03:2181
cluster2.zk.list=node01:2181,node02:2181,node03:2181
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://node03:3306/eagle
kafka.eagle.username=root
kafka.eagle.password=123456
配置環(huán)境變量
kafka-eagle必須配置環(huán)境變量,node03服務器執(zhí)行以下命令來進行配置環(huán)境變量: vim /etc/profile
:
export KE_HOME=/opt//kafka-eagle-bin-1.3.2/kafka-eagle-web-1.3.2
export PATH=:$KE_HOME/bin:$PATH
修改立即生效,執(zhí)行: source /etc/profile
執(zhí)行以下界面啟動kafka-eagle:
cd kafka-eagle-web-1.3.2/bin
chmod u+x ke.sh
./ke.sh start
主界面
訪問kafka-eagle
http://node03:8048/ke/account/signin?/ke/
用戶名:admin
密碼:123456

十一、Kafka大廠面試題
1. 為什么要使用 kafka?
緩沖和削峰:上游數(shù)據(jù)時有突發(fā)流量,下游可能扛不住,或者下游沒有足夠多的機器來保證冗余,kafka在中間可以起到一個緩沖的作用,把消息暫存在kafka中,下游服務就可以按照自己的節(jié)奏進行慢慢處理。
解耦和擴展性:項目開始的時候,并不能確定具體需求。消息隊列可以作為一個接口層,解耦重要的業(yè)務流程。只需要遵守約定,針對數(shù)據(jù)編程即可獲取擴展能力。
冗余:可以采用一對多的方式,一個生產者發(fā)布消息,可以被多個訂閱topic的服務消費到,供多個毫無關聯(lián)的業(yè)務使用。
健壯性:消息隊列可以堆積請求,所以消費端業(yè)務即使短時間死掉,也不會影響主要業(yè)務的正常進行。
異步通信:很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。
2. Kafka消費過的消息如何再消費?
kafka消費消息的offset是定義在zookeeper中的,如果想重復消費kafka的消息,可以在redis中自己記錄offset的checkpoint點(n個),當想重復消費消息時,通過讀取redis中的checkpoint點進行zookeeper的offset重設,這樣就可以達到重復消費消息的目的了
3. kafka的數(shù)據(jù)是放在磁盤上還是內存上,為什么速度會快?
kafka使用的是磁盤存儲。
速度快是因為:
- 順序寫入:因為硬盤是機械結構,每次讀寫都會尋址->寫入,其中尋址是一個“機械動作”,它是耗時的。所以硬盤 “討厭”隨機I/O, 喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
- Memory Mapped Files(內存映射文件):64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來實現(xiàn)文件到物理內存的直接映射。完成映射之后你對物理內存的操作會被同步到硬盤上。
- Kafka高效文件存儲設計:Kafka把topic中一個parition大文件分成多個小文件段,通過多個小文件段,就容易定期清除或刪除已經消費完文件,減少磁盤占用。通過索引信息可以快速定位message和確定response的大小。通過index元數(shù)據(jù)全部映射到memory(內存映射文件),可以避免segment file的IO磁盤操作。通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
注:
- Kafka解決查詢效率的手段之一是將數(shù)據(jù)文件分段,比如有100條Message,它們的offset是從0到99。假設將數(shù)據(jù)文件分成5段,第一段為0-19,第二段為20-39,以此類推,每段放在一個單獨的數(shù)據(jù)文件里面,數(shù)據(jù)文件以該段中 小的offset命名。這樣在查找指定offset的Message的時候,用二分查找就可以定位到該Message在哪個段中。
- 為數(shù)據(jù)文件建 索引數(shù)據(jù)文件分段 使得可以在一個較小的數(shù)據(jù)文件中查找對應offset的Message 了,但是這依然需要順序掃描才能找到對應offset的Message。為了進一步提高查找的效率,Kafka為每個分段后的數(shù)據(jù)文件建立了索引文件,文件名與數(shù)據(jù)文件的名字是一樣的,只是文件擴展名為.index。
4. Kafka數(shù)據(jù)怎么保障不丟失?
分三個點說,一個是生產者端,一個消費者端,一個broker端。
kafka的ack機制:在kafka發(fā)送數(shù)據(jù)的時候,每次發(fā)送消息都會有一個確認反饋機制,確保消息正常的能夠被收到,其中狀態(tài)有0,1,-1。
如果是同步模式:
ack設置為0,風險很大,一般不建議設置為0。即使設置為1,也會隨著leader宕機丟失數(shù)據(jù)。所以如果要嚴格保證生產端數(shù)據(jù)不丟失,可設置為-1。
如果是異步模式:
也會考慮ack的狀態(tài),除此之外,異步模式下的有個buffer,通過buffer來進行控制數(shù)據(jù)的發(fā)送,有兩個值來進行控制,時間閾值與消息的數(shù)量閾值,如果buffer滿了數(shù)據(jù)還沒有發(fā)送出去,有個選項是配置是否立即清空buffer??梢栽O置為-1,永久阻塞,也就數(shù)據(jù)不再生產。異步模式下,即使設置為-1。也可能因為程序員的不科學操作,操作數(shù)據(jù)丟失,比如kill -9,但這是特別的例外情況。
注:
ack=0:producer不等待broker同步完成的確認,繼續(xù)發(fā)送下一條(批)信息。
ack=1(默認):producer要等待leader成功收到數(shù)據(jù)并得到確認,才發(fā)送下一條message。
ack=-1:producer得到follwer確認,才發(fā)送下一條數(shù)據(jù)。
通過offset commit 來保證數(shù)據(jù)的不丟失,kafka自己記錄了每次消費的offset數(shù)值,下次繼續(xù)消費的時候,會接著上次的offset進行消費。
而offset的信息在kafka0.8版本之前保存在zookeeper中,在0.8版本之后保存到topic中,即使消費者在運行過程中掛掉了,再次啟動的時候會找到offset的值,找到之前消費消息的位置,接著消費,由于 offset 的信息寫入的時候并不是每條消息消費完成后都寫入的,所以這種情況有可能會造成重復消費,但是不會丟失消息。
唯一例外的情況是,我們在程序中給原本做不同功能的兩個consumer組設置KafkaSpoutConfig.bulider.setGroupid的時候設置成了一樣的groupid,這種情況會導致這兩個組共享同一份數(shù)據(jù),就會產生組A消費partition1,partition2中的消息,組B消費partition3的消息,這樣每個組消費的消息都會丟失,都是不完整的。為了保證每個組都獨享一份消息數(shù)據(jù),groupid一定不要重復才行。
- kafka集群中的broker的數(shù)據(jù)不丟失
每個broker中的partition我們一般都會設置有replication(副本)的個數(shù),生產者寫入的時候首先根據(jù)分發(fā)策略(有partition按partition,有key按key,都沒有輪詢)寫入到leader中,follower(副本)再跟leader同步數(shù)據(jù),這樣有了備份,也可以保證消息數(shù)據(jù)的不丟失。
5. 采集數(shù)據(jù)為什么選擇kafka?
采集層 主要可以使用Flume, Kafka等技術。
Flume:Flume 是管道流方式,提供了很多的默認實現(xiàn),讓用戶通過參數(shù)部署,及擴展API.
Kafka:Kafka是一個可持久化的分布式的消息隊列。Kafka 是一個非常通用的系統(tǒng)。你可以有許多生產者和很多的消費者共享多個主題Topics。
相比之下,Flume是一個專用工具被設計為旨在往HDFS,HBase發(fā)送數(shù)據(jù)。它對HDFS有特殊的優(yōu)化,并且集成了Hadoop的安全特性。
所以,Cloudera 建議如果數(shù)據(jù)被多個系統(tǒng)消費的話,使用kafka;如果數(shù)據(jù)被設計給Hadoop使用,使用Flume。
6. kafka 重啟是否會導致數(shù)據(jù)丟失?
- kafka是將數(shù)據(jù)寫到磁盤的,一般數(shù)據(jù)不會丟失。
- 但是在重啟kafka過程中,如果有消費者消費消息,那么kafka如果來不及提交offset,可能會造成數(shù)據(jù)的不準確(丟失或者重復消費)。
7. kafka 宕機了如何解決?
kafka 宕機了,首先我們考慮的問題應該是所提供的服務是否因為宕機的機器而受到影響,如果服務提供沒問題,如果實現(xiàn)做好了集群的容災機制,那么這塊就不用擔心了。
想要恢復集群的節(jié)點,主要的步驟就是通過日志分析來查看節(jié)點宕機的原因,從而解決,重新恢復節(jié)點。
8. 為什么Kafka不支持讀寫分離?
在 Kafka 中,生產者寫入消息、消費者讀取消息的操作都是與 leader 副本進行交互的,從 而實現(xiàn)的是一種主寫主讀的生產消費模型。Kafka 并不支持主寫從讀,因為主寫從讀有 2 個很明顯的缺點:
數(shù)據(jù)一致性問題:數(shù)據(jù)從主節(jié)點轉到從節(jié)點必然會有一個延時的時間窗口,這個時間 窗口會導致主從節(jié)點之間的數(shù)據(jù)不一致。某一時刻,在主節(jié)點和從節(jié)點中 A 數(shù)據(jù)的值都為 X, 之后將主節(jié)點中 A 的值修改為 Y,那么在這個變更通知到從節(jié)點之前,應用讀取從節(jié)點中的 A 數(shù)據(jù)的值并不為最新的 Y,由此便產生了數(shù)據(jù)不一致的問題。
延時問題:類似 Redis 這種組件,數(shù)據(jù)從寫入主節(jié)點到同步至從節(jié)點中的過程需要經歷 網絡→主節(jié)點內存→網絡→從節(jié)點內存 這幾個階段,整個過程會耗費一定的時間。而在 Kafka 中,主從同步會比 Redis 更加耗時,它需要經歷 網絡→主節(jié)點內存→主節(jié)點磁盤→網絡→從節(jié) 點內存→從節(jié)點磁盤 這幾個階段。對延時敏感的應用而言,主寫從讀的功能并不太適用。
而kafka的主寫主讀的優(yōu)點就很多了:
- 可以簡化代碼的實現(xiàn)邏輯,減少出錯的可能;
- 將負載粒度細化均攤,與主寫從讀相比,不僅負載效能更好,而且對用戶可控;
- 在副本穩(wěn)定的情況下,不會出現(xiàn)數(shù)據(jù)不一致的情況。
9. kafka數(shù)據(jù)分區(qū)和消費者的關系?
每個分區(qū)只能由同一個消費組內的一個消費者(consumer)來消費,可以由不同的消費組的消費者來消費,同組的消費者則起到并發(fā)的效果。
10. kafka的數(shù)據(jù)offset讀取流程
連接ZK集群,從ZK中拿到對應topic的partition信息和partition的Leader的相關信息
consumer將?自?己保存的offset發(fā)送給Leader
Leader根據(jù)offset等信息定位到segment(索引?文件和?日志?文件)
根據(jù)索引?文件中的內容,定位到?日志?文件中該偏移量量對應的開始位置讀取相應?長度的數(shù)據(jù)并返回給consumer
11. kafka內部如何保證順序,結合外部組件如何保證消費者的順序?
kafka只能保證partition內是有序的,但是partition間的有序是沒辦法的。愛奇藝的搜索架構,是從業(yè)務上把需要有序的打到同?個partition。
12. Kafka消息數(shù)據(jù)積壓,Kafka消費能力不足怎么處理?
如果是Kafka消費能力不足,則可以考慮增加Topic的分區(qū)數(shù),并且同時提升消費組的消費者數(shù)量,消費者數(shù)=分區(qū)數(shù)。(兩者缺一不可)
如果是下游的數(shù)據(jù)處理不及時:提高每批次拉取的數(shù)量。批次拉取數(shù)據(jù)過少(拉取數(shù)據(jù)/處理時間<生產速度),使處理的數(shù)據(jù)小于生產的數(shù)據(jù),也會造成數(shù)據(jù)積壓。
13. Kafka單條日志傳輸大小
kafka對于消息體的大小默認為單條最大值是1M但是在我們應用場景中, 常常會出現(xiàn)一條消息大于1M,如果不對kafka進行配置。則會出現(xiàn)生產者無法將消息推送到kafka或消費者無法去消費kafka里面的數(shù)據(jù), 這時我們就要對kafka進行以下配置:server.properties
replica.fetch.max.bytes: 1048576 broker可復制的消息的最大字節(jié)數(shù), 默認為1M
message.max.bytes: 1000012 kafka 會接收單個消息size的最大限制, 默認為1M左右
注意:message.max.bytes必須小于等于replica.fetch.max.bytes,否則就會導致replica之間數(shù)據(jù)同步失敗。