Python使用asyncio包處理并發(fā)詳解
阻塞型I/O和GIL
CPython 解釋器本身就不是線程安全的,因此有全局解釋器鎖(GIL),一次只允許使用一個(gè)線程執(zhí)行 Python 字節(jié)碼。因此,一個(gè) Python 進(jìn)程通常不能同時(shí)使用多個(gè) CPU 核心。
然而,標(biāo)準(zhǔn)庫(kù)中所有執(zhí)行阻塞型 I/O 操作的函數(shù),在等待操作系統(tǒng)返回結(jié)果時(shí)都會(huì)釋放GIL。這意味著在 Python 語(yǔ)言這個(gè)層次上可以使用多線程,而 I/O 密集型 Python 程序能從中受益:一個(gè) Python 線程等待網(wǎng)絡(luò)響應(yīng)時(shí),阻塞型 I/O 函數(shù)會(huì)釋放 GIL,再運(yùn)行一個(gè)線程。
asyncio
這個(gè)包使用事件循環(huán)驅(qū)動(dòng)的協(xié)程實(shí)現(xiàn)并發(fā)。 asyncio 大量使用 yield from 表達(dá)式,因此與Python 舊版不兼容。
asyncio 包使用的“協(xié)程”是較嚴(yán)格的定義。適合asyncio API 的協(xié)程在定義體中必須使用 yield from,而不能使用 yield。此外,適合 asyncio 的協(xié)程要由調(diào)用方驅(qū)動(dòng),并由調(diào)用方通過(guò) yield from 調(diào)用;
示例1
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Start Hello', threading.currentThread())
yield from asyncio.sleep(5)
print('End Hello', threading.currentThread())
@asyncio.coroutine
def world():
print('Start World', threading.currentThread())
yield from asyncio.sleep(3)
print('End World', threading.currentThread())
# 獲取EventLoop:
loop = asyncio.get_event_loop()
tasks = [hello(), world()]
# 執(zhí)行coroutine
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
@asyncio.coroutine把生成器函數(shù)標(biāo)記為協(xié)程類(lèi)型。
asyncio.sleep(3) 創(chuàng)建一個(gè)3秒后完成的協(xié)程。
loop.run_until_complete(future),運(yùn)行直到future完成;如果參數(shù)是 coroutine object,則需要使用 ensure_future()函數(shù)包裝。
loop.close() 關(guān)閉事件循環(huán)
示例2
import asyncio
@asyncio.coroutine
def worker(text):
"""
協(xié)程運(yùn)行的函數(shù)
:param text:
:return:
"""
i = 0
while True:
print(text, i)
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
i += 1
@asyncio.coroutine
def client(text, io_used):
worker_fu = asyncio.ensure_future(worker(text))
# 假裝等待I/O一段時(shí)間
yield from asyncio.sleep(io_used)
# 結(jié)束運(yùn)行協(xié)程
worker_fu.cancel()
return 'done'
loop = asyncio.get_event_loop()
tasks = [client('xiaozhe', 3), client('zzzz', 5)]
result = loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('Answer:', result)
解釋?zhuān)?
1. asyncio.ensure_future(coro_or_future, *, loop=None):計(jì)劃安排一個(gè) coroutine object的執(zhí)行,返回一個(gè) asyncio.Task object。
2. worker_fu.cancel(): 取消一個(gè)協(xié)程的執(zhí)行,拋出CancelledError異常。
3. asyncio.wait():協(xié)程的參數(shù)是一個(gè)由期物或協(xié)程構(gòu)成的可迭代對(duì)象; wait 會(huì)分別把各個(gè)協(xié)程包裝進(jìn)一個(gè) Task 對(duì)象。
asyncio.Task 對(duì)象與threading.Thread對(duì)象的比較
asyncio.Task 對(duì)象差不多與 threading.Thread 對(duì)象等效。
Task 對(duì)象用于驅(qū)動(dòng)協(xié)程, Thread 對(duì)象用于調(diào)用可調(diào)用的對(duì)象。
Task 對(duì)象不由自己動(dòng)手實(shí)例化,而是通過(guò)把協(xié)程傳給 asyncio.ensure_future(…) 函數(shù)或loop.create_task(…) 方法獲取。
獲取的 Task 對(duì)象已經(jīng)排定了運(yùn)行時(shí)間;Thread 實(shí)例則必須調(diào)用 start 方法,明確告知讓它運(yùn)行。
如果想終止任務(wù),可以使用 Task.cancel() 實(shí)例方法,在協(xié)程內(nèi)部拋出CancelledError 異常。
線程與協(xié)程的安全比較
如果使用線程做過(guò)重要的編程,因?yàn)檎{(diào)度程序任何時(shí)候都能中斷線程。必須記住保留鎖,去保護(hù)程序中的重要部分,防止多步操作在執(zhí)行的過(guò)程中中斷,防止數(shù)據(jù)處于無(wú)效狀態(tài)。
協(xié)程默認(rèn)會(huì)做好全方位保護(hù),以防止中斷。我們必須顯式產(chǎn)出才能讓程序的余下部分運(yùn)行。對(duì)協(xié)程來(lái)說(shuō),無(wú)需保留鎖,在多個(gè)線程之間同步操作,協(xié)程自身就會(huì)同步,因?yàn)樵谌我鈺r(shí)刻只有一個(gè)協(xié)程運(yùn)行。想交出控制權(quán)時(shí),可以使用 yield 或 yield from 把控制權(quán)交還調(diào)度程序。這就是能夠安全地取消協(xié)程的原因:按照定義,協(xié)程只能在暫停的 yield處取消,因此可以處理 CancelledError 異常,執(zhí)行清理操作。
Future(期物)
通常情況下自己不應(yīng)該創(chuàng)建期物,而只能由并發(fā)框架(concurrent.futures 或 asyncio)實(shí)例化。原因很簡(jiǎn)單:期物表示終將發(fā)生的事情,而確定某件事會(huì)發(fā)生的唯一方式是執(zhí)行的時(shí)間已經(jīng)排定。
asyncio.Future
在 asyncio 包中, BaseEventLoop.create_task(…) 方法接收一個(gè)協(xié)程,排定它的運(yùn)行時(shí)間,然后返回一個(gè) asyncio.Task 實(shí)例——也是 asyncio.Future 類(lèi)的實(shí)例,因?yàn)?Task 是Future 的子類(lèi),用于包裝協(xié)程。
asyncio.ensure_future(coro_or_future, *, loop=None)
這個(gè)函數(shù)統(tǒng)一了協(xié)程和期物:第一個(gè)參數(shù)可以是二者中的任何一個(gè)。如果是 Future 或 Task 對(duì)象,那就原封不動(dòng)地返回。如果是協(xié)程,那么 async 函數(shù)會(huì)調(diào)用loop.create_task(…) 方法創(chuàng)建 Task 對(duì)象。 loop= 關(guān)鍵字參數(shù)是可選的,用于傳入事件循環(huán);如果沒(méi)有傳入,那么 async 函數(shù)會(huì)通過(guò)調(diào)用 asyncio.get_event_loop() 函數(shù)獲取循環(huán)對(duì)象。
BaseEventLoop.create_task(coro)
這個(gè)方法排定協(xié)程的執(zhí)行時(shí)間,返回一個(gè) asyncio.Task 對(duì)象。
asyncio 包中有多個(gè)函數(shù)會(huì)自動(dòng)把參數(shù)指定的協(xié)程包裝在 asyncio.Task 對(duì)象中,例如 BaseEventLoop.run_until_complete(…) 方法。
asyncio.as_completed
為了集成進(jìn)度條,我們可以使用的是 as_completed 生成器函數(shù);幸好, asyncio 包提供了這個(gè)生成器函數(shù)的相應(yīng)版本。
使用asyncio和aiohttp包
從 Python 3.4 起, asyncio 包只直接支持 TCP 和 UDP。如果想使用 HTTP 或其他協(xié)議,那么要借助第三方包 aiohttp 。
cc_list = ['China', 'USA']
@asyncio.coroutine
def get_flag(cc):
url = '{}/{cc}/{cc}.gif'.format(BASE_URL, cc=cc.lower())
resp = yield from aiohttp.request('GET', url)
image = yield from resp.read()
return image
@asyncio.coroutine
def download_one(name):
image = yield from get_flag(name)
save_flag(image, name.lower() + '.gif')
return name
loop = asyncio.get_event_loop()
wait_coro = asyncio.wait([download_one(cc) for cc in sorted(cc_list)])
res, _ = loop.run_until_complete(wait_coro)
loop.close()
使用 asyncio 包時(shí),我們編寫(xiě)的異步代碼中包含由 asyncio 本身驅(qū)動(dòng)的協(xié)程(即委派生成器),而生成器最終把職責(zé)委托給 asyncio 包或第三方庫(kù)(如aiohttp)中的協(xié)程。這種處理方式相當(dāng)于架起了管道,讓 asyncio 事件循環(huán)(通過(guò)我們編寫(xiě)的協(xié)程)驅(qū)動(dòng)執(zhí)行低層異步 I/O 操作的庫(kù)函數(shù)。
避免阻塞型調(diào)用
有兩種方法能避免阻塞型調(diào)用中止整個(gè)應(yīng)用程序的進(jìn)程:
1. 在單獨(dú)的線程中運(yùn)行各個(gè)阻塞型操作
2. 把每個(gè)阻塞型操作轉(zhuǎn)換成非阻塞的異步調(diào)用使用
多個(gè)線程是可以的,但是各個(gè)操作系統(tǒng)線程(Python 使用的是這種線程)消耗的內(nèi)存達(dá)兆字節(jié)(具體的量取決于操作系統(tǒng)種類(lèi))。如果要處理幾千個(gè)連接,而每個(gè)連接都使用一個(gè)線程的話,我們負(fù)擔(dān)不起。
把生成器當(dāng)作協(xié)程使用是異步編程的另一種方式。對(duì)事件循環(huán)來(lái)說(shuō),調(diào)用回調(diào)與在暫停的協(xié)程上調(diào)用 .send() 方法效果差不多。各個(gè)暫停的協(xié)程是要消耗內(nèi)存,但是比線程消耗的內(nèi)存數(shù)量級(jí)小。
上面的腳本為什么會(huì)很快
在上面的腳本中,調(diào)用 loop.run_until_complete 方法時(shí),事件循環(huán)驅(qū)動(dòng)各個(gè)download_one 協(xié)程,運(yùn)行到第一個(gè) yield from 表達(dá)式處時(shí),那個(gè)表達(dá)式驅(qū)動(dòng)各個(gè)get_flag 協(xié)程,然后在get_flag協(xié)程里面運(yùn)行到第一個(gè) yield from 表達(dá)式處時(shí),調(diào)用 aiohttp.request(…)函數(shù)。這些調(diào)用都不會(huì)阻塞,因此在零點(diǎn)幾秒內(nèi)所有請(qǐng)求全部開(kāi)始。
asyncio 的基礎(chǔ)設(shè)施獲得第一個(gè)響應(yīng)后,事件循環(huán)把響應(yīng)發(fā)給等待結(jié)果的 get_flag 協(xié)程。得到響應(yīng)后, get_flag 向前執(zhí)行到下一個(gè) yield from 表達(dá)式處,調(diào)用resp.read() 方法,然后把控制權(quán)還給主循環(huán)。其他響應(yīng)會(huì)陸續(xù)返回。所有 get_ flag 協(xié)程都獲得結(jié)果后,委派生成器 download_one 恢復(fù),保存圖像文件。
async和await
為了簡(jiǎn)化并更好地標(biāo)識(shí)異步IO,從Python 3.5開(kāi)始引入了新的語(yǔ)法async和await,可以讓coroutine的代碼更簡(jiǎn)潔易讀。
async和await是針對(duì)coroutine的新語(yǔ)法,要使用新的語(yǔ)法,只需要做兩步簡(jiǎn)單的替換。
1. 把@asyncio.coroutine替換為async
2. 把yield from替換為await
例如:
@asyncio.coroutine
def hello():
print("Hello world!")
r = yield from asyncio.sleep(1)
print("Hello again!")
等同于
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
網(wǎng)站請(qǐng)求實(shí)例
import asyncio
import aiohttp
urls = [
'http://www.163.com/',
'http://www.sina.com.cn/',
'https://www.hupu.com/',
'http://www.csdn.net/'
]
async def get_url_data(u):
"""
讀取url的數(shù)據(jù)
:param u:
:return:
"""
print('running ', u)
async with aiohttp.ClientSession() as session:
async with session.get(u) as resp:
print(u, resp.status, type(resp.text()))
# print(await resp.text())
return resp.headers
async def request_url(u):
"""
主調(diào)度函數(shù)
:param u:
:return:
"""
res = await get_url_data(u)
return res
loop = asyncio.get_event_loop()
task_lists = asyncio.wait([request_url(u) for u in urls])
all_res, _ = loop.run_until_complete(task_lists)
loop.close()
print(all_res)
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Pycharm如何導(dǎo)入python文件及解決報(bào)錯(cuò)問(wèn)題
這篇文章主要介紹了Pycharm如何導(dǎo)入python文件及解決報(bào)錯(cuò)問(wèn)題,本文通過(guò)示例截圖相結(jié)合給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
瘋狂上漲的Python 開(kāi)發(fā)者應(yīng)從2.x還是3.x著手?
熱度瘋漲的 Python,開(kāi)發(fā)者應(yīng)從 2.x 還是 3.x 著手?這篇文章就為大家分析一下了Python開(kāi)發(fā)者應(yīng)從2.x還是3.x學(xué)起,感興趣的小伙伴們可以參考一下2017-11-11
Python爬蟲(chóng)設(shè)置代理IP的方法(爬蟲(chóng)技巧)
這篇文章主要介紹了Python爬蟲(chóng)設(shè)置代理IP的方法(爬蟲(chóng)技巧),需要的朋友可以參考下2018-03-03
使用python實(shí)現(xiàn)一個(gè)簡(jiǎn)單ping?pong服務(wù)器
這篇文章主要為大家介紹了使用python實(shí)現(xiàn)一個(gè)簡(jiǎn)單ping?pong服務(wù)器,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
M1芯片安裝python3.9.1的實(shí)現(xiàn)
這篇文章主要介紹了M1芯片安裝python3.9.1的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-02-02
python輸入一個(gè)水仙花數(shù)(三位數(shù)) 輸出百位十位個(gè)位實(shí)例
這篇文章主要介紹了python輸入一個(gè)水仙花數(shù)(三位數(shù)) 輸出百位十位個(gè)位實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-05-05

