詳解Python如何使用并發(fā)模型編程
關(guān)于什么是并發(fā)模型,我在這里引用 Go 語言聯(lián)合創(chuàng)造者 Rob Pike 的一段話:
并發(fā)是指一次處理多件事。并行是指一次做多件事。二者不同,但是有聯(lián)系。一個關(guān)于結(jié)構(gòu),一個關(guān)于執(zhí)行。并發(fā)用于制定方案,用來解決可能(但未必)并行的問題。
在不涉及并發(fā)概念的情況下,一個單進程單線程的程序執(zhí)行情況可能是這樣的:調(diào)用一個函數(shù),發(fā)出調(diào)用的代碼開始等待函數(shù)執(zhí)行完成,直到函數(shù)返回結(jié)果,如果函數(shù)拋出異常,則可以把調(diào)用函數(shù)的代碼放到 try/except 語句塊中,來捕獲和處理異常。
但是,當(dāng)涉及到并發(fā)時,情況就沒這么簡單了。在啟用多線程(或多進程)后,你無法在一個線程(或進程)中知道另一個線程(或進程)被調(diào)用的函數(shù)何時執(zhí)行完成,也無法輕松得知函數(shù)調(diào)用結(jié)果或捕獲異常。只能采用某種通知的方式,來進行線程(或進程)間通信,這可能是一個信號,也可能是一個消息隊列等。
本文主要講解如何讓 Python 能夠同時處理多個任務(wù),即如何使用并發(fā)模型編程。
目標(biāo)
我們將要實現(xiàn)一個旋轉(zhuǎn)指針程序,啟動一個程序,阻塞 3 秒鐘(模擬耗時任務(wù)),在這期間,終端展示字符動畫,讓用戶知道程序仍在執(zhí)行,并沒有停止,3 秒后程序打印耗時任務(wù)的計算結(jié)果并退出。
實現(xiàn)好的程序效果如下:

這有點像下載進度條,因為打印旋轉(zhuǎn)指針和耗時任務(wù)是“同時”進行的,這種場景只能通過并發(fā)模型來實現(xiàn)。
我們將分別使用多線程、多進程以及協(xié)程來實現(xiàn)這個程序,以此來演示 Python 的的并發(fā)模型用法。
多線程版
第一版旋轉(zhuǎn)指針程序使用 Python 多線程來編寫,首先,我們需要定義兩個函數(shù) spin、slow 分別用來實現(xiàn)旋轉(zhuǎn)指針和模擬耗時任務(wù)(比如從網(wǎng)上下載一個文件)。
import itertools
import time
from threading import Thread, Event
def spin(msg: str, done: Event) -> None:
for char in itertools.cycle(r'\|/-'):
status = f'\r{char} {msg}'
print(status, end='', flush=True)
if done.wait(0.1):
break
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')
def slow() -> int:
time.sleep(3)
return 42threading 模塊提供多線程支持,Thread 實例用來管理一個新的線程,Event 可以用來進行線程間通信。
spin 函數(shù)將作為一個任務(wù)在單獨的線程中執(zhí)行,它接收兩個參數(shù) msg、done,傳遞進來的 msg 將會跟隨旋轉(zhuǎn)指針一起被打印,done 參數(shù)類型為 threading.Event,用來實現(xiàn)多個線程間的通信,以此來同步任務(wù)狀態(tài)。
itertools.cycle(r'\|/-') 是一個無限迭代器,一次產(chǎn)出一個字符,不停的迭代。比如用 for 遍歷 itertools.cycle('123'),將得到無限迭代的數(shù)據(jù) 123123123...。這里 \|/- 字符不停迭代并被打印,就會產(chǎn)生旋轉(zhuǎn)指針的效果。
打印的 status 字符串第一個字符為 \r,可以實現(xiàn)將光標(biāo)移動到行首,這是一個使用文本在控制臺實現(xiàn)動畫的小技巧。
接下來的 done.wait(0.1) 是這個函數(shù)的關(guān)鍵代碼,它是主線程與執(zhí)行當(dāng)前函數(shù)的子線程之間通信的橋梁。done.wait 方法簽名為 Event.wait(self, timeout=None),該方法等待 timeout 指定的時間后返回 False,我們在這里指定為 0.1 秒。如果在其他線程中使用 Event.set() 設(shè)置了這個事件,則當(dāng)前線程該方法將立即返回 True,此時 for 循環(huán)就會被 break 掉。
spin 函數(shù)在退出前,還會打印幾個空格來實現(xiàn)清空當(dāng)前行打印內(nèi)容的效果,并且最終還將光標(biāo)移動到行首。
slow 函數(shù)使用 time.sleep(3) 暫停 3 秒,模擬耗時操作,這個函數(shù)將像我們往常編寫的單線程代碼一樣在主線程中執(zhí)行。
接下來我們要編寫多線程代碼來分別調(diào)用 spin 和 slow 兩個函數(shù),完成這個旋轉(zhuǎn)指針程序。
def supervisor() -> int:
done = Event()
spinner = Thread(target=spin, args=('thinking!', done))
print(f'spinner object: {spinner}')
spinner.start()
result = slow()
done.set()
spinner.join()
return result
def main() -> None:
result = supervisor()
print(f'Answer: {result}')
if __name__ == '__main__':
main()supervisor 函數(shù)中,首先實例化了一個 Event 對象,用于多線程通信。
接著,又實例化了一個 Thread 對象,用來管理子線程,target 參數(shù)接收一個函數(shù) spin,這個函數(shù)將在一個獨立的子線程中執(zhí)行,args 參數(shù)接收一個元組,在子線程中調(diào)用 spin 函數(shù)時,元組的各個參數(shù)將被原樣傳遞給 spin 函數(shù)。
Thread 對象必須要顯式的調(diào)用 start 方法才能啟動,所以代碼執(zhí)行到 spinner.start() 時,子線程才會真正開始執(zhí)行。子線程只會執(zhí)行 spin 函數(shù),至于下方的代碼與子線程無關(guān),都是主線程要執(zhí)行的代碼。
子線程的執(zhí)行對主線程執(zhí)行不會產(chǎn)生影響,主線程代碼會繼續(xù)往下運行,主線程調(diào)用 slow() 時會被耗時任務(wù)所阻塞。此時,子線程內(nèi)部代碼執(zhí)行不受影響,所以子線程會不停的打印旋轉(zhuǎn)指針。
等待 3 秒結(jié)束后,主線程中 slow() 函數(shù)返回結(jié)果,主線程調(diào)用 done.set() 將 Event 對象設(shè)置為 True。此時,子線程 spin 函數(shù)內(nèi)部 done.wait(0.1) 會立即返回 True,隨即 for循環(huán)終止,spin 執(zhí)行完成后子線程也就退出了。
主線程不受子線程退出影響,會接著往下執(zhí)行,調(diào)用 spinner.join() 是為了等待子線程結(jié)束,主線程會阻塞在這里,保證子線程結(jié)束后才會往下執(zhí)行。顯然,子線程在執(zhí)行完 spin 函數(shù)就結(jié)束了,所以主線程代碼會繼續(xù)往下執(zhí)行。
supervisor 函數(shù)最終返回 slow 方法的返回值 result。
入口函數(shù) main 打印 result 值后,主線程也退出了,程序終止。
以上,就是多線程版旋轉(zhuǎn)指針程序的全部邏輯了。
我們來測試下這個程序執(zhí)行效果:

多線程對象 spinner 輸出結(jié)果為 <Thread(Thread-1, initial)>,其中 Thread-1 是線程名稱,initial 是線程狀態(tài),表示當(dāng)前線程剛初始化完成,尚未啟動。
多進程版
Python 提供了 multiprocessing 來支持多進程,這個模塊的 API 基本模仿了多線程的 threading 模塊,所以有了前文的基礎(chǔ),多進程代碼也非常容易看懂。
同多線程一樣,multiprocessing 包也為多進程通信提供了 Event 對象。不同的是,threading.Event 是一個類,multiprocessing.Event 是一個函數(shù),它返回一個 synchronize.Event 類實例。所以 spin 函數(shù)簽名需要進行如下修改:
from multiprocessing import Process, Event
from multiprocessing import synchronize
def spin(msg: str, done: synchronize.Event) -> None:
...spin 函數(shù)內(nèi)部代碼無需調(diào)整,只需要修改參數(shù) done 的類型注解即可。所以不難發(fā)現(xiàn) synchronize.Event 同樣支持 Event.wait(self, timeout=None) 方法。
多進程版本的 supervisor 函數(shù)也要稍作修改:
def supervisor() -> int:
done = Event()
spinner = Process(target=spin, args=('thinking!', done))
print(f'spinner object: {spinner}')
spinner.start()
result = slow()
done.set()
spinner.join()
return result雖然 multiprocessing.Event 和 threading.Event 類型不同,但二者用法和作用則完全相同。
這里使用 Process 實例化一個進程對象,Process 用法和 Thread 用法同樣如出一轍。
只需要對代碼做少量的改動,我們就將程序從多線程遷移到了多進程。這一點,Python 做的非常友好,掌握了多線程編程,基本上就掌握了多進程編程,我們只需要在適當(dāng)?shù)臅r候,使用不同的模塊即可。
下面是多進程版本旋轉(zhuǎn)指針程序測試效果:

多進程對象 spinner 輸出結(jié)果為 <Process name='Process-1' parent=94367 initial>,進程名稱為 Process-1,parent 代表父進程 ID 為 94367(即主進程 ID),initial 是進程狀態(tài),表示當(dāng)前進程剛初始化完成,尚未啟動。
協(xié)程版
最后我們將使用協(xié)程實現(xiàn)旋轉(zhuǎn)指針程序,這一版本代碼改動會比較大。
Python 在 3.5 版本提供了 async、await 關(guān)鍵字(可以參考 PEP 492),開始原生支持了協(xié)程,我們不再需要編寫難懂的 yeild from 來使用生成器實現(xiàn)協(xié)程功能了。
Python 協(xié)程通常在單線程的事件循環(huán)中運行。協(xié)程是一個可以掛起自身并在以后恢復(fù)的“函數(shù)”,async 用來定義協(xié)程,一個協(xié)程必須顯式的使用 await 關(guān)鍵字主動讓出控制權(quán),另一個協(xié)程才有機會在主事件循環(huán)的調(diào)度下并發(fā)的執(zhí)行。
協(xié)程版本旋轉(zhuǎn)指針程序需要對 spin 和 slow 兩個函數(shù)做如下修改:
import asyncio
import itertools
async def spin(msg: str) -> None:
for char in itertools.cycle(r'\|/-'):
status = f'\r{char} {msg}'
print(status, end='', flush=True)
try:
await asyncio.sleep(0.1)
except asyncio.CancelledError:
break
blanks = ' ' * len(status)
print(f'\r{blanks}\r', end='')
async def slow() -> int:
await asyncio.sleep(3)
return 42首先我們使用 async def 將 spin 定義為一個協(xié)程,讓其不再是一個常規(guī)的函數(shù)。
spin 協(xié)程取消了第二個參數(shù),因為 Python 沒有為協(xié)程提供 Event 對象來進行通信,我們需要采用其他招式。
在原來使用 Event 通信的地方替換成了由 try/except 包裹的 await asyncio.sleep(0.1) 語句塊代碼。這段代碼塊有如下三個作用:
await asyncio.sleep(0.1)的作用類似time.sleep,可以讓程序暫停 0.1 秒。不同的是,使用await asyncio.sleep暫停時不阻塞其他協(xié)程。- 因為這里加入了
await關(guān)鍵字,代碼執(zhí)行到這里時,當(dāng)前協(xié)程會主動讓出控制權(quán),不再繼續(xù)往下執(zhí)行,由事件循環(huán)來調(diào)度其他協(xié)程執(zhí)行。 - 如果在控制當(dāng)前協(xié)程的
Task實例中調(diào)用cancel方法(有關(guān)Task的內(nèi)容稍后會進行講解),await asyncio.sleep(0.1)會拋出CancelledError異常,這里使用try/except捕獲異常后退出循環(huán)。這樣,我們就在多個協(xié)程間利用異常機制完成了通信,而不必借助于額外的Event對象。
slow 函數(shù)也被改造為一個協(xié)程,其內(nèi)部原來編寫的阻塞代碼 time.sleep(3) 被替換為了 await asyncio.sleep(3)。
可以發(fā)現(xiàn),其實協(xié)程與普通的函數(shù)在定義上差別不大,只不過多了兩個關(guān)鍵字 async 和 await。但二者在執(zhí)行方式上大有不同,普通函數(shù)在使用 () 運算符調(diào)用時(即 spin())會立刻執(zhí)行,而協(xié)程在使用 spin() 時只會創(chuàng)建一個協(xié)程對象,不會執(zhí)行。
要執(zhí)行上面兩個協(xié)程對象,我們還要對 supervisor 和 main 函數(shù)進行改造:
async def supervisor() -> int:
spinner = asyncio.create_task(spin('thinking!'))
print(f'spinner object: {spinner}')
result = await slow()
spinner.cancel()
return result
def main() -> None:
result = asyncio.run(supervisor())
print(f'Answer: {result}')
if __name__ == '__main__':
main()supervisor 函數(shù)同樣被修改為協(xié)程,spin('thinking!') 并不會像函數(shù)一樣立即執(zhí)行,只會創(chuàng)建一個協(xié)程對象,將它傳遞給 asyncio.create_task,我們可以得到一個 asyncio.Task 對象,這個 Task 對象包裝了協(xié)程對象并調(diào)度其執(zhí)行,它還提供控制和查詢協(xié)程對象運行狀態(tài)的方法。
使用 await 關(guān)鍵字來調(diào)用 slow 協(xié)程,這將阻塞 supervisor 程序(但會讓出控制權(quán),使其他協(xié)程得以執(zhí)行),直到 slow 返回,返回結(jié)果賦值給 result 變量。
接著調(diào)用了 spinner.cancel(),Task.cancel 方法調(diào)用后,將立即在 Task 所包裝的協(xié)程對象即 spin 協(xié)程中拋出 CancelledError 異常,spin 中需要使用 try/except 捕獲 await asyncio.sleep(0.1) 拋出的異常,這樣,就實現(xiàn)了不同協(xié)程之間通過異常進行通信。
main 是唯一的普通函數(shù),沒有被改造為協(xié)程。main 函數(shù)中的 asyncio.run 是整個協(xié)程的啟動入口,asyncio.run 函數(shù)啟動事件循環(huán),驅(qū)動 supervisor() 協(xié)程運行,最終也將啟動其他協(xié)程。
在以上示例代碼中,我們可以總結(jié)出運行協(xié)程的 3 種方式:
asyncio.run(coroutine()):在一個常規(guī)函數(shù)中調(diào)用,是協(xié)程啟動入口,將開啟協(xié)程的事件循環(huán),調(diào)用后保持阻塞,直至拿到coroutine()的返回結(jié)果。asyncio.create_task(coroutine()):在協(xié)程中調(diào)用,接收另一個協(xié)程對象并調(diào)度其最終執(zhí)行,返回的Task對象是對協(xié)程對象的包裝,并且提供控制和查詢協(xié)程對象運行狀態(tài)的方法。await coroutine():在協(xié)程中調(diào)用,await關(guān)鍵字主動讓出執(zhí)行控制權(quán),終止當(dāng)前協(xié)程執(zhí)行,直至拿到coroutine()的返回結(jié)果。同時這也是一個表達式,返回結(jié)果即為coroutine()返回值。
下面是協(xié)程版本旋轉(zhuǎn)指針程序測試效果:

