Django 如何使用 Celery 完成非同步任務或定時任務

2023-04-25 18:01:57

以前版本的 Celery 需要一個單獨的庫(django-celery)才能與 Django 一起工作, 但從 Celery 3.1 開始,情況便不再如此,我們可以直接通過 Celery 庫來完成在 Django 中的任務。

安裝 Redis 伺服器端

以 Docker 安裝為例,安裝一個密碼為 mypassword 的 Redis 伺服器端

docker run -itd --name redis -p 127.0.0.1:6379:6379 redis:alpine redis-server --requirepass mypassword

在 Python 中安裝 Celery 和 Redis

pip install celery redis

在 Django 專案中新增 Celery 設定

在 Django 專案中建立一個 celery.py 檔案,並設定 Celery 應用程式。這個檔案應該與 settings.py 檔案位於同一目錄下:

import os

from celery import Celery

# 設定 Django 的預設環境變數
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

# 使用 Django 的 settings.py 檔案設定 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

# 從所有已安裝的應用中自動發現並載入任務模組
app.autodiscover_tasks()

然後在 settings.py 檔案中新增設定:

# 使用 Redis 作為訊息代理(broker)來傳遞任務訊息,連線地址為 localhost:6379/0,並提供密碼 mypassword 進行身份驗證。
CELERY_BROKER_URL = 'redis://:mypassword@localhost:6379/0'

# 使用 Redis 作為結果儲存後端,連線地址同上,使用相同的密碼進行身份驗證。
CELERY_RESULT_BACKEND = 'redis://:mypassword@localhost:6379/0'

# 指定傳送到代理(broker)的任務訊息序列化格式為 JSON 格式。
CELERY_TASK_SERIALIZER = 'json'

# 指定從結果後端獲取的結果序列化格式為 JSON 格式。
CELERY_RESULT_SERIALIZER = 'json'

# 指定支援接收的內容型別為 JSON 格式。
CELERY_ACCEPT_CONTENT = ['json']

# 將時區設定為亞洲/上海時區。
CELERY_TIMEZONE = 'Asia/Shanghai'

# 啟用 UTC 時間。
CELERY_ENABLE_UTC = True

在 Django 應用程式中建立一個 tasks.py 檔案,並編寫要執行的任務函數。例如,此處我們將編寫一個名為 send_email() 的任務,來定期傳送電子郵件:

from django.core.mail import send_mail
from celery import shared_task

@shared_task
def send_email():
    # 傳送電子郵件的程式碼
    pass

如果想要實現非同步任務的功能,在 Django 專案中的任何位置呼叫任務函數即可。例如,在 views.py 檔案中,我們可以從檢視函數中啟動任務,如下所示:

from myapp.tasks import send_email

def my_view(request):
    send_email.delay()
    return HttpResponse('任務已經在後臺執行。')

如果想要實現定時任務的功能,可以在 Celery 的組態檔中設定定時任務的排程方式。例如,要每小時執行一次 send_email() 任務,我們可以新增以下程式碼:

from celery.task.schedules import crontab

app.conf.beat_schedule = {
    'send-email-every-hour': {
        'task': 'myapp.tasks.send_email',
        'schedule': crontab(minute=0, hour='*/1'),
    },
}

定時任務的具體寫法可以參考官方檔案:https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab

執行 Celery-worker 與 Celery-beat

Celery是一個分散式任務佇列,由三個主要元件組成:Celery worker、Celery beat 和訊息代理(例如 Redis 或 RabbitMQ)。這些元件一起共同作業,讓開發者能夠輕鬆地執行非同步任務和定時任務。

Celery worker:負責接收任務請求並執行任務。當您在 Django 應用程式中呼叫 apply_async 方法時,任務將被傳送到 Celery worker,然後由 worker 執行。

Celery beat:負責排程定時任務。它會根據定義的規則定期觸發任務,並將其傳送到 Celery worker 處理。

所以,對於需要執行定時任務的情況,我們需要同時啟動 Celery worker 和 Celery beat 程序來確保所有任務都可以被正確地處理和執行。

如果只需要使用 Celery 來執行非同步任務,那麼只需啟動 Celery worker 即可。但如果需要週期性地執行任務,那麼需要啟動 Celery beat 來幫助完成排程這些任務。

# 執行 worker 與 beat
celery -A proj worker --loglevel=info --detach --pidfile=worker.pid --logfile=./logs/worker.log
celery -A proj beat --loglevel=info --detach --pidfile=beat.pid --logfile=./logs/beat.log
  • -A proj:指定 Celery 應用程式所在的模組或包,這裡假設其名為 proj。
  • worker 或 beat:啟動的程序名稱,分別對應 worker 和 beat 兩種型別的 Celery 程序。
  • --loglevel=info:設定紀錄檔級別為 info,即只記錄 info 級別及以上的紀錄檔資訊。
  • --detach:以守護行程(daemonized)方式啟動 Celery 程序,使其在後臺執行。
  • --pidfile=worker.pid 或 --pidfile=beat.pid:將程序 ID(PID)寫入指定的 PID 檔案,方便後續管理和監控。
  • --logfile=./logs/worker.log 或 --logfile=./logs/beat.log:指定紀錄檔檔案路徑,所有紀錄檔資訊都會輸出到該檔案中。

隨後我們設定的定時任務便會按規則執行,可以通過指定的紀錄檔檔案檢視執行結果。當我們需要停止 Celery worker 與 Celery beat 時,可以執行以下操作:

kill -TERM $(cat worker.pid)
kill -TERM $(cat beat.pid)

參考

[1] [Using Celery with Django]: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html
[2] [Periodic Tasks]: https://docs.celeryq.dev/en/stable/userguide/periodic-tasks.html?highlight=crontab