工作總結:kafka踩過的坑

2022-11-17 06:01:45

餐飲系統每天中午和晚上用餐高峰期,系統的並行量不容小覷。公司規定各部門都要輪流值班,防止出現線上問題時能夠及時處理。

後廚顯示系統屬於訂單的下游業務。

  1. 使用者點完菜下單後,訂單系統會通過發 Kafka 訊息給系統;

  2. 系統讀取訊息後,做業務邏輯處理,持久化訂單和菜品資料,然後展示到劃菜使用者端;

  3. 這樣廚師就知道哪個訂單要做哪些菜,有些菜做好了,就可以通過該系統出菜;

  4. 系統自動通知服務員上菜;

  5. 如果服務員上完菜,修改菜品上菜狀態,使用者就知道哪些菜已經上了,哪些還沒有上。

系統可以大大提高後廚到使用者的效率。

 

這一切的關鍵是訊息中介軟體:Kafka。如果它有問題,將會直接影響到後廚顯示系統的功能。

接下來,我跟大家一起聊聊使用 Kafka 踩過哪些坑?

1. 順序問題

1.1 為什麼要保證訊息的順序?

剛開始我們系統的商戶很少,為了快速實現功能,我們沒想太多。既然是走訊息中介軟體 Kafka 通訊,訂單系統發訊息時將訂單詳細資料放在訊息體,我們後廚顯示系統只要訂閱 topic,就能獲取相關訊息資料,然後處理自己的業務即可。

不過這套方案有個關鍵因素:要保證訊息的順序

訂單有很多狀態,比如下單、支付、完成、復原等。

不可能下單的訊息都沒讀取到,就先讀取支付或復原的訊息吧。所以要保證訊息的順序。

1.2 如何保證訊息順序?

我們都知道 Kafka 的 topic 是無序的,但是一個 topic 包含多個 partition,每個 partition 內部是有序的。

思路:只要保證生產者寫訊息時,按照一定的規則寫到同一個 partition。不同的消費者讀不同的 partition 的訊息,就能保證生產和消費者訊息的順序。

我們剛開始就是這麼做的,同一個商戶編號的訊息寫到同一個 partition。topic 中建立了 4 個 partition,然後部署了 4 個消費者節點,構成消費者組。一個 partition 對應一個消費者節點。

從理論上說,這套方案是能夠保證訊息順序的。

就這樣上線了。

1.3 出現意外

該功能上線剛開始還是比較正常的。

但是很快就收到使用者投訴,說在劃菜使用者端有些訂單和菜品一直看不到,無法劃菜。

我定位到了原因,那段時間網路經常不穩定,業務介面時不時報超時業務請求時不時會連不上資料庫

這種情況對順序訊息的打擊,可以說是毀滅性的。

為什麼這麼說?

假設訂單系統發了「下單」、「支付」、「完成」 三條訊息。

而」下單「訊息由於網路原因我們系統處理失敗了,而後面的兩條訊息的資料是無法入庫的。因為只有」下單「訊息的資料才是完整的資料,其他型別的訊息只會更新狀態。

加上當時沒有做失敗重試機制,使得這個問題被放大了。

問題變成:一旦「下單」訊息的資料入庫失敗,使用者就永遠看不到這個訂單和菜品了。

那麼這個緊急的問題要如何解決呢?

1.4 解決過程

最開始我們的想法是:在消費者處理訊息時,如果處理失敗了,立馬重試 3-5 次。

如果有些請求要第 6 次才能成功怎麼辦

不可能一直重試呀,這種同步重試機制,會阻塞其他商戶訂單訊息的讀取。

顯然,用上面的這種同步重試機制在出現異常的情況,會嚴重影響訊息消費者的消費速度,降低它的吞吐量。

如此看來,我們不得不用非同步重試機制了。

如果用非同步重試機制,處理失敗的訊息就得儲存到重試表下來。

但有個新問題立馬出現:只存一條訊息如何保證順序

存一條訊息的確無法保證順序,假如「下單」訊息失敗了,還沒來得及非同步重試。此時,「支付」訊息被消費了,它肯定是不能被正常消費的。

此時,「支付」訊息該一直等著,每隔一段時間判斷一次,它前面的訊息都有沒有被消費?

如果真的這麼做,會出現兩個問題:

  • 「支付」訊息前面只有「下單」訊息,這種情況比較簡單。但如果某種型別的訊息,前面有 N 多種訊息,需要判斷多少次呀?這種判斷跟訂單系統的耦合性太強了,相當於要把他們系統的邏輯搬一部分到我們系統;
  • 影響消費者的消費速度。

這時有種更簡單的方案浮出水面:消費者在處理訊息時,先判斷該訂單號在重試表有沒有資料,

