Python多進(jìn)程之進(jìn)程同步及通信詳解
上篇文章介紹了什么是進(jìn)程、進(jìn)程與程序的關(guān)系、進(jìn)程的創(chuàng)建與使用、創(chuàng)建進(jìn)程池等,接下來就來介紹一下進(jìn)程同步及進(jìn)程通信。
進(jìn)程同步
當(dāng)多個進(jìn)程使用同一份數(shù)據(jù)資源的時候,因為進(jìn)程的運行沒有順序,運行起來也無法控制,如果不加以干預(yù),往往會引發(fā)數(shù)據(jù)安全或順序混亂的問題,所以要在多個進(jìn)程讀寫共享數(shù)據(jù)資源的時候加以適當(dāng)?shù)牟呗?,來保證數(shù)據(jù)的一致性問題。
Lock(鎖)
一個Lock對象有兩個方法:acquire()和release()來控制共享數(shù)據(jù)的讀寫權(quán)限, 看下面這張圖片,使用多進(jìn)程的時候會經(jīng)常出現(xiàn)這種情況,這是因為多個進(jìn)程都在搶占輸出資源,共享同一打印終端,從而造成了輸出信息的錯亂。

那么就可以使用Lock機(jī)制:
import multiprocessing
import random
import time
def work(lock, i):
lock.acquire()
print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
time.sleep(random.randint(0, 2))
print("work'{}'執(zhí)行完畢......".format(i))
lock.release()
if __name__ == '__main__':
lock = multiprocessing.Lock()
for i in range(5):
p = multiprocessing.Process(target=work, args=(lock, i))
p.start()
由于引入了Lock機(jī)制,同一時間只能有一個進(jìn)程搶占到輸出資源,其他進(jìn)程等待該進(jìn)程結(jié)束,鎖釋放到,才可以搶占,這樣會解決多進(jìn)程間資源競爭導(dǎo)致數(shù)據(jù)錯亂的問題,但是由并發(fā)執(zhí)行變成了串行執(zhí)行,會犧牲運行效率。
進(jìn)程通信
上篇文章說過,進(jìn)程之間互相隔離,數(shù)據(jù)是獨立的,默認(rèn)情況下互不影響,那要如何實現(xiàn)進(jìn)程間通信呢?Python提供了多種進(jìn)程通信的方式,下面就來說一下。
Queue(隊列)
multiprocessing模塊提供的Queue多進(jìn)程安全的消息隊列,可以實現(xiàn)多進(jìn)程之間的數(shù)據(jù)傳遞。
說明
- 初始化Queue()對象時(例如:q=Queue()),若括號中沒有指定最?可接收的消息數(shù)量,或數(shù)量為負(fù)值,那么就代表可接受的消息數(shù)量沒有上限(直到內(nèi)存的盡頭)。
Queue.qsize():返回當(dāng)前隊列包含的消息數(shù)量。Queue.empty():如果隊列為空,返回True,反之False。Queue.full():如果隊列滿了,返回True,反之False。Queue.get(block, timeout):獲取隊列中的?條消息,然后將其從列隊中移除,block默認(rèn)值為True。如果block使?默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息列隊如果為空,此時程序?qū)⒈蛔枞ㄍT谧x取狀態(tài)),直到從消息列隊讀到消息為?,如果設(shè)置了timeout,則會等待timeout秒,若還沒讀取到任何消息,則拋出Queue.Empty異常;如果block值為False,消息列隊如果為空,則會?刻拋出Queue.Empty異常。Queue.get_nowait():相當(dāng)Queue.get(False)。Queue.put(item, block, timeout):將item消息寫?隊列,block默認(rèn)值為True,如果block使?默認(rèn)值,且沒有設(shè)置timeout(單位秒),消息列隊如果已經(jīng)沒有空間可寫?,此時程序?qū)⒈蛔枞ㄍT趯?狀態(tài)),直到消息列隊騰出空間為?,如果設(shè)置了timeout,則會等待timeout秒,若還沒空間,則拋出Queue.Full異常;如果block值為False,消息列隊如果沒有空間可寫?,則會?刻拋出Queue.Full異常。Queue.put_nowait(item):相當(dāng)于Queue.put(item, False)。
from multiprocessing import Process, Queue
import time
def write_task(queue):
"""
向隊列中寫入數(shù)據(jù)
:param queue: 隊列
:return:
"""
for i in range(5):
if queue.full():
print("隊列已滿!")
message = "消息{}".format(str(i))
queue.put(message)
print("消息{}寫入隊列".format(str(i)))
def read_task(queue):
"""
從隊列讀取數(shù)據(jù)
:param queue: 隊列
:return:
"""
while True:
print("從隊列讀取:{}".format(queue.get(True)))
if __name__ == '__main__':
print("主進(jìn)程執(zhí)行......")
# 主進(jìn)程創(chuàng)建Queue,最大消息數(shù)量為3
queue = Queue(3)
pw = Process(target=write_task, args=(queue, ))
pr = Process(target=read_task, args=(queue, ))
pw.start()
pr.start()
運行結(jié)果為:

從結(jié)果我們可以看出,隊列最大可以放入3條消息,后面再來消息,要等read_task從隊列里取出后才行。
Pipe(管道)
Pipe常用于兩個進(jìn)程,兩個進(jìn)程分別位于管道的兩端,Pipe(duplex)方法返回(conn1,conn2)代表一個管道的兩端,duplex參數(shù)默認(rèn)為True,即全雙工模式,若為False,conn1只負(fù)責(zé)接收信息,conn2負(fù)責(zé)發(fā)送。
send()和recv()方法分別是發(fā)送和接受消息的方法。
import multiprocessing
import time
import random
def proc_send(pipe):
"""
發(fā)送消息
:param pipe:管道一端
:return:
"""
for i in range(10):
print("process send:{}".format(str(i)))
pipe.send(i)
time.sleep(random.random())
def proc_recv(pipe):
"""
接收消息
:param pipe:管道一端
:return:
"""
while True:
print("Process recv:{}".format(pipe.recv()))
time.sleep(random.random())
if __name__ == '__main__':
# 主進(jìn)程創(chuàng)建pipe
pipe = multiprocessing.Pipe()
p1 = multiprocessing.Process(target=proc_send,args=(pipe[0], ))
p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.terminate()
執(zhí)行結(jié)果為:

Semaphore(信號量)
Semaphore用來控制對共享資源的訪問數(shù)量,和進(jìn)程池的最大連接數(shù)類似。
import multiprocessing
import random
import time
def work(s, i):
s.acquire()
print("work'{}'執(zhí)行中......".format(i), multiprocessing.current_process().name, multiprocessing.current_process().pid)
time.sleep(i*2)
print("work'{}'執(zhí)行完畢......".format(i))
s.release()
if __name__ == '__main__':
s = multiprocessing.Semaphore(2)
for i in range(1, 7):
p = multiprocessing.Process(target=work, args=(s, i))
p.start()
上面的代碼中使用Semaphore限制了最多有2個進(jìn)程同時執(zhí)行,那么來一個進(jìn)程獲得一把鎖,計數(shù)加1,當(dāng)計數(shù)等于2時,后面再來的進(jìn)程均需要等待,等前面的進(jìn)程釋放掉,才可以獲得鎖。
信號量與進(jìn)程池的概念上類似,但是要區(qū)分開來,信號量涉及到加鎖的概念。
Event(事件)
Event用來實現(xiàn)進(jìn)程間同步通信的。運行的機(jī)制是:全局定義了一個flag,如果flag值為False,當(dāng)程序執(zhí)行event.wait()方法時就會阻塞,如果flag值為True時,程序執(zhí)行event.wait()方法時不會阻塞繼續(xù)執(zhí)行。
Event常?函數(shù):
event.wait():在進(jìn)程中插入一個標(biāo)記(flag),默認(rèn)為False,可以設(shè)置timeout。event.set():使flag為Ture。event.clear():使flag為False。event.is_set():判斷flag是否為True。
import multiprocessing
import time
def wait_for_event(e):
print("wait_for_event執(zhí)行")
e.wait()
print("wait_for_event: e.is_set():{}".format(e.is_set()))
def wait_for_event_timeout(e, t):
print("wait_for_event_timeout執(zhí)行")
# 只會阻塞2s
e.wait(t)
print("wait_for_event_timeout:e.is_set:{}".format(e.is_set()))
if __name__ == "__main__":
e = multiprocessing.Event()
p1 = multiprocessing.Process(target=wait_for_event, args=(e,))
p1.start()
p2 = multiprocessing.Process(target=wait_for_event_timeout, args=(e, 2))
p2.start()
time.sleep(4)
# 4s之后使用e.set()將flag設(shè)為Ture
e.set()
print("主進(jìn)程:flag設(shè)置為True")
執(zhí)行結(jié)果如下:

總結(jié)
本篇文章就到這里了,希望能夠給你帶來幫助,也希望您能夠多多關(guān)注腳本之家的更多內(nèi)容!
相關(guān)文章
python利用蒙版摳圖(使用PIL.Image和cv2)輸出透明背景圖
這篇文章主要介紹了python利用蒙版摳圖(使用PIL.Image和cv2)輸出透明背景圖,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
Python基于sklearn庫的分類算法簡單應(yīng)用示例
這篇文章主要介紹了Python基于sklearn庫的分類算法,結(jié)合簡單實例形式分析了Python使用sklearn庫封裝樸素貝葉斯、K近鄰、邏輯回歸、SVM向量機(jī)等常見機(jī)器學(xué)習(xí)算法的分類調(diào)用相關(guān)操作技巧,需要的朋友可以參考下2018-07-07
python科學(xué)計算之numpy——ufunc函數(shù)用法
今天小編就為大家分享一篇python科學(xué)計算之numpy——ufunc函數(shù)用法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2019-11-11
使用OpenCV對運動員的姿勢進(jìn)行檢測功能實現(xiàn)
2022年奧林匹克運動會如期舉行,以不正確的方式進(jìn)行運動風(fēng)險在增加,人體姿勢估計是計算機(jī)視覺領(lǐng)域的重要問題,接下來通過本文給大家介紹下使用OpenCV對運動員的姿勢進(jìn)行檢測功能,感興趣的朋友一起看看吧2022-02-02

