本文首發於公眾號:Hunter後端
原文連結:celery筆記六之worker介紹
前面我們介紹過 celery 的理想的設計方式是幾個 worker 處理特定的任務佇列的資料,這樣可以避免任務在佇列中的積壓。
這一篇筆記我們介紹一下如何使用 worker 提高系統中任務的處理效率。
前面介紹過 worker 的啟動方式,在 celery 組態檔的上一級目錄執行下面的命令:
celery -A hunter worker -l INFO
其中,-l 表示紀錄檔等級,相當於是 --loglevel=INFO
celery -A hunter worker --loglevel=INFO
指定worker的hostname
celery -A hunter worker -l INFO -n worker1@%h
其中,%h 表示主機名,包含域名在內,%n 表示僅包含主機名,%d 表示僅包含域名。
以下是範例:
變數 | 範例 | 結果 |
---|---|---|
%h | worker1@%h | [email protected] |
%n | worker1@%n | worker1@george |
%d | worker1@%d | [email protected] |
指定紀錄檔檔案地址
logfile 引數可以指定紀錄檔檔案地址:
celery -A hunter worker --loglevel=INFO --logfile=/Users/hunter/python/celery_log/celery.log
殺死 worker 程序
我們可以通過獲取 worker 的程序 id 來殺死這些程序:
ps aux | grep 'celery -A hunter' | awk '{print $2}' |xargs sudo kill -9
並行處理
一般來說,當我們直接啟動 worker 的時候,會預設同時起好幾個 worker 程序。
如果不指定 worker 的數量,worker 的程序會預設是所在機器的 CPU 的數量。
我們也可以通過 concurrency 引數來指定啟動 worker 的程序數。
比如說,我們想啟動三個 worker 的程序,可以如下指定:
celery -A hunter worker --concurrency=3 -l INFO
--concurrency 也可以簡寫成 -c:
celery -A hunter worker -c 3 -l INFO
這樣,我們在啟動的命令列裡輸入下面的引數就可以看到啟動了三個 worker 的程序:
ps aux |grep 'celery -A hunter'
這裡有一個關於 worker 程序數啟動多少的問題,是不是我們的 worker 啟動的越多,我們的定時任務和延時任務就會執行得越快呢?
並不是,有實驗證明 worker 的數量啟動得越多,對於 task 處理的效能有可能還會起到一個反向作用,這裡不作展開討論,我們可以設定 CPU 的數量即可。
當然,你也可以根據 worker 處理任務的情況,基於 application,基於工作負載,任務執行時間等試驗出一個最佳的數量。
消費指定佇列的task
我們可以在執行 worker 的時候指定 worker 只消費特定佇列的 task,這個特定佇列,可以是一個,也可以是多個,用逗號分隔開。
指定的方式如下:
celery -A hunter worker -l INFO -Q queue_1,queue_2
列出所有活躍的queues
下面的命令可以列出所有系統活躍的佇列資訊:
celery -A hunter inspect active_queues
假設目前我們相關設定如下:
app.conf.task_queues = (
Queue('default_queue',),
Queue('queue_1'),
Queue('queue_2'),
)
app.conf.task_routes = {
'blog.tasks.add': {
'queue': 'queue_1',
},
'blog.tasks.minus': {
'queue': 'queue_2',
},
}
我們這樣啟動worker:
celery -A hunter worker -l INFO -c 3 -n worker1@%h
然後執行上面的檢視佇列命令:
celery -A hunter inspect active_queues
可以看到如下輸出:
-> worker1@localhost: OK
* {'name': 'default_queue', 'exchange': {...}, 'routing_key': 'default_queue', ...}
* {'name': 'queue_1', 'exchange': {...}, 'routing_key': 'default_queue', ...}
* {'name': 'queue_2', 'exchange': {...}, 'routing_key': 'default_queue', ...}
1 node online.
其中,輸出結果最上面的 worker1@localhost 就是我們啟動 worker 通過 -n 指定的 hostnam,可以通過這個來指定 worker。
我們可以指定 worker 輸出對應的佇列資料:
celery -A hunter inspect active_queues -d worker1@localhost
除了命令列,我們也可以在互動介面來獲取這些資料:
# 獲取所有的佇列資訊
from hunter.celery import app
app.control.inspect().active_queues()
# 獲取指定 worker 的佇列資訊
app.control.inspect(['worker1@localhost']).active_queues()
app.control.inspect() 函數可以檢測正在執行的 worker 資訊,我們可以用下面的命令來操作:
from hunter.celery import app
i = app.control.inspect()
這個操作是獲取所有節點,我們也可以指定單個或者多個節點檢測:
# 輸入陣列引數,表示獲取多個節點worker資訊
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])
# 輸入單個worker名,指定獲取worker資訊
i = app.control.inspect('worker1@localhost')
獲取已經註冊的task列表
用到前面的 app.control.inspect() 函數和其下的 registered() 函數
i.registered()
# 輸出結果為 worker 及其下的 task name
# 輸出範例為 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}
輸出的格式是一個 dict,worker 的名稱為 key,task 列表為 value
正在執行的 task
active() 用於獲取正在執行的 task 函數
i.active()
# 輸出 worker 正在執行的 task
# 輸出範例為 {'worker1@localhost': [{'id': 'xxx', 'name': 'blog.tasks.add', 'args': [3, 4], 'hostname': 'worker1@localhost', 'time_start': 1659450162.58197, ..., 'worker_pid': 41167}
輸出的結果也是一個 dict,每個 worker 下有 n 個正在 worker 中執行的 task 資訊,這個 n 的最大數量取決於前面我們啟動 worker 時的 --concurrency 引數。
在其中的 task 資訊裡包含 task_id,task_name,和輸入的引數,開始時間,worker name 等。
即將執行的 task
比如我們執行 add 延時任務,定時在 20s 之後執行:
add.apply_async((1, 1), countdown=20)
返回的結果每個 worker 下有一個任務列表,每個列表存有任務的資訊:
i.scheduled()
# 輸出資訊如下
# {'worker1@localhost': [{'eta': '2022-08-02T22:56:49.503517+08:00', 'priority': 6, 'request': {'id': '23080c03-a906-4cc1-9ab1-f27890c58adb', 'name': 'blog.tasks.add', 'args': [1, 1], 'kwargs': {}, 'type': 'blog.tasks.add', 'hostname': 'worker1@localhost', 'time_start': None, 'acknowledged': False, 'delivery_info': {...}}]}
queue佇列中等待的 task
如果我們有任務在 queue 中積壓,我們可以使用:
i.reserved()
來獲取佇列中等待的 task 列表
ping-pong
檢測 worker 還活著的 worker
使用 ping() 函數,可以得到 pong 字串的回覆表明該 worker 是存活的。
from hunter.celery import app
app.control.ping(timeout=0.5)
# [{'worker1@localhost': {'ok': 'pong'}}]
我們也可以指定 worker 來操作:
app.control.ping(['worker1@localhost'])
如果你瞭解 redis 的存活檢測操作的話,應該知道在 redis-cli 裡也可以執行這個 ping-pong 的一來一回的檢測操作。
如果想獲取更多後端相關文章,可掃碼關注閱讀: