Celery的基本使用

2023-06-30 06:01:11

day11——celery

celery介紹架構和安裝

# celery:分散式的非同步任務框架,主要用來做:
	非同步任務
    延時任務
    定時任務---》如果只想做定時任務,可以不使用celery,有別的選擇

# celery 框架,原理
1)可以不依賴任何伺服器,通過自身命令,啟動服務(內部支援socket)
2)celery服務為為其他專案服務提供非同步解決任務需求的
注:會有兩個服務同時執行,一個是專案服務,一個是celery服務,專案服務將需要非同步處理的任務交給celery服務,celery就會在需要時非同步完成專案的需求

人是一個獨立執行的服務 | 醫院也是一個獨立執行的服務
	正常情況下,人可以完成所有健康情況的動作,不需要醫院的參與;但當人生病時,就會被醫院接收,解決人生病問題
	人生病的處理方案交給醫院來解決,所有人不生病時,醫院獨立執行,人生病時,醫院就來解決人生病的需求
   
# celery架構
訊息中介軟體(broker):訊息佇列:可以使用redis,rabbitmq,咱們使用redis
任務執行單元(worker):真正的執行 提交的任務
任務執行結果儲存(banckend):可以使用mysql,redis,咱們使用redis

# 安裝celery
	-pip install Celery
    -釋放出可執行檔案:celery,由於 python直譯器的script資料夾再環境變數,任意路徑下執行celery都能找到
   
# celery不支援win,所以想再win上執行,需要額外安裝eventlet
windows系統需要eventlet支援:pip3 install eventlet
Linux與MacOS直接執行:
	3.x,4.x版本:celery worker -A demo -l info
    5.x版本:     celery -A demo worker -l info -P eventlet

celery執行非同步任務

# 基本使用
	1 在虛擬環境中裝celery和eventlet
    2 寫個demo.py檔案,範例化得到app物件,註冊任務
    	from celery import Celery
        import time
        broker = 'redis://127.0.0.1:6379/1'  # 訊息中介軟體 redis
        backend = 'redis://127.0.0.1:6379/2'  # 結果儲存 redis
        app = Celery(__name__,broker=broker,backend=backend)
        @app.task  # 變成celery的任務了
        def add(a,b):
            print('運算結果是',a+b)
            time.sleep(1)
            return a + b
    3 啟動worker(worker監聽訊息佇列,等待別人提交任務,如果有則直接執行,沒有則卡在這等待任務)
    	celery -A demo worker -l info -P eventlet
      
    4 別人提交任務,提交完成會返回一個id號,後期使用id號查詢,至於這個任務有沒有被執行,取決於worker有沒有啟動,如果worker沒有啟動會存放在訊息佇列中等待
    	from demo import add
        res = add.delay(2,2)
    	print(res)
    
    5 提交任務的人,使用id號檢視結果
    	from demo import app
        # celery的包下
        from celery.result import AsyncResult

        id = '042a8fc1-6b0f-4ad6-bf72-edefa657a52f'
        if __name__ == '__main__':
            a = AsyncResult(id=id, app=app)
            if a.successful():  # 正常執行完成
                result = a.get()  # 任務返回的結果
                print(result)
            elif a.failed():
                print('任務失敗')
            elif a.status == 'PENDING':
                print('任務等待中被執行')
            elif a.status == 'RETRY':
                print('任務異常後正在重試')
            elif a.status == 'STARTED':
                print('任務已經開始被執行')

包結構celery

