Redis特殊資料型別之stream

2022-10-11 18:00:14
本篇文章給大家帶來了關於的相關知識,其中主要介紹了一個特殊的資料型別stream的相關內容,redis提供了豐富的資料型別,特殊的有四種bitmap、hyperloglog、geospatial、stream,下面就一起來看一下stream的相關問題,希望對大家有幫助。

千萬級資料並行如何處理?進入學習

推薦學習:

Redis Stream 是 Redis 5.0 版本新增加的資料型別,Redis 專門為訊息佇列設計的資料型別。

在 Redis 5.0 Stream 沒出來之前,訊息佇列的實現方式都有著各自的缺陷,例如:

  • 釋出訂閱模式,不能持久化也就無法可靠的儲存訊息,並且對於離線重連的使用者端不能讀取歷史訊息的缺陷;

  • List 實現訊息佇列的方式不能重複消費,一個訊息消費完就會被刪除,而且生產者需要自行實現全域性唯一 ID。

基於以上問題,Redis 5.0 便推出了 Stream 型別也是此版本最重要的功能,用於完美地實現訊息佇列,它支援訊息的持久化、支援自動生成全域性唯一 ID、支援 ack 確認訊息的模式、支援消費組模式等,讓訊息佇列更加的穩定和可靠。

常用命令

Stream 訊息佇列操作命令:

  • XADD : 插入訊息,保證有序,可以自動生成全域性唯一 ID

  • XDEL : 根據訊息 ID 刪除訊息;

  • DEL : 刪除整個 Stream;

# XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|id field value [field value ...]
127.0.0.1:6379> XADD s1 * name sid10t
"1665047636078-0"
127.0.0.1:6379> XADD s1 * name sidiot
"1665047646214-0"
# XDEL key id [id ...]
127.0.0.1:6379> XDEL s1 1665047646214-0
(integer) 1
# DEL key [key ...]
127.0.0.1:6379> DEL s1
(integer) 1
登入後複製

57.png

  • XLEN : 查詢訊息長度;

  • XREAD : 用於讀取訊息,可以按 ID 讀取資料;

  • XRANGE : 讀取區間訊息;

  • XTRIM : 裁剪佇列訊息個數;

# XLEN key
127.0.0.1:6379> XLEN s1
(integer) 2
# XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREAD streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1665047636078-0"
         2) 1) "name"
            2) "sid10t"
      2) 1) "1665047646214-0"
         2) 1) "name"
            2) "sidiot"
127.0.0.1:6379> XREAD count 1 streams s1 0-0
1) 1) "s1"
   2) 1) 1) "1665047636078-0"
         2) 1) "name"
            2) "sid10t"
    # XADD 了一條訊息之後的擴充套件
    127.0.0.1:6379> XREAD streams s1 1665047636078-0
    1) 1) "s1"
       2) 1) 1) "1665047646214-0"
             2) 1) "name"
                2) "sidiot"
          2) 1) "1665053702766-0"
             2) 1) "age"
                2) "18"
# XRANGE key start end [COUNT count]
127.0.0.1:6379> XRANGE s1 - +
1) 1) "1665047636078-0"
   2) 1) "name"
      2) "sid10t"
2) 1) "1665047646214-0"
   2) 1) "name"
      2) "sidiot"
3) 1) "1665053702766-0"
   2) 1) "age"
      2) "18"
127.0.0.1:6379> XRANGE s1 1665047636078-0 1665047646214-0
1) 1) "1665047636078-0"
   2) 1) "name"
      2) "sid10t"
2) 1) "1665047646214-0"
   2) 1) "name"
      2) "sidiot"
# XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]
127.0.0.1:6379> XLEN s1
(integer) 3
127.0.0.1:6379> XTRIM s1 maxlen 2
(integer) 1
127.0.0.1:6379> XLEN s1
(integer) 2
登入後複製
  • XGROUP CREATE : 建立消費者組;

  • XREADGROUP : 按消費組形式讀取訊息;

  • XPENDING 和 XACK :

XPENDING 命令可以用來查詢每個消費組內所有消費者「已讀取、但尚未確認」的訊息;

XACK 命令用於向訊息佇列確認訊息處理已完成;

