Kafka事務原理剖析

2022-11-23 12:00:43

一、事務概覽

提起事務,我們第一印象可能就是ACID,需要滿足原子性、一致性、事務隔離級別等概念,那kafka的事務能做到什麼程度呢?我們首先看一下如何使用事務

Producer端程式碼如下

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
producer.beginTransaction();

ProducerRecord<String, String> kafkaMsg1 = new ProducerRecord<>(TOPIC1, "msg val");
producer.send(kafkaMsg1);
ProducerRecord<String, String> kafkaMsg2 = new ProducerRecord<>(TOPIC2, "msg val");
producer.send(kafkaMsg2);

producer.commitTransaction();

Consumer端不需要做特殊處理,跟消費普通訊息一樣

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(String.format("Consume partition:%d offset:%d", record.partition(), record.offset()));
    }
}

1.1、事務設定

那需要如何設定呢?

Producer

Consumer

transactional.id

事務ID,型別為String字串,預設為空,使用者端自定義,例如"order_bus"

isolation.level

事務隔離級別,預設為空,開啟事務的話,需要將其設定為"read_committed"

enable.idempotence

訊息冪等開關,true/false,預設為false,當設定了transactional.id,此項一定要設定為true,否則會丟擲使用者端設定異常

   

transaction.timeout.ms

事務超時時間,預設為10秒,最長為15分鐘

   

enable.idempotence設定為true時,kafka會檢查如下一些級聯設定

設定項

內容要求

說明

acks

要求此設定項必須設定為all

響應必須要設定為all,也就是leader儲存訊息,並且所有follower也儲存了訊息後再返回,保證訊息的可靠性

retries

> 0

因為冪等特性保證了資料不會重複,在需要強可靠性的前提下,需要使用者設定的重試次數 > 0

max.in.flight.requests.per.connection

<= 5

此項設定是表明在producer還未收到broker應答的最大訊息批次數量。該值設定的越大,標識可允許的吞吐越高,同時也越容易造成訊息亂序

相關設定約束: org.apache.kafka.clients.producer.ProducerConfig#postProcessAndValidateIdempotenceConfigs()

1.2、事務描述

由此,可以出一張事務的概覽圖

一個簡單的事務可能就是這樣:

  • Producer開啟一個事務
  • 首先向Topic1傳送兩條訊息 msg_a、msg_b
  • 然後向Topic2傳送一條訊息msg_c
  • 提交事務

假設有2個消費端此時正在消費這兩個topic對應的分割區,在事務提交前,所有的事務訊息對兩個consumer均不可見,事務一旦提交,在同一時刻,consumer1可以看到a、b訊息,consumer2可看到c訊息(這裡首先作個申明,顯而易見,kafka實現的是分散式事務,既然是分散式事務就脫離不了CAP定理,而kafka的事務也只是做到了最終一致性,後文還會詳細展開

那麼整個事務是如何實現的呢?

二、事務流程

如上圖所示,整個事務流程分一下幾個步驟:

  • 事務初始化 initTransactions
  • 啟動事務 beginTransaction
  • 傳送訊息,一般傳送多條,向1個或多個topic producer.send
  • 事務提交 commitTransaction
  • 事務回滾abortTransaction
  • 消費事務訊息

當Producer傳送N多條事務的話

  • 事務初始化是一次性的
  • 而事務啟動、傳送訊息、事務提交/回滾則會一直迴圈執行

而這裡面很多步驟都是需要多個角色參與的,例如「事務初始化」,就需要Producer及Broker協同實現

三、事務初始化

事務初始化由Producer端觸發,程式碼為

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

事務初始化經歷了兩個階段:

  1. 定位TransactionCoordinator
  2. 初始化ProducerId

兩者是遞進關係,步驟2是嚴格依賴步驟1的,下面的流程圖示注了它們的呼叫關係

3.1、定位TransactionCoordinator

參與方:ProducerBroker

什麼是TransactionCoordinator?

TransactionCoordinator與GroupCoordinator類似,其本質也是一個後端的broker,只是這個broker起到了針對當前事物的協調作用,所有事務操作都需要直接傳送給這個指定的broker

剛開始的時候,Producer並不知道哪個broker是TransactionCoordinator,那麼目標broker是如何選擇出來的呢?

Producer雖然不知道Coordinato的地址,但是他有所有broker的連結串,因此初始化時,整體步驟如下:

  1. 向任意一個節點傳送獲取Coordinato的請求,引數中攜帶使用者端自定義的TransactionId;對應ApiKey為 ApiKeys.FIND_COORDINATOR
  2. Broker收到請求後,取TransactionId的hashCode,然後將其對50取模,(注:50為kafka內部topic __transaction_state的預設分割區數,該topic是kafka實現事務的關鍵,後文還會多次提及)獲取對應的Partition,該Partition從屬的Broker,即為TransactionCoordinator

獲取Partition程式碼如下: kafka.coordinator.transaction.TransactionStateManager#partitionFor()

def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount

3.2、初始化ProducerId

參與方:ProducerCoordinator

獲取TransactionCoordinator後,便需要向其傳送請求獲取ProducerIdEpoch,對應的API為ApiKeys.INIT_PRODUCER_ID。可以認為ProducerId+Epoch是對事物型Producer的唯一標識,後續向broker發起的請求,也都需要攜帶這兩個關鍵引數。這兩個引數含義如下

引數

型別

含義

ProducerId

Long

從0開始,對應Producer端設定的TransactionId,他們存在對映關係,可以通過TransactionId來查詢ProducerId;對映關係儲存在kafka內部topic __transaction_state

Epoch

Short

從0開始,Producer每次重啟,此項值都會+1;當超過short最大值後,ProducerId+1

比如當前的ProducerId為2000,Epoch為10,Producer重啟後,ProducerId為2000不變,Epoch變為11;如果此時Broker端再次收到epoch為10的資料,那麼將會認為是過期資料不予處理

由此可見ProducerId與Epoch是持久化在Broker端的,主要目的就是為了應對Coordinator宕機;接下來就要引出非常重要的一個kafka內部compact topic:__transaction_state

__transaction_state 是一個compact topic,即最新key對應的value內容會將舊值覆蓋,可以簡單將其看做一個KV儲存

Key

Value

TransactionId

producerId

8

從0開始,依次遞增

epoch

2

從0開始,依次遞增

transactionTimeoutMs

4

事務超時時間,預設10秒,最大15分鐘

transactionStatus

1

事務狀態(

0-Empty 事務剛開始時init是這個狀態

1-Ongoing

2-PrepareCommit

3-PrepareAbort

4-CompleteCommit

5-CompleteAbort

6-Dead

7-PrepareEpochFence

topicTotalNum

4

當前事務關聯的所有topic總和

topicNameLen

2

topic長度

topicName

X

topic內容

partitionNum

4

partition的個數

partitionIds

X

例如有n個partition,X = n * 4,每個partition佔用4 byte

transactionLastUpdateTimestampMs

8

最近一次事務操作的更新時間戳

transactionStartTimestampMs

8

事務啟動的時間戳

這個Topic的可以讓broker隨時檢視事務的當前狀態,以及是否超時

相關程式碼 scala/kafka/coordinator/transaction/TransactionLog.scala#valueToBytes()

此步驟會讓Broker向__transaction_state中寫入一條資料(由於當前Coordinator是通過分割區數取模得到的,因此向topic寫入資料是直接寫入本地盤的,沒有網路開銷),事務狀態為Empty,同時向Producer返回ProducerId+Epoch。當前步驟在Broker端還有很多事務狀態異常的判斷,此處不再展開

四、事務啟動-Transaction Begin

參與方:Producer

程式碼範例

producer.beginTransaction();

注:此步驟Producer不會向Broker傳送請求,只是將原生的事務狀態修改為 State.IN_TRANSACTION

Broker也並沒有獨立的步驟來處理事務啟動,Broker在收到第一條訊息時,才認為事物啟動;那麼Kafka為何要設計這樣一個看起來很雞肋的功能呢?直接傳送訊息不行麼

一個正常的事務流程是這樣的:

  • a、初始化
  • b、事務開始
  • c、傳送訊息
  • d、事務提交

因為事務訊息可能是傳送多次的,每次通過producer.beginTransaction()開啟事務,可以使得程式碼更清晰,也更容易理解;因此多次傳送的順序會是這樣

  1. ab、c、d
  2. b、c、d
  3. b、c、d
  4. b、c、d
  5. ......

五、事務訊息傳送-Transaction Send Msg

參與方:ProducerBroker

事務訊息的傳送是非常非常重要的環節,不論是Producer端還是Broker端,針對事務都做了大量的工作,不過在闡述核心功能前,還是需要對一些基礎知識進行鋪墊

5.1、訊息協定

與RocketMQ不同,kafka訊息協定的組裝是在Producer端完成的,kafka訊息協定經歷了3個版本(v0、v1、v2)的迭代,我們看一下現存3個版本的協定對比

  • V0 版本相當整潔,不寫註釋都能明白每個欄位的含義,而且除了key、value外,其他欄位均為定長編碼。這裡簡單闡述下attribute欄位,該欄位的前3個bit用來標誌訊息壓縮型別,剩下5個bit為保留欄位
  • V1 版本只是新增了時間戳欄位,並啟用了attribute欄位的第4個bit,用來標誌timestamp欄位是訊息born的時間,還是儲存的時間

然而V2版本做了相當大的改動,甚至可以說是「面目全非」

V2版本引入了Record Batch的概念,同時也引入了可變長儲存型別(本文不再展開),同一個Producer的訊息會按照一定的策略歸併入同一個Record Batch中;如果兩個Producer,一個開啟事務,一個關閉事務,分別向同一個Topic的同一個Partititon傳送訊息,那麼存在在Broker端的訊息會長什麼樣呢?

可見,同一個Record Batch中的Producer id、epoch、訊息型別等都是一樣的,所以不存在同一個Batch中,既有事務訊息,又有非事務訊息;換言之,某個Batch,要麼是事務型別的,要麼是非事務型別的,這點相當重要,在Consumer端消費訊息時,還要依賴這個特性。因此在Producer端,即便是同一個程序內的2個producer範例,向同一個Topic的同一個Partition,一個傳送事務訊息,一個傳送普通訊息,兩者間隔傳送,這時會發現Record Batch的數量與訊息的數量相同,即一個Record Batch中只會存放一條訊息

5.2、訊息冪等

眾所周知,kafka是有訊息超時重試機制的,既然存在重試,那麼就有可能存在訊息重複

  1. Producer傳送Record Batch A
  2. Broker收到訊息後儲存並持久化下來,但是傳送給Producer的response網路超時
  3. Producer發現傳送訊息超時,便重新傳送該訊息
  4. Broker並不知道收到的訊息是重複訊息,故再次將其儲存下來,因此產生了重複資料

注:上述整個過程,Client的業務方並不知曉,重試邏輯由Producer內部控制,給業務方的感觀便是訊息傳送了一份,卻收到了兩份資料

kafka要實現事務語意的話,訊息重複肯定是接受不了的,因此保證訊息冪等也就成了事務的前置條件。如何實現冪等呢,比較直觀的思路便是給訊息編號,這樣Broker就可以判重了,事實上kafka也是這樣做的;在Producer啟動時,會進行初始化動作,此時會拿到(ProduceId+Epoch),然後在每條訊息上新增Sequence欄位(從0開始),之後的請求都會攜帶Sequence屬性

  • 如果存在重複的RecordBatch(通過produceId+epoch+sequence),那麼Broker會直接返回重複記錄,client收到後丟棄重複資料
    • scala/kafka/log/ProducerStateManager.scala#findDuplicateBatch()
  • 如果Broker收到的RecordBatch與預期不匹配,例如比預期Sequence小或者大,都會丟擲OutOfOrderSequenceException異常
    • 比預期Sequence小:這種請求就是典型的重複傳送,直接拒絕掉並扔出異常
    • 比預期Sequence大:因為設定了冪等引數後,max.in.flight.requests.per.connection 引數的設定最大值即為5,即Producer可能同時傳送了5個未ack的請求,Sequence較大的請求先來到了,依舊扔出上述異常

處理重複資料的關鍵程式碼如下 kafka.log.ProducerStateEntry#findDuplicateBatch()

  def findDuplicateBatch(batch: RecordBatch): Option[BatchMetadata] = {
    if (batch.producerEpoch != producerEpoch)
       None
    else
      batchWithSequenceRange(batch.baseSequence, batch.lastSequence)
  }

  // Return the batch metadata of the cached batch having the exact sequence range, if any.
  def batchWithSequenceRange(firstSeq: Int, lastSeq: Int): Option[BatchMetadata] = {
    val duplicate = batchMetadata.filter { metadata =>
      firstSeq == metadata.firstSeq && lastSeq == metadata.lastSeq
    }
    duplicate.headOption
  }

處理Sequence過大或過小程式碼如下 kafka.log.ProducerAppendInfo#checkSequence()

  private def checkSequence(producerEpoch: Short, appendFirstSeq: Int, offset: Long): Unit = {
    if (producerEpoch != updatedEntry.producerEpoch) {
      ......
    } else {
      ......
      // If there is no current producer epoch (possibly because all producer records have been deleted due to
      // retention or the DeleteRecords API) accept writes with any sequence number
      if (!(currentEntry.producerEpoch == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) {
        throw new OutOfOrderSequenceException(s"Out of order sequence number for producer $producerId at " +
          s"offset $offset in partition $topicPartition: $appendFirstSeq (incoming seq. number), " +
          s"$currentLastSeq (current end sequence number)")
      }
    }
  }

  private def inSequence(lastSeq: Int, nextSeq: Int): Boolean = {
    nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Int.MaxValue)
  }

然而單純依靠訊息冪等,真正能夠實現訊息不重複、訊息全域性冪等嗎?答案是否定的,假定這樣的一個前置條件: 「Produer傳送了一條冪等訊息,在收到ACK前重啟了

  • 新啟動的Produer範例會擁有新的Producer id,Broker並不能區分前後兩個Producer是同一個,因此此條訊息重發的話,就會產生訊息重複
  • 新啟動的Produer可能直接將此條訊息傳送給了其他Partition,Broker會將資料儲存在另外的這個Partition,這樣從全域性來看,這條訊息重複了

因此訊息冪等能只夠保證在單對談(session)單partition的場景下能保證訊息冪等

5.3、訊息傳送-Producer

參與方:ProducerBroker

Producer端在傳送訊息階段,Producer與Broker的互動分兩部分:

  1. 向當前事物的Coordinator傳送新增Partiton的請求
    1. 對應的API為ApiKeys.ADD_PARTITIONS_TO_TXN
    2. 這個請求同步傳送結束後,才會真正傳送訊息
  1. 向對應的分割區傳送訊息
    1. 對應的API為ApiKeys.PRODUCE

也是事務訊息比較影響效能的一個點,在每次真正傳送Record Batch訊息之前,都會向Coordinator同步傳送Partition,之後才會真正傳送訊息。而這樣做的好處也顯而易見,當Producer掛掉後,Broker是儲存了當前事物全量Partition列表的,這樣不論是事務提交還是回滾,亦或是事務超時取消,Coordinator都擁有絕對的主動權

貼少量關鍵原始碼(本人不太喜歡大篇幅貼上原始碼,這樣會破會行文的連貫性,相信讀者也不會通過此文去翻看原始碼。不過在不影響閱讀的前提下,本文還是會黏貼一些關鍵程式碼

這裡是訊息確定了最終Partition後,向transactionManager註冊

  • org/apache/kafka/clients/producer/KafkaProducer.java#doSend()
// Add the partition to the transaction (if in progress) after it has been successfully
// appended to the accumulator. We cannot do it before because the partition may be
// unknown or the initially selected partition may be changed when the batch is closed
// (as indicated by `abortForNewBatch`). Note that the `Sender` will refuse to dequeue
// batches from the accumulator until they have been added to the transaction.
if (transactionManager != null) {
    transactionManager.maybeAddPartition(appendCallbacks.topicPartition());
}

Sender執行緒構建add partition請求

  • org/apache/kafka/clients/producer/internals/Sender.java#maybeSendAndPollTransactionalRequest()
TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequest(accumulator.hasIncomplete());
if (nextRequestHandler == null)
    return false;

5.4、訊息傳送-Coordinator

在訊息傳送階段,Coordinator的參與主要是記錄當前事務訊息所在的Parition資訊,即更新topic __transaction_state 的狀態,正如前文所述,__transaction_state 為compact型別,以下屬性將會被更新

topicTotalNum

4

當前事務關聯的所有topic總和

topicNameLen

2

topic長度

topicName

X

topic內容

partitionNum

4

partition的個數

partitionIds

X

例如有n個partition,X = n * 4,每個partition佔用4 byte

transactionLastUpdateTimestampMs

8

最近一次事務操作的更新時間戳

題外話:如果Coordinator記錄了某個Partition參與了事務,但卻沒有向該Partition傳送事務訊息,這樣會有影響嗎?

  • 其實不會有影響的,在後文事務提交/取消模組會做詳細說明,因為在topic__transaction_state中雖然記錄了某個Partition參與了事務,但在事務提交階段,只會向該Partition傳送marker型別的控制訊息,Consumer在收到controller型別的訊息後會自動過濾,另外也不會影響當前Partition的LSO向前推進

5.5、訊息傳送-Broker

訊息傳送時,Broker做的很重要的一個工作是維護 LSO(log stable offset),一個Partition中可能存了多個事務訊息,也有可能儲存了很多非事務的普通訊息,而LSO為第一個正在進行中(已經commit/abort的事務不算)的事務訊息的offset

如上圖:

  • a: 已經無效的事務
  • b: 已經提交的事務
  • c: 正在進行中的事務(不確定最終是取消還是提交)
  • d: 普通訊息,非事務訊息

因此LSO的位置就在第一個正在進行中的事務的首訊息的offset。訊息不斷寫入,Broker需要實時維護LSO的位置,而在LSO以下的位置的訊息是不可以被標記為READ_COMMITED的consumer消費的。

這裡稍微引申一下Consumer端的邏輯,LSO標記之前的訊息都可以被consumer看到,那麼如上圖,LSO之前有3條訊息,2個a(事務取消),1個b(事務提交),consumer讀到這3條訊息後怎麼處理呢?無非就是以下兩種處理邏輯:

  1. 暫存在consumer端,直至讀取到事務最終狀態,再來判斷是吐給業務端(事務成功),還是訊息扔掉(事務取消)
    1. 這樣設計是沒有問題的,可以保證訊息的準確性,但是如果某個事物提交的資料量巨大(事務最長超時時間可達15分鐘),這樣勢必造成consumer端記憶體吃緊,甚至OOM
  1. 實時判斷當前訊息是該成功消費還是被扔掉
    1. 能夠實時判斷肯定是非常理想的結果,可是如何實時判斷呢?難道每次消費時都要再向broker傳送請求獲取訊息的狀態嗎?

具體採用哪種策略,我們在訊息消費的章節再來展開

六、事務提交-Transaction Commit

參與方:ProducerBroker

6.1、事務提交-Producer

事務提交時Producer端觸發的,程式碼如下

producer.commitTransaction();

事務提交對應的API為ApiKeys.END_TXN,Producer向Broker請求的入參為

  • transactionalId 事務id,即客戶自定義的字串
  • producerId producer id,由coordinator生成,遞增
  • epoch 由coordinator生成
  • committed true:commit false:abort

可以看到,在事務提交階段,Producer只是觸發了提交動作,並攜帶了事務所需的引數,所做的操作相當有限,重頭還是在Coordinator端

注:這裡的提交動作是直接提交給Coordinator的,就跟事務初始化階段,獲取Producer id一樣

6.2、事務提交-Coordinator

在內部Topic __transaction_state 中儲存了當前事物所關聯的所有Partition資訊,因此在提交階段,就是向這些Partition傳送control marker資訊,用來標記當前事物的結束。而事務訊息的標誌正如前文訊息協定所述,在attribute欄位的第5個bit

attribute欄位:

           

control

   

如前文所說,LSO以下的訊息是不會被消費到,這樣控制了事務訊息的可見性,想控制這點,難度應該不大;但事務提交後,所有當前事物的訊息均可見了,那事務提交時,具體發生了什麼,是如何控制可能分佈在多臺broker上的訊息同時可見呢?

上圖以3個Broker組成的事務舉例:

  • 1、Producer提交事務
  • 2、Coordinator收到請求後 ,將事務狀態修改為PrepareCommit(其實就是向__transaction_state追加一條訊息)
  • 3.1、向Producer響應,事務提交成功
  • 3.2、之後向各個Broker傳送control marker訊息,Broker收到後將訊息儲存下來,用來比較當前事物已經成功提交
  • 4、待各個Broker儲存control marker訊息後,Coordinator將事物狀態修改為commit,事務結束

看起來是兩階段提交,且一切正常,但卻有一些疑問:

問題1: 3.1向__transaction_state寫完事務狀態後,便給Producer迴應說事務提交成功,假如說3.2執行過程中被hang住了,在Producer看來,既然事務已經提交成功,為什麼還是讀不到對應訊息呢?

的確是這樣,這裡成功指的是Coordinator收到了訊息,並且成功修改了事務狀態。因此返回成功的語意指的是一階段提交成功,因為後續向各個Partition傳送寫marker的會無限重試,直至成功

問題2: 3.2中向多個Broker傳送marker訊息,如果Broker1、Broker2均寫入成功了,但是Broker3因為網路抖動,Coordinator還在重試,那麼此時Broker1、Broker2上的訊息對Consumer來說已經可見了,但是Broker3上的訊息還是看不到,這不就不符合事務語意了嗎?

事實確實如此,所以kafka的事務不能保證強一致性,並不是說kafka做的不夠完美,而是這種分散式事務統一存在類似的問題,CAP鐵律限制,這裡只能做到最終一致性了。不過對於常規的場景這裡已經夠用了,Coordinator會不遺餘力的重試,直至成功

kafka.coordinator.transaction.TransactionCoordinator#endTransaction() 這裡是當__transaction_state狀態改為PrepareCommit後,就向Producer返回成功

case Right((txnMetadata, newPreSendMetadata)) =>
  // we can respond to the client immediately and continue to write the txn markers if
  // the log append was successful
  responseCallback(Errors.NONE)

  txnMarkerChannelManager.addTxnMarkersToSend(coordinatorEpoch, txnMarkerResult, txnMetadata, newPreSendMetadata)

七、事務取消-Transaction Abort

參與方:ProducerBroker

7.1、事務取消-Producer

事務取消如果是Producer端觸發的,程式碼如下

producer.abortTransaction();

事務提交對應的API為ApiKeys.END_TXN(與事務提交是同一個API,不過引數不一樣),Producer向Broker請求的入參為

  • transactionalId 事務id,即客戶自定義的字串
  • producerId producer id,由coordinator生成,遞增
  • epoch 由coordinator生成
  • committed false:abort

7.2、事務取消-Coordinator

事務取消除了由Producer觸發外,還有可能由Coordinator觸發,例如「事務超時」,Coordinator有個定時器,定時掃描那些已經超時的事務

kafka.coordinator.transaction.TransactionCoordinator#startup()

  def startup(retrieveTransactionTopicPartitionCount: () => Int, enableTransactionalIdExpiration: Boolean = true): Unit = {
    info("Starting up.")
    scheduler.startup()
    scheduler.schedule("transaction-abort",
      () => abortTimedOutTransactions(onEndTransactionComplete),
      txnConfig.abortTimedOutTransactionsIntervalMs,
      txnConfig.abortTimedOutTransactionsIntervalMs
    )
    txnManager.startup(retrieveTransactionTopicPartitionCount, enableTransactionalIdExpiration)
    txnMarkerChannelManager.start()
    isActive.set(true)

    info("Startup complete.")
  }

其實事務取消的流程在Coordinator端,跟事務提交大同小異,不過事務取消會向.txnindex檔案寫入資料,也就是.txnindex檔案儲存了所有已取消的事務詳情。對應原始碼檔案為 kafka.log.AbortedTxn.scala.txnindex檔案儲存協定如下

  • currentVersion 當前檔案版本號,目前為0
  • producerId producerId
  • firstOffset 當前事務的開始offset
  • lastOffset 當前事務的結束offset
  • lastStableOffset 儲存時的LSO

儲存詳情中,不需要記錄epoch、sequence等資訊,因為這個檔案的目的是配合Consumer進行訊息過濾的,有了事務的起止offset已經足夠

firstOffset 與 lastOffset 可能跨度很長,之間如果有多個事務如何區分呢?

其實首先明確一點,同一個ProducerId在同一個時間段,只會存在一個事物,例如某條記錄是這樣儲存:(producerId:1000, firstOffset:20, lastOffset:80) ,也就是offset在20與80之間,producerId為1000的記錄只會存在一條,當然也有可能出現如下記錄

  • (producerId:1001, firstOffset:30, lastOffset:40)
  • (producerId:1001, firstOffset:50, lastOffset:60)

但是producerId一定不是1000了,這點很關鍵,因為在事務訊息消費時,還要依賴這個

append「事務取消記錄」入口 kafka.log.LogSegment#updateTxnIndex()

八、事務消費

參與方:ConsumerBroker

前文所有的工作,其實都體現在事務消費上,消費事務訊息,也是kafka非常重要的課題

8.1、消費策略對比

當consumer的事務隔離級別(isolation.level)設定為 read_committed 後,便只能拉取LSO以下的記錄,且返回的資訊中還會攜帶已取消的事務

kafka.log.UnifiedLog#read

  def read(startOffset: Long,
           maxLength: Int,
           isolation: FetchIsolation,
           minOneMessage: Boolean): FetchDataInfo = {
    checkLogStartOffset(startOffset)
    val maxOffsetMetadata = isolation match {
      case FetchLogEnd => localLog.logEndOffsetMetadata
      case FetchHighWatermark => fetchHighWatermarkMetadata
      case FetchTxnCommitted => fetchLastStableOffsetMetadata
    }
    localLog.read(startOffset, maxLength, minOneMessage, maxOffsetMetadata, isolation == FetchTxnCommitted)
  }

正如前文所說,LSO之前的記錄,均是已提交或已取消的事務;因此在一個事物未完成之前,是永遠都不會被consumer拉取到的。此時還要引出前文提出的問題,即consumer訊息策略

  • 策略一:拉取位點設定為High Water Mark,consumer不斷拉取訊息,不論是已經完結的事務訊息還是未完結,亦或是普通訊息,統一進行拉取;然後在consumer端進行過濾,發現某事物訊息未完結,那麼暫存在consumer,等收到control mark訊息後,再判斷將所有訊息返回給業務方,或是丟棄
  • 策略二:拉取位點設定為Last Stable Offset,consumer只返回最後一個已完結事務之前的訊息,consumer拉取訊息後,即便是事務marker還未拉取,也可以判斷是提交還是丟棄

其實很明顯,現在kafka最新版本採用的是策略二,不過我們還是有必要比較一下兩者優缺點

策略一

策略二

優點

  • 效能相對較高,比如LSO之後存在一些已提交的事務訊息,或者普通訊息,能夠及時消費到
  • 不會造成consume端OOM;只消費LSO以下的訊息,因此在拿到訊息後便可以判斷是commit還是abort
  • consumer退出或重啟,走常規應對即可,降低位點管理的複雜度

缺點

  • 如果事務跨度過長,容易造成consumer端的訊息積壓,從而OOM
  • consumer退出或重啟,對於已積累但未吐出的訊息很難處理,需要使用複雜的邏輯來管理位點
  • 效能較低,由於consumer只能看到LSO以下的訊息,故一些非事務訊息(或已完結的事務訊息,但在LSO之上)不能及時消費。

綜合考慮後,kafka還是選擇了可控性較強,且沒有致命bug的策略二,雖然有一些效能損失,但換來的是整個叢集的穩定性

8.2、常規消費事務訊息

當consumer設定了read_committed消費訊息時,除了返回常規的RecordBatch集合外,還會返回拉取區間已取消的事務列表。假定consumer收到了一段資料:

其中白色的為非事務訊息,即普通訊息,彩色的為事務訊息,相同顏色的訊息為同一事務。下面表格中,abortTxns的格式為 (producerId, startOffset, endOffset)

abortTxns

有效訊息

無效訊息

說明

empty

100-115

當取消事務列表為空時,說明當前讀取到事務訊息均為提交成功的事務訊息

[(10, 101, 115)]

100,

103-114

101,102,103

abort列表表明producerId為10的事務已經取消,因此掃描整個列表,發現符合abort條件的記錄是101、102、115

[(11, 110, 112)]

100-109,

111,

113-115

110, 112

雖然103、106的producerId也是11,但是offset range並不匹配;雖然111的offset range匹配,但是其producerId不匹配

[(10, 101, 115),

(11, 103, 106),

(12, 104, 111)]

100,105,109,110,112,113,114

101-104,

106-108,

111, 115

不再贅述,無效訊息通過producerId+offset range統一來確定

注:consumer在讀取以上資訊的時候,可能並內有讀取到control marker資訊,但是已經能夠確定目標訊息是事務完結狀態,且已經知道事務是commit或abort了,因此可以直接處理;而control訊息是由coordinator傳送給各個partition的,屬於內部訊息,consumer對於control訊息是會自動過濾掉的

org.apache.kafka.clients.consumer.internals.Fetcher.CompletedFetch#nextFetchedRecord()

// control records are not returned to the user
if (!currentBatch.isControlBatch()) {
    return record;
} else {
    // Increment the next fetch offset when we skip a control batch.
    nextFetchOffset = record.offset() + 1;
}

8.3、業務方事務

既然kafka已經實現了事務,那麼我們的業務系統中是否可以直接依賴這一特性?

假如這樣使用kafka:

  1. 業務方通過consumer拉取一條訊息
  2. 業務程式通過這條訊息處理業務,可能將結果存入mysql或寫入檔案或其他儲存媒介

如果業務方將1、2整體當做是一個事務的話,那麼理解就有偏差了,因為這個過程當中還缺少提交位點的步驟,假如步驟2已經執行完畢,但還未提交位點,consumer發生了重啟了,那麼這條訊息還會被再次消費,因此kafka所說的事務支援,指的是讀取、寫入都在kafka叢集上

8.4、Exactly Once

訊息的消費可以分為三種型別

  • At Least Once(至少一次)
    • 也就是某條訊息,至少會被消費一次,潛臺詞就是訊息可能會被消費多次,也就是重複消費;kafka預設的消費型別,實現它的原理很簡單,就是在業務方將訊息消費掉後,再提交其對應的位點,業務方只要做好訊息去重,執行起來還是很嚴謹的
  • At Most Once (至多一次)
    • 與至少一次相對,不存在重複消費的情況,某條訊息最多被消費一次,潛臺詞就是可能會丟訊息;實現原理還是控制位點,在消費某條訊息之前,先提交其位點,再消費,如果提交了位點,consumer重啟了,重啟後從最新位點開始消費資料,也就是之前的資料丟失了,並沒有真正消費
  • Exactly Once(精確一次)
    • 不論是「至少一次」還是「至多一次」都不如精確一次來的生猛,有文章說kafka事務實現了精確一次,但這樣評論是不夠嚴謹的,如果業務方將一次「拉取訊息+業務處理」當做一次處理的話,那即便是開啟了事務也不能保證精確一次;這裡的精確一次指的讀取、寫入都是操作的kafka叢集,而不能引入業務處理

關於Exactly Once,這裡參照一下官方對其描述,Exactly-once Semantics in Apache Kafka

  • Idempotent producer: Exactly-once, in-order, delivery per partition.
  • Transactions: Atomic writes across partitions.
  • Exactly-once stream processing across read-process-write tasks.

簡單概括一下就是 1、冪等型的Producer,在單分割區的前提下支援精準一次、有序的訊息投遞;2、事務,跨多分割區的原子寫入 3、Stream任務,型別為read-process-write形式的,可做到精確一次

舉Stream中的例子:從1個Topic中讀取資料,經過業務方的加工後,寫入另外Topic中

producer.initTransactions();
producer.beginTransaction();

ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(1000));
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
    List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(partition);
    for (ConsumerRecord<String, String> record : partitionRecords) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("topic-sink", record.key(), record.value());
        producer.send(producerRecord);
    }
    long lastConsumedOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
    offsets.put(partition, new OffsetAndMetadata(lastConsumedOffset + 1));
}
producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("groupId"));

producer.commitTransaction();

可以簡單認為,將一次資料讀取,轉換為了資料寫入,並統一歸併至當前事務中;關鍵程式碼為

producer.sendOffsetsToTransaction(offsets, new ConsumerGroupMetadata("groupId"));

這個請求對應的API是ApiKeys.ADD_OFFSETS_TO_TXN,參數列為

  • transactionalId
  • producerId
  • epoch
  • groupId

核心思想就是算出groupId在__consumer_offsets中對應的partition,然後將該partition加入事務中,在事務提交/取消時,再統一操作,這樣便實現了讀與寫的原子性。

不過這樣做的前提是consumer需要將enable.auto.commit引數設定為false,並使用producer.sendOffsetsToTransaction()來提交offset

九、事務狀態流轉

事務總共有8種狀態

state

desc

0-Empty

Transaction has not existed yet

  • received AddPartitionsToTxnRequest => Ongoing
  • received AddOffsetsToTxnRequest => Ongoing

1-Ongoing

Transaction has started and ongoing

  • received EndTxnRequest with commit => PrepareCommit
  • received EndTxnRequest with abort => PrepareAbort
  • received AddPartitionsToTxnRequest => Ongoing
  • received AddOffsetsToTxnRequest => Ongoing

2-PrepareCommit

