訊息的消費一般有兩種模式,推模式和拉模式。推模式是伺服器端主動將訊息推播給消費者,而拉模式是消費者主動向伺服器端發起請求來拉取訊息。kakfa採用的是拉模式,這樣可以很好的控制消費速率。那麼kafka消費的具體工作流程是什麼樣的呢?kafka的位移管理又是怎麼樣的呢?
kafka
是以消費者組進行消費,一個消費者組,由多個consumer
組成,他們和topic
的消費規則如下:
topic
的一個分割區只能被消費組中的一個消費者消費。通過這種分組、分割區的消費方式,可以提高消費者的吞吐量,同時也能夠實現訊息的釋出/訂閱模式和對等兩種模式。
消費者消費總體分為兩個步驟,第一步是制定消費的方案,就是這個組下哪個消費者消費哪個分割區,第二個是建立網路連線,獲取訊息資料。
一、制定消費方案
consumerA
,consumerB
, consumerC
向kafka
叢集中的協調器coordinator
傳送JoinGroup
的請求。coordinator
主要是用來輔助實現消費者組的初始化和分割區的分配。coordinator
老大節點選擇 = groupid
的hashcode
值 % 50( __consumer_offsets
內建主題位移的分割區數量)例如: groupid
的hashcode值 為1,1% 50 = 1
,那麼__consumer_offsets
主題的1號分割區,在哪個broker
上,就選擇這個節點的coordinator
作為這個消費者組的老大。消費者組下的所有的消費者提交offset
的時候就往這個分割區去提交offset
。consumer
作為消費中的leader
,比如上圖中的consumerB
。leader
制定出消費方案,比如誰來消費哪個分割區等,有Range
分割區策略、RoundRobin
分割區策略等。coordinator
coordinator
就把消費方案下發給各個consumer
, 圖中只畫了一條線,實際上是會下發到各個consumer
。二、消費者消費細節
現在已經初始化消費者組資訊,知道哪個消費者消費哪個分割區,接著我們來看看消費者細節。
ConsumerNetworkClient
, 傳送消費請求,可以進行如下設定:fetch.min.bytes
: 每批次最小抓取大小,預設1位元組fetch.max.bytes
: 每批次最大抓取大小,預設50M
fetch.max.wait.ms
:最大超時時間,預設500ms
kafka
叢集completedFetches
佇列中max.poll.records
一次拉取資料返回訊息的最大條數,預設500條。offset
,也就是這個消費者消費到什麼位置了,這樣下次重啟也可以繼續從這個位置開始消費,關於offset
的管理後面詳細介紹。前面簡單提到了消費者組初始化的時候會對分割區進行分配,那麼具體的分配策略是什麼呢,也就是哪個消費者消費哪個分割區資料?
kafka有四種主流的分割區分配策略: Range
、RoundRobin
、Sticky
、CooperativeSticky
。可以通過設定引數partition.assignment.strategy
,修改分割區的分配策略。預設策略是Range + CooperativeSticky
。Kafka可以同時使用多個分割區分配策略。
Range
分割區策略Range
分割區 是對每個 topic
而言的。對同一個 topic
裡面的分割區按照序號進行排序,並對消費者按照字母順序進行排序。partitions
數/consumer
數 來決定每個消費者應該消費幾個分割區。如果除不盡,那麼前面幾個消費者將會多消費 1 個分割區。如上圖所示:有 7 個分割區,3 個消費者,排序後的分割區將會是0,1,2,3,4,5,6;消費者排序完之後將會是C0,C1,C2。7/3 = 2 餘 1 ,除不盡,那麼 消費者 C0 便會多消費 1 個分割區。 8/3=2餘2,除不盡,那麼C0和C1分別多消費一個。
這種方式容易造成資料傾斜!如果有 N 多個 topic
,那麼針對每個 topic
,消費者 C0都將多消費 1 個分割區,topic
越多,C0消費的分割區會比其他消費者明顯多消費 N 個分割區。
RoundRobin
針對叢集中所有topic
而言,RoundRobin
輪詢分割區策略,是把所有的 partition
和所有的consumer
都列出來,然後按照 hashcode
進行排序,最後通過輪詢演演算法來分配 partition
給到各個消費者。
Sticky
是粘性的意思,它是從 0.11.x 版本開始引入這種分配策略,首先會盡量均衡的放置分割區到消費者上面,在出現同一消費者組內消費者出現問題的時候,在rebalance
會盡量保持原有分配的分割區不變化,這樣可以節省開銷。
Cooperative Sticky
和Sticky
類似,但是它會將原來的一次大規模rebalance
操作,拆分成了多次小規模的rebalance
,直至最終平衡完成,所以體驗上會更好。
關於什麼是rebalance
繼續往下看你就知道了。
上面也提到了rebalance
,也就是再均衡。當kafka發生下面的情況會進行在均衡,也就是重新給消費者分配分割區:
Group Coordinator
傳送心跳等情況時,GroupCoordinato
r 會認為消費者己下線。 Group Coorinator
節點發生了變更。 消費者需要儲存當前消費到分割區的什麼位置了,這樣哪怕消費者故障,重啟後也能繼續消費,這就是消費者的維護offset管理。
一、消費者位移offset儲存位置
消費者位移offset
儲存在哪呢?
kafka0.9
版本之前,consumer
預設將offset
儲存在Zookeeper
中consumer
預設將offset
儲存在Kafka
一個內建的topic
中,該topic
為__consumer_offsets
,這樣可以大量減少和zookeeper
的互動。__consumer_offsets
主題裡面採用 key
和 value
的方式儲存資料。key
是 group.id+topic+
分割區號,value
就是當前 offset
的值。如何檢視__consumer_offsets
主題內容?
config/consumer.properties
中新增設定 exclude.internal.topics=false
,預設是 true
,表示不能消費系統主題。為了檢視該系統主題資料,所以該引數修改為 false
。__consumer_offsets
。bin/kafka-console-consumer.sh --topic
__consumer_offsets --bootstrap-server hadoop102:9092 --
consumer.config config/consumer.properties --formatter
"kafka.coordinator.group.GroupMetadataManager$OffsetsMessageForm
atter" --from-beginning
## topic1 1號分割區
[offset,topic1,1]::OffsetAndMetadata(offset=7,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
## topic1 0號分割區
[offset,topic1,0]::OffsetAndMetadata(offset=8,
leaderEpoch=Optional[0], metadata=, commitTimestamp=1622442520203,
expireTimestamp=None)
二、消費者位移offset提交儲存模式
消費者是如何提交儲存位移offset呢?
為了使我們能夠專注於自己的業務邏輯,kafka預設提供了自動提交offset
的功能。這個由消費者使用者端引數 enable.auto.commit
設定, 預設值為 true
。當然這個預設的自動提交不是每消費一條訊息就提交一次,而是定期提交,這個定期的週期時間由使用者端引數 auto.commit.interval.ms
設定,預設值為 5 秒。
poll()
方法的邏輯裡完成的,在每次真正向伺服器端發起拉取請求之前會檢查是否可以進行位移提交,如果可以,那麼就會提交上一次輪詢的位移。自動提交會帶來什麼問題?
自動提交消費位移的方式非常簡便,但會帶來是重複消費的問題。
假設剛剛提交完一次消費位移,然後拉取一批訊息進行消費,在下一次自動提交消費位移之前,消費者崩潰了,那麼又得從上一次位移提交的地方重新開始消費,這樣便發生了重複消費的現象。
我們可以通過減小位移提交的時間間隔來減小重複訊息的視窗大小,但這樣 並不能避免重複消費的傳送,而且也會使位移提交更加頻繁。
很多時候並不是說拉取到訊息就算消費完成,而是需要將訊息寫入資料庫、寫入本地快取,或者是更 加複雜的業務處理。在這些場景下,所有的業務處理完成才能認為訊息被成功消費。手動的提交方式可以讓開發人員根據程式的邏輯在合適的地方進行位移提交。
// 是否自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
手動提交可以細分為同步提交和非同步提交,對應於 KafkaConsumer
中的 commitSync()
和 commitAsync()
兩種型別的方法。
同步提交會阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗),它必須等待offset
提交完畢,再去消費下一批資料。
// 同步提交 offset
consumer.commitSync();
非同步提交則沒有失敗重試機制,故有可能提交失敗。它傳送完提交offset請求後,就開始消費下一批資料了。
// 非同步提交 offset
consumer.commitAsync();
那麼手動提交會帶來什麼問題呢?可能會出現"漏訊息"的情況。
設定offset
為手動提交,當offset
被提交時,資料還在記憶體中未落盤,此時剛好消費者執行緒被kill掉,那麼offset已經提交,但是資料未處理,導致這部分記憶體中的資料丟失。
我們可以通過消費者事物來解決這樣的問題。
其實無論是手動提交還是自動提交,都有可能出現訊息重複和是漏訊息,與我們的程式設計模型有關,需要我們開發的時候根據訊息的重要程度來選擇合適的消費方案。
一個正常的消費邏輯需要具備以下幾個步驟:
(1)設定消費者使用者端引數及建立相應的消費者範例;
(2)訂閱主題;
(3)拉取訊息並消費;
(4)提交消費位移 offset
;
(5)關閉消費者範例。
public class MyConsumer {
public static void main(String[] args) {
Properties props = new Properties();
// 定義 kakfa 服務的地址,不需要將所有 broker 指定上
props.put("bootstrap.servers", "doitedu01:9092");
// 制定 consumer group
props.put("group.id", "g1");
// 是否自動提交 offset
props.put("enable.auto.commit", "true");
// 自動提交 offset 的時間間隔
props.put("auto.commit.interval.ms", "1000");
// key 的反序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// value 的反序列化類
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 如果沒有消費偏移量記錄,則自動重設為起始 offset:latest, earliest, none
props.put("auto.offset.reset","earliest");
// 定義 consumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 消費者訂閱的 topic, 可同時訂閱多個
consumer.subscribe(Arrays.asList("first", "test","test1"));
while (true) {
// 讀取資料,讀取超時時間為 100ms
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
consumer.subscribe(Arrays.asList(topicl ));
consumer subscribe(Arrays.asList(topic2))
如果消費者採用的是正規表示式的方式(subscribe(Pattern)
)訂閱, 在之後的過程中,如果 有人又建立了新的主題,並且主題名字與正表示式相匹配,那麼這個消費者就可以消費到 新新增的主題中的訊息。
consumer.subscribe(Pattern.compile ("topic.*" ));
消費者不僅可以通過 KafkaConsumer.subscribe()
方法訂閱主題,還可直接訂閱某些主題的指定分割區。
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(「tpc_2」,1))) ;
通過unsubscribe()
方法採取消主題的訂閱。
consumer.unsubscribe();
kafka 中的訊息消費是一個不斷輪詢的過程,消費者所要做的就是重複地呼叫 poll()
方法, poll()
方法返回的是所訂閱的主題(分割區)上的一組訊息。
對於 poll ()
方法而言,如果某些分割區中沒有可供消費的訊息,那麼此分割區對應的訊息拉取的結果就為空。
public ConsumerRecords<K, V> poll(final Duration timeout)
超時時間引數 timeout
,用來控制 poll()
方法的阻塞時間,在消費者的緩衝區裡沒有可用資料時會發生阻塞。
有些時候,我們需要一種更細粒度的掌控,可以讓我們從特定的位移處開始拉取訊息,而 KafkaConsumer
中的 seek(
方法正好提供了這個功能,讓我們可以追前消費或回溯消費。
public void seek(TopicPartiton partition,long offset)
最後我們總結一下消費者中重要的引數設定。
引數名稱 | 描述 |
---|---|
bootstrap.servers | 向 Kafka 叢集建立初始連線用到的 host/port 列表。 |
key.deserializer 和value.deserializer | 指定接收訊息的 key 和 value 的反序列化型別。一定要寫全類名。 |
group.id | 標記消費者所屬的消費者組。 |
enable.auto.commit | 預設值為 true,消費者會自動週期性地向伺服器提交偏移量。 |
auto.commit.interval.ms | 如果設定了 enable.auto.commit 的值為 true, 則該值定義了消費者偏移量向 Kafka 提交的頻率,預設 5s。 |
auto.offset.reset | 當 Kafka 中沒有初始偏移量或當前偏移量在伺服器中不存在(如,資料被刪除了),該如何處理? earliest:自動重置偏移量到最早的偏移量。 latest:預設,自動重置偏移量為最新的偏移量。 none:如果消費組原來的(previous)偏移量不存在,則向消費者拋異常。 anything:向消費者拋異常。 |
offsets.topic.num.partitions | __consumer_offsets 的分割區數,預設是 50 個分割區。 |
heartbeat.interval.ms | Kafka 消費者和 coordinator 之間的心跳時間,預設 3s。該條目的值必須小於 session.timeout.ms ,也不應該高於session.timeout.ms 的 1/3。 |
session.timeout.ms | Kafka 消費者和 coordinator 之間連線超時時間,預設 45s。超過該值,該消費者被移除,消費者組執行再平衡。 |
max.poll.interval.ms | 消費者處理訊息的最大時長,預設是 5 分鐘。超過該值,該消費者被移除,消費者組執行再平衡。 |
fetch.min.bytes | 預設 1 個位元組。消費者獲取伺服器端一批訊息最小的位元組數。 |
fetch.max.wait.ms | 預設 500ms。如果沒有從伺服器端獲取到一批資料的最小位元組數。該時間到,仍然會返回資料。 |
fetch.max.bytes | 預設 Default: 52428800(50 m)。消費者獲取伺服器端一批訊息最大的位元組數。如果伺服器端一批次的資料大於該值(50m)仍然可以拉取回來這批資料,因此,這不是一個絕對最大值。一批次的大小受 message.max.bytes (broker config)or max.message.bytes (topic config)影響。 |
max.poll.records | 一次 poll 拉取資料返回訊息的最大條數,預設是 500 條。 |
kafka消費是很重要的一個環節,本文總結kafka消費者的一些重要機制,包括消費者的整個流程,消費的分割區策略,消費的再平衡以及消費的位移管理。在明白這些機制以後,簡單講解了如何使用消費者consumer
的API以及消費者中重要的引數。
歡迎關注個人公眾號【JAVA旭陽】交流學習!
本文來自部落格園,作者:JAVA旭陽,轉載請註明原文連結:https://www.cnblogs.com/alvinscript/p/17448122.html