通過 Pulsar 原始碼徹底解決重複消費問題

2023-02-28 12:01:18

背景

最近真是和 Pulsar 槓上了,業務團隊反饋說是線上有個應用訊息重複消費。

而且在測試環境是可以穩定復現的,根據經驗來看一般能穩定復現的都比較好解決。

定位問題

接著便是定位問題了,根據之前的經驗讓業務按照這幾種情況先排查一下:

通過排查:1,2可以排除了。

  1. 沒有相關紀錄檔
  2. 存在異常,但最外層也捕獲了,所以不管有無異常都會 ACK。

第三個也在消費的入口和提交訊息出計算了時間,最終發現都是在2s左右 ACK 的。

虛擬碼如下:

        Consumer consumer = client.newConsumer()
                .subscriptionType(SubscriptionType.Shared)
                .enableRetry(true)
                .topic(topic)
                .ackTimeout(30, TimeUnit.SECONDS)
                .subscriptionName("my-sub")
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                        log.info("msg_id{}",msg.getMessageId().toString());
                        TimeUnit.SECONDS.sleep(2);
                        consumer.acknowledge(msg);
                    }
                })
                .subscribe();

那這就很奇怪了,因為程式碼裡設定的 ackTimeout 是 30s,理論上來說是不會存在超時導致訊息重發的。

為了排除是否是超時引起的,直接將業務程式碼註釋掉了,等於是訊息收到後立即就 ACK,經過測試發現這樣確實就沒有重複消費了。

為了再次確認是不是和 ackTimeout 有關,直接將 .ackTimeout(30, TimeUnit.SECONDS) 註釋掉後測試,發現也沒有重複消費了。

確認原因

既然如此那一定是和這個設定有關了,但看程式碼確實沒有超時,為了定位具體原因只有去看 client 的原始碼了。

這裡簡單梳理下訊息的消費的流程:

  1. 根據 .receiverQueueSize(1000) 的設定,預設情況下 broker 會直接給使用者端推播 1000 條訊息。
  2. 使用者端將這 1000 條訊息儲存到內部佇列中。
  3. 如果使用同步消費 receive() 時,本質上就是去 take 這個內部佇列。
  4. 如果是使用的是 messageListener 非同步消費並設定 ackTimeout,每當從佇列裡獲得一條訊息後便會把這條訊息加入 UnAckedMessageTracker 內部的一個時間輪中,定時檢測頂部是否存在訊息,如果存在則會觸發重新投遞。
    4.1 加入時間輪後,非同步呼叫我們自定義的事件,這個非同步操作是提交到一個無界佇列中由單個執行緒依次排隊執行(這點是這次問題的關鍵)
  5. 業務 ACK 的時候會從時間輪中刪除訊息,所以如果訊息 ACK 的足夠快,在第四步就不會獲取到訊息進行重新投遞。

整體流程如上圖,程式碼細節如下圖:

所以問題的根本原因就是寫入時間輪(UnAckedMessageTracker)開始倒計時的執行緒和回撥業務邏輯的不是同一個執行緒。

如果業務執行耗時,等到訊息從那個單執行緒的無界佇列中取出來的時候很有可能已經過了 ackTimeou 的時間,從而導致了超時重發。

也就是使用者所理解的 ackTimeout 週期(應該進入回撥時候開始計時)和 SDK 實現的不一致造成的。

之後我再次確認同樣的程式碼換為同步消費是沒有問題的,不會導致重複消費:

while (true) {
Message msg = consumer.receive();
            log.info(
                    "consumer Message received: " + new String(msg.getData()) + msg.getMessageId().toString());
            TimeUnit.SECONDS.sleep(2);
            consumer.acknowledge(msg);	
}

檢視程式碼後發現同步程式碼的獲取訊息和加入 UnAckedMessageTracker 時間輪是同步的,也就不會出現超時的問題。

總結

所以其實 是messageListener 非同步消費的 ackTimeout 的語意是有問題的,需要將加入 UnAckedMessageTracker 處移動到回撥函數中同步呼叫。

我檢視了最新的 2.11.x 版本的程式碼依然沒有修復,正準備提個 PR 切換到 master 時才發現已經有相關的 PR 了,只是還沒有發版。

修復的背景和思路也是類似的,具體參考:

https://github.com/apache/pulsar/pull/18911

其實業務中並不推薦使用 ackTimeout 這個設定了,不好預估時間從而導致超時,而且我相信大部分業務設定好 ackTImeout 後直到後續出問題的時候才想起來要改。
所以乾脆一開始就不要使用。

在 go 版本的 SDK 中直接廢棄掉了這個引數,推薦使用 nack API 替換。