本文首發於公眾號:Hunter後端
原文連結:celery筆記八之資料庫操作定時任務
前面我們介紹定時任務是在 celery.py 中的 app.conf.beat_schedule
定義,這一篇筆記我們介紹一下如何在 Django 系統中的表裡來操作這些任務。
我們先通過 app.conf.beat_schedule 定義定時任務:
app.conf.beat_schedule = {
'add-every-60-seconds': {
'task': 'blog.tasks.add',
'schedule': 60,
'args': (16, 16),
},
'schedule_minus': {
'task': 'blog.tasks.minus',
'schedule': crontab(minute=5, hour=2),
'args': (12, 24),
},
}
如果我們就這樣啟動 Django 系統,worker 和 beat 服務,系統的定時任務就只有一個,寫死在系統裡。
當然,我們也可以使用一些 celery 的函數來手動向系統裡新增定時任務,但是我們有一個更好的方法來管理操作這些定時任務,那就是將這些定時任務寫入到資料庫中,來進行增刪改查操作,客製化開發。
將定時任務寫入資料庫,我們需要進行以下幾步操作:
安裝依賴
通過 pip 安裝一個 django-celery-beat 依賴:
pip3 install django-celery-beat
INSTALLED_APP新增模組
安裝後,要正常使用還需要將其新增到 settings.py 的 INSTALLED_APPS 中:
# settings.py
INSTALLED_APPS = [
...,
'django_celery_beat',
]
執行migrate
接下來我們執行 migrate 操作將需要建立的表寫入資料庫:
python3 manage.py migrate
可以看到如下輸出:
Running migrations:
Applying django_celery_beat.0001_initial... OK
Applying django_celery_beat.0002_auto_20161118_0346... OK
Applying django_celery_beat.0003_auto_20161209_0049... OK
Applying django_celery_beat.0004_auto_20170221_0000... OK
Applying django_celery_beat.0005_add_solarschedule_events_choices... OK
Applying django_celery_beat.0006_auto_20180322_0932... OK
Applying django_celery_beat.0007_auto_20180521_0826... OK
Applying django_celery_beat.0008_auto_20180914_1922... OK
Applying django_celery_beat.0006_auto_20180210_1226... OK
Applying django_celery_beat.0006_periodictask_priority... OK
Applying django_celery_beat.0009_periodictask_headers... OK
Applying django_celery_beat.0010_auto_20190429_0326... OK
Applying django_celery_beat.0011_auto_20190508_0153... OK
Applying django_celery_beat.0012_periodictask_expire_seconds... OK
Applying django_celery_beat.0013_auto_20200609_0727... OK
Applying django_celery_beat.0014_remove_clockedschedule_enabled... OK
Applying django_celery_beat.0015_edit_solarschedule_events_choices... OK
然後可以看到在 Django 系統對應的資料庫裡新增了幾張表,表的介紹及使用我們在後面再介紹。
在啟動 beat 前,我們需要對時區進行設定,前面我們介紹過在 Django 和 celery 中都需要設定成北京時間:
TIME_ZONE = "Asia/Shanghai"
USE_TZ = False
# celery 時區設定
CELERY_TIMEZONE = "Asia/Shanghai"
CELERY_ENABLE_UTC = False
DJANGO_CELERY_BEAT_TZ_AWARE = False
啟動 beat 我們需要新增引數將資料指定儲存在資料庫中,可以在啟動 beat 的時候新增引數:
celery -A hunter beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
也可以通過 app.conf.beat_scheduler 指定值:
app.conf.beat_scheduler = 'django_celery_beat.schedulers:DatabaseScheduler'
然後直接啟動 beat:
celery -A hunter beat -l INFO
在執行完 migrate 之後系統會多出幾張表,都是定時任務相關的表:
其中 django_celery_beat_clockedschedule 和 django_celery_beat_solarschedule 暫時不介紹
django_celery_beat_crontabschedule
是我們的週期任務表,比如我們上面定義的:
'schedule_minus': {
'task': 'blog.tasks.minus',
'schedule': crontab(minute=5, hour=2),
'args': (12, 24),
},
執行 celery 的 beat 後,會在該表新增一條資料,表的欄位就是我們設定的 crontab() 裡的值,包括 minute,hour,day_of_week,day_off_month,month_of_year 和 timezone。
除了 timezone 欄位,前面的欄位如何定義和使用上一篇筆記中已經詳細介紹過,timezone 欄位則是我們在 settings.py 裡定義的時區資訊。
django_celery_beat_intervalschedule
這張表的資料是我們定義的間隔時間任務的表,比如每隔多少秒,多少分鐘執行一次。
該表只有 id, every 和 period 欄位,every 表示的是時間的間隔,填寫的數位,period 則是單位,可選項有:
我們在定義間隔任務的時候,除了直接使用數位表示秒之外,還可以使用 datetime.timedelta() 來定義其他時間,比如:
from datetime import timedelta
app.conf.beat_schedule = {
'add-every-60-seconds': {
'task': 'blog.tasks.add',
'schedule': timedelta(minutes=5),
'args': (16, 16),
},
}
但是當我們啟動 beat 的時候,系統在寫入資料庫的時候還是會自動為我們將其轉化為秒數,比如 minutes=5,會給我們加入的資料是:
every=300, period='seconds'
django_celery_beat_periodictask
這張表其實是對前面幾張表的任務的一個彙總,
還有引數,佇列等資訊。
每一條在 django_celery_beat_crontabschedule 和 django_celery_beat_intervalschedule 表中的資料都必須在該表中有一個彙總的資訊記錄才可以正常執行。
也就是說在前面的兩張表中可以新增各種任務執行的策略,然後在 django_celery_beat_periodictask 中有一個資料指向該策略,就可以使用該策略進行週期任務的執行。
其中,name 欄位上是有唯一鍵的,但是 task 可以重複寫入,這也就意味著我們可以針對同一個 task 制定不同的定時策略。
django_celery_beat_periodictasks
這個表就一條資料,儲存的是系統上一次執行任務的時間。
接下來我們自己定義兩個週期任務,一個是 blog.tasks.add 函數,每隔20s執行一次,另一個是 blog.tasks.minus 函數,每天晚上 23點15分執行一次。
我們首先還是執行 beat 和 worke,然後在 python3 manage.py shell 中執行下面的程式碼:
import json
from django_celery_beat.models import IntervalSchedule, CrontabSchedule, PeriodicTask
twenty_second_interval, _ = IntervalSchedule.objects.get_or_create(every=20, period=IntervalSchedule.SECONDS)
eleven_clock_crontab, _ = CrontabSchedule.objects.get_or_create(minute=18, hour=23)
PeriodicTask.objects.get_or_create(
interval_id=twenty_second_interval.id,
name="twenty_second_interval",
task="blog.tasks.add",
args=json.dumps([1, 2]),
)
PeriodicTask.objects.get_or_create(
crontab_id=eleven_clock_crontab.id,
name="eleven_clock_crontab",
task="blog.tasks.minus",
args=json.dumps([8, 2]),
)
然後可以看到執行 beat 的 shell 中或者紀錄檔檔案有輸出下面的資訊:
DatabaseScheduler: Schedule changed.
其實就是系統監測了 PeriodicTask 表,發現它的資料有變化就會重新更改一次,當 beat 服務啟動,系統會去 PeriodicTask 表裡獲取資料。
如果這些任務的資料有更改,系統就會檢測到然後發出 Schedule changed
的資訊。
我這邊測試了 name、enabled、one_off、args 等欄位,發現修改後系統都會捕獲到任務的變化。
其中,one_off 欄位的含義是該任務僅執行一次。
如果想獲取更多後端相關文章,可掃碼關注閱讀: