推薦學習:《》
###TP6 佇列
TP6 中使用 think-queue 可以實現普通佇列和延遲佇列。
think-queue 是thinkphp 官方提供的一個訊息佇列服務,它支援訊息佇列的一些基本特性:
1、通過生產者推播訊息到訊息佇列服務中
2、訊息佇列服務將收到的訊息存入redis佇列中(zset)
3、消費者進行監聽佇列,當監聽到佇列有新的訊息時,獲取佇列第一條
4、處理獲取下來的訊息呼叫業務類進行處理相關業務
5、業務處理後,需要從佇列中刪除訊息
composer require topthink/think-queue
安裝完 think-queue 後會在 config 目錄中生成 queue.php,這個檔案是佇列的組態檔。
tp6中提供了多種訊息佇列的實現方式,預設使用sync,我這裡選擇使用Redis。
return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => env('redis.host', '127.0.0.1'), 'port' => env('redis.port', '6379'), 'password' => env('redis.password','123456'), 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ];
在 app 目錄下建立 queue 目錄,然後在該目錄下新建一個抽象類 Queue.php 檔案,作為基礎類
<?phpnamespace app\queue;use think\facade\Cache;use think\queue\Job;use think\facade\Log;/** * Class Queue 佇列消費基礎類 * @package app\queue */abstract class Queue{ /** * @describe:fire是訊息佇列預設呼叫的方法 * @param \think\queue\Job $job * @param $message */ public function fire(Job $job, $data) { if (empty($data)) { Log::error(sprintf('[%s][%s] 佇列無訊息', __CLASS__, __FUNCTION__)); return ; } $jobId = $job->getJobId(); // 佇列的資料庫id或者redis key // $jobClassName = $job->getName(); // 佇列物件類 // $queueName = $job->getQueue(); // 佇列名稱 // 如果已經執行中或者執行完成就不再執行了 if (!$this->checkJob($jobId, $data)) { $job->delete(); Cache::store('redis')->delete($jobId); return ; } // 執行業務處理 if ($this->execute($data)) { Log::record(sprintf('[%s][%s] 佇列執行成功', __CLASS__, __FUNCTION__)); $job->delete(); // 任務執行成功後刪除 Cache::store('redis')->delete($jobId); // 刪除redis中的快取 } else { // 檢查任務重試次數 if ($job->attempts() > 3) { Log::error(sprintf('[%s][%s] 佇列執行重試次數超過3次,執行失敗', __CLASS__, __FUNCTION__)); // 第1種處理方式:重新發布任務,該任務延遲10秒後再執行;也可以不指定秒數立即執行 //$job->release(10); // 第2種處理方式:原任務的基礎上1分鐘執行一次並增加嘗試次數 //$job->failed(); // 第3種處理方式:刪除任務 $job->delete(); // 任務執行後刪除 Cache::store('redis')->delete($jobId); // 刪除redis中的快取 } } } /** * 訊息在到達消費者時可能已經不需要執行了 * @param string $jobId * @param $message * @return bool 任務執行的結果 * @throws \Psr\SimpleCache\InvalidArgumentException */ protected function checkJob(string $jobId, $message): bool { // 查詢redis $data = Cache::store('redis')->get($jobId); if (!empty($data)) { return false; } Cache::store('redis')->set($jobId, $message); return true; } /** * @describe: 根據訊息中的資料進行實際的業務處理 * @param $data 資料 * @return bool 返回結果 */ abstract protected function execute($data): bool;}
所有真正的消費類繼承基礎抽象類
<?phpnamespace app\queue\test;use app\queue\Queue;class Test extends Queue{ protected function execute($data): bool { // 具體消費業務邏輯 }}
use think\facade\Queue; // 普通佇列生成呼叫方式 Queue::push($job, $data, $queueName); // 例: Queue::push(Test::class, $data, $queueName); // 延時佇列生成呼叫方式 Queue::later($delay, $job, $data, $queueName); // 例如使用延時佇列 10 秒後執行: Queue::later(10 , Test::class, $data, $queueName);
php think queue:listen php think queue:work
queue:work 命令
work 命令: 該命令將啟動一個 work 程序來處理訊息佇列。
php think queue:work --queue TestQueue
queue:listen 命令
listen 命令: 該命令將會建立一個 listen 父程序 ,然後由父程序通過 proc_open(‘php think queue:work’)
的方式來建立一個work 子 程序來處理訊息佇列,且限制該work程序的執行時間。
php think queue:listen --queue TestQueue
Work 模式
php think queue:work \ --daemon //是否迴圈執行,如果不加該引數,則該命令處理完下一個訊息就退出 --queue helloJobQueue //要處理的佇列的名稱 --delay 0 \ //如果本次任務執行丟擲異常且任務未被刪除時,設定其下次執行前延遲多少秒,預設為0 --force \ //系統處於維護狀態時是否仍然處理任務,並未找到相關說明 --memory 128 \ //該程序允許使用的記憶體上限,以 M 為單位 --sleep 3 \ //如果佇列中無任務,則sleep多少秒後重新檢查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任務已經超過嘗試次數上限,則觸發‘任務嘗試次數超限’事件,預設為0
Listen 模式
php think queue:listen \ --queue helloJobQueue \ //監聽的佇列的名稱 --delay 0 \ //如果本次任務執行丟擲異常且任務未被刪除時,設定其下次執行前延遲多少秒,預設為0 --memory 128 \ //該程序允許使用的記憶體上限,以 M 為單位 --sleep 3 \ //如果佇列中無任務,則多長時間後重新檢查,daemon模式下有效 --tries 0 \ //如果任務已經超過重發次數上限,則進入失敗處理邏輯,預設為0 --timeout 60 //建立的work子程序的允許執行的最長時間,以秒為單位
可以看到 listen 模式下,不包含 --deamon
引數,原因下面會說明
訊息佇列的開始,停止與重新啟動
開始一個訊息佇列:
php think queue:work
停止所有的訊息佇列:
php think queue:restart
重新啟動所有的訊息佇列:
php think queue:restart php think queue:work
推薦學習:《》
以上就是一起聊聊thinkphp6使用think-queue實現普通佇列和延遲佇列的詳細內容,更多請關注TW511.COM其它相關文章!