用Redis實現延遲佇列,我研究了兩種方案,發現並不簡單

2023-02-14 15:02:40

大家好,我是三友~~

背景

前段時間有個小專案需要使用延遲任務,談到延遲任務,我腦子第一時間一閃而過的就是使用訊息佇列來做,比如RabbitMQ的死信佇列又或者RocketMQ的延遲佇列,但是奈何這是一個小專案,並沒有引入MQ,我也不太想因為一個延遲任務就引入MQ,增加系統複雜度,所以這個方案直接就被pass了。

雖然基於MQ這個方式走不通了,但是這個專案中使用到Redis,所以我就想是否能夠使用Redis來代替MQ實現延遲佇列的功能,於是我就查了一下有沒有現成可用的方案,別說,還真給我查到了兩種方案,並且我還仔細研究對比了這兩個方案,發現要想很好的實現延遲佇列,並不簡單。

監聽過期key

基於監聽過期key的方式來實現延遲佇列是我查到的第一個方案,為了弄懂這個方案實現的細節,我還特地去扒了扒官網,還真有所收穫

1、Redis釋出訂閱模式

一談到釋出訂閱模式,其實一想到的就是MQ,只不過Redis也實現了一套,並且跟MQ賊像,如圖:

圖中的channel的概念跟MQ中的topic的概念差不多,你可以把channel理解成MQ中的topic。

生產者在訊息傳送時需要到指定傳送到哪個channel上,消費者訂閱這個channel就能獲取到訊息。

2、keyspace notifications

在Redis中,有很多預設的channel,只不過向這些channel傳送訊息的生產者不是我們寫的程式碼,而是Redis本身。當消費者監聽這些channel時,就可以感知到Redis中資料的變化。

這個功能Redis官方稱為keyspace notifications,字面意思就是鍵空間通知。

這些預設的channel被分為兩類:

  • __keyspace@<db>__:為字首,後面跟的是key的名稱,表示監聽跟這個key有關的事件。

    舉個例子,現在有個消費者監聽了__keyspace@0__:sanyou這個channel,sanyou就是Redis中的一個普通key,那麼當sanyou這個key被刪除或者發生了其它事件,那麼消費者就會收到sanyou這個key刪除或者其它事件的訊息

  • __keyevent@<db>__:為字首,後面跟的是訊息事件型別,表示監聽某個事件

    同樣舉個例子,現在有個消費者監聽了__keyevent@0__:expired這個channel,代表了監聽key的過期事件。那麼當某個Redis的key過期了(expired),那麼消費者就能收到這個key過期的訊息。如果把expired換成del,那麼監聽的就是刪除事件。具體支援哪些事件,可從官網查。

上述db是指具體的資料庫,Redis不是預設分為16個庫麼,序號從0-15,所以db就是0到15的數位,範例中的0就是指0對應的資料庫。

3、延遲佇列實現原理

通過對上面的兩個概念瞭解之後,應該就對監聽過期key的實現原理一目瞭然了,其實就是當這個key過期之後,Redis會發佈一個key過期的事件到__keyevent@<db>__:expired這個channel,只要我們的服務監聽這個channel,那麼就能知道過期的Key,從而就算實現了延遲佇列功能。

所以這種方式實現延遲佇列就只需要兩步:

  • 傳送延遲任務,key是延遲訊息本身,過期時間就是延遲時間
  • 監聽__keyevent@<db>__:expired這個channel,處理延遲任務

4、demo

好了,基本概念和核心原理都說完了之後,又到了show me the code環節。

好巧不巧,Spring已經實現了監聽__keyevent@*__:expired這個channel這個功能,__keyevent@*__:expired中的*代表萬用字元的意思,監聽所有的資料庫。

所以demo寫起來就很簡單了,只需3步即可

引入pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

設定類

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener實現了對__keyevent@*__:expiredchannel的監聽

當KeyExpirationEventMessageListener收到Redis釋出的過期Key的訊息的時候,會發布RedisKeyExpiredEvent事件

所以我們只需要監聽RedisKeyExpiredEvent事件就可以拿到過期訊息的Key,也就是延遲訊息。

對RedisKeyExpiredEvent事件的監聽實現MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent{

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("獲取到延遲訊息:" + new String(body));
    }

}

整個工程目錄也簡單

程式碼寫好,啟動應用

之後我直接通過Redis命令設定訊息,就沒通過程式碼傳送訊息了,訊息的key為sanyou,值為task,值不重要,過期時間為5s

set sanyou task 

expire sanyou 5

如果上面都理論都正確,不出意外的話,5s後MyRedisKeyExpiredEventListener應該可以監聽到sanyou這個key過期的訊息,也就相當於拿到了延遲任務,控制檯會列印出獲取到延遲訊息:sanyou

於是我滿懷希望,靜靜地等待了5s。。

5、4、3、2、1,時間一到,我檢視控制檯,但是控制檯並沒有按照預期列印出上面那句話。

