Redis系列1:深刻理解高效能Redis的本質
Redis系列2:資料持久化提高可用性
Redis系列3:高可用之主從架構
Redis系列4:高可用之Sentinel(哨兵模式)
Redis系列5:深入分析Cluster 叢集模式
追求效能極致:Redis6.0的多執行緒模型
追求效能極致:使用者端快取帶來的革命
Redis系列8:Bitmap實現億萬級資料計算
Redis系列9:Geo 型別賦能億級地圖位置計算
Redis系列10:HyperLogLog實現海量資料基數統計
Redis系列11:記憶體淘汰策略
Redis系列12:Redis 的事務機制
Redis系列13:分散式鎖實現
在分散式系統中,很重要的一個能力就是訊息中介軟體。我們通過訊息佇列實現 功能解耦、訊息有序性、訊息路由、非同步處理、流量削峰 等能力。
目前主流的Mq主要有 RabbitMQ 、RocketMQ、kafka,可以參考這篇《MQ系列2:訊息中介軟體技術選型》。
那除了這些主流MQ之外,咱們的這一節要說的Redis也具備實現訊息佇列的能力。
我們來看看訊息佇列主要要實現哪些能力,原理是什麼,以及如何在 Redission 中應用。
訊息中介軟體是指在分散式系統中完成訊息的傳送和接收的基礎軟體。
訊息中介軟體也可以稱訊息佇列(Message Queue / MQ),用高效可靠的訊息傳遞機制進行與平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息佇列模型,可以在分散式環境下擴充套件程序的通訊。
簡而言之,網際網路場景中經常使用訊息中介軟體進行訊息路由、訂閱釋出、非同步處理等操作,來緩解系統的壓力。
1、解耦: 比如說系統A會交給系統B去處理一些事情,但是A不想直接跟B有關聯,避免耦合太強,就可以通過在A,B中間加入訊息佇列,A將要任務的事情交給訊息佇列 ,B訂閱訊息佇列來執行任務。
這種場景很常見,比如A是訂單系統,B是庫存系統,可以通過訊息佇列把削減庫存的工作交予B系統去處理。如果A系統同時想讓B、C、D...多個系統處理問題的時候,這種優勢就更加明顯了。
2、有序性: 先進先出原理,先來先處理,比如一個系統處理某件事需要很長一段時間,但是在處理這件事情時候,有其他人也發出了請求,可以把請求放在訊息隊裡,一個一個來處理。
對資料的順序性和一致性有強需求的業務,比如同一張銀行卡同時被多個入口使用,需要保證入賬出賬的順序性,避免出現資料不一致。
3、訊息路由: 按照不同的規則,將佇列中訊息傳送到不同的其他佇列中
通過訊息佇列將不同染色的請求傳送到不同的服務去操作。這樣達成了流量按照業務拆分的目的。
4、非同步處理: 處理一項任務的時候,有3個步驟A、B、C,需要先完成A操作, 然後做B、C 操作。任務執行成功與否強依賴A的結果,但不依賴B、C 的結果。
如果我們使用序列的執行方式,那處理任務的週期就會變長,系統的整體吞吐能力也會降低(在同一個系統中做非同步其實也是比較大的開銷),所以使用訊息佇列是比較好的辦法。
登入操作就是典型的場景:A:執行登入並得到結果、B:記錄登入紀錄檔、C:將使用者資訊和Token寫入快取。 執行完A就可以從登入頁跳到首頁了,B、C讓服務慢慢去消化,不阻塞當前操作。
5、削峰: 將峰值期間的操作削減,比如A同學的整個操作流程包含12個步驟,後續的11個步驟是不需要強關注結果的資料,可以放在訊息佇列中。
詳細可參考筆者這篇《MQ系列1:訊息中介軟體執行原理》。
正如上面提到的有序性一樣,他能夠保證訊息按照生產的順序進行處理和消費,避免訊息被無序處理的情況發生。
同樣的,生產和消費的訊息需要保證冪等性原理。避免出現重複執行的情況,
而訊息佇列的去重機制,也需要確保避免訊息被重複消費的問題。
訊息佇列的資料可以實現重試、持久化儲存、死信佇列記錄等,以避免訊息無法成功傳遞所產生的不一致現象。
當訊息伺服器或者消費者恢復健康的時候,可以繼續讀取訊息進行處理,防止訊息遺漏。
稍微學過資料結構都知道。我們經常說Queue(佇列),他的儲存和使用規則是【先進先出】,棧的儲存和使用規則是【先進後出】。
所以List本質上是一個線性的有序結構,也就是Queue的儲存關係,它能夠保證消費的有序性,按照順序進行處理。
即進行訊息生產,入列操作語法:
LPUSH key element[element...]
如果key存在,Producer 通過 LPUSH 將訊息插入該佇列的頭部;如果 key 不存在,則是先建立一個空佇列,然後在進行資料插入。
下面舉個例子,往佇列中插入幾個訊息,然後得到的返回值是插入訊息的個數。
> LPUSH msg_queue msg1 msg2 msg3
(integer) 3
這邊往 key 為 msg_queue 的佇列中插入了三個訊息 msg1、msg2、msg3。
即進行訊息消費,消費的順序是先進先出(先生產先消費),出列使用的語法如下:
> RPOP msg_queue
"msg1"
> RPOP msg_queue
"msg2"
> RPOP msg_queue
"msg3"
> RPOP msg_queue
(nil)
都消費完成之後,就是nil了。
不同於常規的MQ,具備訂閱模式,消費者可以感知到有新的訊息生產出來了,再進行消費。
List的問題在於,生產者向佇列插入資料的時候,List 並不會主動通知消費者,所以消費者做不到及時消費。
為了保證消費的及時,可能需要做一個心跳包(1秒執行一次),不斷地執行 RPOP 指令,當探測到有新訊息就會取出訊息進行消費,沒有訊息的時候就返回nil。
但是這種也存在明顯的短板,就是不斷的呼叫 RPOP 指令,佔用 I/O 資源和CPU資源。
比較好的解決辦法就是在佇列為空佇列的時候,暫停讀取,等有訊息入列的時候,恢復取數和消費的工作,這樣也避免了無效的資源浪費。
Redis 提供了 BLPOP、BRPOP ,無資料的時候自動阻塞讀取的命令,有新訊息進入的時候,恢復訊息取數,如下:
# BRPOP key timeout
BRPOP msg_queue 0
命令最後一個引數 timeout 是超時時間,單位是秒,如果 timeout 大於0,則到達指定的秒數即使沒有彈出成功也會返回,如果 timeout 的值為0,則會一直阻塞等待其他連線向列表中插入元素, timeout 引數不允許為負數。
目前 List 沒有純冪等的鑑別能力,但是可以通過以下兩種方法來實現:
可靠性傳輸我們在MQ篇章用了一整節來介紹持久化儲存、訊息ACK 、二次記錄保障。這邊我們也來看看Redis List中的可靠性傳輸的保障。
Redis中缺少了一個訊息確認(ACK)的機制,如果消費資料的時候執行崩潰了,沒有確認機制,很可能這條訊息就被錯過了,無法保證資料的一致性。
解決方案:Redis 提供了 RPOPLPUSH
指令,當List讀取訊息的時候,會同步的把該訊息複製到另外一個List以作備份。
整個操作過程是具備原子性的,避免讀取訊息了,但是同步備份不成功。
如果出現處理訊息出現故障的情況,在故障回覆之後,可以從備份的List中複製訊息繼續消費。操作如下:
# 生產訊息 msg1 msg2
> LPUSH list_queue msg1 msg2
(integer) 2
# 消費訊息並同步到備份
> RPOPLPUSH list_queue list_queue_bak
"msg1"
# 當發生故障的時候去消費備份的資料,可以消費到
> RPOP list_queue_bak
"msg1"
如果消費成功則把 list_queue_bak 訊息刪除即可,如果發生故障,則可以繼續從 list_queue_bak 再次讀取訊息處理。
這邊以Java SpringBoot為例子進行說明,可以參考官方檔案。
# maven資訊
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.8</version>
</dependency>
# 基本設定
spring:
application:
name: redission_test
redis:
host: x.x.x.x
port: 6379
ssl: false
password: xxxx.xxxx
@Slf4j
@Service
public class RedisQueueService {
@Autowired
private RedissonClient redissonClient;
private static final String REDIS_QUEUE = "listQueue";
/**
* 訊息生產
*
* @param msg
*/
public void msgProduce(String msg) {
RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
try {
blockDeque.putFirst(msg); // 訊息寫入佇列頭部
} catch (InterruptedException e) {
log.error(e.printStackTrace());
}
}
/**
* 訊息消費:阻塞
*/
public void msgConsume() {
RBlockingDeque<String> blockDeque = redissonClient.getBlockingDeque(REDIS_QUEUE);
Boolen isCheck = true;
while (isCheck) {
try {
String msg = blockDeque.takeLast(); // 從佇列中取出訊息
} catch (InterruptedException e) {
log.error(e.printStackTrace());
}
}
}