多分割區原子寫入:
事務能夠保證Kafka topic下每個分割區的原⼦寫⼊。事務中所有的訊息都將被成功寫⼊或者丟棄。
⾸先,我們來考慮⼀下原⼦讀取-處理-寫⼊週期是什麼意思。簡⽽⾔之,這意味著如果某個應⽤程式在某個topic tp0的偏移量X處讀取到了訊息A,並且在對訊息A進⾏了⼀些處理(如B = F(A)),之後將訊息B寫⼊topic tp1,則只有當訊息A和B被認為被成功地消費並⼀起釋出,或者完全不釋出時,整個讀取過程寫⼊操作是原⼦的。
現在,只有當訊息A的偏移量X被標記為已消費,訊息A才從topic tp0消費,消費到的資料偏移量(record offset)將被標記為提交偏移量(Committing offset)。在Kafka中,我們通過寫⼊⼀個名為offsets topic的內部Kafka topic來記錄offset commit。訊息僅在其offset被提交給offsets topic時才被認為成功消費。
由於offset commit只是對Kafka topic的另⼀次寫⼊,並且由於訊息僅在提交偏移量時被視為成功消費,所以跨多個主題和分割區的原⼦寫⼊也啟⽤原⼦讀取-處理-寫⼊迴圈:提交偏移量X到offset topic和訊息B到tp1的寫⼊將是單個事務的⼀部分,所以整個步驟都是原⼦的。
粉碎「殭屍範例」:
我們通過為每個事務Producer分配⼀個稱為transactional.id
的唯⼀識別符號來解決僵⼫範例的問題。在程序重新啟動時能夠識別相同的Producer範例。
API要求事務性Producer的第⼀個操作應該是在Kafka叢集中顯示註冊transactional.id
。 當註冊的時候,Kafka broker⽤給定的transactional.id
檢查開啟的事務並且完成處理。 Kafka也增加了⼀個與transactional.id
相關的epoch。Epoch儲存每個transactional.id
內部後設資料。
⼀旦epoch被觸發,任何具有相同的transactional.id
和舊的epoch的⽣產者被視為僵⼫,Kafka拒絕來⾃這些⽣產者的後續事務性寫⼊。
簡⽽⾔之:Kafka可以保證Consumer最終只能消費⾮事務性訊息或已提交事務性訊息。它將保留來⾃未完成事務的訊息,並過濾掉已中⽌事務的訊息。
在⼀個原⼦操作中,根據包含的操作型別,可以分為三種情況,前兩種情況是事務引⼊的場景,最後⼀種沒⽤:
consume-transform-produce
模式建立消費者程式碼,需要:
auto.commit
)進⾏關閉commitSync()
或者commitAsync()
isolation.level
建立生產者,程式碼如下,需要:
transactional.id
屬性enable.idempotence
屬性事務相關設定
Broker configs:
設定項 | 說明 |
---|---|
transactional.id.timeout.ms | 在ms中,事務協調器在⽣產者TransactionalId提前過期之前等待的最⻓時間,並且沒有從該⽣產者TransactionalId接收到任何事務狀態更新。預設是604800000(7天)。這允許每週⼀次的⽣產者作業維護它們的id |
max.transaction.timeout.ms | 事務允許的最⼤超時。如果使用者端請求的事務時間超過此時間,broke將在InitPidRequest中返回InvalidTransactionTimeout錯誤。這可以防⽌客戶機超時過⼤,從⽽導致⽤戶⽆法從事務中包含的主題讀取內容。 預設值為900000(15分鐘)。這是訊息事務需要傳送的時間的保守上限。 |
transaction.state.log.replication.factor | 事務狀態topic的副本數量。預設值:3 |
transaction.state.log.num.partitions | 事務狀態主題的分割區數。預設值:50 |
transaction.state.log.min.isr | 事務狀態主題的每個分割區ISR最⼩數量。預設值:2 |
transaction.state.log.segment.bytes | 事務狀態主題的segment⼤⼩。預設值:104857600位元組 |
Producer configs:
設定項 | 說明 |
---|---|
enable.idempotence | 開啟冪等 |
transaction.timeout.ms | 事務超時時間 事務協調器在主動中⽌正在進⾏的事務之前等待⽣產者更新事務狀態的最⻓時間。這個設定值將與InitPidRequest⼀起傳送到事務協調器。如果該值⼤於max.transaction.timeout。在broke中設定ms時,請求將失敗,並出現InvalidTransactionTimeout錯誤。 預設是60000。這使得交易不會阻塞下游消費超過⼀分鐘,這在實時應⽤程式中通常是允許的。 |
transactional.id | ⽤於事務性交付的TransactionalId。這⽀持跨多個⽣產者對談的可靠性語意,因為它允許使用者端確保使⽤相同TransactionalId的事務在啟動任何新事務之前已經完成。如果沒有提供TransactionalId,則⽣產者僅限於冪等交付。 |
Consumer configs:
設定項 | 說明 |
---|---|
isolation.level | - read_uncommitted:以偏移順序使⽤已提交和未提交的訊息。 - read_committed:僅以偏移量順序使⽤⾮事務性訊息或已提交事務性訊息。為了維護偏移排序,這個設定意味著我們必須在使⽤者中緩衝訊息,直到看到給定事務中的所有訊息。 |
事務協調器和事務⽇志
事務協調器是每個Kafka內部運⾏的⼀個模組。事務⽇志是⼀個內部的主題。每個協調器擁有事務⽇志所在分割區的⼦集,即這些 borker 中的分割區都是Leader。
每個transactional.id
都通過⼀個簡單的雜湊函數對映到事務⽇志的特定分割區,事務⽇志⽂件__transaction_state-0
。這意味著只有⼀個Broker擁有給定的transactional.id
。
通過這種⽅式,我們利⽤Kafka可靠的複製協定和Leader選舉流程來確保事務協調器始終可⽤,並且所有事務狀態都能夠持久化。
值得注意的是,事務⽇志只儲存事務的最新狀態⽽不是事務中的實際訊息。訊息只儲存在實際的Topic的分割區中。事務可以處於諸如「Ongoing」,「prepare commit」和「Completed」之類的各種狀態中。正是這種狀態和關聯的後設資料儲存在事務⽇志中。
事務資料流
資料流在抽象層⾯上有四種不同的型別
initTransactions API
向coordinator
註冊⼀個transactional.id
。 此時,coordinator
使⽤該transactional.id
關閉所有待處理的事務,並且會避免遇到僵⼫範例,由具有相同的transactional.id
的Producer的另⼀個範例啟動的任何事務將被關閉和隔離。每個Producer對談只發⽣⼀次。coordinator
註冊分割區commitTransaction
或abortTransaction
時,會向coordinator
傳送⼀個請求以開始兩階段提交協定。Kafka在引⼊冪等性之前,Producer向Broker傳送訊息,然後Broker將訊息追加到訊息流中後給Producer返回Ack訊號值。實現流程如下:
⽣產中,會出現各種不確定的因素,⽐如在Producer在傳送給Broker的時候出現⽹絡異常。⽐如以下這種異常情況的出現:
上圖這種情況,當Producer第⼀次傳送訊息給Broker時,Broker將訊息(x2,y2)追加到了訊息流中,但是在返回Ack訊號給Producer時失敗了(⽐如⽹絡異常) 。此時,Producer端觸發重試機制,將訊息(x2,y2)重新傳送給Broker,Broker接收到訊息後,再次將該訊息追加到訊息流中,然後成功返回Ack訊號給Producer。這樣下來,訊息流中就被重複追加了兩條相同的(x2,y2)的訊息。
冪等性
保證在訊息重發的時候,消費者不會重複處理。即使在消費者收到重複訊息的時候,重複處理,也要保證最終結果的⼀致性。
所謂冪等性,數學概念就是:f(f(x)) = f(x)
。f函數表示對訊息的處理。
⽐如,銀⾏轉賬,如果失敗,需要重試。不管重試多少次,都要保證最終結果⼀定是⼀致的。
冪等性實現
新增唯⼀ID,類似於資料庫的主鍵,⽤於唯⼀標記⼀個訊息。
Kafka為了實現冪等性,它在底層設計架構中引⼊了ProducerID
和SequenceNumber
。
同樣,這是⼀種理想狀態下的傳送流程。實際情況下,會有很多不確定的因素,⽐如Broker在傳送Ack訊號給Producer時出現⽹絡異常,導致傳送失敗。異常情況如下圖所示:
當Producer傳送訊息(x2,y2)給Broker時,Broker接收到訊息並將其追加到訊息流中。此時,Broker返回Ack訊號給Producer時,發⽣異常導致Producer接收Ack訊號失敗。對於Producer來說,會觸發重試機制,將訊息(x2,y2)再次傳送,但是,由於引⼊了冪等性,在每條訊息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber傳送給Broker,⽽之前Broker快取過之前傳送的相同的訊息,那麼在訊息流中的訊息就只有⼀條(x2,y2),不會出現重複傳送的情況。
使用者端在⽣成Producer時,會範例化如下程式碼:
// 範例化⼀個Producer物件
Producer<String, String> producer = new KafkaProducer<>(props);
在org.apache.kafka.clients.producer.internals.Sender
類中,在run()中有⼀個maybeWaitForPid()
⽅法,⽤來⽣成⼀個ProducerID,實現程式碼如下:
private void maybeWaitForPid() {
if (transactionState == null)
return;
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. " + "We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. " + "We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
在Kafka事務中,⼀個原⼦性操作,根據操作型別可以分為3種情況。情況如下:
// 初始化事務,需要注意確保transation.id屬性被分配
void initTransactions();
// 開啟事務
void beginTransaction() throws ProducerFencedException;
// 為Consumer提供的在事務內Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException;
// 提交事務
void commitTransaction() throws ProducerFencedException;
// 放棄事務,類似於回滾事務的操作
void abortTransaction() throws ProducerFencedException;
案例1:單個Producer,使⽤事務保證訊息的僅⼀次傳送:
package com.mfc.kafka.demo.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyTransactionalProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 提供使用者端ID
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
// 事務ID
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
// 要求ISR都確認
configs.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
// 初始化事務
producer.initTransactions();
// 開啟事務
producer.beginTransaction();
try {
// producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_01"));
producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
// int i = 1 / 0;
// 提交事務
producer.commitTransaction();
} catch (Exception ex) {
// 中⽌事務
producer.abortTransaction();
} finally {
// 關閉⽣產者
producer.close();
}
}
}
案例2:在消費-轉換-⽣產模式,使⽤事務保證僅⼀次傳送。
package com.mfc.kafka.demo;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class MyTransactional {
public static KafkaProducer<String, String> getProducer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 設定client.id
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
// 設定事務id
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
// 需要所有的ISR副本確認
configs.put(ProducerConfig.ACKS_CONFIG, "all");
// 啟⽤冪等性
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
return producer;
}
public static KafkaConsumer<String, String> getConsumer(String consumerGroupId) {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 設定消費組ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
// 不啟⽤消費者偏移量的⾃動確認,也不要⼿動確認
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 唯讀取已提交的訊息
// configs.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);
return consumer;
}
public static void main(String[] args) {
String consumerGroupId = "consumer_grp_id_101";
KafkaProducer<String, String> producer = getProducer();
KafkaConsumer<String, String> consumer = getConsumer(consumerGroupId);
// 事務的初始化
producer.initTransactions();
//訂閱主題
consumer.subscribe(Collections.singleton("tp_tx_01"));
final ConsumerRecords<String, String> records = consumer.poll(1_000);
// 開啟事務
producer.beginTransaction();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
producer.send(new ProducerRecord<String, String>("tp_tx_out_01", record.key(), record.value()));
offsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1)); // 偏移量表示下⼀條要消費的訊息
}
// 將該訊息的偏移量提交作為事務的⼀部分,隨事務提交和回滾(不提交消費偏移量)
producer.sendOffsetsToTransaction(offsets, consumerGroupId);
// int i = 1 / 0;
// 提交事務
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
// 回滾事務
producer.abortTransaction();
} finally {
// 關閉資源
producer.close();
consumer.close();
}
}
}
Kafka叢集包含若⼲個broker,broker.id
指定broker的編號,編號不要重複。
Kafka叢集上建立的主題,包含若⼲個分割區。
每個分割區包含若⼲個副本,副本因⼦包括了Follower副本和Leader副本。
副本⼜分為ISR(同步副本分割區)和OSR(⾮同步副本分割區)。
控制器就是⼀個broker。
控制器除了⼀般broker的功能,還負責Leader分割區的選舉。
Broker 選舉:
叢集⾥第⼀個啟動的broker在Zookeeper中建立臨時節點<KafkaZkChroot>/controller
。
其他broker在該控制器節點建立Zookeeper watch物件,使⽤Zookeeper的監聽機制接收該節點的變更。
即:Kafka通過Zookeeper的分散式鎖特性選舉叢集控制器。
下圖中,節點/myKafka/controller
是⼀個zookeeper臨時節點,其中"brokerid":0,表示當前控制器是broker.id
為 0 的broker。
每個新選出的控制器通過 Zookeeper 的條件遞增操作獲得⼀個全新的、數值更⼤的 controller epoch。其他 broker 在知道當前 controller epoch 後,如果收到由控制器發出的包含較舊epoch 的訊息,就會忽略它們,以防⽌「腦裂」。
⽐如當⼀個Leader副本分割區所在的broker宕機,需要選舉新的Leader副本分割區,有可能兩個具有不同紀元數位的控制器都選舉了新的Leader副本分割區,如果選舉出來的Leader副本分割區不⼀樣,聽誰的?腦裂了。有了紀元數位,直接使⽤紀元數位最新的控制器結果。
當控制器發現⼀個 broker 已經離開叢集,那些失去Leader副本分割區的Follower分割區需要⼀個新Leader(這些分割區的⾸領剛好是在這個 broker 上)。
下圖中,<KafkaChroot>/brokers/ids/0
儲存該broker的資訊,此節點為臨時節點,如果broker節點宕機,該節點丟失。
叢集控制器負責監聽ids
節點,⼀旦節點⼦節點傳送變化,叢集控制器得到通知。
控制器遍歷這些Follower副本分割區,並確定誰應該成為新Leader分割區,然後向所有包含新Leader分割區和現有Follower的 broker 傳送請求。該請求訊息包含了誰是新Leader副本分割區以及誰是Follower副本分割區的資訊。隨後,新Leader分割區開始處理來⾃⽣產者和消費者的請求,⽽跟隨者開始從新Leader副本分割區消費訊息。
當控制器發現⼀個 broker 加⼊叢集時,它會使⽤ broker ID 來檢查新加⼊的 broker 是否包含現有分割區的副本。如果有,控制器就把變更通知傳送給新加⼊的 broker 和其他 broker,新 broker上的副本分割區開始從Leader分割區那⾥消費訊息,與Leader分割區保持同步。
結論:
--replication-factor 3
,表示分割區的副本數,不要超過broker的數量。replica.lag.time.max.ms
預設值:10000)。acks=all
。Follower收到訊息後,會像Leader傳送ACK。⼀旦Leader收到了ISR中所有Replica的ACK,Leader就commit,那麼Leader就向Producer傳送ACK。當某個topic的--replication-factor
為N(N>1)時,每個Partition都有N個副本,稱作replica。原則上是將replica均勻的分配到整個叢集上。不僅如此,partition的分配也同樣需要均勻分配。為了更好的負載均衡。
副本分配的三個⽬標:
在不考慮機架資訊的情況下:
Leader的選舉:
如果Leader宕機了該怎麼辦?很容易想到我們在Follower中重新選舉⼀個Leader,但是選舉哪個作為leader呢?Follower可能已經落後許多了,因此我們要選擇的是」最新」的Follow:新的Leader必須擁有與原來Leader commit過的所有資訊。
kafka動態維護⼀組同步leader資料的副本(ISR),只有這個組的成員才有資格當選leader,kafka副本寫⼊不被認為是已提交,直到所有的同步副本已經接收才認為。這組ISR儲存在zookeeper,正因為如此,在ISR中的任何副本都有資格當選leader。
基於Zookeeper的選舉⽅式:
⼤資料很多元件都有Leader選舉的概念,如HBASE等。它們⼤都基於ZK進⾏選舉,所有Follow都在ZK上⾯註冊⼀個Watch,⼀旦Leader宕機,Leader對應的Znode會⾃動刪除,那些Follow由於在Leader節點上註冊了Watcher,故可以得到通知,就去參與下⼀輪選舉,嘗試去建立該節點,ZK會保證只有⼀個Follow建立成功,成為新的Leader。
但是這種⽅式有⼏個缺點:
基於Controller的選舉⽅式:
Kafka 0.8後的Leader Election⽅案解決了上述問題,它在所有broker中選出⼀個controller,所有Partition的Leader選舉都由controller決定。controller會將Leader的改變直接通過RPC的⽅式(⽐ZooKeeper Queue的⽅式更⾼效)通知需為為此作為響應的Broker。同時controller也負責增刪Topic以及Replica的重新分配。
如何處理Replica的恢復:
如果Replica都死了怎麼辦?
只要⾄少有⼀個replica,就能保證資料不丟失,可是如果某個partition的所有replica都死了怎麼辦?有兩種⽅案:
unclean.leader.election.enable
禁⽤它。Broker宕機怎麼辦?
Controller在Zookeeper的/brokers/ids節點上註冊Watch。⼀旦有Broker宕機,其在Zookeeper對應的Znode會⾃動被刪除,Zookeeper會fire Controller註冊的Watch,Controller即可獲取最新的倖存的Broker列表。
Controller決定set_p,該集合包含了宕機的所有Broker上的所有Partition。
對set_p中的每⼀個Partition:
從/brokers/topics/[topic]/partitions/[partition]/state
讀取該Partition當前的ISR。
決定該Partition的新Leader。如果當前ISR中有⾄少⼀個Replica還倖存,則選擇其中⼀個作為新Leader,新的ISR則包含當前ISR中所有幸存的Replica。否則選擇該Partition中任意⼀個倖存的Replica作為新的Leader以及ISR(該場景下可能會有潛在的資料丟失)。如果該Partition的所有Replica都宕機了,則將新的Leader設定為-1。
將新的Leader,ISR和新的leader_epoch及controller_epoch寫⼊/brokers/topics/[topic]/partitions/[partition]/state
。
[zk: localhost:2181(CONNECTED) 13] get /brokers/topics/bdstar/partitions/0/state
{"controller_epoch":1272,"leader":0,"version":1,"leader_epoch":4,"isr":[0,2]}
直接通過RPC向set_p相關的Broker傳送LeaderAndISRRequest命令。Controller可以在⼀個RPC操作中傳送多個命令從⽽提⾼效率。
Controller宕機怎麼辦?
每個Broker都會在/controller上註冊⼀個Watch。
[zk: localhost:2181(CONNECTED) 19] get /controller
{"version":1,"brokerid":1...............}
當前Controller宕機時,對應的/controller會⾃動消失。所有「活」著的Broker競選成為新的Controller,會建立新的Controller Path
[zk: localhost:2181(CONNECTED) 19] get /controller
{"version":1,"brokerid":2...............}
注意:只會有⼀個競選成功(這點由Zookeeper保證)。競選成功者即為新的Leader,競選失敗者則重新在新的Controller Path上註冊Watch。因為Zookeeper的Watch是⼀次性的,被fire⼀次之後即失效,所以需要重新註冊
Kafka中,⼀個主題可以有多個分割區,增強主題的可延伸性,為了保證靠可⽤,可以為每個分割區設定副本數。
只有Leader副本可以對外提供讀寫服務,Follower副本只負責poll Leader副本的資料,與Leader副本保持資料的同步。
系統維護⼀個ISR副本集合,即所有與Leader副本保持同步的副本列表。
當Leader宕機找不到的時候,就從ISR列表中挑選⼀個分割區做Leader。如果ISR列表中的副本都找不到了,就剩下OSR的副本了。
此時,有兩個選擇:要麼選擇OSR的副本做Leader,優點是可以⽴即恢復該分割區的服務。缺點是可能會丟失資料。
要麼選擇等待,等待ISR列表中的分割區副本可⽤,就選擇該可⽤ISR分割區副本做Leader。優點是不會丟失資料缺點是會影響當前分割區的可⽤性。
水位標記:
⽔位或⽔印(watermark)⼀詞,表示位置資訊,即位移(offset)。Kafka原始碼中使⽤的名字是⾼⽔位,HW(high watermark)。
副本⻆⾊:
Kafka分割區使⽤多個副本(replica)提供⾼可⽤。
LEO和HW:
每個分割區副本物件都有兩個重要的屬性:LEO和HW。
上圖中,HW值是7,表示位移是07的所有訊息都已經處於「已提交狀態」(committed),⽽LEO值是14,813的訊息就是未完全備份(fully replicated)——為什麼沒有14?LEO指向的是下⼀條訊息到來時的位移。
消費者⽆法消費分割區下Leader副本中位移⼤於分割區HW的訊息。
Follower副本不停地向Leader副本所在的broker傳送FETCH請求,⼀旦獲取訊息後寫⼊⾃⼰的⽇志中進⾏備份。那麼Follower副本的LEO是何時更新的呢?Kafka有兩套Follower副本LEO:
Kafka使⽤前者幫助Follower副本更新其HW值;利⽤後者幫助Leader副本更新其HW。
Follower副本的本地LEO何時更新?
Follower副本的LEO值就是⽇志的LEO值,每當新寫⼊⼀條訊息,LEO值就會被更新。當Follower傳送FETCH請求後,Leader將資料返回給Follower,此時Follower開始Log寫資料,從⽽⾃動更新LEO值。
Leader端Follower的LEO何時更新?
Leader端的Follower的LEO更新發⽣在Leader在處理Follower FETCH請求時。⼀旦Leader接收到Follower傳送的FETCH請求,它先從Log中讀取相應的資料,給Follower返回資料前,先更新Follower的LEO。
Follower更新HW發⽣在其更新LEO之後,⼀旦Follower向Log寫完資料,嘗試更新⾃⼰的HW值。
⽐較當前LEO值與FETCH響應中Leader的HW值,取兩者的⼩者作為新的HW值。
即:如果Follower的LEO⼤於Leader的HW,Follower HW值不會⼤於Leader的HW值。
和Follower更新LEO相同,Leader寫Log時⾃動更新⾃⼰的LEO值。
Leader的HW值就是分割區HW值,直接影響分割區資料對消費者的可⻅性 。
Leader會嘗試去更新分割區HW的四種情況:
結論:
當Kafka broker都正常⼯作時,分割區HW值的更新時機有兩個:
Leader如何更新⾃⼰的HW值?Leader broker上儲存了⼀套Follower副本的LEO以及⾃⼰的LEO。當嘗試確定分割區HW時,它會選出所有滿⾜條件的副本,⽐較它們的LEO(包括Leader的LEO),並選擇最⼩的LEO值作為HW值。
需要滿⾜的條件,(⼆選⼀):
replica.lag.time.max.ms
引數值(預設是10s)如果Kafka只判斷第⼀個條件的話,確定分割區HW值時就不會考慮這些未在ISR中的副本,但這些副本已經具備了「⽴刻進⼊ISR」的資格,因此就可能出現分割區HW值越過ISR中副本LEO的情況——不允許。因為分割區HW定義就是ISR中所有副本LEO的最⼩值。
我們假設有⼀個topic,單分割區,副本因⼦是2,即⼀個Leader副本和⼀個Follower副本。我們看下當producer傳送⼀條訊息時,broker端的副本到底會發⽣什麼事情以及分割區HW是如何被更新的。
初始時Leader和Follower的HW和LEO都是0(嚴格來說原始碼會初始化LEO為-1,不過這不影響之後的討論)。Leader中的Remote LEO指的就是Leader端儲存的Follower LEO,也被初始化成0。此時,⽣產者沒有傳送任何訊息給Leader,⽽Follower已經開始不斷地給Leader傳送FETCH請求了,但因為沒有資料因此什麼都不會發⽣。值得⼀提的是,Follower傳送過來的FETCH請求因為⽆資料⽽暫時會被寄存到Leader端的purgatory中,待500ms (replica.fetch.wait.max.ms
引數)超時後會強制完成。倘若在寄存期間⽣產者發來資料,則Kafka會⾃動喚醒該FETCH請求,讓Leader繼續處理。
producer給該topic分割區傳送了⼀條訊息
此時的狀態如下圖所示:
如上圖所示,Leader接收到PRODUCE請求主要做兩件事情:
PRODUCE請求處理完成後各值如下,Leader端的HW值依然是0,⽽LEO是1,Remote LEO也是0。
屬性 | 階段 | 舊值 | 新值 | 備註 |
---|---|---|---|---|
Leader LEO | PRODUCE處理完成 | 0 | 1 | 寫⼊了⼀條資料 |
Remote LEO | PRODUCE處理完成 | 0 | 0 | 還未Fetch |
Leader HW | PRODUCE處理完成 | 0 | 0 | min(LeaderLEO=1, RemoteLEO=0)=0 |
Follower LEO | PRODUCE處理完成 | 0 | 0 | 還未Fetch |
Follower HW | PRODUCE處理完成 | 0 | 0 | min(LeaderHW=0, FollowerLEO=0)=0 |
假設此時follower傳送了FETCH請求,則狀態變更如下:
本例中當follower傳送FETCH請求時,Leader端的處理依次是:
⽽Follower副本接收到FETCH Response後依次執⾏下列操作:
此時,第⼀輪FETCH RPC結束,我們會發現雖然Leader和Follower都已經在Log中儲存了這條訊息,但分割區HW值尚未被更新,仍為0。
屬性 | 階段 | 舊值 | 新值 | 備註 |
---|---|---|---|---|
Leader LEO | PRODUCE和Follower FETCH處理完成 | 0 | 1 | 寫⼊了⼀條資料 |
Remote LEO | PRODUCE和Follower FETCH處理完成 | 0 | 0 | 第⼀次fetch中offset為0 |
Leader HW | PRODUCE和Follower FETCH處理完成 | 0 | 0 | min(LeaderLEO=1,RemoteLEO=0)=0 |
Follower LEO | PRODUCE和Follower FETCH處理完成 | 0 | 1 | 同步了⼀條資料 |
Follower HW | PRODUCE和Follower FETCH處理完成 | 0 | 0 | min(LeaderHW=0,FollowerLEO=1)=0 |
Follower第⼆輪FETCH
分割區HW是在第⼆輪FETCH RPC中被更新的,如下圖所示:
Follower發來了第⼆輪FETCH請求,Leader端接收到後仍然會依次執⾏下列操作:
同樣地,Follower副本接收到FETCH response後依次執⾏下列操作:
屬性 | 階段 | 舊值 | 新值 | 備註 |
---|---|---|---|---|
Leader LEO | 第⼆次Follower FETCH處理完成 | 1 | 1 | 未寫⼊新資料 |
Remote LEO | 第⼆次Follower FETCH處理完成 | 0 | 1 | 第2次fetch中offset為1 |
Leader HW | 第⼆次Follower FETCH處理完成 | 0 | 1 | min(RemoteLEO,LeaderLEO)=1 |
Follower LEO | 第⼆次Follower FETCH處理完成 | 1 | 1 | 未寫⼊新資料 |
Follower HW | 第⼆次Follower FETCH處理完成 | 0 | 1 | 第2次fetch resp中的LeaderHW和本地FollowerLEO都是1 |
此時訊息已經成功地被複制到Leader和Follower的Log中且分割區HW是1,表明消費者能夠消費offset = 0的訊息。
當Leader⽆法⽴即滿⾜FECTH返回要求的時候(⽐如沒有資料),那麼該FETCH請求被暫存到Leader端的purgatory中(煉獄),待時機成熟嘗試再次處理。Kafka不會⽆限期快取,預設有個超時時間(500ms),⼀旦超時時間已過,則這個請求會被強制完成。當寄存期間還沒超時,⽣產者傳送PRODUCE請求從⽽使之滿⾜了條件以致被喚醒。此時,Leader端處理流程如下:
Kafka使⽤HW值來決定副本備份的進度,⽽HW值的更新通常需要額外⼀輪FETCH RPC才能完成。但這種設計是有問題的,可能引起的問題包括:
使⽤HW值來確定備份進度時其值的更新是在下⼀輪RPC中完成的。如果Follower副本在標記上⽅的的第⼀步與第⼆步之間發⽣崩潰,那麼就有可能造成資料的丟失。
上圖中有兩個副本:A和B。開始狀態是A是Leader。
假設⽣產者min.insync.replicas
為1,那麼當⽣產者傳送兩條訊息給A後,A寫⼊Log,此時Kafka會通知⽣產者這兩條訊息寫⼊成功。
代 | 屬性 | 階段 | 舊值 | 新值 | 備註 |
---|---|---|---|---|---|
1 | Leader LEO | PRODUCE和Follower FETCH處理完成 | 0 | 1 | 寫⼊了⼀條資料 |
1 | Remote LEO | PRODUCE和Follower FETCH處理完成 | 0 | 0 | 第⼀次fetch中offset為0 |
1 | Leader HW | PRODUCE和Follower FETCH處理完成 | 0 | 0 | min(LeaderLEO=1,FollowerLEO=0)=0 |
1 | Follower LEO | PRODUCE和Follower FETCH處理完成 | 0 | 1 | 同步了⼀條資料 |
1 | Follower HW | PRODUCE和Follower FETCH處理完成 | 0 | 0 | min(LeaderHW=0, FollowerLEO=1)=0 |
2 | Leader LEO | 第⼆次Follower FETCH處理完成 | 1 | 2 | 寫⼊了第⼆條資料 |
2 | Remote LEO | 第⼆次Follower FETCH處理完成 | 0 | 1 | 第2次fetch中offset為1 |
2 | Leader HW | 第⼆次Follower FETCH處理完成 | 0 | 1 | min(RemoteLEO=1,LeaderLEO=2)=1 |
2 | Follower LEO | 第⼆次Follower FETCH處理完成 | 1 | 2 | 寫⼊了第⼆條資料 |
2 | Follower HW | 第⼆次Follower FETCH處理完成 | 0 | 1 | min(LeaderHW=1,FollowerLEO=2)=1 |
3 | Leader LEO | 第三次Follower FETCH處理完成 | 2 | 2 | 未寫⼊新資料 |
3 | Remote LEO | 第三次Follower FETCH處理完成 | 1 | 2 | 第3次fetch中offset為2 |
3 | Leader HW | 第三次Follower FETCH處理完成 | 1 | 2 | min(RemoteLEO=2,LeaderLEO)=2 |
3 | Follower LEO | 第三次Follower FETCH處理完成 | 2 | 2 | 未寫⼊新資料 |
3 | Follower HW | 第三次Follower FETCH處理完成 | 1 | 2 | 第3次fetch resp中的LeaderHW和本地FollowerLEO都是2 |
但是在broker端,Leader和Follower的Log雖都寫⼊了2條訊息且分割區HW已經被更新到2,但Follower HW尚未被更新還是1,也就是上⾯標記的第⼆步尚未執⾏,表中最後⼀條未執⾏。
倘若此時副本B所在的broker宕機,那麼重啟後B會⾃動把LEO調整到之前的HW值1,故副本B會做⽇志截斷(log truncation),將offset = 1的那條訊息從log中刪除,並調整LEO = 1。此時follower副本底層log中就只有⼀條訊息,即offset = 0的訊息!
B重啟之後需要給A發FETCH請求,但若A所在broker機器在此時宕機,那麼Kafka會令B成為新的Leader,⽽當A重啟回來後也會執⾏⽇志截斷,將HW調整回1。這樣,offset=1的訊息就從兩個副本的log中被刪除,也就是說這條已經被⽣產者認為傳送成功的資料丟失。
丟失資料的前提是min.insync.replicas=1
時,⼀旦訊息被寫⼊Leader端Log即被認為是committed。延遲⼀輪FETCH RPC更新HW值的設計使follower HW值是非同步延遲更新,若在這個過程中Leader發⽣變更,那麼成為新Leader的Follower的HW值就有可能是過期的,導致⽣產者本是成功提交的訊息被刪除。
除了可能造成的資料丟失以外,該設計還會造成Leader的Log和Follower的Log資料不⼀致。
如Leader端記錄序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
看圖:
假設:A是Leader,A的Log寫⼊了2條訊息,但B的Log只寫了1條訊息。分割區HW更新到2,但B的HW還是1,同時⽣產者min.insync.replicas
仍然為1。
假設A和B所在Broker同時宕機,B先重啟回來,因此B成為Leader,分割區HW = 1。假設此時⽣產者傳送了第3條訊息(紅⾊表示)給B,於是B的log中offset = 1的訊息變成了紅框表示的訊息,同時分割區HW更新到2(A還沒有回來,就B⼀個副本,故可以直接更新HW⽽不⽤理會A)之後A重啟回來,需要執⾏⽇志截斷,但發現此時分割區HW=2⽽A之前的HW值也是2,故不做任何調整。此後A和B將以這種狀態繼續正常⼯作。
顯然,這種場景下,A和B的Log中儲存在offset = 1的訊息是不同的記錄,從⽽引發不⼀致的情形出現。
造成上述兩個問題的根本原因在於
但HW值的更新是非同步延遲的,特別是需要額外的FETCH請求處理流程才能更新,故這中間發⽣的任何崩潰都可能導致HW值的過期。
Kafka從0.11引⼊了leader epoch
來取代HW值。Leader端使⽤記憶體儲存Leader的epoch資訊,即使出現上⾯的兩個場景也能規避這些問題。
所謂Leader epoch實際上是⼀對值:<epoch, offset>:
epoch表示Leader的版本號,從0開始,Leader變更過1次,epoch+1
offset對應於該epoch版本的Leader寫⼊第⼀條訊息的offset。因此假設有兩對值:
<0, 0>
<1, 120>
則表示第⼀個Leader從位移0開始寫⼊訊息;共寫了120條[0, 119];⽽第⼆個Leader版本號是1,從位移120處開始寫⼊訊息。
只需要知道每個副本都引⼊了新的狀態來儲存⾃⼰當leader時開始寫⼊的第⼀條訊息的offset以及leader版本。這樣在恢復的時候完全使⽤這些資訊⽽⾮HW來判斷是否需要截斷⽇志。
依靠Leader epoch的資訊可以有效地規避資料不⼀致的問題。
訊息重複主要發⽣在以下三個階段:
⽣產傳送的訊息沒有收到正確的broke響應,導致producer重試。
producer發出⼀條訊息,broke落盤以後因為⽹絡等種種原因傳送端得到⼀個傳送失敗的響應或者⽹絡中斷,然後producer收到⼀個可恢復的Exception重試訊息導致訊息重複。
說明:
new KafkaProducer()
後建立一個執行緒 KafkaThread
掃描RecordAccumulator
中是否有訊息KafkaProducer.send()
傳送訊息,實際上只是把訊息儲存到RecordAccumulator
中KafkaThread
掃描到RecordAccumulator
中有訊息後,將訊息傳送到Kafka叢集RecordAccumulator
中,等待後臺執行緒KafkaThread
掃描再次傳送異常是 RetriableException
型別或者 TransactionManager
允許重試;RetriableException
類繼承關係如下:
如果設定max.in.flight.requests.per.connection
大於1 (預設5, 單個連線.上傳送的未確認請求的最大數量,表示上一個發出的請求沒有確認下一個請求又發出了)。大於1可能會改變記錄的順序,因為如果將兩個batch傳送到單個分割區,第一個batch處理失敗並重試, 但是第二個batch處理成功,那麼第二個batch處理中的記錄可能先出現被消費。
設定max.in.flight.requests.per.connection
為1,可能會影響吞吐量,可以解決單個生產者傳送順序問題。如果多個生產者,生產者1先傳送一一個請求, 生產者2後傳送請求,此時生產者1返回可恢復異常,重試一定次數成功了。雖然生產者1先傳送訊息,但生產者2傳送的訊息會被先消費。
啟動Kafka的冪等性
要啟動Kafka的冪等性,設定enable.idempotence=true
,以及ack=all
和retries>1
ack=0,不重試
可能會丟失訊息,適用於吞吐量指標重要性高於資料丟失,如:紀錄檔收集
ack=0,不重試
生產者傳送訊息完畢,不管結果,如果傳送失敗也就丟失了
ack=1,Leader crash
生產者傳送訊息完畢,只等待Leader寫入成功就返回了,Leader 分割區丟失了,此時Follower沒來得及同步,訊息丟失
unclean.leader.election.enable
設定true
允許選舉ISR以外的副本作為leader,會導致資料丟失,預設為false。 生產者傳送非同步訊息,只等待Lead寫入成功就返回,Leader分割區丟失,此時ISR中沒有Follower, Leader從OSR中選舉,因為OSR中本來落後於Leader造成訊息丟失。
禁用unclean選舉,ack=all
ack=all / -1,tries > 1,unclean.leader.election.enable:false
生產者發完訊息,等待Follower同步完再返回, 如果異常則重試。副本的數量可能影響吞吐量,不超過5個,一般三個。
不允許unclean Leader選舉。
設定:min.insync.replicas>1
當生產者將acks
設定為all (或-1 )時,min.insync.replicas>1
。指定確認訊息寫成功需要的最小副本數量。達不到這個最小值,生產者將引發-個異常(要麼是NotEnoughReplicas
, 要麼是NotEnoughReplicasAfterAppend
)。
當一起使用時,min.insync.replicas
和ack
允許執行更大的永續性保證。一個典型的場景 是建立一個複製因子為3的主題,設定min.insync
複製到2個, 用 all 設定傳送。將確保如果大多數副本沒有收到寫操作,則生產者將引發異常。
失敗的 offset 單獨記錄
生產者傳送訊息,會自動重試,遇到不可恢復異常會丟擲,這時可以捕獲異常記錄到資料庫或快取,進行單獨處理。
根本原因
資料消費完沒及時提交 offset 到 broker
場景
訊息消費端在消費過程中掛掉沒有及時提交offset到broke,另一個消費端啟動拿之前記錄的offset開始消費,由於offset的滯後性可能會導致新啟動的使用者端有少量重複消費。
取消自動提交
每次消費完或者程式退出時手動提交。這可能也沒法保證一條不重複
下游做冪等
一般是讓 下游做冪等或者儘量每消費-條訊息都記錄offset, 對於少數嚴格的場景可能需要把offset或唯一ID (例如訂單ID)和下游狀態更新放在同一個資料庫裡面做事務來保證精確的一次更新或者在下游資料表裡面同時記錄消費offset,然後更新下游資料的時候用消費位移做樂觀鎖拒絕舊位移的資料更新。
Zookeeper不適合⼤批次的頻繁寫⼊操作。
Kafka 1.0.2將consumer的位移資訊儲存在Kafka內部的topic中,即__consumer_offsets
主題,並且預設提供了kafka_consumer_groups.sh
指令碼供⽤戶檢視consumer資訊。
建立topic 「tp_test_01」
[root@node1 ~]# kafka-topics.sh --zookeeper node1:2181/myKafka --create --topic tp_test_01 --partitions 5 --replication-factor 1
使⽤kafka-console-producer.sh指令碼⽣產訊息
[root@node1 ~]# for i in `seq 100`; do echo "hello lagou $i" >> messages.txt; done
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
由於預設沒有指定key,所以根據round-robin⽅式,訊息分佈到不同的分割區上。 (本例中⽣產了100條訊息)
驗證訊息⽣產成功
[root@node1 ~]# kafka-console-producer.sh --broker-list node1:9092 --topic tp_test_01 < messages.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
[root@node1 ~]# kafka-run-class.sh kafka.tools.GetOffsetShell --brokerlist node1:9092 --topic tp_test_01 --time -1
tp_test_01:2:20
tp_test_01:4:20
tp_test_01:1:20
tp_test_01:3:20
tp_test_01:0:20
[root@node1 ~]#
結果輸出表明100條訊息全部⽣產成功!
建立⼀個console consumer group
[root@node1 ~]#kafka-console-consumer.sh --bootstrap-server node1:9092 --topic tp_test_01 --from-beginning
獲取該consumer group的group id(後⾯需要根據該id查詢它的位移資訊)
[root@node1 ~]# kafka-consumer-groups.sh --bootstrap-server node1:9092 --list
查詢__consumer_offsets topic所有內容
注意:運⾏下⾯命令前先要在consumer.properties
中設定exclude.internal.topics=false
[root@node1 ~]# kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning
預設情況下__consumer_offsets
有50個分割區,如果你的系統中consumer group
也很多的話,那麼這個命令的輸出結果會很多。
計算指定consumer group在__consumer_offsets topic中分割區資訊
這時候就⽤到了第5步獲取的group.id
(本例中是console-consumer-49366
)。Kafka會使⽤下⾯公式計算該group位移儲存在__consumer_offsets
的哪個分割區上:
Math.abs(groupID.hashCode()) % numPartitions
對應的分割區=Math.abs("console-consumer-49366".hashCode()) % 50 = 19
,即__consumer_offsets
的分割區19儲存了這個consumer group
的位移資訊。
獲取指定consumer group的位移資訊
[root@node1 ~]# kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 19 --broker-list node1:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
下⾯是輸出結果:
...
[console-consumer-49366,tp_test_01,3]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,4]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
[console-consumer-49366,tp_test_01,0]::[OffsetMetadata[20,NO_METADATA],CommitTime 1596424702212,ExpirationTime 1596511102212]
...
上圖可⻅,該consumer group
果然儲存在分割區11上,且位移資訊都是對的(這⾥的位移資訊是已消費的位移,嚴格來說不是第3步中的位移。由於我的consumer已經消費完了所有的訊息,所以這⾥的位移與第3步中的位移相同)。另外,可以看到__consumer_offsets topic
的每⼀⽇志項的格式都是:[Group, Topic, Partition]::[OffsetMetadata[Offset, Metadata], CommitTime, ExpirationTime]
。