# XGROUP CREATE key groupname id|$ [MKSTREAM] [ENTRIESREAD entries_read]
# 需要注意的是,XGROUP CREATE 的 streams 必須是一個存在的 streams,否則會報錯;
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
(error) ERR The XGROUP subcommand requires the key to exist. Note that for CREATE you may want to use the MKSTREAM option to create an empty stream automatically.
# 0-0 從頭開始消費,$ 從尾開始消費;
127.0.0.1:6379> XADD myStream * name sid10t
"1665057823181-0"
127.0.0.1:6379> XGROUP CREATE myStream cGroup-top 0-0
OK
127.0.0.1:6379> XGROUP CREATE myStream cGroup-tail $
OK
# XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
127.0.0.1:6379> XREADGROUP Group cGroup-top name count 2 STREAMS myStream >
1) 1) "myStream"
   2) 1) 1) "1665058086931-0"
         2) 1) "name"
            2) "sid10t"
      2) 1) "1665058090167-0"
         2) 1) "name"
            2) "sidiot"
登入後複製

應用場景

訊息佇列

生產者通過 XADD 命令插入一條訊息:

# * 表示讓 Redis 為插入的資料自動生成一個全域性唯一的 ID
# 往名稱為 mymq 的訊息佇列中插入一條訊息,訊息的鍵是 name,值是 sid10t
127.0.0.1:6379> XADD mymq * name sid10t
"1665058759764-0"
登入後複製

插入成功後會返回全域性唯一的 ID:"1665058759764-0"。訊息的全域性唯一 ID 由兩部分組成:

  • 第一部分 「1665058759764」 是資料插入時,以毫秒為單位計算的當前伺服器時間;

  • 第二部分表示插入訊息在當前毫秒內的訊息序號,這是從 0 開始編號的。例如,「1665058759764-0」 就表示在 「1665058759764」 毫秒內的第 1 條訊息。

消費者通過 XREAD 命令從訊息佇列中讀取訊息時,可以指定一個訊息 ID,並從這個訊息 ID 的下一條訊息開始進行讀取(注意是輸入訊息 ID 的下一條資訊開始讀取,不是查詢輸入 ID 的訊息)。

127.0.0.1:6379> XREAD STREAMS mymq 1665058759764-0
(nil)
127.0.0.1:6379> XREAD STREAMS mymq 1665058759763-0
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

如果想要實現阻塞讀(當沒有資料時,阻塞住),可以呼叫 XRAED 時設定 BLOCK 設定項,實現類似於 BRPOP 的阻塞讀取操作。

比如,下面這命令,設定了 BLOCK 10000 的設定項,10000 的單位是毫秒,表明 XREAD 在讀取最新訊息時,如果沒有訊息到來,XREAD 將阻塞 10000 毫秒(即 10 秒),然後再返回。

# 命令最後的 $ 符號表示讀取最新的訊息
127.0.0.1:6379> XREAD BLOCK 10000 STREAMS mymq $
(nil)
(10.01s)
登入後複製

Stream 的基礎方法,使用 xadd 存入訊息和 xread 迴圈阻塞讀取訊息的方式可以實現簡易版的訊息佇列,互動流程如下圖所示:

61.png

前面介紹的這些操作 List 也支援的,接下來看看 Stream 特有的功能。

Stream 可以以使用 XGROUP 建立消費組,建立消費組之後,Stream 可以使用 XREADGROUP 命令讓消費組內的消費者讀取訊息。

建立兩個消費組,這兩個消費組消費的訊息佇列是 mymq,都指定從第一條訊息開始讀取:

# 建立一個名為 group1 的消費組,0-0 表示從第一條訊息開始讀取。
127.0.0.1:6379> XGROUP CREATE mymq group1 0-0
OK
# 建立一個名為 group2 的消費組,0-0 表示從第一條訊息開始讀取。
127.0.0.1:6379> XGROUP CREATE mymq group2 0-0
OK
登入後複製

消費組 group1 內的消費者 consumer1 從 mymq 訊息佇列中讀取所有訊息的命令如下:

# 命令最後的引數「>」,表示從第一條尚未被消費的訊息開始讀取。
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

訊息佇列中的訊息一旦被消費組裡的一個消費者讀取了,就不能再被該消費組內的其他消費者讀取了,即同一個消費組裡的消費者不能消費同一條訊息。

比如說,我們執行完剛才的 XREADGROUP 命令後,再執行一次同樣的命令,此時讀到的就是空值了:

127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 STREAMS mymq >
(nil)
登入後複製

但是,不同消費組的消費者可以消費同一條訊息(但是有前提條件,建立訊息組的時候,不同消費組指定了相同位置開始讀取訊息) 。

比如說,剛才 group1 消費組裡的 consumer1 消費者消費了一條 id 為 1665058759764-0 的訊息,現在用 group2 消費組裡的 consumer1 消費者消費訊息:

127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665058759764-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

因為我建立兩組的消費組都是從第一條訊息開始讀取,所以可以看到第二組的消費者依然可以消費 id 為 1665058759764-0 的這一條訊息。因此,不同的消費組的消費者可以消費同一條訊息。

使用消費組的目的是讓組內的多個消費者共同分擔讀取訊息,所以,我們通常會讓每個消費者讀取部分訊息,從而實現訊息讀取負載在多個消費者間是均衡分佈的。

例如,我們執行下列命令,讓 group2 中的 consumer1、2、3 各自讀取一條訊息。

# 讓 group2 中的 consumer1 從 mymq 訊息佇列中消費一條訊息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060632864-0"
         2) 1) "name"
            2) "sid10t"
            
# 讓 group2 中的 consumer2 從 mymq 訊息佇列中消費一條訊息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer2 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060633903-0"
         2) 1) "name"
            2) "sid10t"
            
# 讓 group2 中的 consumer3 從 mymq 訊息佇列中消費一條訊息
127.0.0.1:6379> XREADGROUP GROUP group2 consumer3 COUNT 1 STREAMS mymq >
1) 1) "mymq"
   2) 1) 1) "1665060634962-0"
         2) 1) "name"
            2) "sid10t"
登入後複製

基於 Stream 實現的訊息佇列,如何保證消費者在發生故障或宕機再次重新啟動後,仍然可以讀取未處理完的訊息?

Streams 會自動使用內部佇列(也稱為 PENDING List)留存消費組裡每個消費者讀取的訊息,直到消費者使用 XACK 命令通知 Streams 「訊息已經處理完成」。

消費確認增加了訊息的可靠性,一般在業務處理完成之後,需要執行 XACK 命令確認訊息已經被消費完成,整個流程的執行如下圖所示:

62.png

如果消費者沒有成功處理訊息,它就不會給 Streams 傳送 XACK 命令,訊息仍然會留存。此時,消費者可以在重新啟動後,用 XPENDING 命令檢視已讀取、但尚未確認處理完成的訊息。

例如,我們來檢視一下 group2 中各個消費者已讀取、但尚未確認的訊息個數,命令如下:

127.0.0.1:6379> XPENDING mymq group2
1) (integer) 4
2) "1665058759764-0"
3) "1665060634962-0"
4) 1) 1) "consumer1"
      2) "2"
   2) 1) "consumer2"
      2) "1"
   3) 1) "consumer3"
      2) "1"
登入後複製

如果想檢視某個消費者具體讀取了哪些資料,可以執行下面的命令:

# 檢視 group2 裡 consumer2 已從 mymq 訊息佇列中讀取了哪些訊息
127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
1) 1) "1665060633903-0"
   2) "consumer2"
   3) (integer) 1888805
   4) (integer) 1
登入後複製

可以看到,consumer2 已讀取的訊息的 ID 是 1665060633903-0。

一旦訊息 1665060633903-0 被 consumer2 處理了,consumer2 就可以使用 XACK 命令通知 Streams,然後這條訊息就會被刪除。

127.0.0.1:6379> XACK mymq group2 1665060633903-0
(integer) 1
登入後複製

當我們再使用 XPENDING 命令檢視時,就可以看到,consumer2 已經沒有已讀取、但尚未確認處理的訊息了。

127.0.0.1:6379> XPENDING mymq group2 - + 10 consumer2
(empty array)
登入後複製

小結

好了,基於 Stream 實現的訊息佇列就說到這裡了,小結一下:

  • 訊息保序:XADD/XREAD

  • 阻塞讀取:XREAD block

  • 重複訊息處理:Stream 在使用 XADD 命令,會自動生成全域性唯一 ID;

  • 訊息可靠性:內部使用 PENDING List 自動儲存訊息,使用 XPENDING 命令檢視消費組已經讀取但是未被確認的訊息,消費者使用 XACK 確認訊息;

  • 支援消費組形式消費資料

Redis 基於 Stream 訊息佇列與專業的訊息佇列有哪些差距?

一個專業的訊息佇列,必須要做到兩大塊:

  • 訊息不可丟。

  • 訊息可堆積。

1、Redis Stream 訊息會丟失嗎?

