kafka是目前企業中很常用的訊息佇列產品,可以用於削峰、解耦、非同步通訊。特別是在巨量資料領域中應用尤為廣泛,主要得益於它的高吞吐量、低延遲,在我們公司的解決方案中也有用到。既然kafka在企業中如此重要,那麼本文就通過幾張圖帶大家全面認識一下kafka,現在我們不妨帶入kafka設計者的角度去思考該如何設計,它的架構是怎麼樣的、都有哪些元件組成、如何進行擴充套件等等。
現在假如有100T大小的訊息要傳送到kafka中,資料量非常大,一臺機器儲存不下,面對這種情況,你該如何設計呢?
很簡單,分而治之,一臺不夠,那就多臺,這就形成了一個kafka叢集。如下圖所示,一個broker就是一個kafka節點,100T資料就有3個節點分擔,每個節點約33T,這樣就能解決問題了,還能提高吞吐量。
topicA
。topic
可以分佈到多個 broker
(即伺服器)上,一個 topic
可以分為多個 partition
,每個 partition
是一個有序的佇列。比如上圖中的topicA被分成了3個partition
。broker
中,萬一這個broker
宕機了怎麼辦?為了實現高可用,一個 topic
的每個分割區都有若干個副本,一個 Leader
和若干個Follower
。比如上圖中的虛線連線的就是它的副本。Leader
。Leader
中同步資料,保持和Leader
資料的同步。Leader
發生故障時,某個 Follower
會成為新的 Leader
。Kafka broker
發訊息的使用者端,後面詳細講解。Kafka broker
取訊息的使用者端,多個Consumer
會組成一個消費者組,後面詳細講解。Kafka
2.8.0版本以後也支援非zk的方式,大大減少了和zk的互動。前面通過一張圖片講解了kafka整體的架構,那現在我們來看看kafka生產者傳送的整個過程,這裡面也是大有文章。
在訊息傳送的過程中,涉及到了兩個執行緒——main
執行緒和 Sender
執行緒。在 main
執行緒中建立了一個雙端佇列 RecordAccumulator
。main
執行緒將訊息傳送給 RecordAccumulator
,Sender
執行緒不斷從 RecordAccumulator
中拉取訊息傳送到 Kafka Broker
。
kafkaProducer
建立訊息,然後通過可能的攔截器、序列化器和分割區器的作用之後快取到訊息累加器(RecordAccumulator
, 也稱為訊息收集器)中。Sender
執行緒負責從 RecordAccumulator
獲取訊息並將其傳送到 Kafka
中。RecordAccumulator
主要用來快取訊息以便 Sender
執行緒可以批次傳送,進而減少網路傳輸的資源消耗以提升效能。RecordAccumulator
快取的大小可以通過生產者使用者端引數 buffer.memory
設定,預設值為 33554432B
,即 32M
。RecordAccumulator
的某個雙端佇列( Deque
)中,RecordAccumulator
內部為每個分割區都維護了一個雙端佇列,即 Deque<ProducerBatch>
, 訊息寫入快取時,追加到雙端佇列的尾部。Sender
讀取訊息時,從雙端佇列的頭部讀取。ProducerBatch
是指一個訊息批次;與此同時,會將較小的 ProducerBatch
湊成一個較大 ProducerBatch
,也可以減少網路請求的次數以提升整體的吞吐量。ProducerBatch
大小可以通過batch.size
控制,預設16kb
。Sender
執行緒會在有資料積累到batch.size
,預設16kb,或者如果資料遲遲未達到batch.size
,Sender
執行緒等待linger.ms
設定的時間到了之後就會獲取資料。linger.ms
單位ms
,預設值是0ms
,表示沒有延遲。Sender
從 RecordAccumulator
獲取快取的訊息之後,會將資料封裝成網路請求<Node,Request>
的形式,這樣就可以將 Request
請求發往各個 Node
了。sender
執行緒發往 Kafka
之前還會儲存到 InFlightRequests
中,它的主要作用是快取了已經發出去但還沒有收到伺服器端響應的請求。InFlightRequests
預設每個分割區下最多快取5個請求,可以通過設定引數為max.in.flight.request.per. connection
修改。Request
通過通道Selector
傳送到kafka
節點。acks
.Leader
收到資料後應答。Request
請求接受到kafka的響應結果,如果成功的話,從InFlightRequests
清除請求,否則的話需要進行重發操作,可以通過設定項retries
決定,當訊息傳送出現錯誤的時候,系統會重發訊息。retries
表示重試次數。預設是 int 最大值,2147483647
。RecordAccumulator
中的資料。原來kafka生產者傳送經過了這麼多流程,我們現在來看看kafka消費者又是如何進行的呢?
Kafka 中的消費是基於拉取模式的。訊息的消費一般有兩種模式:推播模式和拉取模式。推模式是伺服器端主動將訊息推播給消費者,而拉模式是消費者主動向伺服器端發起請求來拉取訊息。
kafka是以消費者組進行消費的,一個消費者組,由多個consumer組成。形成一個消費者組的條件,是所有消費者的groupid相同。
那麼問題來了,kafka是如何指定消費者組的每個消費者消費哪個分割區?每次消費的數量是多少呢?
一、如何制定消費方案
coordinator
傳送JoinGroup
的請求。coordinator
主要是用來輔助實現消費者組的初始化和分割區的分配。coordinator
老大節點選擇 = groupid
的hashcode
值 % 50( __consumer_offsets
內建主題位移的分割區數量)例如: groupid
的hashcode值 為1,1% 50 = 1
,那麼__consumer_offsets
主題的1號分割區,在哪個broker
上,就選擇這個節點的coordinator
作為這個消費者組的老大。消費者組下的所有的消費者提交offset
的時候就往這個分割區去提交offset
。consumer
作為消費中的leader
,比如上圖中的ConsumerB
。leader
制定出消費方案,比如誰來消費哪個分割區等coordinator
coordinator
就把消費方 案下發給各個consumer
, 圖中只畫了一條線,實際上是有下發各個consumer
。注意,每個消費者都會和coordinator
保持心跳(預設3s),一旦超時(session.timeout.ms=45s
),該消費者會被移除,並觸發再平衡;或者消費者處理訊息的時間過長(max.poll.interval.ms
=5分鐘),也會觸發再平衡,也就是重新進行上面的流程。
二、消費者消費細節
現在已經初始化消費者組資訊,知道哪個消費者消費哪個分割區,接著我們來看看消費者細節。
ConsumerNetworkClient
, 傳送消費請求,可以進行如下設定:fetch.min.bytes
: 每批次最小抓取大小,預設1位元組fetch.max.bytes
: 每批次最大抓取大小,預設50Mfetch.max.wait.ms
:最大超時時間,預設500mscompletedFetches
佇列中max.poll.records
一次拉取資料返回訊息的最大條數,預設500條。我們都知道訊息傳送到kafka,最終是儲存到磁碟中的,我們看下kafka是如何儲存的。
一個topic
分為多個partition
,每個partition對應於一個log
檔案,為防止log檔案過大導致資料定位效率低下,Kafka採取了分片和索引機制,每個partition
分為多個segment
。每個segment
包括:「.index
」檔案、「.log
」檔案和.timeindex
等檔案,Producer
生產的資料會被不斷追加到該log檔案末端。
上圖中t1即為一個topic
的名稱,而「t1-0/t1-1」則表明這個目錄是t1這個topic
的哪個partition
。
kafka中的索引檔案以稀疏索引(sparseindex
)的方式構造訊息的索引,如下圖所示:
1.根據目標offset
定位segment
檔案
2.找到小於等於目標offset
的最大offset
對應的索引項
3.定位到log
檔案
4.向下遍歷找到目標Record
注意:index為稀疏索引,大約每往log
檔案寫入4kb
資料,會往index
檔案寫入一條索引。通過引數log.index.interval.bytes
控制,預設4kb
。
那kafka中磁碟檔案儲存多久呢?
kafka 中預設的紀錄檔儲存時間為 7 天,可以通過調整如下引數修改儲存時間。
log.retention.hours
,最低優先順序小時,預設 7 天。log.retention.minutes
,分鐘。log.retention.ms
,最高優先順序毫秒。log.retention.check.interval.ms
,負責設定檢查週期,預設 5 分鐘。其實kafka中的細節十分多,本文也只是對kafka的一些核心機制從理論層面做了一個總結,更多的細節還是需要自行去實踐,去學習。
歡迎關注個人公眾號【JAVA旭陽】交流學習
本文來自部落格園,作者:JAVA旭陽,轉載請註明原文連結:https://www.cnblogs.com/alvinscript/p/17407980.html