滴滴二面:Kafka是如何讀寫副本訊息的?

2022-01-10 09:00:01

無論是讀取副本還是寫入副本,都是通過底層的Partition物件完成的,而這些分割區物件全部儲存在上節課所學的allPartitions欄位中。可以說,理解這些欄位的用途,是後續我們探索副本管理器類功能的重要前提。

現在,我們就來學習下副本讀寫功能。整個Kafka的同步機制,本質上就是副本讀取+副本寫入,搞懂了這兩個功能,你就知道了Follower副本是如何同步Leader副本資料的。

appendRecords-副本寫入

向副本底層紀錄檔寫入訊息的邏輯就實現在ReplicaManager#appendRecords。

Kafka需副本寫入的場景:

  1. 生產者向Leader副本寫入訊息

  2. Follower副本拉取訊息後寫入副本

    僅該場景呼叫Partition物件的方法,其餘3個都是呼叫appendRecords完成

  3. 消費者組寫入組資訊

  4. 事務管理器寫入事務資訊(包括事務標記、事務後設資料等)

appendRecords方法將給定的一組分割區的訊息寫入對應Leader副本,並根據PRODUCE請求中acks的設定,有選擇地等待其他副本寫入完成。然後,呼叫指定回撥邏輯。

appendRecords向副本紀錄檔寫入訊息的過程:

執行流程

可見,appendRecords:

  • 實現訊息寫入的方法是appendToLocalLog
  • 判斷是否需要等待其他副本寫入的方法delayedProduceRequestRequired

appendToLocalLog寫入副本本地紀錄檔

利用Partition#appendRecordsToLeader寫入訊息集合,就是利用appendAsLeader方法寫入本地紀錄檔的。

delayedProduceRequestRequired

判斷訊息集合被寫入到紀錄檔之後,是否需要等待其它副本也寫入成功:

private def delayedProduceRequestRequired(
  requiredAcks: Short,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
  requiredAcks == -1 && entriesPerPartition.nonEmpty && 
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}

若等待其他副本的寫入,須同時滿足:

  1. requiredAcks==-1
  2. 依然有資料尚未寫完
  3. 至少有一個分割區的訊息,已成功被寫入本地紀錄檔

2和3可結合來看。若所有分割區的資料寫入都不成功,則可能出現嚴重錯誤,此時應不再等待,而是直接返回錯誤給傳送方。

而有部分分割區成功寫入,部分分割區寫入失敗,則可能偶發的瞬時錯誤導致。此時,不妨將本次寫入請求放入Purgatory,給個重試機會。

副本讀取:fetchMessages

ReplicaManager#fetchMessages負責讀取副本資料。無論:

  • Java消費者API
  • Follower副本

拉取訊息的主途徑都是向Broker發FETCH請求,Broker端接收到該請求後,呼叫fetchMessages從底層的Leader副本取出訊息。

fetchMessages也可能會延時處理FETCH請求,因Broker端必須要累積足夠多資料後,才會返回Response給請求傳送方。

整個方法分為:

讀取本地紀錄檔

首先判斷,讀取訊息的請求方,就能確定可讀取的範圍了。

fetchIsolation,讀取隔離級別:

  • 對Follower副本,它能讀取到Leader副本LEO值以下的所有訊息
  • 普通Consumer,只能「看到」Leader副本高水位值以下的訊息

確定可讀取範圍後,呼叫readFromLog讀取本地紀錄檔上的訊息資料,並將結果賦給logReadResults變數。readFromLog呼叫readFromLocalLog,在待讀取分割區上依次呼叫其紀錄檔物件的read方法執行實際的訊息讀取。

根據讀取結果確定Response

根據上一步讀取結果建立對應Response:

根據上一步得到的讀取結果,統計可讀取的總位元組數,然後判斷此時是否能夠立即返回Reponse。

副本管理器讀寫副本的兩個方法appendRecords和fetchMessages本質上在底層分別呼叫Log的append和read方法,以實現本地紀錄檔的讀寫操作。完成讀寫操作後,這兩個方法還定義了延時處理的條件。一旦滿足延時處理條件,就交給對應Purgatory處理。

從這倆方法可見單個元件融合一起的趨勢。雖然我們學習單個原始碼檔案的順序是自上而下,但串聯Kafka主要元件功能的路徑卻是自下而上。

如副本寫入操作,紀錄檔物件append方法被上一層的Partition物件中的方法呼叫,而後者又進一步被副本管理器中的方法呼叫。我們按自上而下閱讀了副本管理器、紀錄檔物件等單個元件的程式碼,瞭解了各自的獨立功能。

現在開始慢慢地把它們融合一起,構建Kafka操作分割區副本紀錄檔物件的完整呼叫路徑。同時採用這兩種方式來閱讀原始碼,就能更高效弄懂Kafka原理。

總結

Kafka副本狀態機類ReplicaManager讀寫副本的核心方法:

  • appendRecords:向副本寫入訊息,利用Log#append方法和Purgatory機制實現Follower副本向Leader副本獲取訊息後的資料同步操作
  • fetchMessages:從副本讀取訊息,為普通Consumer和Follower副本所使用。當它們向Broker傳送FETCH請求時,Broker上的副本管理器呼叫該方法從本地紀錄檔中獲取指定訊息