消費者讀取訊息。在其他基於釋出與訂閱的訊息系統中,消費者可能被稱為訂閱者 或 讀者。
消費者訂閱一個或多個主題,並按照訊息生成的順序讀取它們。消費者通過檢查訊息的偏移量來區分已經讀取過的訊息。
偏移量是一種後設資料,它是一個不斷遞增的整數值,在建立訊息時, Kafka 會把偏移量新增到訊息裡。在給定的分割區裡,每個訊息的偏移量都是唯一的。消費者把每個分割區最後讀取的訊息的偏移量儲存在 Zookeeper 或 Kafka 上,如果消費者關閉或重啟,它的讀取狀態不會丟失。
消費者群組
消費者是消費者群組的一部分。一個群組裡的消費者訂閱的是同一個主題,每個消費者接收主題的一部分分割區的訊息。消費者群組保證每個分割區只能被一個消費者使用 。消費者與分割區之間的對映通常被稱為消費者對分割區的所有權關係。
通過消費者群組的方式,消費者可以消費包含大量訊息的主題。而且,如果一個消費者失效,消費者群組裡的其他消費者可以接管失效消費者的工作。
往群組裡增加消費者是橫向伸縮消費能力的主要方式。Kafka 消費者經常會做一些高延遲的操作,比如把資料寫到資料庫或 HDFS,或者使用資料進行比較耗時的計算。在這些情況下,單個消費者無法跟上資料生成的速度,所以可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分割區的訊息,這就是橫向伸縮的主要手段。
當一個消費者被關閉或發生崩潰時,這個消費者就離開群組,原本由它讀取的分割區將由消費者群組裡的其他消費者來讀取。
當一個新的消費者加入消費者群組時,這個新的消費者讀取的是原本由其他消費者讀取的訊息。
在主題發生變化時,比如管理員新增了新的分割區,會發生分割區重分配。分割區的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。分割區再均衡非常重要,它為消費者群組帶來了高可用性和伸縮性(我們可以放心地新增或移除消費者),不過在正常情況下,我們並不希望發生分割區再均衡。原因如下:
分割區再均衡的過程
消費者通過向被指派為群組協調器的 broker(不同的消費者群組可以有不同的協調器)傳送心跳來維持它們和群組的從屬關係以及它們對分割區的所有權關係。消費者會在輪詢訊息(為了獲取訊息)或提交偏移量時傳送心跳。
如果一個消費者發生崩潰,並停止讀取訊息,群組協調器會等待幾秒鐘,確認消費者已經死亡了才會觸發分割區再均衡。在清理消費者時,消費者會通知群組協調器它自己將要離開消費者群組,群組協調器會立即觸發一次分割區再均衡,儘量降低處理停頓。
分配分割區的過程:
當消費者要加入消費者群組時,消費者會向群組協調器傳送一個 JoinGroup 請求。第一個加入群組的消費者將成為「群主」。
群主從群組協調器那裡獲得群組的成員列表(列表中包含了所有最近傳送過心跳的消費者,它們被認為是活躍的),並負責給每一個消費者分配分割區。它使用一個實現了 PartitionAssignor 介面的類來決定哪些分割區應該被分配給哪個消費者,Kafka 內建了兩種分割區分配策略。
群主將分割區分配完畢之後,群主把分割區的分配情況列表傳送給群組協調器,群組協調器再把這些資訊傳送給所有消費者。
每個消費者只能看到自己的分割區分配資訊,只有群主知道群組裡所有消費者的分配資訊。這個過程會在每次分割區再均衡時重複發生。消費者群組的群主應該保證在分配分割區時,儘可能少的改變原有的分割區和消費者的對映關係。
應用程式使用 KafkaConsumer 向 Kafka 訂閱主題,並從訂閱的主題上接收訊息。
應用程式呼叫 kafkaConsumer 的 subscribe() 方法訂閱主題:
輪詢
消費者通過一個簡單的輪詢向伺服器請求資料。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括消費者群組協調、分割區再均衡、傳送心跳和獲取資料,開發者只需要使用一組簡單的 API 來處理從分割區返回的資料。
輪詢不只是獲取資料那麼簡單。在第一次呼叫新消費者的 poll() 方法時,它會負責查詢 GroupCoordinator,然後加入群組,接受分配的分割區。如果發生了分割區再均衡,整個過程也是在輪詢期間進行的。當然,心跳也是從輪詢裡傳送出去的。所以,我們要確保在輪詢期間所做的任何處理工作都應該儘快完成。
我們把更新分割區當前位置的操作叫作提交。那麼消費者是如何提交偏移量的呢?消費者往一個叫作 _consumer_offset 的特殊主題傳送訊息,訊息裡包含每個分割區的偏移量。
如果消費者一直處於執行狀態,那麼偏移量就沒有什麼用處。不過,如果消費者發生崩潰或者有新的消費者加入群組,就會觸發分割區再均衡,完成分割區再均衡之後,每個消費者可能分配到新的分割區,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分割區最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。
所以,處理偏移量的方式對使用者端會有很大的影響。KafkaConsumer API 提供了很多種方式來提交偏移量:自動提交偏移量、手動提交偏移量。
如果 enable.auto.commit 被設為 true,那麼每過 5s,消費者會自動把從 poll() 方法接收到的最大偏移量提交上去。提交的時間間隔由 auto.commit.interval.ms 控制,預設值是 5s。
與消費者裡的其他東西一樣,自動提交也是在輪詢裡進行的。消費者每次在進行輪詢時會檢查是否應該提交偏移量了,如果距離上次的提交時間已經超過了設定引數 auto.commit.interval.ms 指定的值,那麼就會提交上一次輪詢返回的偏移量。
在呼叫 close() 方法之前也會進行自動提交。
讓消費者自動提交偏移量是最簡單的方式。不過,在使用這種簡便的方式之前,需要知道自動提交將會帶來怎樣的結果。
假設我們使用預設的 5s 提交時間間隔,在最近一次提交之後的 3s 發生了分割區再均衡,分割區再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。這個時候偏移量已經落後了 3s,所以在這 3s 內消費者已經處理過的訊息會再被重複處理。我們可以通過修改提交時間間隔來更頻繁地提交偏移量,減小可能出現重複訊息的時間視窗,不過這種情況是無法完全避免的。
手動提交指的是,把 auto.commit.offset 設為 false,讓應用程式決定何時提交偏移量。應用程式可以使用 commitSync()、commitAsync() 方法手動提交偏移量
消費者也可以提交特定的偏移量:消費者 API 允許在呼叫 commitSync() 和 commitAsync() 方法時傳進去希望提交的分割區和偏移量的 map,這樣我們就可以提交特定的偏移量。需要使用期望處理的下一個訊息的偏移量更新 map 裡的偏移量。
非同步提交:同步提交有一個不足之處,在 broker 對提交請求作出迴應之前,應用程式會一直阻塞,這樣會限制應用程式的吞吐量。我們可以通過降低提交頻率來提升吞吐量,但如果發生了分割區再均衡,會增加重複訊息的數量。這個時候我們可以使用非同步提交,我們只管傳送提交請求,無需等待 broker 的響應。
在【分割區再均衡前後】、【消費者開始讀取訊息之前】、【消費者停止讀取訊息之後】我們可以通過消費者 API 執行一些應用程式程式碼,在呼叫 kafkaConsumer 的 subscribe() 方法時傳進去一個 ConsumerRebalanceListener 範例就可以了。
再均衡監聽器 ConsumerRebalanceListener 有兩個需要實現的方法。
如果消費者確定要退出迴圈,需要通過另一個執行緒呼叫 consumer.wakeup() 方法。
consumer.wakeup() 是消費者唯一一個可以從其他執行緒裡安全呼叫的方法。
呼叫 consumer.wakeup() 可以退出 poll(),並丟擲 WakeupException 異常,或者如果呼叫 consumer.wakeup() 時執行緒沒有等待輪詢,那麼異常將在下一輪呼叫 poll() 時丟擲。我們不需要處理 WakeupException,因為它只是用於跳出迴圈的一種方式。
我們可能只需要一個消費者從一個主題的所有分割區或者某個特定的分割區讀取資料。這個時候就不需要消費者群組和分割區再均衡了,只需要把主題或者分割區分配給消費者,然後開始讀取訊息並提交偏移量。
如果是這樣的話,就不需要訂閱主題,取而代之的是為自己分配分割區。一個消費者可以訂閱主題(並加入消費者群組),或者為自己分配分割區,但不能同時做這兩件事情。
獨立消費者除了不會發生分割區再均衡,也不需要手動查詢分割區,其他的看起來一切正常。不過要記住,如果主題增加了新的分割區,消費者並不會收到通知。所以,要麼週期性地呼叫 consumer.partitionsFor() 方法來檢查是否有新分割區加入,要麼在新增新分割區後重啟應用程式。
public void singleCustomer() {
// 向叢集請求主題可用的分割區。如果只打算讀取特定分割區,可以跳過這一步
List<PartitionInfo> partitionInfos = consumer.partitionsFor("topic");
ArrayList<TopicPartition> partitions = new ArrayList<>();
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos) {
partitions.add(new TopicPartition(partition.topic(), partition.partition()));
}
// 為自己分配分割區
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
consumer.commitSync();
}
}
}
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
private KafkaConsumer consumer;
public void MyConsumerRebalanceListener(KafkaConsumer consumer) {
this.consumer = consumer;
}
public static Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
/**
* 在消費者失去分割區所有權之前,提交偏移量
*
* @param partitions
*/
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Lost partitions in rebalance. Committing current offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
/**
* 在消費者獲取分割區所有權之後,指定讀取訊息的起始偏移量
*
* @param partitions
*/
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
long offset = 0;
for (TopicPartition partition : partitions) {
consumer.seek(partition, offset);
}
}
}
public void customer() {
consumer.subscribe(Collections.singletonList("MyTopic"), new MyConsumerRebalanceListener(consumer));
// 如果不需要手動指定消費者讀取訊息的起始偏移量,下面的程式碼不是必須的
consumer.poll(0);
long offset = 0;
for (TopicPartition partition : consumer.assignment()) {
consumer.seek(partition, offset);
}
try {
while (true) {
// 引數是一個超時時間,用於控制 poll() 方法的阻塞時間(在消費者的緩衝區裡沒有可用資料時會發生阻塞)。
// 如果該引數被設為 0,poll() 會立即返回,否則它會在指定的毫秒數內一直等待 broker 返回資料。
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 每條記錄都包含了記錄所屬主題的資訊、記錄所在分割區的資訊、記錄在分割區裡的偏移量、訊息以及訊息鍵。
System.out.printf("topic = %s, partition = %s, offset = % d, customer = %s, country = %s\n ", record.topic(), record.partition(), record.offset(), record.key(), record.value());
// 將記錄儲存到資料儲存系統裡
System.out.println(record);
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no metadata"));
}
// 如果一切正常,我們使用 commitAsync() 方法來提交。這樣速度更快,而且即使這次提交失敗,下一次提交很可能會成功。
// 使用 commitAsync() 方法只會執行一次提交,不會重試
consumer.commitAsync(currentOffsets, new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e != null) {
log.error("Commit failed for offsets {}", map, e);
}
}
});
}
} catch (WakeupException e) {
// 我們不需要處理 WakeupException,因為它只是用於跳出迴圈的一種方式
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 如果直接關閉消費者,就沒有所謂的「下一次提交」了。使用 commitSync() 方法會一直重試,直到提交成功或發生無法恢復的錯誤。
consumer.commitSync();
} finally {
// 在退出應用程式之前使用 close() 方法關閉消費者。網路連線和 socket 也會隨之關閉
// 並向群組協調器傳送訊息,告知自己要離開群組,接下來就會觸發再均衡,而不需要等待對談超時
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
}
《Kafka 權威指南》第 4 章:Kafka 消費者——從 Kafka 讀取資料
本文來自部落格園,作者:真正的飛魚,轉載請註明原文連結:https://www.cnblogs.com/feiyu2/p/17254520.html