Django Web專案中我們經常需要執行耗時的任務比如傳送郵件、呼叫第三方介面、批次處理檔案等等,將這些任務非同步化放在後臺執行可以有效縮短請求響應時間。另外伺服器上經常會有定時任務的需求,比如清除快取、備份資料庫等工作。Celery是一個高效的非同步任務佇列/基於分散式訊息傳遞的作業佇列,可以輕鬆幫我們在Django專案中設定執行非同步和週期性任務。
本文將詳細演示如何在Django專案中整合Celery設定執行非同步和週期性任務並總結下一些高階使用技巧和注意事項。
原文首發大江狗技術部落格(https://pythondjango.cn), 點選閱讀原文可以跳轉。
https://pythondjango.cn/django/advanced/12-sync-periodic-tasks-with-celery/
Celery的工作原理安裝專案依賴檔案Celery設定測試Celery是否工作正常編寫任務非同步呼叫任務檢視任務執行狀態及結果設定定時和週期性任務組態檔新增任務Django Admin新增週期性任務通過Crontab設定定時任務啟動任務排程器beatFlower監控任務執行狀態Celery高階用法與注意事項給任務設定最大重試次數不同任務交由不同Queue處理忽略不想要的結果避免啟動同步子任務Django的模型物件不應該作為引數傳遞使用on_commit函數處理事務小結
Celery是一個高效的基於分散式訊息傳遞的作業佇列。它主要通過訊息(messages)傳遞任務,通常使用一個叫Broker(中間人)來協調client(任務的發出者)和worker(任務的處理者)。clients發出訊息到佇列中,broker將佇列中的資訊派發給 Celery worker來處理。Celery本身不提供訊息服務,它支援的訊息服務(Broker)有RabbitMQ和Redis。小編一般推薦Redis,因為其在Django專案中還是首選的快取後臺。
整個工作流程如下所示:
本專案使用了最新Django(3.2)和Celery版本(Celery 5)。因為本專案使用Redis做訊息佇列的broker,所以還需要安裝redis (Windows下安裝和啟動redis參見菜鳥教學)。另外如果你要設定定時或週期性任務,還需要安裝django-celery-beat
。
# pip安裝必選
Django==3.2
celery==5.0.5
redis==3.5.3
# 可選,windows下執行celery 4以後版本,還需額外安裝eventlet庫
eventlet
# 推薦安裝, 需要設定定時或週期任務時安裝,推薦安裝
django-celery-beat==2.2.0
# 視情況需要,需要儲存任務結果時安裝,視情況需要
django-celery-results==2.0.1
# 視情況需要,需要監控celery執行任務狀態時安裝
folower==0.9.7
在正式使用celery
和django-celery-beat
之前,你需要做基礎的設定。假如你的Django專案資料夾佈局如下所示,你首先需要在myproject/myproject目錄下新增celery.py
並修改__init__.py
。
- myproject/
- manage.py
- project/
- __init__.py # 修改這個檔案
- celery.py # 新增這個檔案
- asgi.py
- settings.py
- urls.py
- wsgi.py
新建celery.py
,新增如下程式碼:
import os
from celery import Celery
# 設定環境變數
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 範例化
app = Celery('myproject')
# namespace='CELERY'作用是允許你在Django組態檔中對Celery進行設定
# 但所有Celery設定項必須以CELERY開頭,防止衝突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自動從Django的已註冊app中發現任務
app.autodiscover_tasks()
# 一個測試任務
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
修改__init__.py
,如下所示:
from .celery import app as celery_app
__all__ = ('celery_app',)
接下來修改Django專案的settings.py
,新增Celery有關設定選項,如下所示:
# 最重要的設定,設定訊息broker,格式為:db://user:password@host:port/dbname
# 如果redis安裝在本機,使用localhost
# 如果docker部署的redis,使用redis://redis:6379
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
# celery時區設定,建議與Django settings中TIME_ZONE同樣時區,防止時差
# Django設定時區需同時設定USE_TZ=True和TIME_ZONE = 'Asia/Shanghai'
CELERY_TIMEZONE = TIME_ZONE
其它Celery常用設定選項包括:
# 為django_celery_results儲存Celery任務執行結果設定後臺
# 格式為:db+scheme://user:password@host:port/dbname
# 支援資料庫django-db和快取django-cache儲存任務狀態及結果
CELERY_RESULT_BACKEND = "django-db"
# celery內容等訊息的格式設定,預設json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
# 為任務設定超時時間,單位秒。超時即中止,執行下個任務。
CELERY_TASK_TIME_LIMIT = 5
# 為儲存結果設定過期日期,預設1天過期。如果beat開啟,Celery每天會自動清除。
# 設為0,儲存結果永不過期
CELERY_RESULT_EXPIRES = xx
# 任務限流
CELERY_TASK_ANNOTATIONS = {'tasks.add': {'rate_limit': '10/s'}}
# Worker並行數量,一般預設CPU核數,可以不設定
CELERY_WORKER_CONCURRENCY = 2
# 每個worker執行了多少任務就會死掉,預設是無限的
CELERY_WORKER_MAX_TASKS_PER_CHILD = 200
完整設定選項見:
https://docs.celeryproject.org/en/stable/userguide/configuration.html#std-setting-result_expires
注意:
在Django中正式編寫和執行自己的非同步任務前,一定要先測試redis和celery是否安裝好並設定成功。
一個無限期阻塞的任務會使得工作單元無法再做其他事情,建議給任務設定超時時間。
首先你要啟動redis服務。windows進入redis所在目錄(比如C:\redis),使用redis-server.exe
啟動redis。Linux下使用./redis-server redis.conf
啟動,也可修改redis.conf將daemonize設定為yes, 確保守護行程開啟。
啟動redis服務後,你要先進入專案所在資料夾執行python manage.py runserver
命令啟動Django伺服器(無需建立任何app),然後再開啟一個終端terminal視窗輸入celery命令,啟動worker。
# Linux下測試,啟動Celery
Celery -A myproject worker -l info
# Windows下測試,啟動Celery
Celery -A myproject worker -l info -P eventlet
# 如果Windows下Celery不工作,輸入如下命令
Celery -A myproject worker -l info --pool=solo
如果你能看到[tasks]下所列非同步任務清單如debug_task
,以及最後一句celery@xxxx ready, 說明你的redis和celery都設定好了,可以開始正式工作了。
-------------- celery@DESKTOP-H3IHAKQ v4.4.2 (cliffs)
--- ***** -----
-- ******* ---- Windows-10-10.0.18362-SP0 2020-04-24 22:02:38
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: myproject:0x456d1f0
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 4 (eventlet)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. myproject.celery.debug_task
[2020-04-24 22:02:38,484: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2020-04-24 22:02:38,500: INFO/MainProcess] mingle: searching for neighbors
[2020-04-24 22:02:39,544: INFO/MainProcess] mingle: all alone
[2020-04-24 22:02:39,572: INFO/MainProcess] pidbox: Connected to redis://127.0.0.1:6379/0.
[2020-04-24 22:02:39,578: WARNING/MainProcess] c:\users\missenka\pycharmprojects\django-static-html-generator\venv\lib\site-packages\celery\fixups\django.py:203: UserWarning: Using sett
ings.DEBUG leads to a memory
leak, never use this setting in production environments!
leak, never use this setting in production environments!''')
[2020-04-24 22:02:39,579: INFO/MainProcess] celery@DESKTOP-H3IHAKQ ready.
Celery設定完成後,我們就可以編寫任務了。Django專案中所有需要Celery執行的非同步或週期性任務都放在tasks.py
檔案裡,該檔案可以位於project目錄下,也可以位於各個app的目錄下。專屬於某個Celery範例化專案的task可以使用@app.task
裝飾器定義,各個app目錄下可以複用的task建議使用@shared_task
定義。
兩個範例如下所示:
# myproject/tasks.py
# 專屬於myproject專案的任務
app = Celery('myproject')
@ app.task
def test():
pass
# app/tasks.py, 可以複用的task
from celery import shared_task
import time
@shared_task
def add(x, y):
time.sleep(2)
return x + y
上面我們定義一個名為add
的任務,它接收兩個引數,並返回計算結果。為了模擬耗時任務,我們中途讓其sleep 2秒。現在已經定義了一個耗時任務,我們希望在Django的檢視或其它地方中以非同步方式呼叫執行它,應該怎麼做呢? 下面我們將給出答案。
注意:
使用celery定義任務時,避免在一個任務中呼叫另一個非同步任務,容易造成阻塞。
當我們使用@app.task
裝飾器定義我們的非同步任務時,那麼這個任務依賴於根據專案名myproject生成的Celery範例。然而我們在進行Django開發時為了保證每個app的可重用性,我們經常會在每個app資料夾下編寫非同步任務,這些任務並不依賴於具體的Django專案名。使用@shared_task
裝飾器能讓我們避免對某個專案名對應Celery範例的依賴,使app的可移植性更強。
Celery提供了2種以非同步方式呼叫任務的方法,delay
和apply_async
方法,如下所示:
# 方法一:delay方法
task_name.delay(args1, args2, kwargs=value_1, kwargs2=value_2)
# 方法二:apply_async方法,與delay類似,但支援更多引數
task.apply_async(args=[arg1, arg2], kwargs={key:value, key:value})
我們接下來看一個具體的例子。我們編寫了一個Django檢視函數,使用delay
方法呼叫add
任務。
# app/views.py
from .tasks import add
def test_celery(request):
add.delay(3, 5)
return HttpResponse("Celery works")
# app/urls.py
urlpatterns = [
re_path(r'^test/$', views.test_celery, name="test_celery")
]
當你通過瀏覽器存取/test/連結時,你根本感受不到2s的延遲,頁面可以秒開,同時你會發現終端的輸出如下所示,顯示任務執行成功。
我們現在再次使用apply_async
方法呼叫add
任務,不過還要列印初任務的id (task.id)和狀態status。Celery會為每個加入到佇列的任務分配一個獨一無二的uuid, 你可以通過task.status
獲取狀態和task.result
獲取結果。注意:apply_async
傳遞引數的方式與delay方法不同。
# app/views.py
from .tasks import add
def test_celery(request):
result = add.apply_async(args=[3, 5])
return HttpResponse(result.task_id + ' : ' + result.status)
Django返回響應結果如下所示。這是在預期之內的,因為耗時任務還未執行完畢,Django就已經返回了響應。
那麼問題來了,這個非同步任務執行了,返回了個計算結果(8),那麼我們系統性地瞭解任務狀態並獲取這個執行結果呢? 答案是django-celery-results
。
通過pip安裝django-celery-results
後,需要將其加入到INSTALLED_APPS
並使用migrate
命令遷移建立資料表。以下幾項設定選項是與這個庫相關的。
# 支援資料庫django-db和快取django-cache儲存任務狀態及結果
# 建議選django-db
CELERY_RESULT_BACKEND = "django-db"
# celery內容等訊息的格式設定,預設json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
安裝設定完成後,進入Django admin後臺,你就可以詳細看到每個任務的id、名稱及狀態。
點選單個任務id,你可以看到有關這個任務的更多資訊,比如傳遞的引數和返回結果,如下所示:
藉助於裝django-celery-beat
後, 你可以將任一Celery任務設定為定時任務或週期性任務。使用它你只需要通過pip安裝它,並加入INSTALLED_APPS裡去。
django-celery-beat
提供了兩種新增定時或週期性任務的方式,一是直接在settings.py
中新增,二是通過Django admin後臺新增。
同一任務可以設定成不同的呼叫週期,給它們不同的任務名就好了。
from datetime import timedelta
CELERY_BEAT_SCHEDULE = {
"add-every-30s": {
"task": "app.tasks.add",
'schedule': 30.0, # 每30秒執行1次
'args': (3, 8) # 傳遞引數-
},
"add-every-day": {
"task": "app.tasks.add",
'schedule': timedelta(hours=1), # 每小時執行1次
'args': (3, 8) # 傳遞引數-
},
}
如果每次新增或修改週期性任務都要修改組態檔非常不方便,一個更好的方式是使用任務排程器。先在settings.py
中將任務排程器設為DatabaseScheduler
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
然後就可以進入Periodic Task表新增和修改週期性任務了。
如果你希望在特定的時間(某月某周或某天)執行一個任務,你可以通過crontab設定定時任務,如下例所示:
CELERY_BEAT_SCHEDULE = {
# 每週一早上7點半執行
'add-every-monday-morning': {
'task': 'app.tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (7, 8),
},
}
更多Crontab定義案例如下所示:
例子 | 含義 |
---|---|
crontab() | 每分 |
crontab(minute=0, hour=0) | 每天午夜 |
crontab(minute=0, hour='*/3') | 能被3整除的小時數,3,6,9點等等 |
crontab(minute=0,``hour='0,3,6,9,12,15,18,21') | 與前面相同,指定小時 |
crontab(minute='*/15') | 每15分鐘 |
crontab(day_of_week='sunday') | 星期日每分鐘 |
crontab(minute='*',``hour='*', day_of_week='sun') | 同上 |
crontab(minute='*/10',``hour='3,17,22', day_of_week='thu,fri') | 每10分鐘執行一次, 但僅限於週四或週五的 3-4 am, 5-6 pm, 和10-11 pm. |
crontab(minute=0, hour='*/2,*/3') | 可以被2或3整除的小時數,除了 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm |
crontab(minute=0, hour='*/5') | 可以被5整除的小時 |
crontab(minute=0, hour='*/3,8-17') | 8am-5pm之間可以被3整除的小時 |
crontab(0, 0, day_of_month='2') | 每個月的第2天 |
crontab(0, 0,``day_of_month='2-30/2') | 每月的偶數日 |
crontab(0, 0,``day_of_month='1-7,15-21') | 每月的第一和第三週 |
crontab(0, 0, day_of_month='11',``month_of_year='5') | 每年的5月11日 |
crontab(0, 0,``month_of_year='*/3') | 每個季度首個月份每天 |
Crontab也可以通過Django Admin新增,然後與任務進行繫結。
如果你變換了時區timezone,比如從'UTC'變成了'Asia/Shanghai',需重置週期性任務,這非常重要。
# 調整timezone後重置任務
$ python manage.py shell
>>> from django_celery_beat.models import PeriodicTask
>>> PeriodicTask.objects.update(last_run_at=None)
前面我們只是新增了定時或週期性任務,我們還需要啟動任務排程器beat分發定時和週期任務給Celery的worker。
多開幾個終端,一個用來啟動任務排程器beat,另一個啟動celery worker,你的任務就可以在後臺執行啦。
# 開啟任務排程器
Celery -A myproject beat
# Linux下開啟Celery worker
Celery -A myproject worker -l info
# windows下開啟Celery worker
Celery -A myproject worker -l info -P eventlet
# windows下如果報Pid錯誤
Celery -A myproject worker -l info --pool=solo
除了django_celery_results
, 你可以使用flower
監控後臺任務執行狀態。它提供了一個視覺化的介面,在測試環境中非常有用。
pip install flower
安裝好後,你有如下兩種方式啟動伺服器。啟動伺服器後,開啟http://localhost:5555即可檢視監控情況。
# 從terminal終端啟動, proj為專案名
$ flower -A proj --port=5555
# 從celery啟動
$ celery flower -A proj --address=127.0.0.1 --port=5555
定義任務時可以通過max_retries
設定最大重試次數,並呼叫self.retry
方法呼叫。因為要呼叫self
這個引數,定義任務時必須設定bind=True
。
@shared_task(bind=True, max_retries=3)
def send_batch_notifications(self):
try:
something_raising()
raise Exception('Can\'t send email.')
except Exception as exc:
self.retry(exc=exc, countdown=5)
send_mail(
subject='Batch email notifications',
message='Test email',
from_email='no-reply@example.com',
recipient_list=['john@example.com']
)
不同的任務所需要的資源和時間不一樣的。為了防止一些非常佔用資源或耗時的任務阻塞任務佇列導致一些簡單任務也無法執行,可以將不同任務交由不同的Queue處理。下例定義了兩個Queue佇列,default執行普通任務,heavy_tasks執行重型任務。
CELERY_TASK_DEFAULT_QUEUE = 'default'
CELERY_TASK_DEFAULT_ROUTING_KEY = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('heavy_tasks', Exchange('heavy_tasks'), routing_key='heavy_tasks'),
)
CELERY_TASK_ROUTES = {
'myapp.tasks.heave_tasks': 'heavy_tasks'
}
如果你不在意任務的返回結果,可以設定 ignore_result
選項,因為儲存結果耗費時間和資源。你還可以可以通過 task_ignore_result
設定全域性忽略任務結果。
@app.task(ignore_result=True)
def my_task():
something()
讓一個任務等待另外一個任務的返回結果是很低效的,並且如果工作單元池被耗盡的話這將會導致死鎖。
# 話例子
@app.task
def update_page_info(url):
page = fetch_page.delay(url).get()
info = parse_page.delay(url, page).get()
store_page_info.delay(url, info)
@app.task
def fetch_page(url):
return myhttplib.get(url)
@app.task
def parse_page(url, page):
return myparser.parse_document(page)
@app.task
def store_page_info(url, info):
return PageInfo.objects.create(url, info)
# 好例子
def update_page_info(url):
chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
chain()
@app.task()
def fetch_page(url):
return myhttplib.get(url)
@app.task()
def parse_page(page):
return myparser.parse_document(page)
@app.task(ignore_result=True)
def store_page_info(info, url):
PageInfo.objects.create(url=url, info=info)
在好例子裡,我們將不同的任務簽名連結起來建立一個任務鏈,三個子任務按順序執行。
Django 的模型物件不應該作為引數傳遞給任務。幾乎總是在任務執行時從資料庫獲取物件是最好的,因為老的資料會導致競態條件。假象有這樣一個場景,你有一篇文章,以及自動展開文章中縮寫的任務:
class Article(models.Model):
title = models.CharField()
body = models.TextField()
@app.task
def expand_abbreviations(article):
article.body.replace('Old text', 'New text')
article.save()
首先,作者建立一篇文章並儲存,這時作者點選一個按鈕初始化一個縮寫展開任務:
>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)
現在,佇列非常忙,所以任務在2分鐘內都不會執行。與此同時,另一個作者修改了這篇文章,當這個任務最終執行,因為老版本的文章作為引數傳遞給了這個任務,所以這篇文章會回滾到老的版本。修復這個競態條件很簡單,只要引數傳遞文章的 id 即可,此時可以在任務中重新獲取這篇文章:
@app.task
def expand_abbreviations(article_id):
article = Article.objects.get(id=article_id)
article.body.replace('MyCorp', 'My Corporation')
article.save()
我們再看另外一個celery中處理事務的例子。這是在資料庫中建立一個文章物件的 Django 檢視,此時傳遞主鍵給任務。它使用 commit_on_success
裝飾器,當檢視返回時該事務會被提交,當檢視丟擲異常時會進行回滾。
from django.db import transaction
@transaction.commit_on_success
def create_article(request):
article = Article.objects.create()
expand_abbreviations.delay(article.pk)
如果在事務提交之前任務已經開始執行會產生一個競態條件;資料庫物件還不存在。解決方案是使用 on_commit
回撥函數來在所有事務提交成功後啟動任務。
from django.db.transaction import on_commit
def create_article(request):
article = Article.objects.create()
on_commit(lambda: expand_abbreviations.delay(article.pk))
本文詳細演示瞭如何在Django專案中整合Celery設定執行非同步、定時和週期性任務,並提供了一些高階使用案例和注意事項,希望對你有所幫助。
祝大家五一長假快樂!
大江狗
2021.5
原創不易,轉載請註明來源。我是大江狗,一名Django技術開發愛好者。您可以通過搜尋【CSDN大江狗】、【知乎大江狗】和搜尋微信公眾號【Python Web與Django開發】關注我!
相關閱讀