如何用Python實現設定熱載入?

2022-07-09 18:01:27

背景

由於最近工作需求,需要在已有專案新增一個新功能,實現設定熱載入的功能。所謂的設定熱載入,也就是說當服務收到設定更新訊息之後,我們不用重啟服務就可以使用最新的設定去執行任務。

如何實現

下面我分別採用多程序多執行緒協程的方式去實現設定熱載入。

使用多程序實現設定熱載入

如果我們程式碼實現上使用多程序, 主程序1來更新設定並行送指令,任務的呼叫是程序2,如何實現設定熱載入呢?

使用signal號誌來實現熱載入

當主程序收到設定更新的訊息之後(設定讀取是如何收到設定更新的訊息的? 這裡我們暫不討論), 主程序就向進子程1傳送kill訊號,子程序1收到kill的訊號就退出,之後由訊號處理常式來啟動一個新的程序,使用最新的組態檔來繼續執行任務。

main 函數

def main():
    # 啟動一個程序執行任務
    p1 = Process(target=run, args=("p1",))
    p1.start()

    monitor(p1, run) # 註冊訊號
    processes["case100"] = p1 #將程序pid儲存
    num = 0 
    while True: # 模擬獲取設定更新
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        sleep(2)
        if num == 4:
            kill_process(processes["case100"]) # kill 當前程序
        if num == 8:
            kill_process(processes["case100"]) # kill 當前程序
        if num == 12:
            kill_process(processes["case100"]) # kill 當前程序
        num += 1

signal_handler 函數

def signal_handler(process: Process, func, signum, frame):
    # print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGCHILD 
        # 這個迴圈是為了忽略SIGTERM發出的訊號,避免搶佔了主程序發出的SIGCHILD
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)

        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")

完整程式碼如下:

#! /usr/local/bin/python3.8
from multiprocessing import Process
from typing import Dict
import signal
from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
import multiprocessing
from multiprocessing import Process
from typing import Callable
from data import processes
import sys
from functools import partial
import time

processes: Dict[str, Process] = {}
counts = 2


def run(process: Process):
    while True:
        print(f"{process} running...")
        time.sleep(1)


def kill_process(process: Process):
    print(f"kill {process}")
    process.terminate()


def monitor(process: Process, func: Callable):
    for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
        # SIGTERM is kill signal.
        # No SIGCHILD is not trigger singnal_handler,
        # No SIGINT is not handler ctrl+c,
        # No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'>
        signal.signal(signame, partial(signal_handler, process, func))


def signal_handler(process: Process, func, signum, frame):
    print(f"{signum=}")
    global counts

    if signum == 17:  # 17 is SIGTERM
        for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
            signal.signal(signame, SIG_DFL)
        print("Launch a new process")
        p = multiprocessing.Process(target=func, args=(f"p{counts}",))
        p.start()
        monitor(p, run)
        processes["case100"] = p
        counts += 1

    if signum == 2:
        if process.is_alive():
            print(f"Kill {process} process")
            process.terminate()
        signal.signal(SIGCHLD, SIG_IGN)
        sys.exit("kill parent process")


def main():
    p1 = Process(target=run, args=("p1",))
    p1.start()
    monitor(p1, run)
    processes["case100"] = p1
    num = 0
    while True:
        print(
            f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
        print(f"{processes=}\n")
        time.sleep(2)
        if num == 4:
            kill_process(processes["case100"])
        if num == 8:
            kill_process(processes["case100"])
        if num == 12:
            kill_process(processes["case100"])
        num += 1


if __name__ == '__main__':
    main()

執行結果如下:

multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

p1 running...
p1 running...
kill <Process name='Process-1' pid=2533 parent=2532 started>
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1

processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}

signum=17
Launch a new process
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1

processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}

p2 running...
p2 running...
kill <Process name='Process-2' pid=2577 parent=2532 started>
signum=17
Launch a new process
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1

processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>}

p3 running...
p3 running...
multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1

總結:

好處:使用號誌可以處理多程序之間通訊的問題。
壞處:程式碼不好寫,寫出來程式碼不好理解。號誌使用必須要很熟悉,不然很容易自己給自己寫了一個bug.(所有初學者慎用,老司機除外。)
還有一點不是特別理解的就是process.terminate() 傳送出訊號是SIGTERM number是15,但是第一次signal_handler收到訊號卻是number=17,如果我要去處理15的訊號,就會導致前一個程序不能kill掉的問題。歡迎有對號誌比較熟悉的大佬,前來指點迷津,不甚感謝。

採用multiprocessing.Event 來實現設定熱載入

實現邏輯是主程序1 更新設定並行送指令。程序2啟動排程任務。
這時候當主程序1更新好設定之後,傳送指令給程序2,這時候的指令就是用Event一個非同步事件通知。

直接上程式碼

scheduler 函數

def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()
        print(f"Got case configurations {case_configurations=}...")

        task_schedule_event.set() # 設定set之後, is_set 為True

        print(f"Schedule will start ...")
        while task_schedule_event.is_set(): # is_set 為True的話,那麼任務就會一直執行
            run(case_configurations)

        print("Clearing all scheduling job ...") 

event_scheduler 函數

def event_scheduler(case_config):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear() # clear之後,is_set 為False
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")

完整程式碼如下:

import multiprocessing
import time


scheduler_notify_queue = multiprocessing.Queue()
task_schedule_event = multiprocessing.Event()


def run(case_configurations: str):
    print(f'{case_configurations} running...')
    time.sleep(3)


def scheduler():
    while True:
        print('wait message...')
        case_configurations = scheduler_notify_queue.get()

        print(f"Got case configurations {case_configurations=}...")
        task_schedule_event.set()

        print(f"Schedule will start ...")
        while task_schedule_event.is_set():
            run(case_configurations)

        print("Clearing all scheduling job ...")


def event_scheduler(case_config: str):

    scheduler_notify_queue.put(case_config)
    print(f"Put cases config to the Queue ...")

    task_schedule_event.clear()
    print(f"Clear scheduler jobs ...")

    print(f"Schedule job ...")


def main():
    scheduler_notify_queue.put('1')
    p = multiprocessing.Process(target=scheduler)
    p.start()

    count = 1
    print(f'{count=}')
    while True:
        if count == 5:
            event_scheduler('100')
        if count == 10:
            event_scheduler('200')
        count += 1
        time.sleep(1)


if __name__ == '__main__':
    main()

執行結果如下:

wait message...
Got case configurations case_configurations='1'...
Schedule will start ...
1 running...
1 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='100'...
Schedule will start ...
100 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='200'...
Schedule will start ...
200 running...
200 running...

總結:

使用Event事件通知,程式碼不易出錯,程式碼編寫少,易讀。相比之前號誌的方法,推薦大家多使用這種方式。

使用多執行緒或協程的方式,其實和上述實現方式一致。唯一區別就是呼叫了不同庫中,queueevent.

# threading
scheduler_notify_queue = queue.Queue()
task_schedule_event = threading.Event()

# async
scheduler_notify_queue = asyncio.Queue()
task_schedule_event = asyncio.Event()

結語:

具體的實現的方式有很多,也各自有各自的優劣勢。我們需要去深刻理解到需求本身,才去做技術選型。