推薦學習:
程式:例如xxx.py這是程式,是一個靜態的
程序:一個程式執行起來後,程式碼+用到的資源 稱之為程序,它是作業系統分配資源的基本單元。不僅可以通過執行緒完成多工,程序也是可以的
工作中,任務數往往大於cpu的核數,即一定有一些任務正在執行,而另外一些任務在等待cpu進行執行,因此導致了有了不同的狀態
multiprocessing
模組通過建立一個Process
物件然後呼叫它的start()
方法來生成程序,Process
與threading.Thread API
相同。
語法格式:multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
引數說明:
group
:指定行程群組,大多數情況下用不到target
:如果傳遞了函數的參照,可以任務這個子程序就執行這裡的程式碼name
:給程序設定一個名字,可以不設定args
:給target指定的函數傳遞的引數,以元組的方式傳遞kwargs
:給target指定的函數傳遞命名引數multiprocessing.Process 物件具有如下方法和屬性:
方法名/屬性 | 說明 |
---|---|
run() | 程序具體執行的方法 |
start() | 啟動子程序範例(建立子程序) |
join([timeout]) | 如果可選引數 timeout 是預設值 None,則將阻塞至呼叫 join() 方法的程序終止;如果 timeout 是一個正數,則最多會阻塞 timeout 秒 |
name | 當前程序的別名,預設為Process-N,N為從1開始遞增的整數 |
pid | 當前程序的pid(程序號) |
is_alive() | 判斷程序子程序是否還在活著 |
exitcode | 子程序的退出程式碼 |
daemon | 程序的守護標誌,是一個布林值。 |
authkey | 程序的身份驗證金鑰。 |
sentinel | 系統物件的數位控制程式碼,當程序結束時將變為 ready。 |
terminate() | 不管任務是否完成,立即終止子程序 |
kill() | 與 terminate() 相同,但在 Unix 上使用 SIGKILL 訊號。 |
close() | 關閉 Process 物件,釋放與之關聯的所有資源 |
# -*- coding:utf-8 -*-from multiprocessing import Processimport timedef run_proc(): """子程序要執行的程式碼""" while True: print("----2----") time.sleep(1)if __name__=='__main__': p = Process(target=run_proc) p.start() while True: print("----1----") time.sleep(1)
執行結果:
說明:建立子程序時,只需要傳入一個執行函數和函數的引數,建立一個Process
範例,用start()
方法啟動
# -*- coding:utf-8 -*-from multiprocessing import Processimport osimport timedef run_proc(): """子程序要執行的程式碼""" print('子程序執行中,pid=%d...' % os.getpid()) # os.getpid獲取當前程序的程序號 print('子程序將要結束...')if __name__ == '__main__': print('父程序pid: %d' % os.getpid()) # os.getpid獲取當前程序的程序號 p = Process(target=run_proc) p.start()
執行結果:
# -*- coding:utf-8 -*-from multiprocessing import Processimport osfrom time import sleepdef run_proc(name, age, **kwargs): for i in range(10): print('子程序執行中,name= %s,age=%d ,pid=%d...' % (name, age, os.getpid())) print(kwargs) sleep(0.2)if __name__=='__main__': p = Process(target=run_proc, args=('test',18), kwargs={"m":20}) p.start() sleep(1) # 1秒中之後,立即結束子程序 p.terminate() p.join()
執行結果:
# -*- coding:utf-8 -*-from multiprocessing import Processimport osimport time nums = [11, 22]def work1(): """子程序要執行的程式碼""" print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums)) for i in range(3): nums.append(i) time.sleep(1) print("in process1 pid=%d ,nums=%s" % (os.getpid(), nums))def work2(): """子程序要執行的程式碼""" print("in process2 pid=%d ,nums=%s" % (os.getpid(), nums))if __name__ == '__main__': p1 = Process(target=work1) p1.start() p1.join() p2 = Process(target=work2) p2.start()
執行結果:
in process1 pid=11349 ,nums=[11, 22]in process1 pid=11349 ,nums=[11, 22, 0]in process1 pid=11349 , nums=[11, 22, 0, 1]in process1 pid=11349 ,nums=[11, 22, 0, 1, 2]in process2 pid=11350 ,nums=[11, 22]
Process之間有時需要通訊,作業系統提供了很多機制來實現程序間的通訊。
方法名 | 說明 |
---|---|
q=Queue() | 初始化Queue()物件,若括號中沒有指定最大可接收的訊息數量,或數量為負值,那麼就代表可接受的訊息數量沒有上限(直到記憶體的盡頭) |
Queue.qsize() | 返回當前佇列包含的訊息數量 |
Queue.empty() | 如果佇列為空,返回True,反之False |
Queue.full() | 如果佇列滿了,返回True,反之False |
Queue.get([block[, timeout]]) | 獲取佇列中的一條訊息,然後將其從列隊中移除,block預設值為True。1、如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止,如果設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則丟擲"Queue.Empty"異常。2、如果block值為False,訊息列隊如果為空,則會立刻丟擲"Queue.Empty"異常 |
Queue.get_nowait() | 相當Queue.get(False) |
Queue.put(item,[block[, timeout]]) | 將item訊息寫入佇列,block預設值為True。1、如果block使用預設值,且沒有設定timeout(單位秒),訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止,如果設定了timeout,則會等待timeout秒,若還沒空間,則丟擲"Queue.Full"異常。 2、如果block值為False,訊息列隊如果沒有空間可寫入,則會立刻丟擲"Queue.Full"異常 |
Queue.put_nowait(item) | 相當Queue.put(item, False) |
可以使用multiprocessing模組的Queue實現多程序之間的資料傳遞,Queue本身是一個訊息列隊程式,首先用一個小范例來演示一下Queue的工作原理:
#coding=utf-8from multiprocessing import Queue q=Queue(3) #初始化一個Queue物件,最多可接收三條put訊息q.put("訊息1") q.put("訊息2")print(q.full()) #Falseq.put("訊息3")print(q.full()) #True#因為訊息列隊已滿下面的try都會丟擲異常,第一個try會等待2秒後再丟擲異常,第二個Try會立刻丟擲異常try: q.put("訊息4",True,2)except: print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())try: q.put_nowait("訊息4")except: print("訊息列隊已滿,現有訊息數量:%s"%q.qsize())#推薦的方式,先判斷訊息列隊是否已滿,再寫入if not q.full(): q.put_nowait("訊息4")#讀取訊息時,先判斷訊息列隊是否為空,再讀取if not q.empty(): for i in range(q.qsize()): print(q.get_nowait())
執行結果:
FalseTrue訊息列隊已滿,現有訊息數量:3訊息列隊已滿,現有訊息數量:3訊息1訊息2訊息3
我們以Queue為例,在父程序中建立兩個子程序,一個往Queue裡寫資料,一個從Queue裡讀資料:
from multiprocessing import Process, Queueimport os, time, random# 寫資料程序執行的程式碼:def write(q): for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random())# 讀資料程序執行的程式碼:def read(q): while True: if not q.empty(): value = q.get(True) print('Get %s from queue.' % value) time.sleep(random.random()) else: breakif __name__=='__main__': # 父程序建立Queue,並傳給各個子程序: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子程序pw,寫入: pw.start() # 等待pw結束: pw.join() # 啟動子程序pr,讀取: pr.start() pr.join() # pr程序裡是死迴圈,無法等待其結束,只能強行終止: print('') print('所有資料都寫入並且讀完')
執行結果:
鎖是為了確保資料一致性。比如讀寫鎖,每個程序給一個變數增加 1,但是如果在一個程序讀取但還沒有寫入的時候,另外的程序也同時讀取了,並寫入該值,則最後寫入的值是錯誤的,這時候就需要加鎖來保持資料一致性。
通過使用Lock來控制一段程式碼在同一時間只能被一個程序執行。Lock物件的兩個方法,acquire()用來獲取鎖,release()用來釋放鎖。當一個程序呼叫acquire()時,如果鎖的狀態為unlocked,那麼會立即修改為locked並返回,這時該程序即獲得了鎖。如果鎖的狀態為locked,那麼呼叫acquire()的程序則阻塞。
1. Lock的語法說明:
lock = multiprocessing.Lock()
: 建立一個鎖
lock.acquire()
:獲取鎖
lock.release()
:釋放鎖
with lock
:自動獲取、釋放鎖 類似於 with open() as f:
2. 程式不加鎖時:
import multiprocessingimport timedef add(num, value): print('add{0}:num={1}'.format(value, num)) for i in range(0, 2): num += value print('add{0}:num={1}'.format(value, num)) time.sleep(1)if __name__ == '__main__': lock = multiprocessing.Lock() num = 0 p1 = multiprocessing.Process(target=add, args=(num, 1)) p2 = multiprocessing.Process(target=add, args=(num, 2)) p1.start() p2.start()
執行結果:運得沒有順序,兩個程序交替執行
add1:num=0add1:num=1add2:num=0add2:num=2add1:num=2add2:num=4
3. 程式加鎖時:
import multiprocessingimport timedef add(num, value, lock): try: lock.acquire() print('add{0}:num={1}'.format(value, num)) for i in range(0, 2): num += value print('add{0}:num={1}'.format(value, num)) time.sleep(1) except Exception as err: raise err finally: lock.release()if __name__ == '__main__': lock = multiprocessing.Lock() num = 0 p1 = multiprocessing.Process(target=add, args=(num, 1, lock)) p2 = multiprocessing.Process(target=add, args=(num, 2, lock)) p1.start() p2.start()
執行結果:只有當其中一個程序執行完成後,其它的程序才會去執行,且誰先搶到鎖誰先執行
add1:num=0add1:num=1add1:num=2add2:num=0add2:num=2add2:num=4
當需要建立的子程序數量不多時,可以直接利用
multiprocessing
中的Process
動態成生多個程序,但如果是上百甚至上千個目標,手動的去建立程序的工作量巨大,此時就可以用到multiprocessing
模組提供的Pool
方法。
語法格式:multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
引數說明:
processes
:工作程序數目,如果 processes 為 None,則使用 os.cpu_count() 返回的值。
initializer
:如果 initializer 不為 None,則每個工作程序將會在啟動時呼叫 initializer(*initargs)。
maxtasksperchild
:一個工作程序在它退出或被一個新的工作程序代替之前能完成的任務數量,為了釋放未使用的資源。
context
:用於指定啟動的工作程序的上下文。
兩種方式向程序池提交任務:
apply(func[, args[, kwds]])
:阻塞方式。
apply_async(func[, args[, kwds]])
:非阻塞方式。使用非阻塞方式呼叫func(並行執行,堵塞方式必須等待上一個程序退出才能執行下一個程序),args為傳遞給func的參數列,kwds為傳遞給func的關鍵字參數列
multiprocessing.Pool
常用函數:
方法名 | 說明 |
---|---|
close() | 關閉Pool,使其不再接受新的任務 |
terminate() | 不管任務是否完成,立即終止 |
join() | 主程序阻塞,等待子程序的退出, 必須在close或terminate之後使用 |
初始化Pool時,可以指定一個最大程序數,當有新的請求提交到Pool中時,如果池還沒有滿,那麼就會建立一個新的程序用來執行該請求;但如果池中的程序數已經達到指定的最大值,那麼該請求就會等待,直到池中有程序結束,才會用之前的程序來執行新的任務,請看下面的範例:
# -*- coding:utf-8 -*-from multiprocessing import Poolimport os, time, randomdef worker(msg): t_start = time.time() print("%s開始執行,程序號為%d" % (msg,os.getpid())) # random.random()隨機生成0~1之間的浮點數 time.sleep(random.random()*2) t_stop = time.time() print(msg,"執行完畢,耗時%0.2f" % (t_stop-t_start))po = Pool(3) # 定義一個程序池,最大程序數3for i in range(0,10): # Pool().apply_async(要呼叫的目標,(傳遞給目標的引數元祖,)) # 每次迴圈將會用空閒出來的子程序去呼叫目標 po.apply_async(worker,(i,))print("----start----")po.close() # 關閉程序池,關閉後po不再接收新的請求po.join() # 等待po中所有子程序執行完成,必須放在close語句之後print("-----end-----")
執行結果:
----start---- 0開始執行,程序號為21466 1開始執行,程序號為21468 2開始執行,程序號為21467 0 執行完畢,耗時1.01 3開始執行,程序號為21466 2 執行完畢,耗時1.24 4開始執行,程序號為21467 3 執行完畢,耗時0.56 5開始執行,程序號為21466 1 執行完畢,耗時1.68 6開始執行,程序號為21468 4 執行完畢,耗時0.67 7開始執行,程序號為21467 5 執行完畢,耗時0.83 8開始執行,程序號為21466 6 執行完畢,耗時0.75 9開始執行,程序號為21468 7 執行完畢,耗時1.03 8 執行完畢,耗時1.05 9 執行完畢,耗時1.69 -----end-----
如果要使用Pool建立程序,就需要使用multiprocessing.Manager()中的Queue()
而不是multiprocessing.Queue(),否則會得到一條如下的錯誤資訊:RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的範例演示了程序池中的程序如何通訊:
# -*- coding:utf-8 -*-# 修改import中的Queue為Managerfrom multiprocessing import Manager,Poolimport os,time,randomdef reader(q): print("reader啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid())) for i in range(q.qsize()): print("reader從Queue獲取到訊息:%s" % q.get(True))def writer(q): print("writer啟動(%s),父程序為(%s)" % (os.getpid(), os.getppid())) for i in "itcast": q.put(i)if __name__=="__main__": print("(%s) start" % os.getpid()) q = Manager().Queue() # 使用Manager中的Queue po = Pool() po.apply_async(writer, (q,)) time.sleep(1) # 先讓上面的任務向Queue存入資料,然後再讓下面的任務開始從中取資料 po.apply_async(reader, (q,)) po.close() po.join() print("(%s) End" % os.getpid())
執行結果:
(11095) start writer啟動(11097),父程序為(11095)reader啟動(11098),父程序為(11095)reader從Queue獲取到訊息:i reader從Queue獲取到訊息:t reader從Queue獲取到訊息:c reader從Queue獲取到訊息:a reader從Queue獲取到訊息:s reader從Queue獲取到訊息:t(11095) End
程序:能夠完成多工,比如 在一臺電腦上能夠同時執行多個QQ
執行緒:能夠完成多工,比如 一個QQ中的多個聊天視窗
定義的不同
程序是系統進行資源分配和排程的一個獨立單位.
執行緒是程序的一個實體,是CPU排程和分派的基本單位,它是比程序更小的能獨立執行的基本單位.執行緒自己基本上不擁有系統資源,只擁有一點在執行中必不可少的資源(如程式計數器,一組暫存器和棧),但是它可與同屬一個程序的其他的執行緒共用程序所擁有的全部資源.
推薦學習:
以上就是Python多程序知識點總結的詳細內容,更多請關注TW511.COM其它相關文章!