kafka
叢集是由多個broker
節點組成,這裡麵包含了許多的知識點,以下的這些問題你都知道嗎?
topic
的分割區leader
是怎麼選舉的嗎?zookeeper
中儲存了kafka
的什麼資訊嗎?起到什麼做呢?kafka
訊息檔案是怎麼儲存的嗎?kafka
中leader
節點或者follower
節點發生故障,訊息會丟失嗎?如何保證訊息的一致性和可靠性呢?如果你對這些問題比較模糊的話,那麼很有必要看看本文,去了解以下kafka
的核心設計,本文主要基於kafka
3.x版本講解。
kafka叢集是由多個kafka broker
通過連同一個zookeeper
組成,那麼他們是如何協同工作對外提供服務的呢?zookeeper
中又儲存了什麼資訊呢?
kafka broker
啟動後,會在zookeeper
的/brokers/ids
路徑下注冊。broker
會被選舉為控制器(Kafka Controller
)。選舉規則也很簡單,誰先註冊到zookeeper
中的/controller
節點,誰就是控制器。Controller
主要負責管理整個叢集中所有分割區和副本的狀態。Kafka Controller
會進行Leader
選擇,比如上圖中針對TopicA
中的0號分割區,選擇broker0
作為Leader
, 然後會將選擇的節點資訊註冊到zookeeper
的/brokers/topics
路徑下,記錄誰是Leader
,有哪些伺服器可用。Leader
的topic
分割區提供對外的讀寫服務。為什麼只有Leader
節點提供讀寫服務,而不是設計成主從方式,Follower
提供讀服務呢?那麼問題來了,kafka
中topic
分割區是如何選擇leader
的呢?為了更好的闡述,我們先來理解下面3個概念。
ISR
:表示和 Leader
保持同步的 Follower
集合。如果 Follower
長時間未向 Leader
傳送通訊請求或同步資料,則該 Follower
將被踢出 ISR
。該時間閾值由 replica.lag.time.max.ms
引數設定,預設 30s
。Leader
發生故障之後,就會從 ISR
中選舉新的Leader
。OSR
:表示 Follower
與 Leader
副本同步時,延遲過多的副本。AR
: 指的是分割區中的所有副本,所以AR = ISR + OSR
。Kafka Controller
選舉Leader
的規則:在isr
佇列中存活為前提,按照AR
中排在前面的優先。例如ar[1,0,2]
, isr [1,0,2]
,那麼leader
就會按照1,0,2的順序輪詢。而AR中的這個順序kafka
會進行打散,分攤kafka broker
的壓力。
當執行中的控制器突然宕機或意外終止時,Kafka
通過監聽zookeeper
能夠快速地感知到,並立即啟用備用控制器來代替之前失敗的控制器。這個過程就被稱為 Failover
,該過程是自動完成的,無需你手動干預。
開始的時候,Broker 0
是控制器。當 Broker 0
宕機後,ZooKeeper
通過`` Watch 機制感知到並刪除了 /controller
臨時節點。之後,所有存活的 Broker
開始競選新的控制器身份。Broker 3
最終贏得了選舉,成功地在 ZooKeeper
上重建了 /controller
節點。之後,Broker 3
會從 ZooKeeper
中讀取叢集後設資料資訊,並初始化到自己的快取中,後面就有Broker 3
來接管選擇Leader
的功能了。
如果topic
分割區的leader
和follower
發生了故障,那麼對於資料的一致性和可靠性會有什麼樣的影響呢?
LEO(Log End Offset)
:每個副本的最後一個offset
,LEO
就是最新的offset
+ 1。HW(High Watermark)
:水位線,所有副本中最小的LEO
,消費者只能看到這個水位線左邊的訊息,從而保證資料的一致性。上圖所示,如果follower
發生故障怎麼辦?
Follower
發生故障後會被臨時踢出ISR
佇列。Leader
和Follower
繼續接收資料。Follower
恢復後,Follower
會讀取本地磁碟記錄的上次的HW
,並將log
檔案高於HW
的部分擷取掉,從HW
開始向Leader
進行同步。Follower
的LEO
大於等於該Partition
的HW
,即Follower
追上Leader
之後,就可以重新加入ISR了。如果leader
發生故障怎麼辦?
Leader
發生故障之後,會從ISR
中選出一個新的Leader
Follower
會先將各自的log
檔案高於HW
的部分截掉,然後從新的Leader
同步資料。所以為了讓kafka broker
保證訊息的可靠性和一致性,我們要做如下的設定:
producer
的設定acks=all
或者-1。leader
在返回確認或錯誤響應之前,會等待所有副本收到悄息,需要配合min.insync.replicas
設定使用。這樣就意味著leader
和follower
的LEO
對齊。topic
的設定replication.factor>=3
副本大於3個,並且 min.insync.replicas>=2
表示至少兩個副本應答。broker
設定unclean.leader.election.enable=false
,預設也是false,表示不對落後leader
很多的follower
也就是非ISR
佇列中的副本選擇為Leader
, 這樣可以避免資料丟失和資料 不一致,但是可用性會降低。正常情況下,Kafka
本身會自動把Leader Partition
均勻分散在各個機器上,來保證每臺機器的讀寫吞吐量都是均勻的。但是如果某些broker
宕機,會導致Leader Partition過於集中在其他少部分幾臺broker
上,這會導致少數幾臺broker
的讀寫請求壓力過高,其他宕機的broker重啟之後都是follower partition
,讀寫請求很低,造成叢集負載不均衡。那麼該如何負載平衡呢?
通過broker
設定設定自動負載均衡。
auto.leader.rebalance.enable
:預設是 true
。 自動 Leader Partition
平衡。生產環境中,leader
重選舉的代價比較大,可能會帶來效能影響,建議設定為 false 關閉。leader.imbalance.per.broker.percentage
:預設是 10%
。每個 broker
允許的不平衡的 leader
的比率。如果每個 broker
超過了這個值,控制器會觸發 leader
的平衡。leader.imbalance.check.interval.seconds
:預設值 300
秒。檢查 leader
負載是否平衡的間隔時間。topic
進行負載均衡./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08
topic
負載均衡cat topicPartitionList.json
{
"partitions":
[
{"topic":"test.example","partition": "0"}
]
}
./bin/kafka-preferred-replica-election.sh --zookeeper hadoop16:2181,hadoop17:2181,hadoop18:2181/kafka08 --path-to-json-file topicPartitionList.json
kafka訊息最終會儲存到磁碟檔案中,那麼是如何儲存的呢?清理策略是什麼呢?
一個topic
分為多個partition
,每個partition對應於一個log
檔案,為防止log檔案過大導致資料定位效率低下,Kafka採取了分片和索引機制,每個partition
分為多個segment
。每個segment
包括:「.index
」檔案、「.log
」檔案和.timeindex
等檔案,Producer
生產的資料會被不斷追加到該log檔案末端。
上圖中t1即為一個topic
的名稱,而「t1-0/t1-1」則表明這個目錄是t1這個topic
的哪個partition
。
kafka中的索引檔案以稀疏索引(sparseindex
)的方式構造訊息的索引,如下圖所示:
1.根據目標offset
定位segment
檔案
2.找到小於等於目標offset
的最大offset
對應的索引項
3.定位到log
檔案
4.向下遍歷找到目標Record
注意:index為稀疏索引,大約每往log
檔案寫入4kb
資料,會往index
檔案寫入一條索引。通過引數log.index.interval.bytes
控制,預設4kb
。
那kafka中磁碟檔案儲存多久呢?
kafka 中預設的紀錄檔儲存時間為 7 天,可以通過調整如下引數修改儲存時間。
log.retention.hours
,最低優先順序小時,預設 7 天。log.retention.minutes
,分鐘。log.retention.ms
,最高優先順序毫秒。log.retention.check.interval.ms
,負責設定檢查週期,預設 5 分鐘。前面講解了kafka broker
中的核心機制,我們再來看下重要的設定引數。
首先來說下kafka伺服器端設定屬性Update Mode
的作用:
read-only
。被標記為read-only
的引數和原來的引數行為一樣,只有重啟 Broker
,才能令修改生效。per-broker
。被標記為 per-broker 的引數屬於動態引數,修改它之後,無需重啟就會在對應的 broker
上生效。cluster-wide
。被標記為 cluster-wide
的引數也屬於動態引數,修改它之後,會在整個叢集範圍內生效,也就是說,對所有 broker
都生效。也可以為具體的 broker
修改cluster-wide
引數。Broker重要引數
引數名稱 | 描述 |
---|---|
replica.lag.time.max.ms | ISR 中,如果 Follower 長時間未向 Leader 傳送通訊請求或同步資料,則該 Follower 將被踢出 ISR。該時間閾值,預設 30s。 |
auto.leader.rebalance.enable | 預設是 true。 自動 Leader Partition 平衡。 |
leader.imbalance.per.broker.percentage | 預設是 10%。每個 broker 允許的不平衡的 leader的比率。如果每個 broker 超過了這個值,控制器會觸發 leader 的平衡。 |
leader.imbalance.check.interval.seconds | 預設值 300 秒。檢查 leader 負載是否平衡的間隔時間。 |
log.segment.bytes | Kafka 中 log 紀錄檔是分成一塊塊儲存的,此設定是指 log 紀錄檔劃分 成塊的大小,預設值 1G。 |
log.index.interval.bytes | 預設 4kb,kafka 裡面每當寫入了 4kb 大小的紀錄檔(.log),然後就往 index 檔案裡面記錄一個索引。 |
log.retention.hours | Kafka 中資料儲存的時間,預設 7 天。 |
log.retention.minutes | Kafka 中資料儲存的時間,分鐘級別,預設關閉。 |
log.retention.ms | Kafka 中資料儲存的時間,毫秒級別,預設關閉。 |
log.retention.check.interval.ms | 檢查資料是否儲存超時的間隔,預設是 5 分鐘。 |
log.retention.bytes | 預設等於-1,表示無窮大。超過設定的所有紀錄檔總大小,刪除最早的 segment。 |
log.cleanup.policy | 預設是 delete,表示所有資料啟用刪除策略;如果設定值為 compact,表示所有資料啟用壓縮策略。 |
num.io.threads | 預設是 8。負責寫磁碟的執行緒數。整個引數值要佔總核數的 50%。 |
num.replica.fetchers | 副本拉取執行緒數,這個引數佔總核數的 50%的 1/3 |
num.network.threads | 預設是 3。資料傳輸執行緒數,這個引數佔總核數的50%的 2/3 。 |
log.flush.interval.messages | 強制頁快取刷寫到磁碟的條數,預設是 long 的最大值,9223372036854775807。一般不建議修改,交給系統自己管理。 |
log.flush.interval.ms | 每隔多久,刷資料到磁碟,預設是 null。一般不建議修改,交給系統自己管理。 |
Kafka
叢集的分割區多副本架構是 Kafka
可靠性保證的核心,把訊息寫入多個副本可以使 Kafka
在發生崩潰時仍能保證訊息的永續性。本文圍繞這樣的核心架構講解了其中的一些核心機制,包括Leader的選舉、訊息的儲存機制等等。
歡迎關注個人公眾號【JAVA旭陽】交流學習
本文來自部落格園,作者:JAVA旭陽,轉載請註明原文連結:https://www.cnblogs.com/alvinscript/p/17442726.html