如果有則直接把當前訊息儲存到重試表;如果沒有,則進行業務處理,如果出現異常,把該訊息儲存到重試表。

後來我們用 elastic-job 建立了失敗重試機制,如果重試了 7 次後還是失敗,則將該訊息的狀態標記為失敗,發郵件通知開發人員。

終於由於網路不穩定,導致使用者在劃菜使用者端有些訂單和菜品一直看不到的問題被解決了。

現在商戶頂多偶爾延遲看到菜品,比一直看不菜品好太多。

2. 訊息積壓

隨著系統的商戶越來越多。隨之而來的是訊息的數量越來越大,導致消費者處理不過來,經常出現訊息積壓的情況。

對商戶的影響非常直觀,劃菜使用者端上的訂單和菜品可能半個小時後才能看到。一兩分鐘還能忍,半個訊息的延遲,哪裡忍得了。我們那段時間經常接到商戶投訴說訂單和菜品有延遲。

雖說加伺服器節點就能解決問題,但是按照公司為了省錢的慣例,要先做系統優化,所以我們開始了訊息積壓問題解決之旅。

2.1 訊息體過大

雖說 Kafka 號稱支援百萬級的 TPS,但從 producer 傳送訊息到 broker 需要一次網路 IO,broker 寫資料到磁碟需要一次磁碟 IO(寫操作),consumer 從 broker 獲取訊息先經過一次磁碟 IO(讀操作),再經過一次網路 IO。

一次簡單的訊息從生產到消費過程,需要經過兩次網路 IO 和兩次磁碟 IO。如果訊息體過大,勢必會增加 IO 的耗時,進而影響 Kafka 生產和消費的速度。消費者速度太慢的結果,就會出現訊息積壓情況。

除了上面的問題之外,訊息體過大還會浪費伺服器的磁碟空間。稍不注意,可能會出現磁碟空間不足的情況。

此時,我們已經到了需要優化訊息體過大問題的時候。

如何優化呢?

我們重新梳理了一下業務,沒有必要知道訂單的中間狀態,只需知道一個最終狀態就可以了。

如此甚好,我們就可以這樣設計了:

  • 訂單系統傳送的訊息體只用包含 id 和狀態等關鍵資訊;
  • 後廚顯示系統消費訊息後,通過 id 呼叫訂單系統的訂單詳情查詢介面獲取資料;
  • 後廚顯示系統判斷資料庫中是否有該訂單的資料,如果沒有則入庫,有則更新。

果然這樣調整之後,訊息積壓問題很長一段時間都沒再出現。

2.2 路由規則不合理

有天中午又有商戶投訴說訂單和菜品有延遲。我們一查 Kafka 的 topic 竟然又出現了訊息積壓。

但這次有點詭異,不是所有 partition 上的訊息都有積壓,而是隻有一個。

剛開始,我以為是消費那個 partition 訊息的節點出了什麼問題導致的。但是經過排查,沒有發現任何異常。

這就奇怪了,到底哪裡有問題呢?

後來,我查紀錄檔和資料庫發現:有幾個商戶的訂單量特別大,剛好這幾個商戶被分到同一個 partition,使得該 partition 的訊息量比其他 partition 要多很多。

這時我們才意識到,發訊息時按商戶編號路由 partition 的規則不合理。可能會導致有些 partition 訊息太多消費者處理不過來,而有些 partition 卻因為訊息太少,消費者出現空閒的情況。

為了避免出現這種分配不均勻的情況,我們需要對發訊息的路由規則做一下調整。

我們思考了一下,用訂單號做路由相對更均勻,不會出現單個訂單發訊息次數特別多的情況。除非是遇到某個人一直加菜的情況,但是加菜是需要花錢的,所以其實同一個訂單的訊息數量並不多。

調整後按訂單號路由到不同的 partition,同一個訂單號的訊息,每次到發到同一個 partition。

調整後,訊息積壓的問題又有很長一段時間都沒有再出現。我們的商戶數量在這段時間,增長的非常快,越來越多了。

2.3 批次操作引起的連鎖反應

在高並行的場景中,訊息積壓問題可以說如影隨形,真的沒辦法從根本上解決。

有天下午,產品過來說:「有幾個商戶投訴過來了,他們說菜品有延遲,快查一下原因」。

這次問題出現得有點奇怪。

為什麼這麼說?

首先這個時間點就有點奇怪,這次問題出現在下午,不是中午或者晚上用餐高峰期。

根據以往積累的經驗,我直接看了 Kafka 的 topic 的資料,果然上面訊息有積壓。

但這次每個 partition 都積壓了十幾萬的訊息沒有消費,比以往加壓的訊息數量增加了幾百倍。這次訊息積壓得極不尋常。

