Redis Stream是5.0版本之後新增的一種資料結構,其結構類似於‘僅追加紀錄檔’。但也實現了多種操作來克服‘僅追加紀錄檔’的一些限制,如讀取策略(xread,xrange....)。.....
xadd
向流新增新節點(返回節點id)127.0.0.1:6379> XADD mystream1 * name a1
"1688888761768-0"
127.0.0.1:6379> XADD mystream1 * name a2
"1688888772896-0"
127.0.0.1:6379> XADD mystream1 * name a3
"1688888782841-0"
127.0.0.1:6379> XADD mystream1 * name a4
"1688888786201-0"
第一個引數=‘mystream’: stream的key。
第二個引數=‘*’: stream節點的id生成策略,能保證自增且格式為(毫秒值-序列號),即使計算機時間不正確。
第三個引數=‘name a1’: 一個key-value的鍵值對。
XREAD
讀取一個或多個節點,從給定位置開始並向前移動。127.0.0.1:6379> XREAD COUNT 2 STREAMS mystream1 0
1) 1) "mystream1"
2) 1) 1) "1688888761768-0"
2) 1) "name"
2) "a1"
2) 1) "1688888772896-0"
2) 1) "name"
2) "a2"
第一個引數=‘COUNT 2’: 一次讀取兩條資料。
第二個引數=‘STREAMS mystream1’: 讀取key等於mystream1的stream。
第三個引數=‘0’: 讀取策略-stream節點id大於它的數,根據id生成策略任何id都大於0,故重頭讀取。
若=’$‘ :則可讀取id大於當前最大值的id,即讀取最新的節點。
其他引數:
如: 'BLOCK 0'(堵塞讀取,0表示無限堵塞)
備註: 對於不同的客戶段(client),只需簡單的設定(記錄自己的最大消費id)就可實現fan-out 的消費策略。
XRANGE
返回兩個提供的條目 ID 之間的節點範圍。127.0.0.1:6379> XRANGE mystream1 - + COUNT 3
1) 1) "1688888761768-0"
2) 1) "name"
2) "a1"
2) 1) "1688888772896-0"
2) 1) "name"
2) "a2"
3) 1) "1688888782841-0"
2) 1) "name"
2) "a3"
第一個引數=‘mystream1’: key等於mystream1的stream。
第二個引數=‘- +’: 特殊含義,-(表示id最小值),+(表示id最大值)。
第三個引數=‘COUNT 3’: 一次最多讀取3個
XLEN
返回流的長度(節點數)。127.0.0.1:6379> XLEN mystream1
(integer) 4
與XREAD指令不同,消費組可實現再同一個消費組內的消費者只消費未被消費的訊息。假設我們想象有三個消費者 C1、C2、C3 和一個包含訊息 1、2、3、4、5、6、7 的流,那麼我們想要的是按照下圖提供訊息:
1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1
+----------------------------------------+
| 消費組名: mygroup
| 消費的Stream: somekey
| 最近一次消費的id: 1292309234234-92
|
| 消費者陣列:
| "消費者-1" 未ack的訊息
| 1292309234234-4
| 1292309234232-8
| "消費者-2" 未ack的訊息
| ... (等等)
+----------------------------------------+
127.0.0.1:6379> XGROUP CREATE mystream1 mygroup $
OK
指令解釋
第一個引數=‘mystream1’: 消費的Stream的key。
第二個引數=‘mygroup’:命名消費組名稱。
第三個引數=‘ $’: 節點的id,表示消費只消費大於該id的訊息,特殊字元$表示只消費該組建立後的訊息。
可選引數'MKSTREAM‘ 表示自動建立stream(當stream不存在時)。
127.0.0.1:6379> XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream1 >
(nil) #沒有任何訊息
# 因為4.1建立時用的三個引數=‘ $’
指令解釋
第一個引數=‘mygroup’: 消費的Stream的key。
第二個引數=‘Alice’:消費者名稱。
第三個引數=‘ COUNT 1’: 每次消費的最大數量。
第四個引數=‘ mystream1: 消費的Stream的key。
第五個引數=‘ >’: 表示未消費的訊息。
127.0.0.1:6379> XGROUP CREATE mystream1 mygroup1 0 # 建立消費組mygroup1
OK
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 > # 第1次消費Alice
1) 1) "mystream1"
2) 1) 1) "1688957166436-0"
2) 1) "name"
2) "a1"
......
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 > #第5次消費 Alice
(nil)
127.0.0.1:6379> XADD mystream1 * name a5 #新增訊息
"1688959089805-0"
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice COUNT 1 STREAMS mystream1 > #第6次消費 Alice
1) 1) "mystream1"
2) 1) 1) "1688959089805-0"
2) 1) "name"
2) "a5"
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice2 COUNT 1 STREAMS mystream1 > #第7次消費 Alice2
(nil)
127.0.0.1:6379> XADD mystream1 * name a6 #新增訊息
"1688959797006-0"
127.0.0.1:6379> XREADGROUP GROUP mygroup1 Alice2 COUNT 1 STREAMS mystream1 > #第8次消費 Alice2
1) 1) "mystream1"
2) 1) 1) "1688959797006-0"
2) 1) "name"
2) "a6"
127.0.0.1:6379> XINFO CONSUMERS mystream1 mygroup1
1) 1) "name" #消費者名稱
2) "Alice"
3) "pending" #待確認的訊息數 (ack)
4) (integer) 5
5) "idle"
6) (integer) 711930
2) 1) "name" #消費者名稱
2) "Alice2"
3) "pending" #待確認的訊息數(ack)
4) (integer) 1
5) "idle"
6) (integer) 4160
127.0.0.1:6379> XACK mystream1 mygroup1 1688959797006-0
(integer) 1
參考資料:https://redis.io/docs/data-types/streams/
Stream全部命令:https://redis.io/commands/?group=stream