應用場景
為什麼要用呢,有什麼好處?這應該放在最開頭說,一件東西你只有了解它是幹什麼的,適合幹什麼,才能更好的與自己的專案相結合,用到哪裡學到哪裡,學了不用等於不會,我們平時就應該多考慮一些這樣的問題:自己做個什麼專案功能能跟 xx 技術相結合呢?這個 xx 技術放在這種業務場景下行不行呢?而不是 「學了這個 xx 技術能幹嘛呢,公司現在也沒有用這個的呀,學了也沒用啊」,帶著這樣心情去學習 xx 技術,肯定很痛苦。
佇列大家都知道是將一些耗時的操作先不去做,先埋點,再非同步去處理,這樣對一些發郵件傳簡訊之類的耗時操作,使用者是感覺不到的,因為埋點結束,操作也就結束了,消費佇列都是在伺服器上做的。主要應用在簡訊或郵件通知,存取第三方介面訂閱訊息,商城的一些秒殺活動,都可以結合佇列來完成。
Beanstalkd 介紹
Beanstalkd 是一個高效能,輕量級的分散式記憶體佇列,C 程式碼,典型的類 Memcached 設計,協定和使用方式都是同樣的風格,所以使用過 memcached 的使用者會覺得 Beanstalkd 似曾相識。
beanstalkd 的最初設計意圖是在高並行的網路請求下,通過非同步執行耗時較多的請求,及時返回結果,減少請求的響應延遲。
Ubuntu 安裝
sudo apt-get install beanstalkd
組態檔
vim /etc/default/beanstalkd
檢視狀態
service beanstalkd status # 命令回顯 # [email protected]:/www/server/php/72/etc# service beanstalkd status ● beanstalkd.service - Simple, fast work queue Loaded: loaded (/lib/systemd/system/beanstalkd.service; enabled; vendor preset: enabled) Active: active (running) since Tue 2018-10-16 10:42:28 CST; 6 days ago Docs: man:beanstalkd(1) Main PID: 7033 (beanstalkd) Tasks: 1 (limit: 4634) CGroup: /system.slice/beanstalkd.service └─7033 /usr/bin/beanstalkd -l 0.0.0.0 -p 11300 -b /var/lib/beanstalkd Oct 16 10:42:28 ip-10-93-2-137 systemd[1]: Started Simple, fast work queue.
設定連通性 + 持久化
ip 用 0.0.0.0 允許所有連線,靠設定安全組或防火牆去約束連線,放開 -b 引數 (預設沒有持久化),記憶體的佇列訊息可以落地到硬碟 binlog 實現持久化,斷電可重新讀取佇列訊息。
vim /etc/default/beanstalkd BEANSTALKD_LISTEN_ADDR=0.0.0.0 BEANSTALKD_LISTEN_PORT=11300 BEANSTALKD_EXTRA="-b /var/lib/beanstalkd"
beanstalkd 任務狀態
狀態 | 注釋 |
---|---|
delayed | 延遲狀態 |
ready | 準備好狀態 |
reserved | 消費者把任務讀出來,處理時 |
buried | 預留狀態 |
delete | 刪除狀態 |
管理工具
親測了很多網上能找到的 beanstalkd 工具,這兩款是我最中意的了,一個命令列,一個 web 的。
命令列:https://github.com/src-d/beanstool
web 介面:https://github.com/ptrofimov/beanstalk_console
程式語言用戶端
PHP 用戶端
https://packagist.org/packages/pda/pheanstalk
composer require pda/pheanstalk
寫入 job
<?php //建立佇列訊息 require_once('./vendor/autoload.php'); use PheanstalkPheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; $jobData = [ 'email' => '[email protected]', 'message' => 'Hello World !!', 'dtime' => date('Y-m-d H:i:s'), ]; $pheanstalk->useTube( $tubeName)->put( json_encode( $jobData ) );
消費 job
<?php ini_set('default_socket_timeout', 86400*7); ini_set( 'memory_limit', '256M' ); // 消費佇列訊息 require_once('./vendor/autoload.php'); use PheanstalkPheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $tubeName = 'email_list'; while ( true ) { // 獲取佇列資訊, reserve 阻塞獲取 $job = $pheanstalk->watch( $tubeName )->ignore( 'default' )->reserve(); if ( $job !== false ) { $data = $job->getData(); /* TODO 邏輯操作 */ /* 處理完成,刪除 job */ $pheanstalk->delete( $job ); } }
default_socket_timeout 這個引數是一定要加的,php 預設一般是 60s,假如您沒有在程式碼裡面設定,採用預設的話(60s),60s 之內如果沒有 job 產生,指令碼就會報 socket 錯誤,我寫的是 7 天超時,您可以根據業務去調整,記住一定要設定,網上很多蒐的 consumer 指令碼都沒有設定這個,根本不能投入生產環境使用,這是我親自實踐的結果。
關於 while true 是否死迴圈,很明確告訴你是死迴圈,但是不會一直耗效能的那樣執行下去,它會在 reserve 這裡阻塞不動,直到有訊息產生才會往下走,所以大可放心使用,我的專案程式碼裡面是使用了方法呼叫方法自身去實現回圈的。
就是這樣的程式碼,供參考:
public function watchJob() { $job = $this->pheanstalk->watch( config( 'tube' ) )->ignore( 'default' )->reserve(); if ( $job !== false ) { $job_data = $job->getData(); $this->subscribe( $job_data ); $this->pheanstalk->delete( $job ); /* 繼續 Watch 下一個 job */ $this->watchJob(); } else { $this->log->error( 'reserve false', 'reserve false' ); } }
監控 beanstalkd 狀態
<?php //監控服務狀態 require_once('./vendor/autoload.php'); use PheanstalkPheanstalk; $pheanstalk = new Pheanstalk('127.0.0.1',11300); $isAlive = $pheanstalk->getConnection()->isServiceListening(); var_dump( $isAlive );
可以配合 email 做一個報警郵件,指令碼每分鐘去執行,判斷狀態是 false,就給管理員傳送郵件報警。
一些相關命令
檢視 beanstalkd 服務記憶體占用
top -u beanstalkd
後台執行 consumer 指令碼
nohup php googlehome_subscribe.php &
檢視 consumer 指令碼執行時間
ps -A -opid,stime,etime,args | grep consumer.php
手工重新啟動 consumer 指令碼
ps auxf|grep 'googlehome_subscribe.php'|grep -v grep|awk '{print $2}'|xargs kill -9 nohup php googlehome_subscribe.php &
一些總結
php 要把錯誤紀錄檔開啟,方便收集 consumer 指令碼 crash 的 log,指令碼跑出一些致命的 error 一定要及時修復,因為一旦有錯就會掛掉,這會影響你指令碼的可用性,後期穩定之後可以上 supervisor 這種進程管理程式來管控指令碼生命週期。
一些網路請求操作,一定要 try catch 到所有錯誤,一旦沒有 catch 到,指令碼就崩。我用的是 Guzzle 去做的網路請求,下面是我 catch 的一些錯誤,程式碼片段供參考。
try { /* TODO: 邏輯操作 */ } catch ( ClientException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ClientException', $results ); } catch ( ServerException $e ) { $results['mid'] = $this->mid; $results['code'] = $e->getResponse()->getStatusCode(); $results['reason'] = $e->getResponse()->getReasonPhrase(); $this->log->error( 'properties-changed ServerException', $results ); } catch ( ConnectException $e ) { $results['mid'] = $this->mid; $this->log->error( 'properties-changed ConnectException', $results ); }
job 消費之後一定要刪除掉,如果長時間不刪除,php 用戶端會有 false 返回,是因為有 DEADLINE_SOON 這個超時錯誤產生,所以處理完任務,一定要記得刪除,這一點跟 kafka 不一樣,beanstalkd 需要開發者自己去刪除 job。
以上就是PHP7 生產環境佇列 Beanstalkd 正確使用姿勢的詳細內容,更多請關注TW511.COM其它相關文章!