為什麼會沒列印出?難道是程式碼寫錯了?正當我準備檢查程式碼的時候,官網的一段話道出了真實原因。

我給大家翻譯一下上面這段話講的內容。

上面這段話主要討論的是key過期事件的時效性問題,首先提到了Redis過期key的兩種清除策略,就是面試八股文常背的兩種:

  • 惰性清除。當這個key過期之後,存取時,這個Key才會被清除
  • 定時清除。後臺會定期檢查一部分key,如果有key過期了,就會被清除

再後面那段話是核心,意思是說,key的過期事件釋出時機並不是當這個key的過期時間到了之後就釋出,而是這個key在Redis中被清理之後,也就是真正被刪除之後才會釋出。

到這我終於明白了,上面的例子中即使我設定了5s的過期時間,但是當5s過去之後,只要兩種清除策略都不滿足,沒人存取sanyou這個key,後臺的定時清理的任務也沒掃描到sanyou這個key,那麼就不會發布key過期的事件,自然而然也就監聽不到了。

至於後臺的定時清理的任務什麼時候能掃到,這個沒有固定時間,可能一到過期時間就被掃到,也可能等一定時間才會被掃到,這就可能會造成了使用者端從釋出到監聽到的訊息時間差會大於等於過期時間,從而造成一定時間訊息的延遲,這就著實有點坑了。。

5、坑

除了上面測試demo的時候遇到的坑之外,在我深入研究之後,還發現了一些更離譜的坑。

丟訊息太頻繁

Redis的丟訊息跟MQ不一樣,因為MQ都會有訊息的持久化機制,可能只有當機器宕機了,才會丟點訊息,但是Redis丟訊息就很離譜,比如說你的服務在重啟的時候就訊息會丟訊息。

Redis實現的釋出訂閱模式,訊息是沒有持久化機制,當訊息釋出到某個channel之後,如果沒有使用者端訂閱這個channel,那麼這個訊息就丟了,並不會像MQ一樣進行持久化,等有消費者訂閱的時候再給消費者消費。

所以說,假設服務重啟期間,某個生產者或者是Redis本身釋出了一條訊息到某個channel,由於服務重啟,沒有監聽這個channel,那麼這個訊息自然就丟了。

訊息消費只有廣播模式

Redis的釋出訂閱模式訊息消費只有廣播模式一種。

所謂的廣播模式就是多個消費者訂閱同一個channel,那麼每個消費者都能消費到釋出到這個channel的所有訊息。

如圖,生產者釋出了一條訊息,內容為sanyou,那麼兩個消費者都可以同時收到sanyou這條訊息。

所以,如果通過監聽channel來獲取延遲任務,那麼一旦服務範例有多個的話,還得保證訊息不能重複處理,額外地增加了程式碼開發量。

接收到所有key的某個事件

這個不屬於Redis釋出訂閱模式的問題,而是Redis本身事件通知的問題。

當消費者監聽了以__keyevent@<db>__:開頭的訊息,那麼會導致所有的key發生了事件都會被通知給消費者。

舉個例子,某個消費者監聽了__keyevent@*__:expired這個channel,那麼只要key過期了,不管這個key是張三還會李四,消費者都能收到。

所以如果你只想消費某一類訊息的key,那麼還得自行加一些標記,比如訊息的key加個字首,消費的時候判斷一下帶字首的key就是需要消費的任務。

所以,綜上能夠得出一個非常重要的結論,那就是監聽Redis過期Key這種方式實現延遲佇列,不穩定,坑賊多!

那有沒有比較靠譜的延遲佇列的實現方案呢?這就不得不提到我研究的第二種方案了。

Redisson實現延遲佇列

Redisson他是Redis的兒子(Redis son),基於Redis實現了非常多的功能,其中最常使用的就是Redis分散式鎖的實現,但是除了實現Redis分散式鎖之外,它還實現了延遲佇列的功能。

先來個demo,後面再來說說這種實現的原理。

1、demo

引入pom

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

