我的 Kafka 旅程

2022-09-29 18:00:25

kafka採用Consumer消費者Pull主動拉取資料的方式,當Broker無資料時,消費者空轉。Kafka並不刪除已消費的訊息,各自獨立的消費者可消費同一個Broker分割區資料。

消費流程

1、消費者發起網路消費請求

# 每批次最小抓取設定(推薦1位元組)
fetch.min.bytes
# 每批次最大抓取大小設定(推薦500ms)
fetch.max.bytes
# 未達到大小的超時設定(推薦50M)
fetch.max.wait.ms

2、拉取資料到記憶體消費佇列中

# 單次拉取最大訊息條數設定(推薦500條)
max.poll.records

2.1、反序列化處理(對應了Producer端的序列化動作)

2.2、攔截器處理(如:彙總統計記錄)

3、資料的後續處理

儲存等的消費端動作。

 

offset

當一個消費者掛掉或重啟後,是否還記得消費到的位置了?offset解決了此問題。
對於每一個topic,都會維持一個分割區紀錄檔,分割區中的每一個記錄都會分配一個Id來表示順序,稱之為offset,offset用來唯一的標識分割區中每條記錄,並將每次的消費位置提交到topic中。消費者恢復啟動後接著按序消費資料。

自動提交

# 開啟自動提交
enable.auto.commit = true
# 每次提交間隔(推薦5秒)
auto.commit.interval.ms = 5000

手動提交

先關閉自動提交後,在Consumer使用者端的程式碼中,通過呼叫方法函數提交,通常的方法名:

# 同步提交,等提交完成才可下一次再消費
.CommitSync
# 非同步提交,可直接進行下一個消費,也有可能提交失敗
.CommitAync

指定消費

在Consumer使用者端的程式碼中,手動指定offset的位置進行消費,關聯到的方法函數名:

# 按指定時間得出offset值
.offsetsForTimes
# 按指定offset值繼續消費
.seek

初始策略

# earliest:	最早消費;無offset時,從頭開始消費。
# latest:	最新消費;無offset時,從最新的資料開始消費。
# none:	無offset時,引發異常。
auto.offset.reset = earliest | latest | none

消費現象

重複消費:offset未提交成功,下次消費還是舊的offset。

漏消費:offset提交成功,消費者端後續的資料處理未完成(建議下游步驟事務處理)。

 

消費者組

為了實現橫向擴充套件,應用程式需要建立一個消費者群組,然後往群組裡新增消費者來提高處理效率,群組裡的每個消費者只處理一部分訊息。

消費者組是邏輯上的一個消費者,是由一個或多個消費者範例組成,具有可延伸性和可容錯性,消費者組內的消費者共用一個GroupId組成;組內每個消費者負責消費不同分割區資料,並行消費資料;當組內一個消費者掛了之後,其它消費者要自動承擔它的消費任務 - 組內再平衡

 

觸發再平衡

消費成員與Broker分割區保持心跳連線,或者消費成員處理訊息時間過長,會被認為此消費者需要被移除,觸發組內消費成員任務再分配。以下設定任其一條件觸發再平衡:

# 心跳連線超時的 移除條件(建議45秒)
session.timeout.ms
# 訊息處理超時的 移除條件(建議5分鐘)
max.poll.interval.ms

再平衡策略

# 再平衡策略設定項(可多策略組合)
partition.assignment.strategy = Range | RoundRobin | Sticky | CooperativeSticky
  • Range:單個Topic內的重新平均分配
  • RoundRobin:所有Topic的全部消費者,一起重新分配
  • Sticky:一次小範圍重新分配;僅調整需要的,避免大規模重新分配
  • CooperativeSticky:可多次小範圍重新調整,直至最終效果

 

提升吞吐量

  • 增加分割區,增加消費者,兩者一一對應起來,並行消費
  • 調整一次最多拉取的訊息條數(500條)
  • 調整單次抓取的資料最大容量(50M)