celery筆記三之task和task的呼叫

2023-06-14 06:00:24

本文首發於公眾號:Hunter後端
原文連結:celery筆記三之task和task的呼叫

這一篇筆記介紹 task 和 task 的呼叫。

以下是本篇筆記目錄:

  1. 基礎的 task 定義方式
  2. 紀錄檔處理
  3. 任務重試
  4. 忽略任務執行結果
  5. task 的呼叫

1、基礎的 task 定義方式

前面兩篇筆記中介紹了最簡單的定義方式,使用 @app.task 作為裝飾器:

@app.task
def add(x, y):
    return x + y

如果是在 Django 系統中使用 celery,需要定義一個延時任務或者週期定時任務,可以使用 @shared_task 來修飾

from celery import shared_task

@shared_task
def add(x, y):
    return x + y

在 Django 系統中使用 celery 的方式會在接下來的幾篇筆記中介紹道。

多個裝飾器

如果是 celery 的任務和其他裝飾器一起聯用,記得將 celery 的裝飾器放在最後使用,也就是列表的最前面:

@app.task
@decorator1
@decorator2
def add(x, y):
    return x + y

task名稱

每個 task 都有一個唯一的名稱用來標識這個 task,如果我們在定義的時候不指定,系統會為我們預設一個名稱,這些名稱會在 celery 的 worker 啟動的時候被系統掃描然後輸出一個列表展示。

還是上一篇筆記中我們定義的兩個 task,我們給其中一個指定 name:

#tasks1.py
from .celery import app


@app.task(name="tasks1.add")
def add(x, y):
    return x + y

可以觀察在 celery 的 worker 啟動的時候,會有一個輸出:

[tasks]
  . proj.tasks2.mul
  . tasks1.add

可以看到這個地方,系統就會使用我們定義的 name 了。

2、紀錄檔處理

我們可以在啟動 worker 的時候指定紀錄檔的輸出,定義格式如下:

celery -A proj worker -l INFO --logfile=/Users/hunter/python/celery_log/celery.log

在 task 中的定義可以使用 celery 中方法:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

也可以直接使用 logging 模組:

import logging

logger1 = logging.getLogger(__name__)

直接在 task 中輸出:

@app.task(name="tasks1.add")
def add(x, y):
    logger.info("this is from logger")
    return x + y

然後在 worker 啟動時指定的紀錄檔檔案就會有我們列印出的紀錄檔內容:

[2022-07-24 16:28:33,210: INFO/ForkPoolWorker-7] tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a]: this is from logger
[2022-07-24 16:28:33,224: INFO/ForkPoolWorker-7] Task tasks1.add[4db4b0fc-c6ca-472a-8847-ae42e0a7959a] succeeded in 0.016244667931459844s: 3

3、任務重試

對於一個 task,我們可以對其設定 retry 引數來指定其在任務執行失敗後會重試幾次,以及隔多長時間重試。

比如對於下面的 div() 函數,我們來輸入除數為 0 的情況檢視重試的功能。

當然,這裡我們是故意輸入引數錯誤,在實際的專案中可能會是其他的原因造成任務失敗,比如資料庫連線失敗等

任務重試的引數也都在 @app.task() 中定義:

# tasks1.py

@app.task(autoretry_for=(Exception, ),  default_retry_delay=10, retry_kwargs={'max_retries': 5})
def div(x, y):
    return x / y

在這裡,autoretry_for 表示的是某種報錯情況下重試,我們定義的 Exception 表示任何錯誤都重試。

如果只是想在某種特定的 exception 情況下重試,將那種 exception 的值替換 Exception 即可。

default_retry_delay 表示重試間隔時長,預設值是 3 * 60s,即三分鐘,是以秒為單位,這裡我們設定的是 10s。

retry_kwargs 是一個 dict,其中有一個 max_retries 引數,表示的是最大重試次數,我們定為 5

然後可以嘗試呼叫這個延時任務:

from proj.tasks1 import div
div.delay(1, 0)

然後可以看到在紀錄檔檔案會有如下輸出:

[2022-07-24 16:59:35,653: INFO/ForkPoolWorker-7] Task proj.tasks1.div[1f65c410-1b2a-4127-9d83-a84b1ad9dd2c] retry: Retry in 10s: ZeroDivisionError('division by zero',)

且每隔 10s 執行一次,一共執行 5 次,5次之後還是不成功則會報錯。

retry_backoff 和 retry_backoff_max

