C# Kafka重置到最新的偏移量,即從指定的Partition訂閱訊息使用Assign方法

2023-04-11 18:01:52

在使用Kafka的過程中,消費者斷掉之後,再次開始消費時,消費者會從斷掉時的位置重新開始消費。

場景再現:比如昨天消費者晚上斷掉了,今天上午我們會發現kafka消費的資料不是最新的,而是昨天晚上的資料,由於資料量比較多,也不會及時的消費到今天上午的資料,這個時候就需要我們對偏移量進行重置為最新的,以獲取最新的資料。

前提,我們使用的AutoOffsetReset設定是Latest,即從連線到Kafka那一刻開始消費之後產生的訊息,之前釋出的訊息不在消費,這也是預設的設定。

關於AutoOffsetReset這個列舉的設定項如下:

    • latest (default) which means consumers will read messages from the tail of the partition
      最新(預設) ,這意味著使用者將從分割區的尾部讀取訊息,只消費最新的資訊,即自從消費者上線後才開始推播來的訊息。那麼會導致忽略掉之前沒有處理的訊息。
    • earliest which means reading from the oldest offset in the partition
      這意味著從分割區中最早的偏移量讀取;自動從消費者上次開始消費的位置開始,進行消費。
    • none throw exception to the consumer if no previous offset is found for the consumer's group
      如果沒有為使用者的組找到以前的偏移量,則不會向使用者丟擲異常。

接下來,我們直接使用下面這一段程式碼即可:

使用Assign訂閱指定的分割區,注意最後還需要使用Subscribe方法訂閱

consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1)),Offset.End));//從指定的Partition訂閱訊息使用Assign方法

consumer.Subscribe(topic);//訂閱訊息使用Subscribe方法

從指定的分割區獲取資料,並且指定了對應的偏移量

 關於Offset這個列舉不同設定項的說明如下:

Offset 可以被設定為 Beginning、End、Stored 和 Unset。這些值的含義如下:

  1. Beginning:從 Kafka 分割區的最早訊息(Offset 為 0)開始消費。如果分割區中有新訊息產生,消費者會繼續消費這些訊息。

  2. End:從 Kafka 分割區的最新訊息開始消費。如果消費者在啟動後到達了 Kafka 分割區的末尾,它將停止消費,並等待新訊息的到來。

  3. Stored:從消費者儲存的 Offset 開始消費。這個 Offset 通常是消費者在上次停止消費時儲存的 Offset。如果儲存的 Offset 失效或者已過期,消費者會從最新的訊息(End)開始消費。

  4. Unset:在消費者啟動時,Offset 沒有被設定。在這種情況下,消費者將根據 auto.offset.reset 設定項的值來決定從哪裡開始消費。如果 auto.offset.reset 的值為 latest,則從最新的訊息開始消費;如果 auto.offset.reset 的值為 earliest,則從最早的訊息開始消費。

需要注意的是,如果設定了 Stored 的 Offset,但是在 Kafka 中找不到對應的訊息,消費者將會從最新的訊息(End)開始消費。

因此,儲存的 Offset 必須要有效才能夠被正確地使用