現在假定這麼一個業務場景,從kafka
中的topic
獲取訊息資料,經過一定加工處理後,傳送到另外一個topic
中,要求整個過程訊息不能丟失,也不能重複傳送,即實現端到端的Exactly-Once
精確一次訊息投遞。這該如何實現呢?
針對上面的業務場景,kafka已經替我們想到了,在kafka 0.11版本以後,引入了一個重大的特性:冪等性和事務。
這裡提到冪等性的原因,主要是因為事務的啟用必須要先開啟冪等性,那麼什麼是冪等性呢?
冪等性是指生產者無論向kafka broker
傳送多少次重複的資料,broker
端只會持久化一條,保證資料不會重複。
冪等性通過生產者設定項enable.idempotence=true
開啟,預設情況下為true。
冪等性實現原理
<PID, Partition, SeqNumber>
組成。PID
:ProducerID
,每個生產者啟動時,Kafka 都會給它分配一個 ID
,ProducerID
是生產者的唯一標識,需要注意的是,Kafka
重啟也會重新分配 PID
。Partition
:訊息需要發往的分割區號。SeqNumber
:生產者,他會記錄自己所傳送的訊息,給他們分配一個自增的 ID
,這個 ID
就是 SeqNumber
,是該訊息的唯一標識,每傳送一條訊息,序列號加 1。冪等性缺點
根據冪等性的原理,我們發現它存在下面的缺點:
PID
,還是有可能產生重複的資料那麼如何實現跨分割區、kafka broker重啟也能保證不重複呢?這就要使用事務了。
所謂事務,就是要求保證原子性,要麼全部成功,要麼全部失敗。那麼具體該如何開啟呢?
kafka
要想開啟事務必須要啟用冪等性,即生產者設定enable.idempotence=true
kafka
生產者需要設定唯一的事務idtransactional.id
, 最好為其設定一個有意義的名字。kafka
消費端也有一個設定項isolation.level
和事務有很大關係。read_uncommitted
:預設值,消費端應用可以看到(消費到)未提交的事務,當然對於已提交的事務也是可見的。read_committed
:消費端應用只能消費到提交的事務內的訊息。現在我們用java的api來實現一下前面這個「消費-處理-生產「的例子吧。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
</dependency>
Properties prodcuerProps = new Properties();
// kafka地址
prodcuerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
// key序列化
prodcuerProps.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// value序列化
prodcuerProps.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
// 啟用冪等性
producerProps.put("enable.idempotence", "true");
// 設定事務id
producerProps.put("transactional.id", "prod-1");
KafkaProducer<String, String> producer = new KafkaProducer(prodcuerProps);
enable.idempotence
設定專案為truetransactional.id
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "localhost:9092");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put("group.id", "my-group-id");
// 設定consumer手動提交
consumerProps.put("enable.auto.commit", "false");
// 設定隔離級別,讀取事務已提交的訊息
consumerProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
//訂閱主題
consumer.subscribe(Collections.singletonList("topic1"));
enable.auto.commit=false
,設定手動提交消費者offset
isolation.level=read_committed
,消費事務已提交的訊息// 初始化事務
producer.initTransactions();
while(true) {
// 拉取訊息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000L));
if(!records.isEmpty()){
// 準備一個 hashmap 來記錄:"分割區-消費位移" 鍵值對
HashMap<TopicPartition, OffsetAndMetadata> offsetsMap = new HashMap<>();
// 開啟事務
producer.beginTransaction();
try {
// 獲取本批訊息中所有的分割區
Set<TopicPartition> partitions = records.partitions();
// 遍歷每個分割區
for (TopicPartition partition : partitions) {
// 獲取該分割區的訊息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
// 遍歷每條訊息
for (ConsumerRecord<String, String> record : partitionRecords) {
// 執行資料的業務處理邏輯
ProducerRecord<String, String> outRecord = new ProducerRecord<>("topic2", record.key(), record.value().toUpperCase());
// 將處理結果寫入 kafka
producer.send(outRecord);
}
// 將處理完的本分割區對應的消費位移記錄到 hashmap 中
long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
// 事務提交的是即將到來的偏移量,這意味著我們需要加 1
offsetsMap.put(partition,new OffsetAndMetadata(offset+1));
}
// 向事務管理器提交消費位移
producer.sendOffsetsToTransaction(offsetsMap,"groupid");
// 提交事務
producer.commitTransaction();
} catch(Exeception e) {
e.printStackTrace();
// 終止事務
producer.abortTransaction();
}
}
}
initTransactions()
: 初始化事務beginTransaction()
: 開啟事務sendOffsetsToTransaction()
: 在事務內提交已經消費的偏移量(主要用於消費者)commitTransaction()
: 提交事務abortTransaction()
: 放棄事務kafka事務的實現引入了事務協調器,如下圖所示:
commit
或 abort
請求,等待 kafka 響應__transaction_state
中,__transaction_state
預設有50個分割區,每個分割區負責一部分事務。事務劃分是根據transactional.id
的hashcode
值%50
,計算出該事務屬於哪個分割區。 該分割區Leader
副本所在的broker節點即為這個transactional.id
對應的Transaction Coordinator
節點,這也是上面第一步中的計算邏輯。kafka broker
設定max.transaction.timeout.ms
之前既不提交也不中止事務, kafka broker
將中止事務本身。 此屬性的預設值為 15 分鐘。本文講解了通過kafka事務可以實現端到端的精確一次的訊息語意,通過事務機制,KAFKA 實現了對多個 topic
的多個 partition
的原子性的寫入,通過一個例子瞭解了一下如何使用事物。同時也簡單介紹了事務實現的原理,它底層必須要依賴kafka的冪等性機制,同時通過類似「二段提交」的方式保證事務的原子性。
歡迎關注個人公眾號【JAVA旭陽】交流學習!
本文來自部落格園,作者:JAVA旭陽,轉載請註明原文連結:https://www.cnblogs.com/alvinscript/p/17459920.html