最近做的這個專案(基於Django),需要做個功能,實現定時採集車輛定位。
這讓我想起來幾年前那個OneCat專案,當時我用的是Celery這個很重的元件
Celery實在是太重了,後來我做公眾號採集平臺的時候,又接觸了Django-RQ和Django-Q這倆,前者是對RQ的封裝,讓RQ和Django更好的結合在一起;後者是一個全新的「多程序任務佇列」元件,相比起celery很輕量,當時使用的時候就給我留下不錯的印象。
於是這個專案我決定繼續使用Django-Q來實現一些非同步操作和定時任務。
官方介紹:
A multiprocessing task queue for Django
pip install django-q
INSTALLED_APPS
INSTALLED_APPS = (
# other apps
'django_q',
)
由於Django-Q會把執行結果放到資料庫裡,所以要執行一下資料庫遷移的操作
python manage.py migrate
這個操作會生成 django_q_ormq
、django_q_schedule
、django_q_task
三個表
因為本身專案用的快取就是Redis,所以我直接用Redis作為訊息佇列的後端(broker)
Django-Q支援很多種後端,除了Redis還有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~
在 settings.py
中新增以下設定:
Q_CLUSTER = {
'name': 'project_name',
'workers': 4,
'recycle': 500,
'timeout': 60,
'compress': True,
'cpu_affinity': 1,
'save_limit': 250,
'queue_limit': 500,
'label': 'Django Q',
'redis': {
'host': 127.0.0.1',
'port': 6379,
'db': 0,
}
}
python manage.py qcluster
搞定,現在訊息佇列服務已經跑起來了
我們可以新增非同步任務或者定時任務
最簡單的方式是使用它提供的 async_task
方法,新增一個新的非同步任務到佇列中
來寫個例子,輸入一個數,求階乘之後開平方
import math
def demo_task(number: int):
return math.sqrt(math.factorial(number))
然後來新增一個非同步任務
from django_q.tasks import async_task, Task
def task_finish(task: Task):
print(f'任務 {task.name}(ID:{task.id})完成!')
task_id = async_task(
demo_task, 10,
task_name='任務名稱',
hook=task_finish,
)
可以看到,直接呼叫 async_task
方法就行
這個方法的定義是
async_task(func: Any, *args: Any, **kwargs: Any)
傳入要非同步執行的方法之後,可以把該方法的引數跟在後面傳進去,也可以用 kwargs
的方式傳入
這兩種方式都可以的:
async_task(demo_task, 10)
async_task(demo_task, number=10)
我個人比較喜歡第一種,因為Django-Q本身有幾個命名引數,比如 task_name
、hook
、timeout
之類的,用第一種方式傳參不容易和Django-Q預設的命名引數衝突。
有兩種方式獲取任務的執行結果:
result
方法,在程式碼中獲取第一種方式無需贅述,在安裝Django-Q元件後執行了資料庫遷移,就會生成 Failed tasks
、Scheduled tasks
、Successful tasks
三個admin模組,顧名思義,在 Failed tasks
和Successful tasks
中可以看到任務的執行結果,也就是我們寫在 demo_task
裡的返回值。
第二種方式,程式碼如下:
from django_q.tasks import result
task_result = result(task_id)
把 task_id
傳入就可以查詢任務執行的結果,如果任務還沒執行完,那結果就是 None
這個 result
方法還有個 wait
引數,可以設定等待時間,單位是毫秒
上面程式碼中,我們還設定了 hook
引數
作用就是任務執行完成之後,執行 task_finish
這個函數
在 task_finish
裡可以通過 task
引數獲取任務資訊
就是這樣~
async_task
的其他引數建立非同步任務的這個方法還有很多引數,官網檔案寫得還算可以,很多引數都是 Q_CLUSTER
設定裡面有的,在 async_task
裡設定這些引數就會覆蓋預設的設定。
我直接搬運一波,權當翻譯檔案了~
除了上面介紹到的 task_name
、hook
還有這些引數:
group: str
任務的分組名稱save
設定任務執行結果的儲存後端,不過檔案裡只是一句話的介紹,具體如何設定還得研究一下。(稍微看了一下原始碼,沒搞懂,動態語言太折磨人了)timeout: int
任務超時時間,單位是秒。回顧一下前面的 Q_CLUSTER
設定,裡面有 timeout
設定,設定這個引數可以覆蓋前面的設定,如果任務執行超出了這個時間,就會被直接終止。ack_failures: bool
設定為True時,也承認失敗的任務。這會導致失敗的任務被視為成功交付,從而將其從任務佇列中刪除。預設值為False。(說實話我沒看懂是啥意思)sync: bool
設定為True的時候,所有非同步任務會變成同步執行,這個功能在測試的時候比較有用。預設是False。cached
這個引數既可以設定為True,也可以傳入數位,代表快取過期時間。根據檔案描述,非同步任務的執行結果會存在資料庫裡,當這個引數設定為True的時候,結果不寫入資料庫,而是儲存在快取裡。這個功能在短時間內要大量執行非同步任務,且不需要把結果立刻寫入資料庫的情況下比較有用,可以提高效能。broker
需要傳入一個 Broker
物件的範例,用來控制這個非同步任務在哪個Broker裡執行。q_options: dict
這是最後一個引數了。我下面單獨介紹一下q_options
引數根據前面啟動任務的部分,我們啟動非同步任務的時候,可以通過命名引數向任務方法傳遞引數,比如:
async_task(demo_task, number=10)
但 async_task
這個方法本身又有很多引數,如果這個引數名稱和我們要執行的任務 demo_task
引數重名的話,這些引數就被 async_task
拿走了,我們的任務 demo_task
就拿不到這些引數了。
怎麼辦?
q_options
引數就是為了解決這個問題
可以把要傳給 async_task
的引數都包裝在一個 dict 裡面,然後通過 q_options
引數傳入
假如我們的 demo_task
是這樣的:
def demo_task(number: int, timeout: int):
...
除了 number
這個引數,還要接收一個跟 async_task
自有引數重名的 timeout
引數,使用 q_options
的解決方案如下
opts = {
'hook': 'hooks.print_result',
'group': 'math',
'timeout': 30
}
async_task(demo_task, number=10, timeout=100, q_options=opts)
這樣既能……又能……,完美啊~
當然我還是建議用 *args
的方式傳參,這樣就沒有引數重名的問題了。
有兩種方式新增定時任務
比較簡單,直接上程式碼
from django_q.tasks import schedule
schedule(
'demo_task',
schedule_type=Schedule.MINUTES,
minutes=1,
task_name='任務名稱',
)
有一點注意的是,因為新增後的定時任務是要儲存在資料庫中的
所以需要把要執行的方法(包含完整包名),以字串的形式傳入
假如在我們的Django專案中,要執行的是在 apps/test/tasks.py
檔案中的 demo_task
方法
那麼需要把 apps.test.tasks.demo_task
這個完整的名稱傳入
在admin中新增也是一樣
Django-Q的定時任務有很多型別:
注意,即使是Cron表示式,定時任務執行的最短間隔也是1分鐘
這點我一開始不知道,用Cron表示式寫了個15秒的任務,但執行時間根本不對,然後我翻了一下github上的issues,看到作者的解答才知道~
那個Issues的地址:https://github.com/Koed00/django-q/issues/179
作者的回覆:
The current design has a heartbeat of 30 seconds, which means the schedule table can't have schedules below that. Most of this is explained in the architecture docs. Because of the way the internal loop is set up, a resolution under a dozen seconds or so, quickly becomes unreliable.
I always imagined tasks that need accuracy measured in seconds, would use a delayed tasks strategy where a few seconds delay is either added through the broker or inside the task itself.
The problem with all this, is that a task is currently always added to the back of the queue.
So even with a 1 second resolution on the schedule, the task still has to wait it's execution time. Which can of course vary wildly depending on the broker type, worker capacity and current workload.
這點感覺有些雞肋,如果要高頻執行的任務,那隻能選擇Celery了
這個更簡單,傻瓜式操作
所以這部分略過了~
現在後端服務基本是用docker部署的
為了能在docker中使用Django-Q
我們需要在原有Django容器的基礎上,再起一個同樣的容器,然後入口改成qcluster的啟動命令
這裡有個issues也有討論這個問題:https://github.com/Koed00/django-q/issues/513
來個 docker-compose.yml
的例子
version: "3.9"
services:
redis:
image: redis:alpine
ports:
- 6379:6379
web:
build: .
command: python manage.py runserver 0.0.0.0:8000
volumes:
- .:/code
ports:
- "8000:8000"
depends_on:
- redis
- django_q
django_q:
build: .
command: python manage.py qcluster
volumes:
- .:/code
depends_on:
- redis
一個簡單的例子
其他的類似環境變數這些,根據實際情況來
注意:
docker-compose restart django_q
,才能使修改的程式碼生效Django-Q還提供了一些命令列工具
python manage.py qmonitor
python manage.py qmemory
python manage.py qinfo
除了使用命令監控,還可以在程式碼裡做監控,不過我暫時沒用到,所以還沒研究,有需要的同學可以直接看檔案
安裝完Django-Q後,會在admin出現三個選單,跟普通的Django app一樣,這些也是通過 admin 註冊進去的,因此我們可以重新註冊這些 ModelAdmin
來自定義admin上的操作介面
來一段官方關於失敗任務介面的程式碼:
from django_q import models as q_models
from django_q import admin as q_admin
admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
list_display = (
'name',
'func',
'result',
'started',
# add attempt_count to list_display
'attempt_count'
)
跟普通的 ModelAdmin
是一樣的
我們可以自行新增搜尋方塊、過濾欄位之類的。記得要先執行 admin.site.unregister([q_models.Failure])
取消之前Django-Q自己註冊的 ModelAdmin
物件。
Django內建訊號系統,我之前有寫過一篇簡單的文章介紹:3分鐘看懂Python後端必須知道的Django的訊號機制
Django-Q提供了兩類訊號:
例子程式碼如下:
from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute
@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
print("Task {} will be enqueued".format(task["name"]))
@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
print("Task {} will be executed by calling {}".format(
task["name"], func))
有需要的話可以註冊訊息接收器,做一些處理。(不過我暫時是沒用上)
搞定~
Django-Q使用下來的體驗還是不錯的,足夠輕量,部署足夠方便,足以應付大部分場景了~