Python多進(jìn)程并發(fā)與多線程并發(fā)編程實(shí)例總結(jié)
本文實(shí)例總結(jié)了Python多進(jìn)程并發(fā)與多線程并發(fā)。分享給大家供大家參考,具體如下:
這里對(duì)python支持的幾種并發(fā)方式進(jìn)行簡(jiǎn)單的總結(jié)。
Python支持的并發(fā)分為多線程并發(fā)與多進(jìn)程并發(fā)(異步IO本文不涉及)。概念上來(lái)說(shuō),多進(jìn)程并發(fā)即運(yùn)行多個(gè)獨(dú)立的程序,優(yōu)勢(shì)在于并發(fā)處理的任務(wù)都由操作系統(tǒng)管理,不足之處在于程序與各進(jìn)程之間的通信和數(shù)據(jù)共享不方便;多線程并發(fā)則由程序員管理并發(fā)處理的任務(wù),這種并發(fā)方式可以方便地在線程間共享數(shù)據(jù)(前提是不能互斥)。Python對(duì)多線程和多進(jìn)程的支持都比一般編程語(yǔ)言更高級(jí),最小化了需要我們完成的工作。
一.多進(jìn)程并發(fā)
Mark Summerfield指出,對(duì)于計(jì)算密集型程序,多進(jìn)程并發(fā)優(yōu)于多線程并發(fā)。計(jì)算密集型程序指的程序的運(yùn)行時(shí)間大部分消耗在CPU的運(yùn)算處理過(guò)程,而硬盤和內(nèi)存的讀寫消耗的時(shí)間很短;相對(duì)地,IO密集型程序指的則是程序的運(yùn)行時(shí)間大部分消耗在硬盤和內(nèi)存的讀寫上,CPU的運(yùn)算時(shí)間很短。
對(duì)于多進(jìn)程并發(fā),python支持兩種實(shí)現(xiàn)方式,一種是采用進(jìn)程安全的數(shù)據(jù)結(jié)構(gòu):multiprocessing.JoinableQueue,這種數(shù)據(jù)結(jié)構(gòu)自己管理“加鎖”的過(guò)程,程序員無(wú)需擔(dān)心“死鎖”的問(wèn)題;python還提供了一種更為優(yōu)雅而高級(jí)的實(shí)現(xiàn)方式:采用進(jìn)程池。下面一一介紹。
1.隊(duì)列實(shí)現(xiàn)——使用multiprocessing.JoinableQueue
multiprocessing是python標(biāo)準(zhǔn)庫(kù)中支持多進(jìn)程并發(fā)的模塊,我們這里采用multiprocessing中的數(shù)據(jù)結(jié)構(gòu):JoinableQueue,它本質(zhì)上仍是一個(gè)FIFO的隊(duì)列,它與一般隊(duì)列(如queue中的Queue)的區(qū)別在于它是多進(jìn)程安全的,這意味著我們不用擔(dān)心它的互斥和死鎖問(wèn)題。JoinableQueue主要可以用來(lái)存放執(zhí)行的任務(wù)和收集任務(wù)的執(zhí)行結(jié)果。舉例來(lái)看(以下皆省去導(dǎo)入包的過(guò)程):
def read(q):
while True:
try:
value = q.get()
print('Get %s from queue.' % value)
time.sleep(random.random())
finally:
q.task_done()
def main():
q = multiprocessing.JoinableQueue()
pw1 = multiprocessing.Process(target=read, args=(q,))
pw2 = multiprocessing.Process(target=read, args=(q,))
pw1.daemon = True
pw2.daemon = True
pw1.start()
pw2.start()
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
try:
q.join()
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
對(duì)于windows系統(tǒng)的多進(jìn)程并發(fā),程序文件里必須含有“入口函數(shù)”(如main函數(shù)),且結(jié)尾處必須調(diào)用入口點(diǎn)。例如以if __name__ == '__main__': main()結(jié)尾。
在這個(gè)最簡(jiǎn)單的多進(jìn)程并發(fā)例子里,我們用多進(jìn)程實(shí)現(xiàn)將26個(gè)字母打印出來(lái)。首先定義一個(gè)存放任務(wù)的JoinableQueue對(duì)象,然后實(shí)例化兩個(gè)Process對(duì)象(每個(gè)對(duì)象對(duì)應(yīng)一個(gè)子進(jìn)程),實(shí)例化Process對(duì)象需要傳送target和args參數(shù),target是實(shí)現(xiàn)每個(gè)任務(wù)工作中的具體函數(shù),args是target函數(shù)的參數(shù)。
pw1.daemon = True pw2.daemon = True
這兩句話將子進(jìn)程設(shè)置為守護(hù)進(jìn)程——主進(jìn)程結(jié)束后隨之結(jié)束。
pw1.start() pw2.start()
一旦運(yùn)行到這兩句話,子進(jìn)程就開始獨(dú)立于父進(jìn)程運(yùn)行了,它會(huì)在單獨(dú)的進(jìn)程里調(diào)用target引用的函數(shù)——在這里即read函數(shù),它是一個(gè)死循環(huán),將參數(shù)q中的數(shù)一一讀取并打印出來(lái)。
value = q.get()
這是多進(jìn)程并發(fā)的要點(diǎn),q是一個(gè)JoinableQueue對(duì)象,支持get方法讀取第一個(gè)元素,如果q中沒(méi)有元素,進(jìn)程就會(huì)阻塞,直至q中被存入新元素。
因此執(zhí)行完pw1.start() pw2.start()這兩句話后,子進(jìn)程雖然開始運(yùn)行了,但很快就堵塞住。
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
將26個(gè)字母依次放入JoinableQueue對(duì)象中,這時(shí)候兩個(gè)子進(jìn)程不再阻塞,開始真正地執(zhí)行任務(wù)。兩個(gè)子進(jìn)程都用value = q.get()來(lái)讀取數(shù)據(jù),它們都在修改q對(duì)象,而我們并不用擔(dān)心同步問(wèn)題,這就是multiProcessing.Joinable數(shù)據(jù)結(jié)構(gòu)的優(yōu)勢(shì)所在——它是多進(jìn)程安全的,它會(huì)自動(dòng)處理“加鎖”的過(guò)程。
try:
q.join()
q.join()方法會(huì)查詢q中的數(shù)據(jù)是否已讀完——這里指的就是任務(wù)是否執(zhí)行完,如果沒(méi)有,程序會(huì)阻塞住等待q中數(shù)據(jù)讀完才開始繼續(xù)執(zhí)行(可以用Ctrl+C強(qiáng)制停止)。
對(duì)Windows系統(tǒng),調(diào)用任務(wù)管理器應(yīng)該可以看到有多個(gè)子進(jìn)程在運(yùn)行。
2.進(jìn)程池實(shí)現(xiàn)——使用concurrent.futures.ProcessPoolExecutor
Python還支持一種更為優(yōu)雅的多進(jìn)程并發(fā)方式,直接看例子:
def read(q):
print('Get %s from queue.' % q)
time.sleep(random.random())
def main():
futures = set()
with concurrent.futures.ProcessPoolExecutor() as executor:
for q in (chr(ord('A')+i) for i in range(26)):
future = executor.submit(read, q)
futures.add(future)
try:
for future in concurrent.futures.as_completed(futures):
err = future.exception()
if err is not None:
raise err
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
這里我們采用concurrent.futures.ProcessPoolExecutor對(duì)象,可以把它想象成一個(gè)進(jìn)程池,子進(jìn)程往里“填”。我們通過(guò)submit方法實(shí)例一個(gè)Future對(duì)象,然后把這里Future對(duì)象都填到池——futures里,這里futures是一個(gè)set對(duì)象。只要進(jìn)程池里有future,就會(huì)開始執(zhí)行任務(wù)。這里的read函數(shù)更為簡(jiǎn)單——只是把一個(gè)字符打印并休眠一會(huì)而已。
try:
for future in concurrent.futures.as_completed(futures):
這是等待所有子進(jìn)程都執(zhí)行完畢。子進(jìn)程執(zhí)行過(guò)程中可能拋出異常,err = future.exception()可以收集這些異常,便于后期處理。
可以看出用Future對(duì)象處理多進(jìn)程并發(fā)更為簡(jiǎn)潔,無(wú)論是target函數(shù)的編寫、子進(jìn)程的啟動(dòng)等等,future對(duì)象還可以向使用者匯報(bào)其狀態(tài),也可以匯報(bào)執(zhí)行結(jié)果或執(zhí)行時(shí)的異常。
二.多線程并發(fā)
對(duì)于IO密集型程序,多線程并發(fā)可能要優(yōu)于多進(jìn)程并發(fā)。因?yàn)閷?duì)于網(wǎng)絡(luò)通信等IO密集型任務(wù)來(lái)說(shuō),決定程序效率的主要是網(wǎng)絡(luò)延遲,這時(shí)候是使用進(jìn)程還是線程就沒(méi)有太大關(guān)系了。
1.隊(duì)列實(shí)現(xiàn)——使用queue.Queue
程序與多進(jìn)程基本一致,只是這里我們不必使用multiProcessing.JoinableQueue對(duì)象了,一般的隊(duì)列(來(lái)自queue.Queue)就可以滿足要求:
def read(q):
while True:
try:
value = q.get()
print('Get %s from queue.' % value)
time.sleep(random.random())
finally:
q.task_done()
def main():
q = queue.Queue()
pw1 = threading.Thread(target=read, args=(q,))
pw2 = threading.Thread(target=read, args=(q,))
pw1.daemon = True
pw2.daemon = True
pw1.start()
pw2.start()
for c in [chr(ord('A')+i) for i in range(26)]:
q.put(c)
try:
q.join()
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
并且這里我們實(shí)例化的是Thread對(duì)象,而不是Process對(duì)象,程序的其余部分看起來(lái)與多進(jìn)程并沒(méi)有什么兩樣。
2. 線程池實(shí)現(xiàn)——使用concurrent.futures.ThreadPoolExecutor
直接看例子:
def read(q):
print('Get %s from queue.' % q)
time.sleep(random.random())
def main():
futures = set()
with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor:
for q in (chr(ord('A')+i) for i in range(26)):
future = executor.submit(read, q)
futures.add(future)
try:
for future in concurrent.futures.as_completed(futures):
err = future.exception()
if err is not None:
raise err
except KeyboardInterrupt:
print("stopped by hand")
if __name__ == '__main__':
main()
用ThreadPoolExecutor與用ProcessPoolExecutor看起來(lái)沒(méi)什么區(qū)別,只是改了一下簽名而已。
不難看出,不管是使用隊(duì)列還是使用進(jìn)/線程池,從多進(jìn)程轉(zhuǎn)化到多線程是十分容易的——僅僅是修改了幾個(gè)簽名而已。當(dāng)然內(nèi)部機(jī)制完全不同,只是python的封裝非常好,使我們可以不用關(guān)心這些細(xì)節(jié),這正是python優(yōu)雅之處。
更多關(guān)于Python相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《Python進(jìn)程與線程操作技巧總結(jié)》、《Python Socket編程技巧總結(jié)》、《Python數(shù)據(jù)結(jié)構(gòu)與算法教程》、《Python函數(shù)使用技巧總結(jié)》、《Python字符串操作技巧匯總》、《Python入門與進(jìn)階經(jīng)典教程》及《Python文件與目錄操作技巧匯總》
希望本文所述對(duì)大家Python程序設(shè)計(jì)有所幫助。
相關(guān)文章
Django將默認(rèn)的SQLite更換為MySQL的實(shí)現(xiàn)
今天小編就為大家分享一篇Django將默認(rèn)的SQLite更換為MySQL的實(shí)現(xiàn),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-11-11
python模仿網(wǎng)頁(yè)版微信發(fā)送消息功能
這篇文章主要介紹了python模仿網(wǎng)頁(yè)版微信發(fā)送消息功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02
Python中Matplotlib圖像添加標(biāo)簽的方法實(shí)現(xiàn)
本文主要介紹了Python中Matplotlib圖像添加標(biāo)簽的方法實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
手動(dòng)安裝Anaconda環(huán)境變量的實(shí)現(xiàn)教程
這篇文章主要介紹了手動(dòng)安裝Anaconda環(huán)境變量的實(shí)現(xiàn)教程,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-01-01
解決pycharm remote deployment 配置的問(wèn)題
今天小編就為大家分享一篇解決pycharm remote deployment 配置的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-06-06
Python HTMLTestRunner如何下載生成報(bào)告
這篇文章主要介紹了Python HTMLTestRunner如何下載生成報(bào)告,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-09-09
利用Python實(shí)現(xiàn)在同一網(wǎng)絡(luò)中的本地文件共享方法
今天小編就為大家分享一篇利用Python實(shí)現(xiàn)在同一網(wǎng)絡(luò)中的本地文件共享方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-06-06

