詳細解析Python實現定時任務之apscheduler

2022-10-10 18:00:49
本篇文章給大家帶來了關於Python的相關知識,其中主要介紹了關於實現定時任務的相關問題,可以使用第三方包來管理定時任務,相對來說apscheduler使用起來更簡單,下面一起來看一下使用的方法,希望對大家有幫助。

程式設計師必備介面測試偵錯工具:

【相關推薦:Python3視訊教學

初識apscheduler

來個簡單的例子看看apscheduler是如何使用的。

#encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test():
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, 測試apscheduler'.format(now))
task = BlockingScheduler()
task.add_job(func=sch_test, trigger='cron', second='*/10')
task.start()

上述例子很簡單,我們首先要定義一個apscheduler的物件,然後add_job新增任務,最後start開啟任務就行了。

例子是每隔10秒執行一次sch_test任務,執行結果如下:

時間:2022-10-08 15:16:30, 測試apscheduler
時間:2022-10-08 15:16:40, 測試apscheduler
時間:2022-10-08 15:16:50, 測試apscheduler
時間:2022-10-08 15:17:00, 測試apscheduler

如果我們要在執行任務函數時攜帶引數,只要在add_job函數中新增args就行,比如task.add_job(func=sch_test, args=('a'), trigger='cron', second='*/10')。

apscheduler有哪些模組

上面例子中我們初步瞭解到如何使用apschedulerl了,接下來需要知道apscheduler的設計框架。apscheduler有四個主要模組,分別是:觸發器triggers、任務記憶體job_stores、執行器executors、排程器schedulers。

1. 觸發器triggers:

觸發器指的是任務指定的觸發方式,例子中我們用的是「cron」方式。我們可以選擇cron、date、interval中的一個。

cron表示的是定時任務,類似linux crontab,在指定的時間觸發。

可用引數如下:

01.png

除此之外,我們還可用表示式型別去設定cron。比如常用的有:

02.png

使用方法範例,在每天7點20分執行一次:

task.add_job(func=sch_test, args=('定時任務',), trigger='cron',

hour='7', minute='20')

date表示具體到某個時間的一次性任務;

使用方法範例:

# 使用run_date指定執行時間
task.add_job(func='sch_test', trigger='date', run_date=datetime.datetime(2022 ,10 , 8, 16, 1, 30))
# 或者用next_run_time
task.add_job(func=sch_test,trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))

interval表示的是迴圈任務,指定一個間隔時間,每過間隔時間執行一次。

interval可設定如下的引數:

03.png

使用方法範例,每隔3秒執行一次sch_test任務:

task.add_job(func=sch_test, args=('迴圈任務',), trigger='interval', seconds=3)。

來個例子把3種觸發器都使用一遍:

# encoding:utf-8
from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
task = BlockingScheduler()
task.add_job(func=sch_test, args=('一次性任務',),trigger='date', next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=3))
task.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
task.add_job(func=sch_test, args=('迴圈任務',), trigger='interval', seconds=3)
task.start()

列印部分結果:

時間:2022-10-08 15:45:49, 一次性任務測試apscheduler
時間:2022-10-08 15:45:49, 迴圈任務測試apscheduler
時間:2022-10-08 15:45:50, 定時任務測試apscheduler
時間:2022-10-08 15:45:52, 迴圈任務測試apscheduler
時間:2022-10-08 15:45:55, 定時任務測試apscheduler
時間:2022-10-08 15:45:55, 迴圈任務測試apscheduler
時間:2022-10-08 15:45:58, 迴圈任務測試apscheduler

通過程式碼範例和結果展示,我們可清晰的知道不同觸發器的使用區別。

2. 任務記憶體job_stores

顧名思義,任務記憶體是儲存任務的地方,預設都是儲存在記憶體中。我們也可自定義儲存方式,比如將任務存到mysql中。這裡有以下幾種選擇:

04.png

通常預設儲存在記憶體即可,但若程式故障重新啟動的話,會重新拉取任務執行了,如果你對任務的執行要求高,那麼可以選擇其他的記憶體。

使用SQLAlchemyJobStore記憶體範例:

from apscheduler.schedulers.blocking import BlockingScheduler
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
sched = BlockingScheduler()
# 使用mysql儲存任務
sql_url = 'mysql+pymysql://root:root@localhost:3306/db_name?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 新增任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
sched.start()

3. 執行器executors

執行器的功能就是將任務放到執行緒池或程序池中執行。有以下幾種選擇:

05.png

預設是ThreadPoolExecutor, 常用的也就是第執行緒和程序池執行器。如果應用是CPU密集型操作,可用ProcessPoolExecutor來執行。

4. 排程器schedulers

排程器屬於apscheduler的核心,它扮演著統籌整個apscheduler系統的角色,記憶體、執行器、觸發器在它的排程下正常執行。排程器有以下幾個:

06.png

不是特定場景下,我們最常用的是BlockingScheduler排程器。

異常監聽

定時任務在執行時,若出現錯誤,需要設定監聽機制,我們通常結合logging模組記錄錯誤資訊。

使用範例:

from apscheduler.schedulers.blocking import BlockingScheduler
import datetime
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
# logging紀錄檔設定列印格式及儲存位置
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='sche.log',
                    filemode='a')
def log_listen(event):
if event.exception :
print ( '任務出錯,報錯資訊:{}'.format(event.exception))
else:
print ( '任務正常執行...' )
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
    print(1/0)
sched = BlockingScheduler()
# 使用mysql儲存任務
sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
sched.add_jobstore('sqlalchemy',url=sql_url)
# 新增任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
# 設定任務執行完成及錯誤時的監聽
sched.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
# 設定紀錄檔監聽
sched._logger = logging
sched.start()

apscheduler的封裝使用

上面介紹了apscheduler框架的主要模組,我們基本能掌握怎樣使用apscheduler了。下面就來封裝一下apscheduler吧,以後要用直接在這份程式碼上修改就行了。

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import EVENT_JOB_EXECUTED , EVENT_JOB_ERROR
import logging
import logging.handlers
import os
import datetime
class LoggerUtils():
    def init_logger(self, logger_name):
        # 紀錄檔格式
        formatter = logging.Formatter('%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s')
        log_obj = logging.getLogger(logger_name)
        log_obj.setLevel(logging.INFO)
        # 設定log儲存位置
        path = '/data/logs/'
        filename = '{}{}.log'.format(path, logger_name)
        if not os.path.exists(path):
            os.makedirs(path)
        # 設定紀錄檔按照時間分割
        timeHandler = logging.handlers.TimedRotatingFileHandler(
           filename,
           when='D',  # 按照什麼維度切割, S:秒,M:分,H:小時,D:天,W:周
           interval=1, # 多少天切割一次
           backupCount=10  # 保留幾天
        )
        timeHandler.setLevel(logging.INFO)
        timeHandler.setFormatter(formatter)
        log_obj.addHandler(timeHandler)
        return log_obj
class Scheduler(LoggerUtils):
    def __init__(self):
        # 執行器設定
        executors = {
            'default': ThreadPoolExecutor(10),  # 設定一個名為「default」的ThreadPoolExecutor,其worker值為10
            'processpool': ProcessPoolExecutor(5)  # 設定一個名為「processpool」的ProcessPoolExecutor,其worker值為5
        }
        self.scheduler = BlockingScheduler(timezone="Asia/Shanghai", executors=executors)
        # 記憶體設定
        # 這裡使用sqlalchemy記憶體,將任務儲存在mysql
        sql_url = 'mysql+pymysql://root:root@localhost:3306/db?charset=utf8'
        self.scheduler.add_jobstore('sqlalchemy',url=sql_url)
        def log_listen(event):
            if event.exception:
                # 紀錄檔記錄
                self.scheduler._logger.error(event.traceback)
    
        # 設定任務執行完成及錯誤時的監聽
        self.scheduler.add_listener(log_listen, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
        # 設定紀錄檔監聽
        self.scheduler._logger = self.init_logger('sche_test')
    def add_job(self, *args, **kwargs):
        """新增任務"""
        self.scheduler.add_job(*args, **kwargs)
    def start(self):
        """開啟任務"""
        self.scheduler.start()
# 測試任務
def sch_test(job_type):
    now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
    print('時間:{}, {}測試apscheduler'.format(now, job_type))
    print(1/0)
# 新增任務,開啟任務
sched = Scheduler()
# 新增任務
sched.add_job(func=sch_test, args=('定時任務',), trigger='cron', second='*/5')
# 開啟任務
sched.start()

【相關推薦:Python3視訊教學

以上就是詳細解析Python實現定時任務之apscheduler的詳細內容,更多請關注TW511.COM其它相關文章!