使用一個訊息佇列,其實就分為三大塊:生產者、佇列中介軟體、消費者,所以要保證訊息就是保證三個環節都不能丟失資料。

63.png

Redis Stream 訊息佇列能不能保證三個環節都不丟失資料?

  • Redis 生產者會不會丟訊息?生產者會不會丟訊息,取決於生產者對於異常情況的處理是否合理。 從訊息被生產出來,然後提交給 MQ 的過程中,只要能正常收到 ( MQ 中介軟體) 的 ack 確認響應,就表示傳送成功,所以只要處理好返回值和異常,如果返回異常則進行訊息重發,那麼這個階段是不會出現訊息丟失的。

  • Redis 消費者會不會丟訊息?不會,因為 Stream ( MQ 中介軟體)會自動使用內部佇列(也稱為 PENDING List)留存消費組裡每個消費者讀取的訊息,但是未被確認的訊息。消費者可以在重新啟動後,用 XPENDING 命令檢視已讀取、但尚未確認處理完成的訊息。等到消費者執行完業務邏輯後,再傳送消費確認 XACK 命令,也能保證訊息的不丟失。

  • Redis 訊息中介軟體會不會丟訊息?會,Redis 在以下 2 個場景下,都會導致資料丟失:

AOF 持久化設定為每秒寫盤,但這個寫盤過程是非同步的,Redis 宕機時會存在資料丟失的可能;

主從複製也是非同步的,主從切換時,也存在丟失資料的可能 (opens new window)。

可以看到,Redis 在佇列中介軟體環節無法保證訊息不丟。像 RabbitMQ 或 Kafka 這類專業的佇列中介軟體,在使用時是部署一個叢集,生產者在釋出訊息時,佇列中介軟體通常會寫「多個節點」,也就是有多個副本,這樣一來,即便其中一個節點掛了,也能保證叢集的資料不丟失。

2、Redis Stream 訊息可堆積嗎?

Redis 的資料都儲存在記憶體中,這就意味著一旦發生訊息積壓,則會導致 Redis 的記憶體持續增長,如果超過機器記憶體上限,就會面臨被 OOM 的風險。

所以 Redis 的 Stream 提供了可以指定佇列最大長度的功能,就是為了避免這種情況發生。

當指定佇列最大長度時,佇列長度超過上限後,舊訊息會被刪除,只保留固定長度的新訊息。這麼來看,Stream 在訊息積壓時,如果指定了最大長度,還是有可能丟失訊息的。

但 Kafka、RabbitMQ 專業的訊息佇列它們的資料都是儲存在磁碟上,當訊息積壓時,無非就是多佔用一些磁碟空間。

因此,把 Redis 當作佇列來使用時,會面臨的 2 個問題:

  • Redis 本身可能會丟資料;

  • 面對訊息擠壓,記憶體資源會緊張;

所以,能不能將 Redis 作為訊息佇列來使用,關鍵看你的業務場景:

  • 如果你的業務場景足夠簡單,對於資料丟失不敏感,而且訊息積壓概率比較小的情況下,把 Redis 當作佇列是完全可以的。

  • 如果你的業務有海量訊息,訊息積壓的概率比較大,並且不能接受資料丟失,那麼還是用專業的訊息佇列中介軟體吧。

補充:Redis 釋出/訂閱機制為什麼不可以作為訊息佇列?

釋出訂閱機制存在以下缺點,都是跟丟失資料有關:

  • 釋出/訂閱機制沒有基於任何資料型別實現,所以不具備「資料持久化」的能力,也就是釋出/訂閱機制的相關操作,不會寫入到 RDB 和 AOF 中,當 Redis 宕機重新啟動,釋出/訂閱機制的資料也會全部丟失。

  • 釋出訂閱模式是 「發後既忘」 的工作模式,如果有訂閱者離線重連之後不能消費之前的歷史訊息。

  • 當消費端有一定的訊息積壓時,也就是生產者傳送的訊息,消費者消費不過來時,如果超過 32M 或者是 60s 內持續保持在 8M 以上,消費端會被強行斷開,這個引數是在組態檔中設定的,預設值是 client-output-buffer-limit pubsub 32mb 8mb 60。

所以,釋出/訂閱機制只適合即使通訊的場景,比如構建哨兵叢集 (opens new window)的場景採用了釋出/訂閱機制。

推薦學習:

以上就是Redis特殊資料型別之stream的詳細內容,更多請關注TW511.COM其它相關文章!