我趕緊查服務監控看看消費者掛了沒,還好沒掛。又查服務紀錄檔沒有發現異常。這時我有點迷茫,碰運氣問了問訂單組下午發生了什麼事情沒?他們說下午有個促銷活動,跑了一個 Job 批次更新過有些商戶的訂單資訊。

這時,我一下子如夢初醒:是他們在 Job 中批次發訊息導致的問題。沒有通知我們呢?實在太坑了。

雖說知道問題的原因了,倒是眼前積壓的這十幾萬的訊息該如何處理呢?

此時,如果直接調大 partition 數量是不行的,歷史訊息已經儲存到4個固定的 partition,只有新增的訊息才會到新的 partition。我們重點需要處理的是已有的 partition。

直接加服務節點也不行,因為 Kafka 允許同組的多個 partition 被一個 consumer 消費,但不允許一個 partition 被同組的多個 consumer 消費,可能會造成資源浪費。

看來只有用多執行緒處理了。

為了緊急解決問題,我改成了用執行緒池處理訊息,核心執行緒和最大執行緒數都設定成了 50。

調整之後,果然,訊息積壓數量不斷減少。

但此時有個更嚴重的問題出現:我收到了報警郵件,有兩個訂單系統的節點宕機了。

不久,訂單組的同事過來找我說,我們系統呼叫他們訂單查詢介面的並行量突增,超過了預計的好幾倍,導致有 2 個服務節點掛了。他們把查詢功能單獨整成了一個服務,部署了 6 個節點,掛了 2 個節點。再不處理,另外 4 個節點也會掛。訂單服務可以說是公司最核心的服務,它掛了公司損失會很大,情況萬分緊急。

為了解決這個問題,只能先把執行緒數調小。

幸好,執行緒數是可以通過 ZooKeeper 動態調整的。我把核心執行緒數調成了 8 個,核心執行緒數改成了 10 個。

後面,運維把訂單服務掛的 2 個節點重啟後恢復正常了。以防萬一,再多加了 2 個節點。為了確保訂單服務不會出現問題,就保持目前的消費速度,後廚顯示系統的訊息積壓問題,1 小時候後也恢復正常了。

後來,我們開了一次覆盤會,得出的結論是:
  • 訂單系統的批次操作一定提前通知下游系統團隊;
  • 下游系統團隊多執行緒呼叫訂單查詢介面一定要做壓測;
  • 這次給訂單查詢服務敲響了警鐘。它作為公司的核心服務,應對高並行場景做的不夠好,需要做優化;
  • 對訊息積壓情況加監控。

順便說一下,對於要求嚴格保證訊息順序的場景,可以將執行緒池改成多個佇列,每個佇列用單執行緒處理。

2.4 表過大

為了防止後面再次出現訊息積壓問題,消費者後面就一直用多執行緒處理訊息。

但有天中午我們還是收到很多報警郵件,提醒我們 Kafka 的 topic 訊息有積壓。我們正在查原因,此時產品跑過來說:「又有商戶投訴說菜品有延遲,趕緊看看」。

這次她看起來有些不耐煩,確實優化了很多次還是出現了同樣的問題。

在外行看來:為什麼同一個問題一直解決不了?

導致訊息積壓的原因其實有很多種,這也許是使用訊息中介軟體的通病吧。

查紀錄檔發現消費者消費一條訊息的耗時長達 2 秒。以前是 500 毫秒,現在怎麼會變成 2 秒呢?

消費者的程式碼也沒有做大的調整,為什麼會出現這種情況呢?

查了一下線上菜品表,單表資料量竟然到了幾千萬,其他的劃菜表也是一樣,現在單表儲存的資料太多了。

我們組梳理了一下業務,其實菜品在使用者端只展示最近 3 天的即可。

這就好辦了,我們伺服器端存著多餘的資料,不如把表中多餘的資料歸檔。於是 DBA 幫我們把資料做了歸檔,只保留最近 7 天的資料。

如此調整後,訊息積壓問題被解決了,又恢復了往日的平靜。

3. 主鍵衝突

其他的問題。比如報警郵件經常報出資料庫異常:Duplicate entry '6' for key 'PRIMARY',說主鍵衝突。

出現這種問題一般是由於有兩個以上相同主鍵的 SQL,同時插入資料,第一個插入成功後,第二個插入的時候會報主鍵衝突。表的主鍵是唯一的,不允許重複。

我仔細檢查了程式碼,發現程式碼邏輯會先根據主鍵從表中查詢訂單是否存在,如果存在則更新狀態,不存在才插入資料,沒得問題。

這種判斷在並行量不大時,是有用的。

但是如果在高並行的場景下,兩個請求同一時刻都查到訂單不存在,一個請求先插入資料,另一個請求再插入資料時就會出現主鍵衝突的異常。

解決這個問題最常規的做法是:加鎖。

我剛開始也是這樣想的,加資料庫悲觀鎖肯定是不行的,太影響效能。加資料庫樂觀鎖,基於版本號判斷,一般用於更新操作,像這種插入操作基本上不會用。

剩下的只能用分散式鎖了,我們系統在用 Redis,可以加基於 Redis 的分散式鎖,鎖定訂單號。

但後面仔細思考了一下:

  • 加分散式鎖也可能會影響消費者的訊息處理速度;
  • 消費者依賴於 Redis,如果 Redis 出現網路超時,我們的服務就悲劇了。

所以,我也不打算用分散式鎖。

而是選擇使用 MySQL 的 INSERT INTO ...ON DUPLICATE KEY UPDATE 語法:

INSERTINTOtable (column_list)

它會先嚐試把資料插入表,如果主鍵衝突的話那麼更新欄位。

把以前的 insert 語句改造之後,就沒再出現過主鍵衝突問題。

4. 資料庫主從延遲

不久之後的某天,又收到商戶投訴說下單後,在劃菜使用者端上看得到訂單,但是看到的菜品不全,有時甚至訂單和菜品資料都看不到。

這個問題跟以往的都不一樣,根據以往的經驗先看 Kafka 的 topic 中訊息有沒有積壓,但這次並沒有積壓。

再查了服務紀錄檔,發現訂單系統介面返回的資料有些為空,有些只返回了訂單資料,沒返回菜品資料。

這就非常奇怪了,我直接過去找訂單組的同事。他們仔細排查服務,沒有發現問題。這時我們不約而同的想到,會不會是資料庫出問題了,一起去找 DBA。果然 DBA發現資料庫的主庫同步資料到從庫,由於網路原因偶爾有延遲,有時延遲有 3 秒。

如果我們的業務流程從發訊息到消費訊息耗時小於 3 秒,呼叫訂單詳情查詢介面時,可能會查不到資料,或者查到的不是最新的資料。

這個問題非常嚴重,會導致直接我們的資料錯誤。

為了解決這個問題,我們也加了重試機制。呼叫介面查詢資料時,如果返回資料為空,或者只返回了訂單沒有菜品,則加入重試表。

調整後,問題被解決了。

5. 重複消費

Kafka消費訊息時支援三種模式:

  • at most once 模式:最多一次。保證每一條訊息 commit 成功之後,再進行消費處理。訊息可能會丟失,但不會重複;
  • at least once 模式:至少一次。保證每一條訊息處理成功之後,再進行 commit。訊息不會丟失,但可能會重複;
  • exactly once 模式:精確傳遞一次。將 offset 作為唯一 id 與訊息同時處理,並且保證處理的原子性。訊息只會處理一次,不丟失也不會重複。但這種方式很難做到。

Kafka 預設的模式是 at least once,但這種模式可能會產生重複消費的問題。所以我們的業務邏輯必須做冪等設計。

而我們的業務場景儲存資料時使用了 INSERT INTO ...ON DUPLICATE KEY UPDATE 語法,不存在時插入,存在時更新,是天然支援冪等性的。

6. 多環境消費問題

我們當時線上環境分為:pre(預釋出環境)和 prod(生產環境),兩個環境共用同一個資料庫,並且共用同一個 Kafka 叢集。

需要注意的是,在設定 Kafka 的 topic 的時候,要加字首用於區分不同環境。pre環境的以 pre_ 開頭,比如 pre_order。生產環境以 prod_開頭,比如 prod_order,防止訊息在不同環境中串了。

但有次運維在 pre 環境切換節點,設定 topic 的時候,錯誤地配成了 prod 的 topic。剛好那天我們有新功能上 pre 環境,結果悲劇了:prod 的有些訊息被 pre 環境的 consumer 消費了。而由於訊息體做了調整,導致 pre 環境的 consumer 處理訊息一直失敗。

其結果是生產環境丟了部分訊息。不過還好,最後生產環境消費者通過重 置offset,重新讀取了那一部分訊息解決了問題,沒有造成太大損失。

後記

除了上述問題之外,我還遇到過:

  • Kafka 的 consumer 使用自動確認機制,導致 CPU 使用率 100%;
  • Kafka 叢集中的一個 broker 節點掛了,重啟後又一直掛。

這兩個問題說起來有些複雜,我就不一一列舉了。非常感謝那兩年使用訊息中介軟體 Kafka 的經歷,雖說遇到過挺多問題,踩了很多坑,走了很多彎路,但是實打實的讓我積累了很多寶貴的經驗,快速成長了。

其實 Kafka 是一個非常優秀的訊息中介軟體,我所遇到的絕大多數問題都並非 Kafka 自身的問題(除了 CPU 使用率 100% 是它的一個 bug 導致的之外)。