還有一個 retry_backoff 和 retry_backoff_max 引數,這兩個引數是用於這種情況:如果你的 task 依賴另一個 service 服務,比如會呼叫其他系統的 API,然後這兩個引數可以用於避免請求過多的佔用服務。

retry_backoff 引數可以設定成一個 布林型資料,為 True 的話,自動重試的時間間隔會成倍的增長

第一次重試是 1 s後
第二次是 2s 後
第三次是 4s 後
第四次是 8s 後
...

如果 retry_backoff 引數是一個數位,比如是 3,那麼後續的間隔時間則是 3 的倍數增長

第一次重試 3s 後
第二次是 6s 後
第三次是 12s 後
第四次是 24s 後

retry_backoff_max 是重試的最大的間隔時間,比如重試次數設定的很大,retry_backoff 的間隔時間重複達到了這個值之後就不再增大了。

這個值預設是 600s,也就是 10分鐘。

我們看一下下面這個例子:

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

關於重試的機制,理論上應該是按照我們前面列出來的重試時間間隔進行重試,但是如果我們這樣直接執行 div.delay(),得出的間隔時間是不定的,是在 0 到 最大值之間得出的一個隨機值。

這樣產生的原因是因為還有一個 retry_jitter 引數,這個引數預設是 True,所以時間間隔會是一個隨機值。

如果需要任務延時的間隔值是按照 retry_backoff 和 retry_backoff_max 兩個設定值來執行,那麼則需要將 retry_jitter 值設為 False。

# tasks1.py

@app.task(autoretry_for=(Exception, ), retry_backoff=2, retry_backoff_max=40, retry_jitter=False, retry_kwargs={'max_retries': 8})
def div(x, y):
    return x / y

然後執行 div 的延時任務,就可以看到延時任務按照規律的間隔時間重試了,以下是紀錄檔:

[2022-07-24 19:00:38,588: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 2s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:40,662: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:40,664: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 4s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:44,744: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:44,746: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 8s: ZeroDivisionError('division by zero',)
[2022-07-24 19:00:52,870: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:00:52,872: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 16s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:09,338: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:09,340: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 32s: ZeroDivisionError('division by zero',)
[2022-07-24 19:01:41,843: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:01:41,845: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:02:21,923: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:02:21,925: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)
[2022-07-24 19:03:02,001: INFO/MainProcess] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] received
[2022-07-24 19:03:02,003: INFO/ForkPoolWorker-7] Task proj.tasks1.div[7e689dcf-8069-4f17-8815-fe58f9800fc0] retry: Retry in 40s: ZeroDivisionError('division by zero',)

因為我們設定的重試間隔時間最大為 40s,所以這個地方延時間隔時間到了 40 之後,就不再往上繼續增長了。

4、忽略任務執行結果

有時候延時任務的結果我們並不想儲存,但是我們設定了 result_backend 引數,這個時候我們有三種方式不儲存執行結果。

1.ignore_result=True 不儲存任務執行的結果

@app.task(ignore_result=True)
def add(x, y):
    return x + y

2.app.conf 設定

也可以通過 app.conf 的設定來禁用結果的儲存:

app.conf.update(
    task_ignore_result=True
)

3.執行單個任務的時候禁用

from proj.tasks1 import add
add.apply_async((1, 2), ignore_result=True)

apply_async() 函數的作用相當於是帶引數的 delay(),或者 delay() 是簡化版的 apply_async(),這個我們下面會介紹。

5、task 的呼叫

前面簡單兩個簡單的呼叫方法,一個是 apply_async(),一個是 delay()。

簡單來說就是 delay() 是不帶引數執行的 apply_async()。

以下用 add() 函數為例介紹一下他們的用法:

delay()

純粹的延時任務,只能如下操作:

add.delay(1, 2)

apply_async()

帶引數的用法,add() 函數的引數用 () 包起來:

add.apply_async((1, 2))

也可以帶其他引數,比如上面介紹的不儲存執行結果:

add.apply_async((1, 2), ignore_result=True)

這個函數還可以指定延時的時間:

countdown引數

現在開始 10s 後開始執行:

add.apply_async((1, 2), countdown=10)

eta引數

也可以用 eta 引數來指定 10s 後執行:

from datetime import datetime, timedelta

now = datetime.now()
add.apply_async((1, 2), eta=now + timedelta(seconds=10))

expires引數

這個是用來設定過期的引數:

add.apply_async((1, 2), countdown=60, expires=120)

上面的參數列示,距現在60秒後開始執行,兩分鐘後過期

如果想獲取更多後端相關文章,可掃碼關注閱讀: