輕量級訊息佇列 Django-Q 輕度體驗

2022-09-01 06:03:15

前言

最近做的這個專案(基於Django),需要做個功能,實現定時採集車輛定位。

這讓我想起來幾年前那個OneCat專案,當時我用的是Celery這個很重的元件

Celery實在是太重了,後來我做公眾號採集平臺的時候,又接觸了Django-RQ和Django-Q這倆,前者是對RQ的封裝,讓RQ和Django更好的結合在一起;後者是一個全新的「多程序任務佇列」元件,相比起celery很輕量,當時使用的時候就給我留下不錯的印象。

於是這個專案我決定繼續使用Django-Q來實現一些非同步操作和定時任務。

關於Django-Q

官方介紹:

A multiprocessing task queue for Django

快速開始

安裝

pip install django-q

新增到 INSTALLED_APPS

INSTALLED_APPS = (
    # other apps
    'django_q',
)

資料庫遷移

由於Django-Q會把執行結果放到資料庫裡,所以要執行一下資料庫遷移的操作

python manage.py migrate

這個操作會生成 django_q_ormqdjango_q_scheduledjango_q_task 三個表

設定

因為本身專案用的快取就是Redis,所以我直接用Redis作為訊息佇列的後端(broker)

Django-Q支援很多種後端,除了Redis還有Disque、IronMQ、Amazon SQS、MongoDB或者是Django的ORM~

settings.py 中新增以下設定:

Q_CLUSTER = {
    'name': 'project_name',
    'workers': 4,
    'recycle': 500,
    'timeout': 60,
    'compress': True,
    'cpu_affinity': 1,
    'save_limit': 250,
    'queue_limit': 500,
    'label': 'Django Q',
    'redis': {
        'host': 127.0.0.1',
        'port': 6379,
        'db': 0,
    }
}

啟動服務

python manage.py qcluster

搞定,現在訊息佇列服務已經跑起來了

我們可以新增非同步任務或者定時任務

非同步任務

最簡單的方式是使用它提供的 async_task 方法,新增一個新的非同步任務到佇列中

來寫個例子,輸入一個數,求階乘之後開平方

import math

def demo_task(number: int):
    return math.sqrt(math.factorial(number))

啟動任務

然後來新增一個非同步任務

from django_q.tasks import async_task, Task

def task_finish(task: Task):
    print(f'任務 {task.name}(ID:{task.id})完成!')

task_id = async_task(
    demo_task, 10,
    task_name='任務名稱',
    hook=task_finish,
)

可以看到,直接呼叫 async_task 方法就行

這個方法的定義是

async_task(func: Any, *args: Any, **kwargs: Any)

傳入要非同步執行的方法之後,可以把該方法的引數跟在後面傳進去,也可以用 kwargs 的方式傳入

這兩種方式都可以的:

  • async_task(demo_task, 10)
  • async_task(demo_task, number=10)

我個人比較喜歡第一種,因為Django-Q本身有幾個命名引數,比如 task_namehooktimeout之類的,用第一種方式傳參不容易和Django-Q預設的命名引數衝突。

獲取執行結果

有兩種方式獲取任務的執行結果:

  • admin後臺
  • 使用 result 方法,在程式碼中獲取

第一種方式無需贅述,在安裝Django-Q元件後執行了資料庫遷移,就會生成 Failed tasksScheduled tasksSuccessful tasks 三個admin模組,顧名思義,在 Failed tasksSuccessful tasks 中可以看到任務的執行結果,也就是我們寫在 demo_task 裡的返回值。

第二種方式,程式碼如下:

from django_q.tasks import result

task_result = result(task_id)

task_id 傳入就可以查詢任務執行的結果,如果任務還沒執行完,那結果就是 None

這個 result 方法還有個 wait 引數,可以設定等待時間,單位是毫秒

執行完成回撥

上面程式碼中,我們還設定了 hook 引數

作用就是任務執行完成之後,執行 task_finish 這個函數

task_finish 裡可以通過 task 引數獲取任務資訊

就是這樣~

async_task 的其他引數

建立非同步任務的這個方法還有很多引數,官網檔案寫得還算可以,很多引數都是 Q_CLUSTER 設定裡面有的,在 async_task 裡設定這些引數就會覆蓋預設的設定。

我直接搬運一波,權當翻譯檔案了~

除了上面介紹到的 task_namehook 還有這些引數:

  • group: str 任務的分組名稱
  • save 設定任務執行結果的儲存後端,不過檔案裡只是一句話的介紹,具體如何設定還得研究一下。(稍微看了一下原始碼,沒搞懂,動態語言太折磨人了)
  • timeout: int 任務超時時間,單位是秒。回顧一下前面的 Q_CLUSTER 設定,裡面有 timeout 設定,設定這個引數可以覆蓋前面的設定,如果任務執行超出了這個時間,就會被直接終止。
  • ack_failures: bool 設定為True時,也承認失敗的任務。這會導致失敗的任務被視為成功交付,從而將其從任務佇列中刪除。預設值為False。(說實話我沒看懂是啥意思)
  • sync: bool 設定為True的時候,所有非同步任務會變成同步執行,這個功能在測試的時候比較有用。預設是False。
  • cached 這個引數既可以設定為True,也可以傳入數位,代表快取過期時間。根據檔案描述,非同步任務的執行結果會存在資料庫裡,當這個引數設定為True的時候,結果不寫入資料庫,而是儲存在快取裡。這個功能在短時間內要大量執行非同步任務,且不需要把結果立刻寫入資料庫的情況下比較有用,可以提高效能。
  • broker 需要傳入一個 Broker 物件的範例,用來控制這個非同步任務在哪個Broker裡執行。
  • q_options: dict 這是最後一個引數了。我下面單獨介紹一下

q_options 引數

根據前面啟動任務的部分,我們啟動非同步任務的時候,可以通過命名引數向任務方法傳遞引數,比如:

async_task(demo_task, number=10)

async_task 這個方法本身又有很多引數,如果這個引數名稱和我們要執行的任務 demo_task 引數重名的話,這些引數就被 async_task 拿走了,我們的任務 demo_task 就拿不到這些引數了。

怎麼辦?

q_options 引數就是為了解決這個問題

可以把要傳給 async_task 的引數都包裝在一個 dict 裡面,然後通過 q_options 引數傳入

假如我們的 demo_task 是這樣的:

def demo_task(number: int, timeout: int):
  ...

除了 number 這個引數,還要接收一個跟 async_task 自有引數重名的 timeout 引數,使用 q_options 的解決方案如下

opts = {
    'hook': 'hooks.print_result',
    'group': 'math',
    'timeout': 30
}

async_task(demo_task, number=10, timeout=100, q_options=opts)

這樣既能……又能……,完美啊~

當然我還是建議用 *args 的方式傳參,這樣就沒有引數重名的問題了。

定時任務

有兩種方式新增定時任務

  • 在程式碼新增
  • admin後臺

在程式碼中新增

比較簡單,直接上程式碼

from django_q.tasks import schedule

schedule(
  'demo_task',
  schedule_type=Schedule.MINUTES,
  minutes=1,
  task_name='任務名稱',
)

有一點注意的是,因為新增後的定時任務是要儲存在資料庫中的

所以需要把要執行的方法(包含完整包名),以字串的形式傳入

假如在我們的Django專案中,要執行的是在 apps/test/tasks.py 檔案中的 demo_task 方法

那麼需要把 apps.test.tasks.demo_task 這個完整的名稱傳入

在admin中新增也是一樣

時間間隔設定

Django-Q的定時任務有很多型別:

  • 一次性
  • 按x分鐘執行一次
  • 每小時一次
  • 每天
  • 每週
  • 每月
  • 每季度
  • 每年
  • Cron表示式

注意,即使是Cron表示式,定時任務執行的最短間隔也是1分鐘

這點我一開始不知道,用Cron表示式寫了個15秒的任務,但執行時間根本不對,然後我翻了一下github上的issues,看到作者的解答才知道~

那個Issues的地址:https://github.com/Koed00/django-q/issues/179

作者的回覆:

The current design has a heartbeat of 30 seconds, which means the schedule table can't have schedules below that. Most of this is explained in the architecture docs. Because of the way the internal loop is set up, a resolution under a dozen seconds or so, quickly becomes unreliable.

I always imagined tasks that need accuracy measured in seconds, would use a delayed tasks strategy where a few seconds delay is either added through the broker or inside the task itself.

The problem with all this, is that a task is currently always added to the back of the queue.
So even with a 1 second resolution on the schedule, the task still has to wait it's execution time. Which can of course vary wildly depending on the broker type, worker capacity and current workload.

這點感覺有些雞肋,如果要高頻執行的任務,那隻能選擇Celery了

在admin後臺新增

這個更簡單,傻瓜式操作

所以這部分略過了~

docker部署

現在後端服務基本是用docker部署的

為了能在docker中使用Django-Q

我們需要在原有Django容器的基礎上,再起一個同樣的容器,然後入口改成qcluster的啟動命令

這裡有個issues也有討論這個問題:https://github.com/Koed00/django-q/issues/513

來個 docker-compose.yml 的例子

version: "3.9"
services:  
  redis:
    image: redis:alpine
    ports:
      - 6379:6379
  web:
    build: .
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - .:/code
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - django_q
  django_q:
    build: .
    command: python manage.py qcluster
    volumes:
      - .:/code
    depends_on:
      - redis

一個簡單的例子

其他的類似環境變數這些,根據實際情況來

注意:

  • Django容器有的東西(環境變數、依賴),Django-Q也要同步加進去
  • Django專案程式碼修改之後,如果是通過uwsgi之類的自動重啟服務,那要注意Django-Q不會自動重啟,需要手動執行 docker-compose restart django_q ,才能使修改的程式碼生效

其他

命令列工具

Django-Q還提供了一些命令列工具

  • 監控cluster執行情況:python manage.py qmonitor
  • 監控內容:python manage.py qmemory
  • 檢視當前狀態資訊:python manage.py qinfo

除了使用命令監控,還可以在程式碼裡做監控,不過我暫時沒用到,所以還沒研究,有需要的同學可以直接看檔案

admin自定義

安裝完Django-Q後,會在admin出現三個選單,跟普通的Django app一樣,這些也是通過 admin 註冊進去的,因此我們可以重新註冊這些 ModelAdmin 來自定義admin上的操作介面

來一段官方關於失敗任務介面的程式碼:

from django_q import models as q_models
from django_q import admin as q_admin

admin.site.unregister([q_models.Failure])
@admin.register(q_models.Failure)
class ChildClassAdmin(q_admin.FailAdmin):
    list_display = (
        'name',
        'func',
        'result',
        'started',
        # add attempt_count to list_display
        'attempt_count'
    )

跟普通的 ModelAdmin 是一樣的

我們可以自行新增搜尋方塊、過濾欄位之類的。記得要先執行 admin.site.unregister([q_models.Failure]) 取消之前Django-Q自己註冊的 ModelAdmin 物件。

訊號

Django內建訊號系統,我之前有寫過一篇簡單的文章介紹:3分鐘看懂Python後端必須知道的Django的訊號機制

Django-Q提供了兩類訊號:

  • 任務加入訊息佇列前
  • 任務執行前

例子程式碼如下:

from django.dispatch import receiver
from django_q.signals import pre_enqueue, pre_execute

@receiver(pre_enqueue)
def my_pre_enqueue_callback(sender, task, **kwargs):
    print("Task {} will be enqueued".format(task["name"]))

@receiver(pre_execute)
def my_pre_execute_callback(sender, func, task, **kwargs):
    print("Task {} will be executed by calling {}".format(
          task["name"], func))

有需要的話可以註冊訊息接收器,做一些處理。(不過我暫時是沒用上)

小結

搞定~

Django-Q使用下來的體驗還是不錯的,足夠輕量,部署足夠方便,足以應付大部分場景了~

參考資料