封裝了一個RedissonDelayQueue類

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延遲任務:{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("新增延遲任務:{} 延遲時間:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

這個類在建立的時候會去初始化延遲佇列,建立一個RedissonClient物件,之後通過RedissonClient物件獲取到RDelayedQueue和RBlockingQueue物件,傳入的佇列名字叫SANYOU,這個名字無所謂。

當延遲佇列建立之後,會開啟一個延遲任務的消費執行緒,這個執行緒會一直從RBlockingQueue中通過take方法阻塞獲取延遲任務。

新增任務的時候是通過RDelayedQueue的offer方法新增的。

controller類,通過介面新增任務,延遲時間為5s

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

啟動專案,在瀏覽器輸入如下連線,新增任務

http://localhost:8080/add?task=sanyou

靜靜等待5s,成功獲取到任務。

2、實現原理

如下圖就是上面demo中,一個延遲佇列會在Redis內部使用到的channel和資料型別

SANYOU前面的字首都是固定的,Redisson建立的時候會拼上字首。

  • redisson_delay_queue_timeout:SANYOU,sorted set資料型別,存放所有延遲任務,按照延遲任務的到期時間戳(提交任務時的時間戳 + 延遲時間)來排序的,所以列表的最前面的第一個元素就是整個延遲佇列中最早要被執行的任務,這個概念很重要
  • redisson_delay_queue:SANYOU,list資料型別,也是存放所有的任務,但是研究下來發現好像沒什麼用。。
  • SANYOU,list資料型別,被稱為目標佇列,這個裡面存放的任務都是已經到了延遲時間的,可以被消費者獲取的任務,所以上面demo中的RBlockingQueue的take方法是從這個目標佇列中獲取到任務的
  • redisson_delay_queue_channel:SANYOU,是一個channel,用來通知使用者端開啟一個延遲任務

有了這些概念之後,再來看看整體的執行原理圖

  • 生產者在提交任務的時候將任務放到redisson_delay_queue_timeout:SANYOU中,分數就是提交任務的時間戳+延遲時間,就是延遲任務的到期時間戳
  • 使用者端會有一個延遲任務,為了區分,後面我都說是使用者端延遲任務。這個延遲任務會向Redis Server傳送一段lua指令碼,Redis執行lua指令碼中的命令,並且是原子性的

這段lua指令碼主要乾了兩件事:

  • 將到了延遲時間的任務從redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU這個目標佇列
  • 獲取到redisson_delay_queue_timeout:SANYOU中目前最早到過期時間的延遲任務的到期時間戳,然後釋出到redisson_delay_queue_channel:SANYOU這個channel中

當用戶端監聽到redisson_delay_queue_channel:SANYOU這個channel的訊息時,會再次提交一個使用者端延遲任務,延遲時間就是訊息(最早到過期時間的延遲任務的到期時間戳)- 當前時間戳,這個時間其實也就是redisson_delay_queue_channel:SANYOU中最早到過期時間的任務還剩餘的延遲時間。

此處可以等待10s,好好想想。。

這樣,一旦時間來到了上面說的最早到過期時間任務的到期時間戳,redisson_delay_queue_timeout:SANYOU中上面說的最早到過期時間的任務已經到期了,使用者端的延遲任務也同時到期,於是開始執行lua指令碼操作,及時將到了延遲時間的任務放到目標佇列中。然後再次釋出剩餘的延遲任務中最早到期的任務到期時間戳到channe中,如此迴圈往復,一直執行下去,保證redisson_delay_queue_timeout:SANYOU中到期的資料能及時放到目標佇列中。

所以,上述說了一大堆的主要的作用就是保證到了延遲時間的任務能夠及時被放到目標佇列。

這裡再補充兩個特殊情況,圖中沒有畫出:

第一個就是如果redisson_delay_queue_timeout:SANYOU是新新增的任務(佇列之前有或者沒有任務)是佇列中最早需要被執行的,也會發布訊息到channel,之後就按時上面說的流程走了。

新增任務程式碼如下,也是通過lua指令碼來的

第二種特殊情況就是專案啟動的時候會執行一次使用者端延遲任務。專案在重啟時,由於沒有使用者端延遲任務的執行,可能會出現redisson_delay_queue_timeout:SANYOU佇列中有到期但是沒有被放到目標佇列的可能,重啟就執行一次就是為了保證到期的資料能被及時放到目標佇列中。

3、與第一種方案比較

現在來比較一下第一種方案和Redisson的這種方案,看看有沒有第一種方案的那些坑。

第一個任務延遲的問題,Redisson方案理論上是沒有延遲的,但是當訊息數量增加,消費者消費緩慢這個情況下可能會導致延遲任務消費的延遲。

第二個丟訊息的問題,Redisson方案很大程度上減輕了丟訊息的可能性,因為所有的任務都是存在list和sorted set兩種資料型別中,Redis有持久化機制,就算Redis宕機了,也就可能會丟一點點資料。

第三個廣播消費任務的問題,這個是不會出現的,因為每個使用者端都是從同一個目標佇列中獲取任務的。

第四個問題是Redis內部channel釋出事件的問題,跟這種方案不沾邊,就更不可能存在了。

所以,通過上面的對比可以看出,Redisson這種實現方案就顯得更加的靠譜了。

往期熱門文章推薦

寫出漂亮程式碼的45個小技巧

扒一扒Bean注入到Spring的那些姿勢

三萬字盤點Spring/Boot的那些常用擴充套件點

兩萬字盤點那些被玩爛了的設計模式

RocketMQ保姆級教學

RocketMQ訊息短暫而又精彩的一生

掃碼或者搜尋關注公眾號 三友的java日記 ,及時乾貨不錯過,公眾號致力於通過畫圖加上通俗易懂的語言講解技術,讓技術更加容易學習,回覆 面試 即可獲得一套面試真題。