該資料結構需要 Redis 5.0.0 + 版本才可用使用
Redis stream 是 Redis 5 引入的一種新的資料結構,它是一個高效能、高可靠性的訊息佇列,主要用於非同步訊息處理和流式資料處理。在此之前,想要使用 Redis 實現訊息佇列,通常可以使用例如:列表,有序集合、釋出與訂閱 3 種資料結構。但是 stream 相比它們具有以下的優勢:
話不多說,接下來具體看看如何使用它。(PS:萬字長文,行駛途中請繫好安全帶)
XADD 命令的語法格式如下:
XADD stream-name id field value [field value]
*
號表示自動生成關於使用 XADD 新增元素,還有以下特點:
my-stream
流不存在時,redis 會自動建立,然後將元素追加在流的末尾處下面是一個使用 XADD 命令新增新訊息的範例:
XADD my-stream * name John age 30 email [email protected]
上述命令的說明:
my-stream
的 Redis stream 中新增了一條新訊息。*
表示使用自動生成的訊息 ID,name
、age
和 email
是訊息的欄位名John
、30
和 [email protected]
是訊息的欄位值。XADD 命令在成功執行後會返回元素 ID 作為結果:
"1681138020163-0"
每個元素的 ID 是一個遞增的唯一識別符號,由兩部分組成:一個時間戳和一個序列號。
為了證明,我們可以指定訊息 ID 向指定流中傳送一條訊息:
XADD my-stream 1681138020163-1 name Mary age 25 email [email protected]
返回結果:
"1681138020163-1"
最後,可以提前使用 XRANGE
指令檢視推入流中的資料
XRANGE my-stream - +
返回結果:
1) 1) "1681138020163-0"
2) 1) "name"
2) "John"
3) "age"
4) "30"
5) "email"
6) "[email protected]"
2) 1) "1681138020163-1"
2) 1) "name"
2) "Mary"
3) "age"
4) "25"
5) "email"
6) "[email protected]"
元素 ID 在 Redis stream 中扮演著非常重要的角色,它不僅保證了元素的唯一性和順序性,還提供了高效的範圍查詢和分析功能。在使用 Redis stream 時,需要特別注意元素 ID 的限制,並保證 ID 的唯一性和遞增性。
限制如下::
還有一些長度和特殊字元的限制等等,不符合上述限制的新增元素操作,會被 redis 拒絕,並且返回一個錯誤等。
最大元素 ID 是如何更新的 ?
在成功執行XADD命令之後,流的最大元素ID也會隨之更新。
為什麼要限制 新元素的 ID 必須比流中所有已有元素的 ID 都要大 ?
限制新元素的 ID 必須比流中所有已有元素的 ID 都要大,是為了保證 stream 中每個元素的唯一性和順序性。這種特性對於使用流實現訊息佇列和事件系統的使用者來說是非常重要的:使用者可以確信,新的訊息和事件只會出現在已有訊息和事件之後,就像現實世界裡新事件總是發生在已有事件之後一樣,一切都是有序進行的。
範例開始就演示自動生成訊息向流中推播資料,在日常使用非常方便,這裡說一下它的生成規則:
流的資料大多隻是臨時儲存的,如果不對流的長度進行限制,會出現以下情況:
為了避免該問題,在使用 Redis stream 時,可以使用 MAXLEN 選項指定 stream 的最大長度,命令格式如下:
XADD stream [MAXLEN len] id field value [field value ...]
範例:
XADD mini-stream MAXLEN 3 * k1 v1
XADD mini-stream MAXLEN 3 * k2 v2
XADD mini-stream MAXLEN 3 * k3 v3
XADD mini-stream MAXLEN 3 * k4 v4
# 我們向一個限制長度為 3 的 `mini-stream` 流中新增 4 條資料,然後檢視流內的訊息:
XRANGE mini-stream - +
1) 1) "1681140898447-0"
2) 1) "k2"
2) "v2"
2) 1) "1681140901790-0"
2) 1) "k3"
2) "v3"
3) 1) "1681140906703-0"
2) 1) "k4"
2) "v4"
最後會看到最早建立的 k1
訊息已經被移除,redis 刪除在流中存在時間最長的元素,從而來保證流的整體長度。
除了在 XADD 命令時限制流,Redis 還提供單獨限制流長度的 MAXLEN 命令,基礎語法如下:
XTRIM stream MAXLEN len
範例:
XTRIM my-stream MAXLEN 2
(integer) 1
這條命令 XTRIM my-stream MAXLEN 2
的作用是將名為 my-stream
的流修剪為最多包含 2 條訊息。換句話說,流中超出這個長度的較舊訊息將被移除。
XDEL 用於從流中刪除特定的訊息。這個命令需要提供流的鍵(key)和一個或多個訊息 ID 作為引數。當訊息被成功刪除時,XDEL
命令會返回被刪除訊息的數量。
XDEL
的基本語法如下:
XDEL key ID [ID ...]
範例:
# 這個命令將從名為 `mystream` 的流中刪除訊息 ID 為 `1681480521617-0` 的訊息。
XDEL my-stream 1681480521617-0
(integer) 1
# 你也可以傳入多個 `id` 引數進行批次刪除
XDEL my-stream 1681480524451-0 1681480526810-0 1681480965273-0
(integer) 3
注意:,XDEL
不會修改流的長度計數,這意味著刪除訊息後,流的長度保持不變。
XLEN 用於獲取流中訊息的數量。這個命令非常簡單且高效,因為它只要一個引數。
XLEN
的基本語法如下:
XLEN key
範例:
XLEN my-stream
(integer) 4
注意:XLEN
命令僅返回流中訊息的數量,並不提供訊息的具體內容。獲取訊息內容的命令,看下面的 XRANGE
XRANG
主要用於獲取流中的一段連續訊息,它還有一個非常相似的 XREVRANGE
命令,區別:
XRANGE
按照訊息 ID 順序返回結果XREVRANGE
按照訊息 ID 逆序返回結果(用來查詢流中最新的訊息,非常有用!)XRANG
的的基本語法如下:
XRANGE key start end [COUNT count]
獲取指定訊息,我們可以把 start
和 end
設定同一條訊息 ID,可以用來達到查詢指定訊息 ID 的效果。使用範例:
# 獲取指定訊息 ID
XRANGE my-stream 1681480968241-0 1681480968241-0
獲取多條訊息,可以利用 COUNT 選項引數,使用範例:
# 獲取流中最早的 5 條訊息
XRANGE my-stream - + COUNT 5
這條命令獲取流中最早的 5 條訊息(按訊息 ID 順序排序)。-
和 +
分別表示最小和最大的訊息 ID,用於獲取流中的所有訊息。
想要讀取流中全部訊息內容,移除 COUNT 即可:
# 獲取全部訊息
XRANGE my-stream - +
XREVRANGE
按照訊息 ID 逆序返回結果,基本語法如下:
XREVRANGE key end start [COUNT count]
用法完全和 XRANGE 一樣,這裡就不過多介紹了,使用範例:
XREVRANGE my-stream + - COUNT 5
這個命令將返回名為 mystream
的流中的最新的 3 條訊息(按訊息 ID 逆序排序)。
在實際業務場景中,可以利用 XRANGE
和 XREVRANGE
命令可以用於實現以下功能:
start
、end
和 COUNT
引數,可以實現對流中訊息的分頁查詢相比 XRANGE,XREVRANGE 類似,XREAD 也是用於從流中讀取訊息的命令,但它們之間有一些關鍵區別:
BLOCK
阻塞等待時間引數,控制阻塞時間XREAD 的阻塞模式,可以更好的構建實時資料處理應用程式,如事件驅動系統、實時分析系統等。
XREAD
命令的基本語法如下:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
查詢的話,除了同時讀取多個流的特點外,其他和 XRANGE,XREVRANGE 類似。
使用範例:
XREAD STREAMS my-stream 0
這個命令將從名為 my-stream
的流中讀取訊息,0 代表讀取所有訊息,如果指定的訊息 ID,表示從該訊息 ID 之後開始讀取
XREAD STREAMS my-stream mini-stream 0 0
這個命令將從名為 my-stream
和 mini-stream
的流中分別讀取所有訊息,後面的 2 個引數 0 分別對應 2 個訊息 ID 0
開始的位置
當使用阻塞模式時,XREAD
命令會在以下幾種情況下表現出不同的行為:
使用範例:
如果流中有滿足條件的訊息(即從指定的訊息 ID 之後的新訊息),那麼 XREAD
命令會立即返回這些訊息,不會發生阻塞。
XREAD BLOCK 1000000 COUNT 1 STREAMS my-stream 0
1) 1) "my-stream"
2) 1) 1) "1681480968241-0"
2) 1) "k5"
2) "v5"
XREAD
命令解除阻塞也分 2 情況:超時,新訊息到達
範例程式碼:
# 超時: 阻塞超時,沒有新訊息到達,解除阻塞
XREAD BLOCK 5000 STREAMS my-stream 1681482023346-0
(nil)
(5.09s)
# 新訊息到達: 新訊息到達,且滿足讀取條件 (新訊息的 ID 大於指定的訊息 ID) 解除阻塞
XREAD BLOCK 50000 STREAMS my-stream 1681482023346-0
1) 1) "my-stream"
2) 1) 1) "1681485525804-0"
2) 1) "newMessage"
2) "v1"
(18.46s)
如果設定的阻塞等待時間為 0,那麼 XREAD
命令會一直阻塞:
範例程式碼:
XREAD BLOCK 0 STREAMS my-stream $
這個命令將一直阻塞等待,直到新訊息到達。$
符號表示唯讀取新訊息。
當然如果使用者端主動斷開連線,阻塞的 XREAD
命令也會被取消
在實際應用中,XREAD
使用阻塞模式,可以在新訊息到達時立即處理,實現實時訊息處理。
在 Redis 流的訊息模型中,是通過消費者組(Consumer Group)來組織和管理多個消費者以協同處理來自同一個流的訊息的機制。消費者組的主要目的是在多個消費者之間分發訊息,實現負載均衡、高可用性和容錯能力。
工作原理:
如圖所示:
使用消費者組這種模型的設計,以為在 Redis Stream 中實現以下功能:
接下來我們再詳細說明消費組相關的命令使用
通過 XGROUP
命令可以為你的 Redis Stream 建立和管理消費組。
命令格式如下:
XGROUP CREATE stream group id
引數說明:
<stream>
:要關聯的流的鍵。<group>
:消費組的名稱。<id>
:開始讀取訊息的起始 ID。通常使用 $
表示僅消費新訊息,或者使用 0
表示消費流中的所有訊息。[MKSTREAM]
(可選):如果流不存在,自動建立一個新的流。使用範例:
# 建立消費組,如果流不存在則自動建立
XGROUP CREATE mystream mygroup $ MKSTREAM
OK
# 檢視流中的消費組
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
以上命令是使用 XGROUP CREATE
命令建立一個名為 mygroup
的消費組,從最新的訊息開始消費,使用 MKSTREAM
選項,如果流不存在則會自動建立流,返回 OK 既代表建立成功。最後使用 XINFO 檢視結果。
在某些情況下,你可能想要消費組忽略某些訊息,或者重新處理某些訊息來重現 bug,那麼可以使用 XGROUP SETID
命令設定消費組的起始訊息 ID。
命令格式非常簡單:
XGROUP SETID stream group id
使用範例:
# 設定 mygroup 組的最新訊息為指定 ID
XGROUP SETID mystream mygroup 1681655893911-0
OK
# 檢視消費組
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681655893911-0" # 已被改變
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 4
# 設定 mygroup 組的最新訊息為流的最新訊息 ID
XGROUP SETID mystream mygroup $
# 檢視消費組
127.0.0.1:6379> XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "1681655916001-0" # 已更新
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
以上命令將 mygroup
組的最新訊息 ID 更新為指定 ID 和流的最新 ID 的使用範例。
使用 XREADGROUP
命令讀取消費組裡面的訊息,基本語法:
XREADGROUP GROUP <group> <consumer> [COUNT <n>] [BLOCK <ms>] STREAMS <stream_key_1> <stream_key_2> ... <id_1> <id_2> ...
引數說明:
<group>
:消費組的名稱。<consumer>
:消費者的名稱。<n>
(可選):要讀取的最大訊息數。<ms>
(可選):阻塞等待新訊息的時間(以毫秒為單位)。<stream_key_1>
, <stream_key_2>
:要從中讀取訊息的流的鍵。<id_1>
, <id_2>
:從每個流中開始讀取的訊息 ID,通常使用特殊字元 >
表示從上次讀取的位置開始讀取新的訊息。使用範例:
我們建立一個 myconsumer
的消費組讀取上面建立 mygroup
消費組的資訊,以下是多種用法範例:
# 以 myconsumer 消費者身份從 mystream 中讀取分配給 mygroup 的訊息
# 讀取所有最新的訊息(常用)
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
(nil)
# 其他用法:
# 讀取最多 10 條訊息
XREADGROUP GROUP mygroup myconsumer COUNT 10 STREAMS mystream >
# 進行阻塞讀取最新訊息
XREADGROUP GROUP mygroup myconsumer BLOCK 5000 STREAMS mystream >
這裡拿不到資料是因為我們上面把消費組 mygroup 的訊息 ID 設定為最新,我們嘗試修改訊息 ID 重新消費試試
# 設定消費組的訊息 ID,進行重新消費
XGROUP SETID mystream mygroup 1681655893911-0
# 消費組 myconsumer 讀取消費組的訊息
XREADGROUP GROUP mygroup myconsumer STREAMS mystream >
1) 1) "mystream"
2) 1) 1) "1681655897993-0"
2) 1) "k1"
2) "v1"
2) 1) "1681655899297-0"
2) 1) "k1"
2) "v1"
3) 1) "1681655915496-0"
2) 1) "k1"
2) "v1"
4) 1) "1681655916001-0"
2) 1) "k1"
2) "v1"
# 檢視消費組的資訊
XINFO GROUPS mystream
1) 1) "name"
2) "mygroup"
3) "consumers"
4) (integer) 1 # 消費組有一個消費者
5) "pending"
6) (integer) 4 # 有 4 條正在處理的訊息
7) "last-delivered-id"
8) "1681655916001-0"
9) "entries-read"
10) (nil)
11) "lag"
12) (integer) 0
通過以上命令可以確認,myconsumer
消費者拿到 mygroup
消費組的訊息未確認處理,所以看到有 4 條訊息正在等待處理中。
通過 XPENDING 命令,可以獲取指定流的指定消費者組目前的待處理訊息的相關資訊。在很多場景下,你需要通過它來觀察和了解消費者的處理情況,從而做出處理,例如以下場景:
XPENDING
命令檢測到積壓訊息XPENDING
命令,您可以在發現掛起訊息數量超過預設閾值時觸發報警基本語法:
XPENDING stream group [start stop count] [consumer]
引數說明:
<stream>
:流的鍵。<group>
:消費組的名稱。<start>
(可選):掛起訊息範圍的起始 ID。<stop>
(可選):掛起訊息範圍的結束 ID。<count>
(可選):返回的最大掛起訊息數。<consumer>
(可選):篩選特定消費者的掛起訊息。使用範例:
使用 XPENDING
命令檢視上面的 mygroup
組的訊息去哪兒了:
XPENDING mystream mygroup
1) (integer) 4 # 待處理訊息數量
2) "1681655897993-0" # 首條訊息 ID
3) "1681655916001-0" # 最後一條訊息的 ID
4) 1) 1) "myconsumer" # 各消費者正在處理的訊息數量
2) "4"
以上展示的彙總資訊,你還可以通過以下命令,檢視待處理訊息更詳細的資訊:
# 檢視指定待處理訊息
XPENDING mystream mygroup 1681655897993-0 1681655897993-0 1
1) 1) "1681655897993-0" # 訊息 ID
2) "myconsumer" # 所屬消費者
3) (integer) 2397387 # 最後一次投遞時間
4) (integer) 1 # 投遞次數
從以上資訊你可以看到訊息正在被誰處理和處理的時間,你也可以指定消費者檢視資訊:
XPENDING mystream mygroup - + 10 myconsumer
1) 1) "1681655897993-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
2) 1) "1681655899297-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
3) 1) "1681655915496-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
4) 1) "1681655916001-0"
2) "myconsumer"
3) (integer) 2591145
4) (integer) 1
以上命令列出 myconsumer 消費者所有待處理的訊息的詳細資訊
XACK 用於確認消費組中的特定訊息已被處理。在消費者成功處理訊息後,應使用 XACK
命令通知 Redis,以便從消費組的掛起訊息列表中移除該訊息。
命令格式:
XACK stream group id [id id ...]
使用範例:
通過 XACK 命令,我們將上面 myconsumer 消費者的訊息進行確認處理:
# 確認訊息
XACK mystream mygroup 1681655897993-0
(integer) 1
# .....
當消費者對所有訊息進行處理後,再檢視消費組內容進行驗證:
XPENDING mystream mygroup - + 10 myconsumer
(empty array)
XPENDING mystream mygroup
1) (integer) 0
2) (nil)
3) (nil)
4) (nil)
使用 XACK
可以確保訊息不會重複處理防止其他消費者或相同消費者在故障恢復後重復處理該訊息等等好處。
XCLAIM
訊息轉移類似我們生活中的來電轉駁,當一個消費者無法處理某個訊息或出現故障時,XCLAIM
可以確保其他消費者接管並處理這些訊息。命令格式非常簡單:
XCLAIM stream group new_consumer max_pending_time id [id id id]
使用範例:
# 使用 XPENDING 命令查詢消費組中掛起的訊息
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
2) "2"
# 使用 XCLAIM 命令將訊息轉移
XCLAIM mystream mygroup myconsumer2 10000 1681660259887-0
1) 1) "1681660259887-0" # 被轉移的訊息 ID
2) 1) "k1" # 訊息內容
2) "v1"
上面的命令意思是:如果訊息 ID 1681660259887-0 處理時間超過 10000ms,那麼訊息轉移給 myconsumer2,我們使用 XPENDING 命令來驗證:
XPENDING mystream mygroup
1) (integer) 2
2) "1681660259887-0"
3) "1681660263096-0"
4) 1) 1) "myconsumer"
2) "1"
2) 1) "myconsumer2"
2) "1"
XINFO 用於獲取流或消費組的詳細資訊。XINFO
命令有多個子命令,可以提供不同型別的資訊。
以下是一些常用的 XINFO
子命令及其介紹:
XINFO STREAM:此子命令用於獲取流的詳細資訊,包括長度、消費組數量、第一個和最後一個條目等。例如:
XINFO STREAM mystream
XINFO GROUPS:此子命令用於獲取流中消費組的列表及其相關資訊。例如:
XINFO GROUPS mystream
XINFO CONSUMERS:此子命令用於獲取消費組中消費者的列表及其相關資訊。例如:
XINFO CONSUMERS mystream mygroup
通過使用這些子命令,您可以瞭解流、消費組和消費者的狀態,從而監控和優化 Redis Stream 應用程式的效能。在處理問題或分析系統效能時,這些資訊可能特別有用。
當用戶不再需要某個消費者的時候,可以通過執行以下命令將其刪除,命令格式:
XGROUP DELCONSUMER stream group consumer
使用範例:
# 刪除 myconsumer 消費者
XGROUP DELCONSUMER mystream mygroup myconsumer
(integer) 1
當你不需要消費組時,可以通過以下命令刪除它,命令格式:
XGROUP DESTROY stream group
使用範例:
# 刪除 mygroup 消費組
XGROUP DESTROY mystream mygroup
(integer) 1
以下是本篇文章涉及的 Redis Stream 命令命令和簡要總結:
這些命令提供了對 Redis Stream 的全面操作支援,包括新增、刪除、讀取、修剪訊息以及管理消費組和消費者。通過熟練使用這些命令,您可以實現高效且可延伸的訊息傳遞和紀錄檔處理系統。edis Stream 是 Redis 提供的一種強大、持久且可延伸的資料結構,用於實現訊息傳遞和紀錄檔處理等場景。Stream 資料結構類似於紀錄檔檔案,訊息以有序的方式儲存在流中,同時還支援消費組的概念,允許多個消費者並行處理訊息。