一篇文章帶你入門celery,內附優先順序、訊號、工作流任務範例程式碼

2021-05-02 03:00:52

訊息佇列是什麼

很多人初次聽說訊息佇列的時候可能會覺得這個詞有點高階,一定充滿了複雜的知識點,對,其實沒錯,生產中使用使的確很複雜,但在學習時,我們可以將其理解的很簡單,怎麼理解呢,拆開來看,訊息(Message)+佇列(Queue):

  • 訊息就很好理解了,微信訊息,簡訊訊息,小道訊息,內幕訊息等等,訊息在日常生活中簡直無所不在,這裡的訊息也並無差別,不過我還是簡單用「黑話」描述一下,一段承載生產實體傳遞到消費實體通訊內容的結構化資料,通常是序列化的位元組陣列。
  • 佇列那就更簡單了,資料結構學過吧,先進先出知道吧,沒了。
    訊息佇列其實是生產者-消費者模型的一種實現方式,如下圖所示,把記憶體緩衝區換成訊息佇列即可很清晰的表達其作用:作為生產者至消費者之間的一個訊息通道。
    image.png
    圖片來源

    為什麼要用訊息佇列

    這裡我用一段文字做一個形象的比喻:

    一陣涼風吹過樹林,幾片枯黃的葉子在樹枝上搖搖欲墜,摩擦中還發出沙沙之聲,彷彿都想把對方先推下去,殊不知無論早晚,結局都是一樣。
    風繼續向前吹去,卻撞在了一座宏大的建築上,不敢再向前。前面的建築像是一座住著吸血鬼的中世紀城堡,牆壁上雕刻著奇異的圖案,最上方的房檐上站立著幾隻黑色的石鷹,城堡兩側隱藏在濃密的迷霧中讓人看不太清,但很明顯一定充滿了極致的對稱美。
    此時城堡的大廳內零零散散站著一些人,但卻異常安靜,安靜到空氣中充滿了肅殺之氣,配合著這秋意的蕭瑟使人不寒而慄。
    這些人有同一個身份--賞金獵人,他們來此只有一個目的,那塊懸浮在大廳上空的木牌。木牌看似沒什麼特別之處,但邊角處暗紅色的血跡和劍痕幾乎在明示其並不簡單。
    木牌有一個很文雅的名字,「見無」,意思一目瞭然,在其上被見到的人馬上就會在這個世界消失。
    「見無」上的任務多是由一些富商巨賈,豪門貴族釋出,一是他們掌握敵國的財富,請得動這些賞金獵人,二是總還得維持在江湖上那點偽善。雖說「見無」,但事情總有例外,高居任務榜首的天級、地級、玄級三個任務已在那裡七百年之久,而今天讓這些神龍見首不見尾的獵人們齊聚一廳,擡首觀望的原因就是,玄級任務被完成了。
    玄級任務不知由誰釋出,但任務內容從釋出至今卻未曾變化:當代姑蘇慕容家家主的項上人頭。歲月更迭,慕容家主不知換了多少代,卻未有一人是死於這個任務,反而因任務丟掉性命的賞金獵人那真是一茬又一茬,甚至慕容家已經將此作為其震懾武林的手段。
    關於玄級任務的釋出者,坊間猜測頗多,有位於西南把控天下古玩的星家,虎踞遼北鎮守邊疆的岳家,甚至還有猜測是隱世不出根植燕京的王家,每種猜測都有其緣由,不必細說。
    此時人們更關注的是,慕容家主被殺了?那可是慕容家,一門冷月劍法傳承了千年,也震懾了武林千年的慕容家,有道是,「冷光浮照千萬裡,月下再無一絲聲「。不過「見無」萬千年來從未出過差錯,獵人們不存在一絲對結果的質疑,至於為什麼沒有聚在一起討論是誰完成了任務,是因為所有人心中都是同一個答案,他叫「若」,沒有姓氏沒有出身,甚至沒人見過他的臉,只知道他的名字是「若」,一個仿若虛無的人。**
    「若」的強大沒有人敢質疑,他曾。。