Group is preparing to commit

  • received acks from all partitions => CompleteCommit

3-PrepareAbort

Group is preparing to abort

  • received acks from all partitions => CompleteAbort

4-CompleteCommit

Group has completed commit

Will soon be removed from the ongoing transaction cache

5-CompleteAbort

Group has completed abort

Will soon be removed from the ongoing transaction cache

6-Dead

TransactionalId has expired and is about to be removed from the transaction cache

7-PrepareEpochFence

We are in the middle of bumping the epoch and fencing out older producers.

最常見的狀態流轉

  • Empty->Ongong->PrepareCommit->CompleteCommit->Empty
  • Empty->Ongong->PrepareAbort->CompleteAbort->Empty

十、事務Topic及檔案

10.1、簡單總結

總結一下kafka事務相關的一些topic及檔案。topic只有一個,是專門為事務特性服務的,而檔案有兩個,這裡的檔案指的是所有參與事務的topic下檔案

  • Topic
    • __transaction_state內部compact topic,主要是將事務狀態持久化,避免Transactional Coordinator重啟或切換後事務狀態丟失
  • 檔案
    • .txnindex 存放已經取消事務的記錄,請問已經提到過,如果當前logSegment沒有取消的事務,那麼這個檔案也不會存在
    • .snapshot 正如其名,因為Broker端要存放每個ProducerId與Sequence的對映關係,目的是sequence num的驗重

10.2、.snapshot 檔案

.snapshot 跟其他索引檔案不同,其他索引檔案都是隨著記錄的增加,動態append到檔案中的;而.snapshot檔案則是在logSegment roll時,也就是切換下一個log檔案時,將當前快取中的所有producerId及Sequence的對映關係儲存下來。一旦發生Broker宕機,重啟後只需要將最近一個.snapshot讀取出來,並通過log檔案將後續的資料補充進來,這樣快取中就可以儲存當前分割區的全量索引

field

desc

Version

Version of the snapshot file

Crc

CRC of the snapshot data

Number

The entries in the producer table

ProducerId

The producer ID

ProducerEpoch

Current epoch of the producer

LastSequence

Last written sequence of the producer

LastOffset

Last written offset of the producer

OffsetDelta

The difference of the last sequence and first sequence in the last written batch

Timestamp

Max timestamp from the last written entry

CoordinatorEpoch

The epoch of the last transaction coordinator to send an end transaction marker

CurrentTxnFirstOffset

The first offset of the on-going transaction (-1 if there is none)

附錄

事務中使用的API

API KEY

描述

ApiKeys.FIND_COORDINATOR

尋找transaction coordinator

ApiKeys.INIT_PRODUCER_ID

初始化producerId及epoch

ApiKeys.ADD_PARTITIONS_TO_TXN

將某個partition新增進入事務

ApiKeys.PRODUCE

傳送訊息

ApiKeys.END_TXN

事務結束,包括事務提交跟事務取消

ApiKeys.FETCH

拉取訊息

ApiKeys.ADD_OFFSETS_TO_TXN

read-process-write模式時使用,用於將一次讀操作轉換為寫行為

部分程式碼記錄

注:本文所有程式碼擷取均基於開源v3.3.1版本

  • kafka topic 中的檔案 kafka.log.UnifiedLog#1767
object UnifiedLog extends Logging {
  val LogFileSuffix = LocalLog.LogFileSuffix
  val IndexFileSuffix = LocalLog.IndexFileSuffix
  val TimeIndexFileSuffix = LocalLog.TimeIndexFileSuffix
  val ProducerSnapshotFileSuffix = ".snapshot"
  val TxnIndexFileSuffix = LocalLog.TxnIndexFileSuffix
  val DeletedFileSuffix = LocalLog.DeletedFileSuffix
  val CleanedFileSuffix = LocalLog.CleanedFileSuffix
  val SwapFileSuffix = LocalLog.SwapFileSuffix
  val DeleteDirSuffix = LocalLog.DeleteDirSuffix
  val FutureDirSuffix = LocalLog.FutureDirSuffix
  • 根據TransactionId計算partition kafka.coordinator.transaction.TransactionStateManager#partitionFor
def partitionFor(transactionalId: String): Int = Utils.abs(transactionalId.hashCode) % transactionTopicPartitionCount
  • 生成ProducerId kafka.coordinator.transaction.ZkProducerIdManager#generateProducerId
  def generateProducerId(): Long = {
    this synchronized {
      // grab a new block of producerIds if this block has been exhausted
      if (nextProducerId > currentProducerIdBlock.lastProducerId) {
        allocateNewProducerIdBlock()
        nextProducerId = currentProducerIdBlock.firstProducerId
      }
      nextProducerId += 1
      nextProducerId - 1
    }
  }
  • 過濾control訊息 org.apache.kafka.clients.consumer.internals.Fetcher.CompletedFetch#nextFetchedRecord
if (record.offset() >= nextFetchOffset) {
    // we only do validation when the message should not be skipped.
    maybeEnsureValid(record);

    // control records are not returned to the user
    if (!currentBatch.isControlBatch()) {
        return record;
    } else {
        // Increment the next fetch offset when we skip a control batch.
        nextFetchOffset = record.offset() + 1;
    }
}

參考:

https://www.confluent.io/blog/simplified-robust-exactly-one-semantics-in-kafka-2-5/

https://www.slideshare.net/ConfluentInc/exactlyonce-semantics-in-apache-kafka

https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8/edit

http://matt33.com/2018/11/04/kafka-transaction/

http://www.jasongj.com/kafka/transaction/