kafka採用Consumer消費者Pull主動拉取資料的方式,當Broker無資料時,消費者空轉。Kafka並不刪除已消費的訊息,各自獨立的消費者可消費同一個Broker分割區資料。
# 每批次最小抓取設定(推薦1位元組)
fetch.min.bytes
# 每批次最大抓取大小設定(推薦500ms)
fetch.max.bytes
# 未達到大小的超時設定(推薦50M)
fetch.max.wait.ms
# 單次拉取最大訊息條數設定(推薦500條)
max.poll.records
2.1、反序列化處理(對應了Producer端的序列化動作)
2.2、攔截器處理(如:彙總統計記錄)
儲存等的消費端動作。
當一個消費者掛掉或重啟後,是否還記得消費到的位置了?offset解決了此問題。
對於每一個topic,都會維持一個分割區紀錄檔,分割區中的每一個記錄都會分配一個Id來表示順序,稱之為offset,offset用來唯一的標識分割區中每條記錄,並將每次的消費位置提交到topic中。消費者恢復啟動後接著按序消費資料。
# 開啟自動提交
enable.auto.commit = true
# 每次提交間隔(推薦5秒)
auto.commit.interval.ms = 5000
先關閉自動提交後,在Consumer使用者端的程式碼中,通過呼叫方法函數提交,通常的方法名:
# 同步提交,等提交完成才可下一次再消費
.CommitSync
# 非同步提交,可直接進行下一個消費,也有可能提交失敗
.CommitAync
在Consumer使用者端的程式碼中,手動指定offset的位置進行消費,關聯到的方法函數名:
# 按指定時間得出offset值
.offsetsForTimes
# 按指定offset值繼續消費
.seek
# earliest: 最早消費;無offset時,從頭開始消費。
# latest: 最新消費;無offset時,從最新的資料開始消費。
# none: 無offset時,引發異常。
auto.offset.reset = earliest | latest | none
重複消費:offset未提交成功,下次消費還是舊的offset。
漏消費:offset提交成功,消費者端後續的資料處理未完成(建議下游步驟事務處理)。
為了實現橫向擴充套件,應用程式需要建立一個消費者群組,然後往群組裡新增消費者來提高處理效率,群組裡的每個消費者只處理一部分訊息。
消費者組是邏輯上的一個消費者,是由一個或多個消費者範例組成,具有可延伸性和可容錯性,消費者組內的消費者共用一個GroupId組成;組內每個消費者負責消費不同分割區資料,並行消費資料;當組內一個消費者掛了之後,其它消費者要自動承擔它的消費任務 - 組內再平衡。
消費成員與Broker分割區保持心跳連線,或者消費成員處理訊息時間過長,會被認為此消費者需要被移除,觸發組內消費成員任務再分配。以下設定任其一條件觸發再平衡:
# 心跳連線超時的 移除條件(建議45秒)
session.timeout.ms
# 訊息處理超時的 移除條件(建議5分鐘)
max.poll.interval.ms
# 再平衡策略設定項(可多策略組合)
partition.assignment.strategy = Range | RoundRobin | Sticky | CooperativeSticky