在協(xié)程版本中,spinner 是一個 Task 對象,其字符串表示形式為 <Task pending name='Task-2' coro=<spin() running at /Users/jianghushinian/spin/spinner_async.py:8>>。
根據(jù)以上示例代碼,我們可以總結(jié)出 Python 協(xié)程的最大特點:一處異步,處處異步。在協(xié)程中任何耗時操作都會減慢事件循環(huán),由于事件循環(huán)是單線程管理的,所以這會影響其他所有協(xié)程。在編寫協(xié)程代碼,要格外小心,不要寫出同步阻塞的代碼。好在如今 Python 已經(jīng)從語法層面原生支持協(xié)程,比使用生成器實現(xiàn)協(xié)程的年代要好多了。
給你留個小作業(yè):嘗試將 slow 協(xié)程中 await asyncio.sleep(3) 替換成普通的 time.sleep(3) 觀察下效果并思考為什么。
總結(jié)
本文我們分別使用了多線程、多進程以及協(xié)程這三種不同的并發(fā)模型實現(xiàn)了旋轉(zhuǎn)指針程序。三者比較起來,多線程、多進程在語法上差別不大,協(xié)程則大為不同,理解起來也更加困難。
由 supervisor 中打印的 spinner 對象結(jié)果可以看出,線程對象使用 Thread 來表示,進程對象使用 Process 來表示,協(xié)程對象則使用 Task 來表示。協(xié)程的定位是用戶態(tài)線程,相比傳統(tǒng)意義上的線程更加輕量,所以叫作 Task 也合理,代表同一個線程下的不同任務(wù)。
線程和進程無法在外部終止,即主線程和主進程無法終止由子線程或子進程來執(zhí)行的 spin 函數(shù),只能通過 Event 來進行通信,然后由 spin 函數(shù)內(nèi)部自己終止。協(xié)程則可以通過任務(wù)實例方法 Task.cancel() 進行通信,spin 協(xié)程中捕獲 CancelledError 異常后終止自身代碼。
記住,使用協(xié)程的代碼只有一個執(zhí)行流,就如同單線程代碼,同樣只有一個執(zhí)行流,只不過單線程代碼執(zhí)行流永遠是從上到下,而協(xié)程的執(zhí)行流則由事件循環(huán)來控制。
多線程和多進程模型是搶占式的,由操作系統(tǒng)進行調(diào)度執(zhí)行。用戶通常需要控制的是不要讓多個線程(進程)同時操作同一個數(shù)據(jù),常使用互斥鎖來解決這一問題。而協(xié)程只有一個控制循環(huán),協(xié)程的控制權(quán)在我們自己手里,我們決定什么時候切換其他任務(wù)來執(zhí)行。所以在編寫協(xié)程代碼時,要時刻注意不要寫出同步阻塞代碼。
到此這篇關(guān)于詳解Python如何使用并發(fā)模型編程的文章就介紹到這了,更多相關(guān)Python并發(fā)模型內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python數(shù)據(jù)結(jié)構(gòu)之鏈表詳解
這篇文章主要為大家詳細介紹了python數(shù)據(jù)結(jié)構(gòu)之鏈表的相關(guān)資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-09-09
python3使用tkinter實現(xiàn)ui界面簡單實例
使用tkinter創(chuàng)建一個小窗口,布置2個按鈕,一個btn關(guān)閉窗口,另一個btn用于切換執(zhí)行傳入的2個函數(shù),簡單的小代碼,大家參考使用吧2014-01-01
python保存字典數(shù)據(jù)到csv文件的完整代碼
在實際數(shù)據(jù)分析過程中,我們分析用Python來處理數(shù)據(jù)(海量的數(shù)據(jù)),我們都是把這個數(shù)據(jù)轉(zhuǎn)換為Python的對象的,比如最為常見的字典,下面這篇文章主要給大家介紹了關(guān)于python保存字典數(shù)據(jù)到csv的相關(guān)資料,需要的朋友可以參考下2022-06-06
Python監(jiān)測屏幕界面內(nèi)容變化并發(fā)送通知方法詳解
這篇文章主要為大家介紹了Python監(jiān)測屏幕界面內(nèi)容變化并發(fā)送通知,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-06-06
Pytorch使用DataLoader實現(xiàn)批量加載數(shù)據(jù)
這篇文章主要介紹了Pytorch使用DataLoader實現(xiàn)批量加載數(shù)據(jù)方式,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02
Django+vue+vscode前后端分離搭建的實現(xiàn)
本文以一個非常簡單的demo為例,介紹了利用django+drf+vue的前后端分離開發(fā)模式,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2023-08-08

