淺析Redis中怎麼使用訊息佇列

2022-01-05 10:00:09
本篇文章帶大家瞭解一下Redis進階用法-訊息佇列,介紹一下Redis中的延時佇列,希望對大家有所幫助!

說到訊息佇列中介軟體,我們都會想到RabbitMQ、RocketMQ和Kafka,來給應用實現非同步訊息傳遞的功能。這些都是專業的訊息佇列中介軟體,其特性之多超出了我們的理解能力。

而這些訊息中介軟體使用起來的是複雜的,例如RabbitMQ,發訊息之前要建立Exchange,還要建立Queue,然後將Exchange和Queue通過某種規則繫結起來,傳送訊息的時候還要制定routing-key,還要 控制頭訊息。這僅是生產者,消費者在消費訊息之前也要將上面一系列的繁瑣步驟再操作一遍。

那麼對於那些並不要求百分百可靠,並且希望實現簡單的訊息佇列需求時,我們可以通過Redis將我們從訊息佇列的中介軟體的繁瑣步驟中解脫出來。

Redis的訊息佇列不是專業的訊息佇列,他並沒有訊息佇列中許多的高階特性,也沒有ack保證。如果對訊息的可靠性有著極致的追求,請轉向專業的MQ中介軟體。【相關推薦:Redis視訊教學

非同步訊息佇列

從最簡單的非同步訊息佇列開始,Redis的list資料結構常用來作為非同步訊息佇列,通過lrpush/lpush來操作入列,通過rpop/lpop來出列。

1.png

問題一:空佇列

對於pop操作來說,當訊息佇列空了的時候,使用者端會陷入pop的死迴圈,造成大量的浪費生命的空輪詢,導致使用者端CPU拉高,同時Redis的QPS也被拉高。

對於以上問題的解決辦法就是通過list結構的blpop/brpop來操作出列,其中b字首代表的就是blocking,阻塞讀。對於阻塞讀在佇列沒有資料的時候就會進入休眠狀態,一旦資料到來就會立刻醒來。完美的解決了上面這個問題。

問題二:空閒連線斷開

阻塞讀的方案看似完美,緊接著引出了另外一個問題:空閒連線。 如果執行緒一直阻塞在哪哪裡,Redis的使用者端連線就變成了空閒連線。空閒時間過長,Redis伺服器就會主動斷開連線,以減少閒置資源佔用。這時候blpop/brpop就會丟擲異常來。

所以,我們在編寫使用者端(應用程式)消費者的時候需要小心,注意捕獲異常,並進行重試。

應用一:延時佇列

在Redis的分散式鎖中一般有三種策略來處理加鎖失敗的情況:

  • 直接丟擲異常,前端提醒使用者是否要繼續操作;

  • sleep一會再重試;

  • 將請求放到延時佇列中,一會再重試;

而Redis中延時佇列,我們可以通過zset(有序列表)資料結構來實現。我們將訊息序列化作為一個字串作為zse的value,而訊息的到期處理時間(延時時間)作為score。然後通過輪詢zset獲取到期時間進行處理,通過zrem將key從zset移除代表成功消費,進而處理任務。

核心程式碼如下:

// 生產\
public void delay(T msg) {\
  TaskItem task = new TaskItem();\
  task.id = UUID.randomUUID().toString(); // 分配唯一的 uuid\
  task.msg = msg;\
  String s = JSON.toJSONString(task); // fastjson 序列化\
  jedis.zadd(queueKey, System.currentTimeMillis() + 5000, s); // 塞入延時佇列 ,5s 後再試\
}\
// 消費\
public void loop() {\
  while (!Thread.interrupted()) {\
   // zrangeByScore引數中0, System.currentTimeMills()代表從redis中去score範圍在0到系統當前時間的資料, 0,1表示從0開始取1個 拓展傳入的score為-inf, +inf 分別表示zset中的最大值和最小值,當你不知道zset中的score最值時就可以使用inf作為引數變數\
   Set values = jedis.zrangeByScore(queueKey, 0, System.currentTimeMillis(), 0, 1);\
   if (values.isEmpty()) {\
     try {\
       Thread.sleep(500); // 歇會繼續\
    }\
     catch (InterruptedException e) {\
       break;\
    }\
     continue;\
  }\
   String s = values.iterator().next();  //消費佇列\
   if (jedis.zrem(queueKey, s) > 0) { // 搶到了,要考慮到多執行緒下鎖爭搶的情況,只有rem成功代表成功的消費了一條訊息。\
     TaskItem task = JSON.parseObject(s, TaskType); // fastjson 反序列化\
     this.handleMsg(task.msg);\
  }\
}\
}

以上的程式碼在多執行緒中對於同一個任務被多個執行緒爭搶的情況,雖然能夠通過zrem後在處理任務來避免一個任務被多次消費的情況。但是對於那些獲取到了任務但是沒有成功消費的執行緒來說,都是白白浪費時間取了一次任務。所以可以考慮通過lua scripting來優化這個邏輯。將zrangeByScore和zrem一同挪到伺服器進行原子操作,就能夠完美解決了。

更多程式設計相關知識,請存取:!!

以上就是淺析Redis中怎麼使用訊息佇列的詳細內容,更多請關注TW511.COM其它相關文章!