Redis資料型別之Stream系列一

2023-07-10 12:00:12

一:Stream簡介

​ Redis Stream是5.0版本之後新增的一種資料結構,其結構類似於‘僅追加紀錄檔’。但也實現了多種操作來克服‘僅追加紀錄檔’的一些限制,如讀取策略(xread,xrange....)。.....

二:基本指令與用法

2.1 xadd 向流新增新節點(返回節點id)

127.0.0.1:6379> XADD mystream1 * name a1
"1688888761768-0"
127.0.0.1:6379> XADD mystream1 * name a2
"1688888772896-0"
127.0.0.1:6379> XADD mystream1 * name a3
"1688888782841-0"
127.0.0.1:6379> XADD mystream1 * name a4
"1688888786201-0"
指令解釋

​ 第一個引數=‘mystream’: stream的key。

​ 第二個引數=‘*’: stream節點的id生成策略,能保證自增且格式為(毫秒值-序列號),即使計算機時間不正確。

​ 第三個引數=‘name a1’: 一個key-value的鍵值對。

2.2 XREAD 讀取一個或多個節點,從給定位置開始並向前移動。

127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream1 0
1) 1) "mystream1"
   2) 1) 1) "1688888761768-0"
         2) 1) "name"
            2) "a1"
      2) 1) "1688888772896-0"
         2) 1) "name"
            2) "a2"
指令解釋

​ 第一個引數=‘COUNT 2’: 一次讀取兩條資料。

​ 第二個引數=‘STREAMS mystream1’: 讀取key等於mystream1的stream。

​ 第三個引數=‘0’: 讀取策略-stream節點id大於它的數,根據id生成策略任何id都大於0,故重頭讀取。

​ 若=’$‘ :則可讀取id大於當前最大值的id,即讀取最新的節點。

​ 其他引數:

​ 如: 'BLOCK 0'(堵塞讀取,0表示無限堵塞)

備註: 對於不同的客戶段(client),只需簡單的設定(記錄自己的最大消費id)就可實現fan-out 的消費策略。

2.3 XRANGE 返回兩個提供的條目 ID 之間的節點範圍。

127.0.0.1:6379> XRANGE mystream1 - + COUNT 3
1) 1) "1688888761768-0"
   2) 1) "name"
      2) "a1"
2) 1) "1688888772896-0"
   2) 1) "name"
      2) "a2"
3) 1) "1688888782841-0"
   2) 1) "name"
      2) "a3"
指令解釋

​ 第一個引數=‘mystream1’: key等於mystream1的stream。

​ 第二個引數=‘- +’: 特殊含義,-(表示id最小值),+(表示id最大值)。

​ 第三個引數=‘COUNT 3’: 一次最多讀取3個

2.4 XLEN 返回流的長度(節點數)。

127.0.0.1:6379> XLEN mystream1
(integer) 4

三:消費組(Consumer groups)

​ 與XREAD指令不同,消費組可實現再同一個消費組內的消費者只消費未被消費的訊息。假設我們想象有三個消費者 C1、C2、C3 和一個包含訊息 1、2、3、4、5、6、7 的流,那麼我們想要的是按照下圖提供訊息:

1 -> C1  
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

3.1 看圖理解消費組模式

+----------------------------------------+
| 消費組名: mygroup           			   
| 消費的Stream: somekey         				
| 最近一次消費的id: 1292309234234-92    
|                                        
| 消費者陣列:                             
|    "消費者-1" 未ack的訊息  
|       1292309234234-4                  
|       1292309234232-8                  
|    "消費者-2" 未ack的訊息  
|       ... (等等)               
+----------------------------------------+

四: 消費組相關命令

4.1 建立消費組: XGROUP CREATE

127.0.0.1:6379> XGROUP CREATE mystream1 mygroup $
OK

指令解釋

​ 第一個引數=‘mystream1’: 消費的Stream的key。

​ 第二個引數=‘mygroup’:命名消費組名稱。

​ 第三個引數=‘ $’: 節點的id,表示消費只消費大於該id的訊息,特殊字元$表示只消費該組建立後的訊息。

​ 可選引數'MKSTREAM‘ 表示自動建立stream(當stream不存在時)。

4.2 建立組內消費者,並消費訊息:XREADGROUP GROUP

127.0.0.1:6379> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream1 >
(nil)  #沒有任何訊息
# 因為4.1建立時用的三個引數=‘ $’

指令解釋

​ 第一個引數=‘mygroup’: 消費的Stream的key。

​ 第二個引數=‘Alice’:消費者名稱。

​ 第三個引數=‘ COUNT 1’: 每次消費的最大數量。

​ 第四個引數=‘ mystream1: 消費的Stream的key。

​ 第五個引數=‘ >’: 表示未消費的訊息。

範例2:建立消費組2,從頭(0)消費
127.0.0.1:6379> XGROUP CREATE mystream1 mygroup1 0   # 建立消費組mygroup1
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 >  # 第1次消費Alice
1) 1) "mystream1"
   2) 1) 1) "1688957166436-0"
         2) 1) "name"
            2) "a1"
......
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 > #第5次消費 Alice
(nil)
127.0.0.1:6379> XADD mystream1 * name a5  #新增訊息
"1688959089805-0"
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 > #第6次消費  Alice
1) 1) "mystream1"
   2) 1) 1) "1688959089805-0"
         2) 1) "name"
            2) "a5"
 127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice2 COUNT 1 STREAMS mystream1 > #第7次消費 Alice2
(nil)
127.0.0.1:6379> XADD mystream1 * name a6 #新增訊息
"1688959797006-0"
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice2 COUNT 1 STREAMS mystream1 > #第8次消費 Alice2
1) 1) "mystream1"
   2) 1) 1) "1688959797006-0"
         2) 1) "name"
            2) "a6"

4.3 檢視消費組資訊 XINFO CONSUMERS

127.0.0.1:6379> XINFO CONSUMERS mystream1 mygroup1  
1) 1) "name" #消費者名稱
   2) "Alice" 
   3) "pending" #待確認的訊息數 (ack)
   4) (integer) 5
   5) "idle"
   6) (integer) 711930
2) 1) "name" #消費者名稱
   2) "Alice2"
   3) "pending" #待確認的訊息數(ack)
   4) (integer) 1
   5) "idle"
   6) (integer) 4160

4.4 確認訊息 XACK key group id (返回表示確認成功數)

127.0.0.1:6379> XACK mystream1 mygroup1 1688959797006-0
(integer) 1

參考資料:https://redis.io/docs/data-types/streams/

Stream全部命令:https://redis.io/commands/?group=stream