由於最近工作需求,需要在已有專案新增一個新功能,實現設定熱載入的功能。所謂的設定熱載入,也就是說當服務收到設定更新訊息之後,我們不用重啟服務就可以使用最新的設定去執行任務。
下面我分別採用多程序
、多執行緒
、協程
的方式去實現設定熱載入。
如果我們程式碼實現上使用多程序, 主程序1來更新設定並行送指令,任務的呼叫是程序2,如何實現設定熱載入呢?
當主程序收到設定更新的訊息之後(設定讀取是如何收到設定更新的訊息的? 這裡我們暫不討論), 主程序就向進子程1傳送kill訊號,子程序1收到kill的訊號就退出,之後由訊號處理常式來啟動一個新的程序,使用最新的組態檔來繼續執行任務。
main
函數def main():
# 啟動一個程序執行任務
p1 = Process(target=run, args=("p1",))
p1.start()
monitor(p1, run) # 註冊訊號
processes["case100"] = p1 #將程序pid儲存
num = 0
while True: # 模擬獲取設定更新
print(
f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
print(f"{processes=}\n")
sleep(2)
if num == 4:
kill_process(processes["case100"]) # kill 當前程序
if num == 8:
kill_process(processes["case100"]) # kill 當前程序
if num == 12:
kill_process(processes["case100"]) # kill 當前程序
num += 1
signal_handler
函數def signal_handler(process: Process, func, signum, frame):
# print(f"{signum=}")
global counts
if signum == 17: # 17 is SIGCHILD
# 這個迴圈是為了忽略SIGTERM發出的訊號,避免搶佔了主程序發出的SIGCHILD
for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
signal.signal(signame, SIG_DFL)
print("Launch a new process")
p = multiprocessing.Process(target=func, args=(f"p{counts}",))
p.start()
monitor(p, run)
processes["case100"] = p
counts += 1
if signum == 2:
if process.is_alive():
print(f"Kill {process} process")
process.terminate()
signal.signal(SIGCHLD, SIG_IGN)
sys.exit("kill parent process")
#! /usr/local/bin/python3.8
from multiprocessing import Process
from typing import Dict
import signal
from signal import SIGCHLD, SIGTERM, SIGINT, SIGQUIT, SIG_DFL, SIG_IGN
import multiprocessing
from multiprocessing import Process
from typing import Callable
from data import processes
import sys
from functools import partial
import time
processes: Dict[str, Process] = {}
counts = 2
def run(process: Process):
while True:
print(f"{process} running...")
time.sleep(1)
def kill_process(process: Process):
print(f"kill {process}")
process.terminate()
def monitor(process: Process, func: Callable):
for signame in [SIGTERM, SIGCHLD, SIGINT, SIGQUIT]:
# SIGTERM is kill signal.
# No SIGCHILD is not trigger singnal_handler,
# No SIGINT is not handler ctrl+c,
# No SIGQUIT is RuntimeError: reentrant call inside <_io.BufferedWriter name='<stdout>'>
signal.signal(signame, partial(signal_handler, process, func))
def signal_handler(process: Process, func, signum, frame):
print(f"{signum=}")
global counts
if signum == 17: # 17 is SIGTERM
for signame in [SIGTERM, SIGCHLD, SIGQUIT]:
signal.signal(signame, SIG_DFL)
print("Launch a new process")
p = multiprocessing.Process(target=func, args=(f"p{counts}",))
p.start()
monitor(p, run)
processes["case100"] = p
counts += 1
if signum == 2:
if process.is_alive():
print(f"Kill {process} process")
process.terminate()
signal.signal(SIGCHLD, SIG_IGN)
sys.exit("kill parent process")
def main():
p1 = Process(target=run, args=("p1",))
p1.start()
monitor(p1, run)
processes["case100"] = p1
num = 0
while True:
print(
f"{multiprocessing.active_children()=}, count={len(multiprocessing.active_children())}\n")
print(f"{processes=}\n")
time.sleep(2)
if num == 4:
kill_process(processes["case100"])
if num == 8:
kill_process(processes["case100"])
if num == 12:
kill_process(processes["case100"])
num += 1
if __name__ == '__main__':
main()
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1
processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}
p1 running...
p1 running...
kill <Process name='Process-1' pid=2533 parent=2532 started>
multiprocessing.active_children()=[<Process name='Process-1' pid=2533 parent=2532 started>], count=1
processes={'case100': <Process name='Process-1' pid=2533 parent=2532 started>}
signum=17
Launch a new process
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1
processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1
processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}
p2 running...
p2 running...
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 started>], count=1
processes={'case100': <Process name='Process-2' pid=2577 parent=2532 started>}
p2 running...
p2 running...
kill <Process name='Process-2' pid=2577 parent=2532 started>
signum=17
Launch a new process
multiprocessing.active_children()=[<Process name='Process-2' pid=2577 parent=2532 stopped exitcode=-SIGTERM>], count=1
processes={'case100': <Process name='Process-3' pid=2675 parent=2532 started>}
p3 running...
p3 running...
multiprocessing.active_children()=[<Process name='Process-3' pid=2675 parent=2532 started>], count=1
好處:使用號誌可以處理多程序之間通訊的問題。
壞處:程式碼不好寫,寫出來程式碼不好理解。號誌使用必須要很熟悉,不然很容易自己給自己寫了一個bug.(所有初學者慎用,老司機除外。)
還有一點不是特別理解的就是process.terminate()
傳送出訊號是SIGTERM
number是15,但是第一次signal_handler
收到訊號卻是number=17,如果我要去處理15的訊號,就會導致前一個程序不能kill掉的問題。歡迎有對號誌比較熟悉的大佬,前來指點迷津,不甚感謝。
multiprocessing.Event
來實現設定熱載入實現邏輯是主程序1 更新設定並行送指令。程序2啟動排程任務。
這時候當主程序1更新好設定之後,傳送指令給程序2,這時候的指令就是用Event一個非同步事件通知。
直接上程式碼
scheduler
函數def scheduler():
while True:
print('wait message...')
case_configurations = scheduler_notify_queue.get()
print(f"Got case configurations {case_configurations=}...")
task_schedule_event.set() # 設定set之後, is_set 為True
print(f"Schedule will start ...")
while task_schedule_event.is_set(): # is_set 為True的話,那麼任務就會一直執行
run(case_configurations)
print("Clearing all scheduling job ...")
event_scheduler
函數def event_scheduler(case_config):
scheduler_notify_queue.put(case_config)
print(f"Put cases config to the Queue ...")
task_schedule_event.clear() # clear之後,is_set 為False
print(f"Clear scheduler jobs ...")
print(f"Schedule job ...")
import multiprocessing
import time
scheduler_notify_queue = multiprocessing.Queue()
task_schedule_event = multiprocessing.Event()
def run(case_configurations: str):
print(f'{case_configurations} running...')
time.sleep(3)
def scheduler():
while True:
print('wait message...')
case_configurations = scheduler_notify_queue.get()
print(f"Got case configurations {case_configurations=}...")
task_schedule_event.set()
print(f"Schedule will start ...")
while task_schedule_event.is_set():
run(case_configurations)
print("Clearing all scheduling job ...")
def event_scheduler(case_config: str):
scheduler_notify_queue.put(case_config)
print(f"Put cases config to the Queue ...")
task_schedule_event.clear()
print(f"Clear scheduler jobs ...")
print(f"Schedule job ...")
def main():
scheduler_notify_queue.put('1')
p = multiprocessing.Process(target=scheduler)
p.start()
count = 1
print(f'{count=}')
while True:
if count == 5:
event_scheduler('100')
if count == 10:
event_scheduler('200')
count += 1
time.sleep(1)
if __name__ == '__main__':
main()
wait message...
Got case configurations case_configurations='1'...
Schedule will start ...
1 running...
1 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='100'...
Schedule will start ...
100 running...
Put cases config to the Queue ...
Clear scheduler jobs ...
Schedule job ...
Clearing all scheduling job ...
wait message...
Got case configurations case_configurations='200'...
Schedule will start ...
200 running...
200 running...
使用Event事件通知,程式碼不易出錯,程式碼編寫少,易讀。相比之前號誌的方法,推薦大家多使用這種方式。
queue
和 event
.# threading
scheduler_notify_queue = queue.Queue()
task_schedule_event = threading.Event()
# async
scheduler_notify_queue = asyncio.Queue()
task_schedule_event = asyncio.Event()
具體的實現的方式有很多,也各自有各自的優劣勢。我們需要去深刻理解到需求本身,才去做技術選型。
本文來自部落格園,作者:煙燻柿子學程式設計,轉載請註明原文連結:https://www.cnblogs.com/aaron-948/p/16459059.html