PHP7 生產環境佇列 Beanstalkd 正確使用姿勢

2020-07-16 10:06:14

應用場景

  為什麼要用呢,有什麼好處?這應該放在最開頭說,一件東西你只有了解它是幹什麼的,適合幹什麼,才能更好的與自己的專案相結合,用到哪裡學到哪裡,學了不用等於不會,我們平時就應該多考慮一些這樣的問題:自己做個什麼專案功能能跟 xx 技術相結合呢?這個 xx 技術放在這種業務場景下行不行呢?而不是 「學了這個 xx 技術能幹嘛呢,公司現在也沒有用這個的呀,學了也沒用啊」,帶著這樣心情去學習 xx 技術,肯定很痛苦。

  佇列大家都知道是將一些耗時的操作先不去做,先埋點,再非同步去處理,這樣對一些發郵件傳簡訊之類的耗時操作,使用者是感覺不到的,因為埋點結束,操作也就結束了,消費佇列都是在伺服器上做的。主要應用在簡訊或郵件通知,存取第三方介面訂閱訊息,商城的一些秒殺活動,都可以結合佇列來完成。

Beanstalkd 介紹

  Beanstalkd 是一個高效能,輕量級的分散式記憶體佇列,C 程式碼,典型的類 Memcached 設計,協定和使用方式都是同樣的風格,所以使用過 memcached 的使用者會覺得 Beanstalkd 似曾相識。

  beanstalkd 的最初設計意圖是在高並行的網路請求下,通過非同步執行耗時較多的請求,及時返回結果,減少請求的響應延遲。

Ubuntu 安裝

sudo apt-get install beanstalkd

組態檔

vim /etc/default/beanstalkd

檢視狀態

service beanstalkd status
# 命令回顯 #
[email protected]:/www/server/php/72/etc# service beanstalkd status
● beanstalkd.service - Simple, fast work queue
   Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled)
   Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago
     Docs: man:beanstalkd(1)
 Main PID: 7033 (beanstalkd)
    Tasks: 1 (limit: 4634)
   CGroup: /system.slice/beanstalkd.service
           └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd
Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.

設定連通性 + 持久化

ip 用 0.0.0.0 允許所有連線,靠設定安全組或防火牆去約束連線,放開 -b 引數 (預設沒有持久化),記憶體的佇列訊息可以落地到硬碟 binlog 實現持久化,斷電可重新讀取佇列訊息。

vim /etc/default/beanstalkd
BEANSTALKD_LISTEN_ADDR=0.0.0.0
BEANSTALKD_LISTEN_PORT=11300
BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"

beanstalkd 任務狀態

狀態注釋
delayed延遲狀態
ready準備好狀態
reserved消費者把任務讀出來,處理時
buried預留狀態
delete刪除狀態

管理工具

親測了很多網上能找到的 beanstalkd 工具,這兩款是我最中意的了,一個命令列,一個 web 的。

命令列:https://github.com/src-d/beanstool

web 介面:https://github.com/ptrofimov/beanstalk_console

程式語言用戶端

PHP 用戶端

https://packagist.org/packages/pda/pheanstalk

composer require pda/pheanstalk

寫入 job

<?php
//建立佇列訊息
require_once('./vendor/autoload.php');
use PheanstalkPheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
$jobData = [
    'email' => '[email protected]',
    'message' => 'Hello World !!',
    'dtime' => date('Y-m-d H:i:s'),
];
$pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );

消費 job

<?php
ini_set('default_socket_timeout', 86400*7);
ini_set( 'memory_limit', '256M' );
// 消費佇列訊息
require_once('./vendor/autoload.php');
use PheanstalkPheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$tubeName = 'email_list';
while ( true )
{
    // 獲取佇列資訊, reserve 阻塞獲取
    $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve();
    if ( $job !== false )
    {
        $data = $job->getData();
        /* TODO 邏輯操作 */
        /* 處理完成,刪除 job */
        $pheanstalk->delete( $job );
    }
}

default_socket_timeout 這個引數是一定要加的,php 預設一般是 60s,假如您沒有在程式碼裡面設定,採用預設的話(60s),60s 之內如果沒有 job 產生,指令碼就會報 socket 錯誤,我寫的是 7 天超時,您可以根據業務去調整,記住一定要設定,網上很多蒐的 consumer 指令碼都沒有設定這個,根本不能投入生產環境使用,這是我親自實踐的結果。

  關於 while true 是否死迴圈,很明確告訴你是死迴圈,但是不會一直耗效能的那樣執行下去,它會在 reserve 這裡阻塞不動,直到有訊息產生才會往下走,所以大可放心使用,我的專案程式碼裡面是使用了方法呼叫方法自身去實現回圈的。

就是這樣的程式碼,供參考:

    public function watchJob()
    {
        $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve();
        if ( $job !== false )
        {
            $job_data = $job->getData();
            $this->subscribe( $job_data );
            $this->pheanstalk->delete( $job );
            /* 繼續 Watch 下一個 job */
            $this->watchJob();
        }
        else
        {
            $this->log->error( 'reserve false', 'reserve false' );
        }
    }

監控 beanstalkd 狀態

<?php
//監控服務狀態
require_once('./vendor/autoload.php');
use PheanstalkPheanstalk;
$pheanstalk = new Pheanstalk('127.0.0.1',11300);
$isAlive = $pheanstalk->getConnection()->isServiceListening();
var_dump( $isAlive );

可以配合 email 做一個報警郵件,指令碼每分鐘去執行,判斷狀態是 false,就給管理員傳送郵件報警。

一些相關命令

檢視 beanstalkd 服務記憶體占用

top -u beanstalkd

後台執行 consumer 指令碼

nohup php googlehome_subscribe.php &

檢視 consumer 指令碼執行時間

ps -A -opid,stime,etime,args | grep consumer.php

手工重新啟動 consumer 指令碼

ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 
nohup php googlehome_subscribe.php &

一些總結

  php 要把錯誤紀錄檔開啟,方便收集 consumer 指令碼 crash 的 log,指令碼跑出一些致命的 error 一定要及時修復,因為一旦有錯就會掛掉,這會影響你指令碼的可用性,後期穩定之後可以上 supervisor 這種進程管理程式來管控指令碼生命週期。

  一些網路請求操作,一定要 try catch 到所有錯誤,一旦沒有 catch 到,指令碼就崩。我用的是 Guzzle 去做的網路請求,下面是我 catch 的一些錯誤,程式碼片段供參考。

try
{
    /* TODO: 邏輯操作 */
}
catch ( ClientException $e )
{
    $results['mid']    = $this->mid;
    $results['code']   = $e->getResponse()->getStatusCode();
    $results['reason'] = $e->getResponse()->getReasonPhrase();
    $this->log->error( 'properties-changed ClientException', $results );
}
catch ( ServerException $e )
{
    $results['mid']    = $this->mid;
    $results['code']   = $e->getResponse()->getStatusCode();
    $results['reason'] = $e->getResponse()->getReasonPhrase();
    $this->log->error( 'properties-changed ServerException', $results );
}
catch ( ConnectException $e )
{
    $results['mid'] = $this->mid;
    $this->log->error( 'properties-changed ConnectException', $results );
}

  job 消費之後一定要刪除掉,如果長時間不刪除,php 用戶端會有 false 返回,是因為有 DEADLINE_SOON 這個超時錯誤產生,所以處理完任務,一定要記得刪除,這一點跟 kafka 不一樣,beanstalkd 需要開發者自己去刪除 job。

以上就是PHP7 生產環境佇列 Beanstalkd 正確使用姿勢的詳細內容,更多請關注TW511.COM其它相關文章!