一起聊聊thinkphp6使用think-queue實現普通佇列和延遲佇列

2022-04-20 16:00:40
本篇文章給大家帶來了關於的相關知識,其中主要介紹了關於使用think-queue來實現普通佇列和延遲佇列的相關內容,think-queue是thinkphp官方提供的一個訊息佇列服務,下面一起來看一下,希望對大家有幫助。

推薦學習:《》

###TP6 佇列

TP6 中使用 think-queue 可以實現普通佇列和延遲佇列。

think-queue 是thinkphp 官方提供的一個訊息佇列服務,它支援訊息佇列的一些基本特性:

  • 訊息的釋出,獲取,執行,刪除,重發,失敗處理,延遲執行,超時控制等
  • 佇列的多佇列, 記憶體限制 ,啟動,停止,守護等
  • 訊息佇列可降級為同步執行

訊息佇列實現過程

1、通過生產者推播訊息到訊息佇列服務中

2、訊息佇列服務將收到的訊息存入redis佇列中(zset)

3、消費者進行監聽佇列,當監聽到佇列有新的訊息時,獲取佇列第一條

4、處理獲取下來的訊息呼叫業務類進行處理相關業務

5、業務處理後,需要從佇列中刪除訊息

composer 安裝 think-queue

composer require topthink/think-queue

組態檔

安裝完 think-queue 後會在 config 目錄中生成 queue.php,這個檔案是佇列的組態檔。

tp6中提供了多種訊息佇列的實現方式,預設使用sync,我這裡選擇使用Redis。

return [
    'default'     => 'redis',
    'connections' => [
        'sync'     => [
            'type' => 'sync',
        ],
        'database' => [
            'type'       => 'database',
            'queue'      => 'default',
            'table'      => 'jobs',
            'connection' => null,
        ],
        'redis'    => [
            'type'       => 'redis',
            'queue'      => 'default',
            'host'       => env('redis.host', '127.0.0.1'),
            'port'       => env('redis.port', '6379'),
            'password'   => env('redis.password','123456'),
            'select'     => 0,
            'timeout'    => 0,
            'persistent' => false,
        ],
    ],
    'failed'      => [
        'type'  => 'none',
        'table' => 'failed_jobs',
    ],
];

建立目錄及佇列消費類檔案

在 app 目錄下建立 queue 目錄,然後在該目錄下新建一個抽象類 Queue.php 檔案,作為基礎類

<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/**
 * Class Queue 佇列消費基礎類
 * @package app\queue
 */abstract class Queue{
    /**
     * @describe:fire是訊息佇列預設呼叫的方法
     * @param \think\queue\Job $job
     * @param $message
     */
    public function fire(Job $job, $data)
    {
        if (empty($data)) {
            Log::error(sprintf('[%s][%s] 佇列無訊息', __CLASS__, __FUNCTION__));
            return ;
        }

        $jobId = $job->getJobId(); // 佇列的資料庫id或者redis key
        // $jobClassName = $job->getName(); // 佇列物件類
        // $queueName = $job->getQueue(); // 佇列名稱

        // 如果已經執行中或者執行完成就不再執行了
        if (!$this->checkJob($jobId, $data)) {
            $job->delete();
            Cache::store('redis')->delete($jobId);
            return ;
        }

        // 執行業務處理
        if ($this->execute($data)) {
            Log::record(sprintf('[%s][%s] 佇列執行成功', __CLASS__, __FUNCTION__));
            $job->delete(); // 任務執行成功後刪除
            Cache::store('redis')->delete($jobId); // 刪除redis中的快取
        } else {
            // 檢查任務重試次數
            if ($job->attempts() > 3) {
                Log::error(sprintf('[%s][%s] 佇列執行重試次數超過3次,執行失敗', __CLASS__, __FUNCTION__));
                 // 第1種處理方式:重新發布任務,該任務延遲10秒後再執行;也可以不指定秒數立即執行
                //$job->release(10); 
                // 第2種處理方式:原任務的基礎上1分鐘執行一次並增加嘗試次數
                //$job->failed();   
                // 第3種處理方式:刪除任務
                $job->delete(); // 任務執行後刪除
                Cache::store('redis')->delete($jobId); // 刪除redis中的快取
            }
        }
    }

    /**
     * 訊息在到達消費者時可能已經不需要執行了
     * @param  string  $jobId
     * @param $message
     * @return bool 任務執行的結果
     * @throws \Psr\SimpleCache\InvalidArgumentException
     */
    protected function checkJob(string $jobId, $message): bool
    {
        // 查詢redis
        $data = Cache::store('redis')->get($jobId);
        if (!empty($data)) {
            return false;
        }
        Cache::store('redis')->set($jobId, $message);
        return true;
    }

    /**
     * @describe: 根據訊息中的資料進行實際的業務處理
     * @param $data 資料
     * @return bool 返回結果
     */
    abstract protected function execute($data): bool;}

所有真正的消費類繼承基礎抽象類

<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{
    protected function execute($data): bool
    {
       // 具體消費業務邏輯
    }}

生產者邏輯

use think\facade\Queue;

// 普通佇列生成呼叫方式
Queue::push($job, $data, $queueName);
// 例:
Queue::push(Test::class, $data, $queueName);

// 延時佇列生成呼叫方式
Queue::later($delay, $job, $data, $queueName);
// 例如使用延時佇列 10 秒後執行:
Queue::later(10 , Test::class, $data, $queueName);

開啟程序監聽任務並執行

php think queue:listen
php think queue:work

命令模式介紹

命令模式

  • queue:work 命令

    work 命令: 該命令將啟動一個 work 程序來處理訊息佇列。

    php think queue:work --queue TestQueue
  • queue:listen 命令

    listen 命令: 該命令將會建立一個 listen 父程序 ,然後由父程序通過 proc_open(‘php think queue:work’) 的方式來建立一個work 子 程序來處理訊息佇列,且限制該work程序的執行時間。

    php think queue:listen --queue TestQueue

命令列引數

  • Work 模式

    php think queue:work \
    --daemon            //是否迴圈執行,如果不加該引數,則該命令處理完下一個訊息就退出
    --queue  helloJobQueue  //要處理的佇列的名稱
    --delay  0 \        //如果本次任務執行丟擲異常且任務未被刪除時,設定其下次執行前延遲多少秒,預設為0
    --force  \          //系統處於維護狀態時是否仍然處理任務,並未找到相關說明
    --memory 128 \      //該程序允許使用的記憶體上限,以 M 為單位
    --sleep  3 \        //如果佇列中無任務,則sleep多少秒後重新檢查(work+daemon模式)或者退出(listen或非daemon模式)
    --tries  2          //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,預設為0
  • Listen 模式

    php think queue:listen \
    --queue  helloJobQueue \   //監聽的佇列的名稱
    --delay  0 \         //如果本次任務執行丟擲異常且任務未被刪除時,設定其下次執行前延遲多少秒,預設為0
    --memory 128 \       //該程序允許使用的記憶體上限,以 M 為單位
    --sleep  3 \         //如果佇列中無任務,則多長時間後重新檢查,daemon模式下有效
    --tries  0 \         //如果任務已經超過重發次數上限,則進入失敗處理邏輯,預設為0
    --timeout 60         //建立的work子程序的允許執行的最長時間,以秒為單位

    可以看到 listen 模式下,不包含 --deamon 引數,原因下面會說明

  • 訊息佇列的開始,停止與重新啟動

    • 開始一個訊息佇列:

      php think queue:work
    • 停止所有的訊息佇列:

      php think queue:restart
    • 重新啟動所有的訊息佇列:

      php think queue:restart 
      php think queue:work

推薦學習:《》

以上就是一起聊聊thinkphp6使用think-queue實現普通佇列和延遲佇列的詳細內容,更多請關注TW511.COM其它相關文章!