在上面的小故事中,任務榜單對應訊息佇列,任務釋出人是生產者,賞金獵人是消費者。
先來想像一下如果沒有任務榜單,一個富商想殺死某人只能挨個給獵人們打電話,「喂,你有空不啦,我要殺xxx,你要多少錢」,「沒空」。發現問題了嗎,這種方式效率是極其低下的並且想給人家打電話還得知道電話號碼,而採用任務榜單就很優雅的解決了這個問題,有任務放上去就好,自然會有人處理。
這就是訊息佇列的第一個好處,實現了邏輯上的解耦,沒有依賴,每個實體各司其職,豈不美哉。並且如果任務極多的話,通過榜單可以分發到很多賞金獵人,每個賞金獵人都可以滿負荷幹活,這就是分散式
再思考一下,榜單上的任務其實並不是要馬上完成,任務完成後獵人再通過榜單通知釋出者即可,這就是非同步,非同步也是計算機世界應用非常廣泛的設計。
當然還可以利用訊息佇列做更多事情,像上面故事中的任務分級等等,還可以設定不同的路由,某個任務只針對某些獵人可見。當任務過多時,只需要再招募獵人,實現了水平擴充套件。
總結一下,三大好處:解耦,分散式,非同步。應用時可延伸出,不同路由策略、優先順序、限流、水平擴充套件等等好處。

AMQP & rabbitMQ

維基百科:高階訊息佇列協定Advanced Message Queuing Protocol(AMQP)是訊息導向中介軟體提供的開放的應用層協定,其設計目標是對於訊息的排序、路由(包括對等和訂閱-釋出)、保持可靠性、保證安全性[[1]](https://zh.wikipedia.org/wiki...。AMQP規範了訊息傳遞方和接收方的行為,以使訊息在不同的提供商之間實現互操作性,就像SMTPHTTPFTP等協定可以建立互動系統一樣。與先前的中介軟體標準(如Java訊息服務)不同的是,JMS在特定的API介面層面和實現行為上進行了統一,而高階訊息佇列協定則關注於各種訊息如何以位元組流的形式進行傳遞。因此,使用了符合協定實現的任意應用程式之間可以保持對訊息的建立、傳遞。
官網:The Advanced Message Queuing Protocol (AMQP) is an open standard for passing business messages between applications or organizations. It connects systems, feeds business processes with the information they need and reliably transmits onward the instructions that achieve their goals.
概念性的描述看看就好,直接理解起來還是有些困難的,從拆分元件的角度看就簡單多了。
AMQP中的元件:
Broker:rabbitMQ服務就是Broker,是一個比較大的概念,描述的是整個應用服務。
交換機(exchange):用於接收來自生產者的訊息,並把訊息轉發到訊息佇列中。AMQP中存在四種交換機,Direct exchange、Fanout exchange、Topic exchange、Headers exchange,
圖片來源
而RabbitMQ是使用基於AMQP來實現的開源訊息佇列伺服器,具備極高的穩定性和可靠性,自帶一個監控平臺,等下會用到。

本文主角--celery

celery概述

celery是python實現的一個輕量級分散式框架系統,使用celery可以很簡單快速的實現任務分散式下發。
celery還提供了極其完善的檔案,讓開發者可以很快速的上手和深入學習。
celery的應用場景就不說了,看了上面訊息佇列的介紹應該很清楚,什麼場景可以用celery。

broker,backend,worker是什麼

broker在上文也有提到,可以在此簡單理解為用來傳遞celery任務訊息的中介軟體。最新的celery5中支援四種broker:RabbitMQ、Redis、Amazon SQS、Zookeeper(實驗性質)。一般最常用的就是redis和rabbitMQ。
backend是用來儲存任務結果和中間狀態的實體,backend的選擇就很多了,redis/mongoDB/elsticsearch/rabbitMQ,甚至還可以自己宣告。如果生產者或者其他服務需要關心非同步任務的結果則一定要設定backend。
worker,顧名思義,其作用就是執行任務,需要注意的是啟動worker時一般需要設定其監聽的佇列和最大並行數。

一個簡單的demo

root_dir
├── celery_demo
│   ├── __init__.py

__init__.py

import time

from celery import Celery
from celery.exceptions import TimeoutError
from celery.result import AsyncResult
from kombu import Queue, Exchange

# celery設定,4.0之後引入了小寫設定,這種大寫設定在6.0之後將不再支援
# 可以參考此連結
# https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc
CONFIG = {
    # 設定時區
    'CELERY_TIMEZONE': 'Asia/Shanghai',
    # 預設為true,UTC時區
    'CELERY_ENABLE_UTC': False,
    # broker,注意rabbitMQ的VHOST要給你使用的使用者加許可權
    'BROKER_URL': 'amqp://root:[email protected]:5672/dev',
    # backend設定,注意指定redis資料庫
    'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4',
    # worker最大並行數
    'CELERYD_CONCURRENCY': 10,
    # 如果不設定,預設是celery佇列,此處使用預設的直連交換機,routing_key完全一致才會排程到celery_demo佇列
    # 此處注意,元組中只有一個值的話,需要最後加逗號
    'CELERY_QUEUES': (
        Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo"),
    )
}
app = Celery()
app.config_from_object(CONFIG)


@app.task(name='demo_task')
def demo_task(x, y):
    print(f"這是一個demo任務,睡了10秒,並返回了{x}+{y}的結果。")
    time.sleep(10)
    return x + y


def call():
    def get_result(task_id):
        res = AsyncResult(task_id)
        try:
            # 拿到非同步任務的結果,需要用task_id範例化AsyncResult,再呼叫get方法,get預設是阻塞方法,提供timeout引數,此處設定為0.1秒
            res.get(0.1)
            return res.get(0.1)
        except TimeoutError:
            return None

    tasks = []
    print("開始下發11個任務")
    for _ in range(11):
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo'))
    print("等待10秒後查詢結果")
    time.sleep(10)
    for index, task in enumerate(tasks):
        task_result = get_result(task.id)
        if task_result is not None:
            print(f"任務{index}的返回值是:{task_result}")
        else:
            print(f"任務{index}還沒執行結束")
    print("再等待10秒")
    time.sleep(10)
    print(f"任務10的返回值是:{get_result(tasks[-1].id)}")


if __name__ == '__main__':
    call()

pycharm中啟動worker
pycharm-worker.png
celery_demo_queue.png
啟動後可以在rabbitMQ的監控看板上看到出現了celery_demo佇列。
因為設定了最大並行為10,接下來下發11個任務看是什麼結果。
display.png
可以看到在第一個10秒等待後,任務10(第11個任務)並未結束,繼續等待10秒後才能拿到結果,說明最大並行數的確生效了。

常用設定說明

介紹一些常用的設定

  • CELERYD_PREFETCH_MULTIPLIER:預取訊息數量,預設會取4*並行數,在上面的例子中則會最多預取40個訊息,如果設定為1,則表示禁止預取。
  • CELERY_ACKS_LATE:預設為FALSE,這個引數的理解需要先了解下rabbitMQ的ACK機制,簡單說就是如果設定True則會在任務執行完成後才會對訊息佇列傳送確認,表明訊息已被消費,如果任務執行中發生了異常情況,未傳送確認訊息,則訊息佇列會繼續保留此訊息,直到下一個worker取走併成功執行。此時衍生出了一個問題,需要保證此訊息是冪等的,也就是無論執行多少次結果都一樣,否則可能會得到一些預料之外的結果。
  • CELERYD_MAX_TASKS_PER_CHILD:worker執行多少個任務會重新啟動程序,預設為無限制,建議設定此值,可避免記憶體漏失。
  • CELERY_ROUTES:可以在設定中指定每個任務的路由規則,上面例子使用的動態指定佇列的方式,在呼叫時指定路由規則。

其餘設定參考官方檔案

訊號(Signals)

celery中的訊號我理解為就是勾點函數,celery提供了不同型別的勾點函數,分別對應不同元件,常用的有三種:任務型別、worker型別、紀錄檔型別。
下面附上程式碼範例,以演示訊號的效果:

@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))
@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    worker_logger.info(f"為worker新增一個監控佇列:{queue_name}")
    instance.app.amqp.queues.select_add(queue_name)
    worker_logger.info(f"worker當前監控佇列:{','.join(instance.app.amqp.queues.keys())}")
@after_setup_logger.connect
def setup_logger(logger, loglevel, logfile, **kwargs):
    worker_logger.info(f"worker紀錄檔級別是:{loglevel}")
    worker_logger.info(
        f"logger中目前有{len(logger.handlers)}個handler,分別是:{','.join(type(_).__name__ for _ in logger.handlers)}")

結果如下所示:
task_signals.png
logger_signals.png

設定任務優先順序

celery使用rabbitMQ支援任務優先順序非常簡單,只需要在Queue的設定中加一個引數,並新增CELERY_ACKS_LATE,CELERYD_PREFETCH_MULTIPLIER設定,作用在上個章節有講,如下

# 優先順序範圍設定為0-9,最大設定為255,數位越大優先順序越高
'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 1,
'CELERY_QUEUES': (
        Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}),
    )

需要注意的是,x-max-priority是rabbitMQ3.5.0版本後才支援的,不要用錯版本喲
首先刪除之前的celery_demo佇列,再次啟動worker後可以發現celery_demo佇列多了Pri標識,表明已經支援優先順序。
priority_queue.png
接下來驗證下優先順序是否有效,數位越大,優先順序越高
連線任務型別訊號task_received,當任務被worker接收到時執行

@task_received.connect
def on_task_received(request, **kwargs):
    # 函數的一個引數就是任務編號
    worker_logger.info(f"任務{request.args[0]}已被worker接收,開始執行")

再改造一下call方法,因為worker的並行數是10,所以先下發10個任務,讓worker滿並行,再以優先順序由低到高下發10個任務,按照預期任務的執行順序應該task19到task10排列。

def priority_call():
    tasks = []
    print("同時下發20個任務")
    for _ in range(10):
        # apply_async提供priority引數指定優先順序
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0))
    time.sleep(1)
    for _ in range(10, 20):
        # apply_async提供priority引數指定優先順序
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10))

結果如下,很明顯可以看到,符合預期
priority_result.png
不知道有沒有人對這兩個設定有疑問,看到網上有一些文章提到了一定要設定這兩個引數才能實現優先順序,但是並沒有說具體原因,簡單說下我的理解。上文已經說明這兩個設定的作用,不再贅述,只說下和優先順序的關係。

'CELERY_ACKS_LATE': True,
'CELERYD_PREFETCH_MULTIPLIER': 1,

先假設如果不設定CELERY_ACKS_LATE,celery為提高效能會在任務真正執行前就會向佇列傳送確認訊息,這會導致儘管一個worker設定了並行數是10,但實際上在此worker上最多同時會有20個任務,其中10個正在執行,另外10個是還沒傳送確認訊息(ACK)的,這10個實際上並未開始執行,所以如果其優先順序很高,但是卻並未執行,也沒有分配到其他worker,反而可能低優先順序卻在其他worker開始執行了,顯然不符合優先順序的預期。
CELERYD_PREFETCH_MULTIPLIER的作用也是如此,你可以按照上面的分析自行理解下,其目的都是為了保證每個worker的並行都只會分配一個任務。

工作流任務

官方檔案寫的非常好,有耐心還是去讀檔案比較好。
我們先來了解下signature,翻譯過來就是簽名,這裡和java的方法簽名概念類似,java中用方法名和引數型別組成了一個方法的簽名,celery中的signature同樣是包裝了task和指定的引數,方便可以可以對其進行傳遞,比如作為引數傳遞到某個函數。簽名包裝後還可以進行二次修改,比如新增或更新某個引數,當然還可以通過immutable=True將其設定為不可修改。

celery通過繼承Signature實現了幾個易用的工作流任務類:

  • chain:鏈式任務,序列執行,父任務的返回值會作為引數傳遞給子任務
  • group:組任務,並行執行,使用celery.result.GroupResult獲取結果
  • chord:依賴一個group任務,group任務結束後,將所有子任務的返回值作為引數傳遞給chord任務
  • chunks:一般用於將同一任務的極多次執行分組下發,以降低訊息傳輸的成本

    工作流任務通常在一組任務有執行順序的要求時才會用到,做過DAG任務排程工具的同學肯定會容易理解,我舉個小例子說明下:
    早上起床後的流程:穿衣服->洗漱->吃早餐,這就是一個序列執行的鏈式任務,可能吃早餐的時候還會看下新聞,吃早餐和看新聞就是一個並行執行的組任務
    四個範例

    from celery import chain, group, chord, chunks
    
    @app.task(name='demo_task2')
    def demo_task2(x, y):
      return x * y
    
    
    @app.task(name='tsum')
    def tsum(nums):
      return sum(nums)
    
    
    def chain_call():
      # 1 * 2 * 3 * 4 = 24
      # .s()是.signature()的縮寫
      # 還可通過管道符呼叫chain,具體參考檔案
      res = chain(
          *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])()
      print(res.id)
      print(f"chain任務:1 * 2 * 3 * 4={res.get()}")
    
    
    def group_call():
      res = group(
          *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])()
      print(res.id)
      print(f"chain任務:1 * 2, 3 * 4, 5 * 6={res.get()}")
    
    
    def chord_call():
      res = chord(
          (demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]),
          tsum.s().set(routing_key='celery_demo', queue='celery_demo')
      )()
      print(res.id)
      print(f"chord任務:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}")
    
    
    def chunk_call():
      res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo')
      print(res.id)
      print(f"chunk任務:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}")
    

全部程式碼,可直接執行

import time

from celery import Celery
from celery.exceptions import TimeoutError
from celery.result import AsyncResult, GroupResult
from kombu import Queue, Exchange
from celery.signals import after_task_publish, celeryd_after_setup, after_setup_logger, task_received, task_success
from celery.utils.log import get_logger, worker_logger
from celery import chain, group, chord, chunks
import logging

# celery設定,4.0之後引入了小寫設定,這種大寫設定在6.0之後將不再支援
# 可以參考此連結
# https://docs.celeryproject.org/en/stable/userguide/configuration.html?highlight=worker#std-setting-enable_utc
CONFIG = {
    # 設定時區
    'CELERY_TIMEZONE': 'Asia/Shanghai',
    # 預設為true,UTC時區
    'CELERY_ENABLE_UTC': False,
    # broker,注意rabbitMQ的VHOST要給你使用的使用者加許可權
    'BROKER_URL': 'amqp://root:[email protected]:5672/dev',
    # backend設定,注意指定redis資料庫
    'CELERY_RESULT_BACKEND': 'redis://192.168.1.5:30412/4',
    # worker最大並行數
    'CELERYD_CONCURRENCY': 10,
    # 如果不設定,預設是celery佇列,此處使用預設的直連交換機,routing_key完全一致才會排程到celery_demo佇列
    'CELERY_ACKS_LATE': True,
    'CELERYD_PREFETCH_MULTIPLIER': 1,
    # 此處注意,元組中只有一個值的話,需要最後加逗號
    'CELERY_QUEUES': (
        Queue("celery_demo", Exchange("celery_demo"), routing_key="celery_demo", queue_arguments={'x-max-priority': 9}),
    )
}
app = Celery()
app.config_from_object(CONFIG)


@app.task(name='demo_task')
def demo_task(x, y):
    time.sleep(10)
    return x + y


@app.task(name='demo_task2')
def demo_task2(x, y):
    return x * y


@app.task(name='tsum')
def tsum(nums):
    return sum(nums)


@celeryd_after_setup.connect
def setup_direct_queue(sender, instance, **kwargs):
    queue_name = '{0}.dq'.format(sender)  # sender is the nodename of the worker
    worker_logger.info(f"為worker新增一個監控佇列:{queue_name}")
    instance.app.amqp.queues.select_add(queue_name)
    worker_logger.info(f"worker當前監控佇列:{','.join(instance.app.amqp.queues.keys())}")


@after_task_publish.connect
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
    # information about task are located in headers for task messages
    # using the task protocol version 2.
    info = headers if 'task' in headers else body
    print('after_task_publish for task id {info[id]}'.format(
        info=info,
    ))


@task_received.connect
def on_task_received(request, **kwargs):
    worker_logger.info(f"任務{request.args[0]}已被worker接收,開始執行")


@after_setup_logger.connect
def setup_logger(logger, loglevel, logfile, **kwargs):
    worker_logger.info(f"worker紀錄檔級別是:{loglevel}")
    worker_logger.info(
        f"logger中目前有{len(logger.handlers)}個handler,分別是:{','.join(type(_).__name__ for _ in logger.handlers)}")


def call():
    def get_result(task_id):
        res = AsyncResult(task_id)
        try:
            # 拿到非同步任務的結果,需要用task_id範例化AsyncResult,再呼叫get方法,get預設是阻塞方法,提供timeout引數,此處設定為0.1秒
            res.get(0.1)
            return res.get(0.1)
        except TimeoutError:
            return None

    tasks = []
    print("開始下發11個任務")
    for _ in range(11):
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo'))
    print("等待10秒後查詢結果")
    time.sleep(10)
    for index, task in enumerate(tasks):
        task_result = get_result(task.id)
        if task_result is not None:
            print(f"任務{index}的返回值是:{task_result}")
        else:
            print(f"任務{index}還沒執行結束")
    print("再等待10秒")
    time.sleep(10)
    print(f"任務10的返回值是:{get_result(tasks[-1].id)}")


def priority_call():
    tasks = []
    print("先下發10個任務,佔滿worker的並行")
    for _ in range(10):
        # apply_async提供priority引數指定優先順序
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=0))
    # 保險起見,sleep 1
    time.sleep(1)
    print("再以優先順序由低到高的順序下發10個任務,預期任務將逆序執行")
    for _ in range(10, 20):
        # apply_async提供priority引數指定優先順序
        tasks.append(demo_task.apply_async((_, _), routing_key='celery_demo', queue='celery_demo', priority=_ % 10))


def chain_call():
    # 1 * 2 * 3 * 4 = 24
    # .s()是.signature()的縮寫
    # 還可通過管道符呼叫chain,具體參考檔案
    res = chain(
        *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3,), (4,)]])()
    print(res.id)
    print(f"chain任務:1 * 2 * 3 * 4={res.get()}")


def group_call():
    res = group(
        *[demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]])()
    print(res.id)
    print(f"chain任務:1 * 2, 3 * 4, 5 * 6={res.get()}")


def chord_call():
    res = chord(
        (demo_task2.signature(_, routing_key='celery_demo', queue='celery_demo') for _ in [(1, 2), (3, 4), (5, 6)]),
        tsum.s().set(routing_key='celery_demo', queue='celery_demo')
    )()
    print(res.id)
    print(f"chord任務:sum(1 * 2, 3 * 4, 5 * 6)={res.get()}")


def chunk_call():
    res = chunks(demo_task2.s(), [(1, 2), (3, 4), (5, 6)], 2).apply_async(routing_key='celery_demo', queue='celery_demo')
    print(res.id)
    print(f"chunk任務:1 * 2, 3 * 4={res.get()[0]}, 5 * 6={res.get()[1]}")


if __name__ == '__main__':
    call()
    priority_call()
    chain_call()
    group_call()
    chord_call()
    chunk_call()

斜體