在使用Kafka的過程中,消費者斷掉之後,再次開始消費時,消費者會從斷掉時的位置重新開始消費。
場景再現:比如昨天消費者晚上斷掉了,今天上午我們會發現kafka消費的資料不是最新的,而是昨天晚上的資料,由於資料量比較多,也不會及時的消費到今天上午的資料,這個時候就需要我們對偏移量進行重置為最新的,以獲取最新的資料。
前提,我們使用的AutoOffsetReset設定是Latest,即從連線到Kafka那一刻開始消費之後產生的訊息,之前釋出的訊息不在消費,這也是預設的設定。
關於AutoOffsetReset這個列舉的設定項如下:
latest
(default) which means consumers will read messages from the tail of the partitionearliest
which means reading from the oldest offset in the partitionnone
throw exception to the consumer if no previous offset is found for the consumer's group接下來,我們直接使用下面這一段程式碼即可:
使用Assign訂閱指定的分割區,注意最後還需要使用Subscribe方法訂閱
consumer.Assign(new TopicPartitionOffset(new TopicPartition(topic, new Partition(1)),Offset.End));//從指定的Partition訂閱訊息使用Assign方法
consumer.Subscribe(topic);//訂閱訊息使用Subscribe方法
從指定的分割區獲取資料,並且指定了對應的偏移量
關於Offset這個列舉不同設定項的說明如下:
Offset 可以被設定為 Beginning、End、Stored 和 Unset。這些值的含義如下:
Beginning:從 Kafka 分割區的最早訊息(Offset 為 0)開始消費。如果分割區中有新訊息產生,消費者會繼續消費這些訊息。
End:從 Kafka 分割區的最新訊息開始消費。如果消費者在啟動後到達了 Kafka 分割區的末尾,它將停止消費,並等待新訊息的到來。
Stored:從消費者儲存的 Offset 開始消費。這個 Offset 通常是消費者在上次停止消費時儲存的 Offset。如果儲存的 Offset 失效或者已過期,消費者會從最新的訊息(End)開始消費。
Unset:在消費者啟動時,Offset 沒有被設定。在這種情況下,消費者將根據 auto.offset.reset 設定項的值來決定從哪裡開始消費。如果 auto.offset.reset 的值為 latest,則從最新的訊息開始消費;如果 auto.offset.reset 的值為 earliest,則從最早的訊息開始消費。
需要注意的是,如果設定了 Stored 的 Offset,但是在 Kafka 中找不到對應的訊息,消費者將會從最新的訊息(End)開始消費。
因此,儲存的 Offset 必須要有效才能夠被正確地使用