# 使用步驟
	1 新建包:celery_task
    2 再包下新建 celery.py 必須叫它,裡面範例化得到app物件
    from celery import Celery
    broker = 'redis://127.0.0.1:6379/1'  # 訊息中介軟體 redis
    backend = 'redis://127.0.0.1:6379/2'  # 結果儲存 redis
    app = Celery(__name__, broker=broker, backend=backend, include=['celery_task.course_task','celery_task.home_task','celery_task.user_task'])
    
    3 新建任務py檔案:user_task.py   course_task.py  home_task.py
    	-以後跟誰相關的任務,就寫在誰裡面
        from .celery import app
        import time
        @app.task
        def send_sms(mobile, code):
            time.sleep(2)
            print('%s手機號,傳送簡訊成功,驗證碼是:%s' % (mobile, code))
            return True
    4 啟動worker,以包啟動,來到包所在路徑下
    	celery -A 包名 worker -l info -P eventlet
        celery -A celery_task worker -l info -P eventlet
       
    5 其他程式,匯入任務,提交任務即可
    	from celery_task.user_task import send_sms
        res = send_sms.delay(1999999333, 8888)
        print(res)  # f33ba3c5-9b78-467a-94d6-17b9074e8533
    
    6 其它程式,查詢結果
    from celery_task.celery import app
    # celery的包下
    from celery.result import AsyncResult

    id = '51a669a3-c96c-4f8c-a5fc-e1b8e2189ed0'
    if __name__ == '__main__':
        a = AsyncResult(id=id, app=app)
        if a.successful():  # 正常執行完成
            result = a.get()  # 任務返回的結果
            print(result)
        elif a.failed():
            print('任務失敗')
        elif a.status == 'PENDING':
            print('任務等待中被執行')
        elif a.status == 'RETRY':
            print('任務異常後正在重試')
        elif a.status == 'STARTED':
            print('任務已經開始被執行')

celery執行延遲任務和定時任務

# celery 可以做
	-非同步任務
    -延遲任務---》延遲多長時間幹任務
    -定時任務:每天12點鐘,每隔幾秒。。。
   		-如果只做定時任務,不需要使用celery這麼重,apscheduler(自己去研究)
        
# 非同步任務
	-匯入非同步任務的函數
    -函數.delay(引數)
   
# 延遲任務
	-匯入非同步任務的函數
    -函數.apply_async(kwargs={'mobile':'1896334234','code':8888},eta=時間物件)
   
# 定時任務:在app所在的檔案下設定
	- 1 設定
	app.conf.beat_schedule = {
        'send_sms': {
            'task': 'celery_task.user_task.send_sms',
            'schedule': timedelta(seconds=5),
            'args': ('1822344343', 8888),
        },
        'add_course': {
            'task': 'celery_task.course_task.add_course',
            # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
            'schedule': crontab(hour=11, minute=38),  # 每天11點35,執行
            'args': (),
        }
    }
    -2 啟動beat
	celery  -A celery_task beat -l info 
       啟動worker
    celery -A celery_task worker -l info -P eventlet
    -3 到了時間,beat程序負責提交任務到訊息佇列---》worker執行

django中使用celery

# 使用步驟
	1 把之前寫好的包,copy到專案根路徑下
    2 在xx_task.py 中寫任務
    	from .celery import app
        @app.task
        def add_banner():
            from home.models import Banner
            Banner.objects.create(title='測試', image='/1.png', link='/banner', info='xxx',orders=99)
            return 'banner增加成功'
    3 在celery.py 中載入django設定
    import os
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffy_api.settings.dev")
    
    4 檢視函數中,匯入任務,提交即可
    	class CeleryView(APIView):
            def get(self, request):
                res = add_banner.delay()
                return APIResponse(msg='新增banner的任務已經提交了')
    5 啟動worker,等待執行即可
    	celery -A celery_task worker -l info -P eventlet

介面快取

# 所有介面都可以改造,尤其是查詢所有的這種介面,如果加入快取,會極大的提高查詢速度

# 首頁輪播圖介面:獲取輪播圖資料,加快取---》咱們只是以它為例


class BannerView(GenericViewSet, ListModelMixin):
    queryset = Banner.objects.all().filter(is_delete=False, is_show=True).order_by('orders')[:settings.BANNER_COUNT]
    serializer_class = BannerSerializer

    def list(self, request, *args, **kwargs):
        '''
          1 先去快取中查一下有沒有資料
          2 如果有,直接返回,不走父類別的list了(list在走資料庫)
          3 如果沒有,走父類別list,查詢資料庫
          4 把返回的資料,放到快取中
        '''

        data = cache.get('home_banner_list')
        if not data:  # 快取中沒有
            print('走了資料庫')
            res = super().list(request, *args, **kwargs)  # 查詢資料庫
            # 返回的資料,放到快取中
            data = res.data.get('data')  # {code:100,msg:成功,data:[{},{}]}
            cache.set('home_banner_list', data)
        return APIResponse(data=data)


    
# 公司裡可能會這麼寫
	-寫一個查詢所有帶快取的基礎類別
    -寫個裝飾器,只要一設定,就自動帶快取
    
    
# 雙寫一致性問題:快取資料和資料庫資料不一致了
	-寫入資料庫,刪除快取
    -寫入資料庫,更新快取
    -定時更新快取