celery筆記六之worker介紹

2023-06-24 06:00:51

本文首發於公眾號:Hunter後端
原文連結:celery筆記六之worker介紹

前面我們介紹過 celery 的理想的設計方式是幾個 worker 處理特定的任務佇列的資料,這樣可以避免任務在佇列中的積壓。

這一篇筆記我們介紹一下如何使用 worker 提高系統中任務的處理效率。

  1. worker啟動
  2. worker與佇列
  3. worker檢測
  4. 其他worker命令

1、worker 啟動

前面介紹過 worker 的啟動方式,在 celery 組態檔的上一級目錄執行下面的命令:

celery -A hunter worker -l INFO

其中,-l 表示紀錄檔等級,相當於是 --loglevel=INFO

celery -A hunter worker --loglevel=INFO

指定worker的hostname

celery -A hunter worker -l INFO -n worker1@%h

其中,%h 表示主機名,包含域名在內,%n 表示僅包含主機名,%d 表示僅包含域名。

以下是範例:

變數 範例 結果
%h worker1@%h [email protected]
%n worker1@%n worker1@george
%d worker1@%d [email protected]

指定紀錄檔檔案地址

logfile 引數可以指定紀錄檔檔案地址:

celery -A hunter worker --loglevel=INFO --logfile=/Users/hunter/python/celery_log/celery.log

殺死 worker 程序

我們可以通過獲取 worker 的程序 id 來殺死這些程序:

ps aux | grep 'celery -A hunter' | awk '{print $2}' |xargs sudo kill -9

並行處理

一般來說,當我們直接啟動 worker 的時候,會預設同時起好幾個 worker 程序。

如果不指定 worker 的數量,worker 的程序會預設是所在機器的 CPU 的數量。

我們也可以通過 concurrency 引數來指定啟動 worker 的程序數。

比如說,我們想啟動三個 worker 的程序,可以如下指定:

celery -A hunter worker --concurrency=3 -l INFO

--concurrency 也可以簡寫成 -c:

celery -A hunter worker -c 3 -l INFO

這樣,我們在啟動的命令列裡輸入下面的引數就可以看到啟動了三個 worker 的程序:

ps aux |grep 'celery -A hunter'

這裡有一個關於 worker 程序數啟動多少的問題,是不是我們的 worker 啟動的越多,我們的定時任務和延時任務就會執行得越快呢?

並不是,有實驗證明 worker 的數量啟動得越多,對於 task 處理的效能有可能還會起到一個反向作用,這裡不作展開討論,我們可以設定 CPU 的數量即可。

當然,你也可以根據 worker 處理任務的情況,基於 application,基於工作負載,任務執行時間等試驗出一個最佳的數量。

2、worker與佇列

消費指定佇列的task

我們可以在執行 worker 的時候指定 worker 只消費特定佇列的 task,這個特定佇列,可以是一個,也可以是多個,用逗號分隔開。

指定的方式如下:

celery -A hunter worker -l INFO -Q queue_1,queue_2

列出所有活躍的queues

下面的命令可以列出所有系統活躍的佇列資訊:

celery -A hunter inspect active_queues

假設目前我們相關設定如下:

app.conf.task_queues = (
    Queue('default_queue',),
    Queue('queue_1'),
    Queue('queue_2'),
)

app.conf.task_routes = {
    'blog.tasks.add': {
        'queue': 'queue_1',
    },
    'blog.tasks.minus': {
        'queue': 'queue_2',
    },
}

我們這樣啟動worker:

celery -A hunter worker -l INFO -c 3 -n worker1@%h

然後執行上面的檢視佇列命令:

celery -A hunter inspect active_queues

可以看到如下輸出:

->  worker1@localhost: OK
    * {'name': 'default_queue', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_1', 'exchange': {...}, 'routing_key': 'default_queue', ...}
    * {'name': 'queue_2', 'exchange': {...}, 'routing_key': 'default_queue', ...}

1 node online.

其中,輸出結果最上面的 worker1@localhost 就是我們啟動 worker 通過 -n 指定的 hostnam,可以通過這個來指定 worker。

我們可以指定 worker 輸出對應的佇列資料:

celery -A hunter inspect active_queues -d worker1@localhost

除了命令列,我們也可以在互動介面來獲取這些資料:

# 獲取所有的佇列資訊
from hunter.celery import app
app.control.inspect().active_queues()

# 獲取指定 worker 的佇列資訊
app.control.inspect(['worker1@localhost']).active_queues()

3、worker 的檢測

app.control.inspect() 函數可以檢測正在執行的 worker 資訊,我們可以用下面的命令來操作:

from hunter.celery import app

i = app.control.inspect()

這個操作是獲取所有節點,我們也可以指定單個或者多個節點檢測:

# 輸入陣列引數,表示獲取多個節點worker資訊
i = app.control.inspect(['worker1@localhost', 'worker2@localhost'])

# 輸入單個worker名,指定獲取worker資訊
i = app.control.inspect('worker1@localhost')

獲取已經註冊的task列表

用到前面的 app.control.inspect() 函數和其下的 registered() 函數

i.registered()

# 輸出結果為 worker 及其下的 task name 
# 輸出範例為 {'worker1@localhost': ['blog.tasks.add', 'blog.tasks.minus', 'polls.tasks.multi']}

輸出的格式是一個 dict,worker 的名稱為 key,task 列表為 value

正在執行的 task

active() 用於獲取正在執行的 task 函數

i.active()

# 輸出 worker 正在執行的 task
# 輸出範例為 {'worker1@localhost': [{'id': 'xxx', 'name': 'blog.tasks.add', 'args': [3, 4], 'hostname': 'worker1@localhost', 'time_start': 1659450162.58197, ..., 'worker_pid': 41167}

輸出的結果也是一個 dict,每個 worker 下有 n 個正在 worker 中執行的 task 資訊,這個 n 的最大數量取決於前面我們啟動 worker 時的 --concurrency 引數。

在其中的 task 資訊裡包含 task_id,task_name,和輸入的引數,開始時間,worker name 等。

即將執行的 task

比如我們執行 add 延時任務,定時在 20s 之後執行:

add.apply_async((1, 1), countdown=20)

返回的結果每個 worker 下有一個任務列表,每個列表存有任務的資訊:

i.scheduled()

# 輸出資訊如下
# {'worker1@localhost': [{'eta': '2022-08-02T22:56:49.503517+08:00', 'priority': 6, 'request': {'id': '23080c03-a906-4cc1-9ab1-f27890c58adb', 'name': 'blog.tasks.add', 'args': [1, 1], 'kwargs': {}, 'type': 'blog.tasks.add', 'hostname': 'worker1@localhost', 'time_start': None, 'acknowledged': False, 'delivery_info': {...}}]}

queue佇列中等待的 task

如果我們有任務在 queue 中積壓,我們可以使用:

i.reserved()

來獲取佇列中等待的 task 列表

4、其他 worker 命令

ping-pong

檢測 worker 還活著的 worker

使用 ping() 函數,可以得到 pong 字串的回覆表明該 worker 是存活的。

from hunter.celery import app

app.control.ping(timeout=0.5)

# [{'worker1@localhost': {'ok': 'pong'}}]

我們也可以指定 worker 來操作:

app.control.ping(['worker1@localhost'])

如果你瞭解 redis 的存活檢測操作的話,應該知道在 redis-cli 裡也可以執行這個 ping-pong 的一來一回的檢測操作。

如果想獲取更多後端相關文章,可掃碼關注閱讀: