簡單有效上手Python3異步asyncio問題
Python3異步asyncio問題
官方文檔:
https://docs.python.org/zh-cn/3/library/asyncio-task.html#asyncio.run
看了一大堆相關(guān)的資料和教程,針對的Python版本不同,寫法也各不一致,翻了翻官方的文檔,發(fā)現(xiàn)其實(shí)越高版本的Python對異步進(jìn)行封裝的越方便,官方說法叫高層級API,甚至都不用去理解什么Future\task\loop之類的概念了,我現(xiàn)在用的是Python 3.7.5,可以這樣很簡單的實(shí)現(xiàn)阻塞等待\異步并發(fā):如果沒有復(fù)雜需求的話,用高層級API就可以了。
如果涉及到回調(diào)的話貌似還得用低層級的API,后面單獨(dú)記錄。
import asyncio
async def first():
print('first函數(shù)調(diào)用開始,下面將會等待3秒模擬函數(shù)運(yùn)行完畢')
await asyncio.sleep(3)
print('first函數(shù)執(zhí)行完畢')
async def last():
print('last函數(shù)調(diào)用開始')
await asyncio.sleep(2)
print('last函數(shù)執(zhí)行完畢')
async def func(delay):
print('開始異步同時(shí)執(zhí)行的函數(shù)+延遲: ' + str(delay))
await asyncio.sleep(delay)
print('--異步函數(shù)執(zhí)行完畢+延遲: ' + str(delay))
async def main():
await first() # 這里先調(diào)用first()函數(shù),并且等它執(zhí)行完了才會開始
await asyncio.gather(
func(1),
func(2),
func(3)
)
await last()
asyncio.run(main())
上面代碼實(shí)際執(zhí)行的過程是:
- 開始執(zhí)行
first()函數(shù) - 等
first()執(zhí)行完畢后開始并發(fā)執(zhí)行下面gather中加入的 - 三個(gè)函數(shù)(任務(wù))三個(gè)函數(shù)全部并發(fā)執(zhí)行完畢后執(zhí)行
last()
官方文檔中給的建議是只創(chuàng)建一個(gè)主入口的main()函數(shù)(當(dāng)然這個(gè)函數(shù)名可以自定義的),將要調(diào)用的其他函數(shù)都放在這個(gè)函數(shù)中,然后再使用asyncio.run()啟動,理想情況下應(yīng)當(dāng)只被調(diào)用一次.
上圖:

更新
上面所謂的高階層API用法最后一行asyncio.run(main())和下面使用低階層API實(shí)現(xiàn)效果是一樣的:
loop = asyncio.get_event_loop() task = loop.create_task(main()) loop.run_until_complete(task)
下面是學(xué)習(xí)過程中記錄的偏低層實(shí)現(xiàn)的資料
最基本的定義和應(yīng)用
import asyncio
# 定義一個(gè)可以異步調(diào)用的函數(shù),其類型為coroutine
async def func1():
pass
if __name__ == '__main__':
loop = asyncio.get_event_loop() # 定義一個(gè)用來循環(huán)異步函數(shù)的loop對象
task = asyncio.ensure_future(func1()) # 創(chuàng)建一個(gè)調(diào)用異步函數(shù)的任務(wù),task類型為future
# task = loop.create_task(func1()) # 使用loop的.create_task()創(chuàng)建任務(wù),效果一樣
loop.run_until_complete(task) # 開始在loop循環(huán)中執(zhí)行異步函數(shù),直到該函數(shù)運(yùn)行完畢
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創(chuàng)建一個(gè)task,run_until_complete的參數(shù)是一個(gè)futrue對象。
當(dāng)傳入一個(gè)協(xié)程,其內(nèi)部會自動封裝成task,task是Future的子類。
isinstance(task, asyncio.Future)將會輸出True。
future類型的任務(wù)可以在loop.run_until_complete中執(zhí)行,也可以直接用await+任務(wù)變量名阻塞?調(diào)用
什么時(shí)候使用異步
import asyncio
# 這是一個(gè)耗時(shí)很少的異步函數(shù)
async def msg(text):
await asyncio.sleep(0.1)
print(text)
# 這是一個(gè)耗時(shí)較長的異步函數(shù)
async def long_operation():
print('long_operation started')
await asyncio.sleep(3)
print('long_operation finished')
# 主函數(shù)部分,同樣需要聲明為async異步類型
async def main():
await msg('first')
# 現(xiàn)在需要調(diào)用一個(gè)耗時(shí)較長的函數(shù)操作,不希望阻塞的等待它執(zhí)行完畢
# 希望long_operation在開始執(zhí)行后,立即調(diào)用msg,這里就可以將long_operation封裝到task任務(wù)中
task = asyncio.ensure_future(long_operation())
await msg('second')
# 開始task中的long_operation函數(shù)
await task
# task執(zhí)行完畢后會繼續(xù)下面的代碼
print('All done.')
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
運(yùn)行結(jié)果:

并發(fā)和并行
并發(fā)和并行一直是容易混淆的概念。并發(fā)通常指有多個(gè)任務(wù)需要同時(shí)進(jìn)行,并行則是同一時(shí)刻有多個(gè)任務(wù)執(zhí)行。
用上課來舉例就是,并發(fā)情況下是一個(gè)老師在同一時(shí)間段輔助不同的人功課。
并行則是好幾個(gè)老師分別同時(shí)輔助多個(gè)學(xué)生功課。
簡而言之就是一個(gè)人同時(shí)吃三個(gè)饅頭還是三個(gè)人同時(shí)分別吃一個(gè)的情況,吃一個(gè)饅頭算一個(gè)任務(wù)。
asyncio實(shí)現(xiàn)并發(fā),就需要多個(gè)協(xié)程來完成任務(wù),每當(dāng)有任務(wù)阻塞的時(shí)候就await,然后其他協(xié)程繼續(xù)工作。
創(chuàng)建多個(gè)協(xié)程的列表,然后將這些協(xié)程注冊到事件循環(huán)中。
import asyncio
import time
now = lambda: time.time()
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)
start = now()
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task ret: ', task.result())
print('TIME: ', now() - start)
結(jié)果如下
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.003541946411133
總時(shí)間為4s左右。4s的阻塞時(shí)間,足夠前面兩個(gè)協(xié)程執(zhí)行完畢。如果是同步順序的任務(wù),那么至少需要7s。此時(shí)我們使用了aysncio實(shí)現(xiàn)了并發(fā)。
asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個(gè)task列表,后者接收一堆task。
異步結(jié)果回調(diào)
找了個(gè)別人寫的例子,大致理解了下實(shí)現(xiàn)過程:
- 定義
async異步函數(shù) - 定義普通函數(shù)用于處理回調(diào)
- 獲取異步函數(shù)的
coroutine協(xié)程對象(其實(shí)就是不帶await修飾直接執(zhí)行異步函數(shù)返回的那個(gè)對象) - 獲取
loop循環(huán)對象 - 使用低階層API手動創(chuàng)建
task任務(wù) - 為
task任務(wù)對象注冊回調(diào)函數(shù) - 啟動
loop循環(huán)調(diào)用異步函數(shù)
import time
import asyncio
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
return 'Done after {}s'.format(x)
def callback(future):
print('Callback: ', future.result())
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
#task = asyncio.ensure_future(coroutine) # 貌似和上面用loop創(chuàng)建任務(wù)效果一樣
task.add_done_callback(callback)
loop.run_until_complete(task)
print('TIME: ', now() - start)
這里需要注意,在使用低層級API手動創(chuàng)建異步任務(wù)的時(shí)候,不能同時(shí)使用高層級API的簡單操作了,比如這里創(chuàng)建task任務(wù)對象的時(shí)候,就不能用asyncio.create_task(),否則會找不到loop對象,返回下面的錯誤
RuntimeError: no running event loop
翻了一下asyncio\tasks.py源代碼,原來所謂的高層級API就是這么簡單的封裝啊…
調(diào)用的時(shí)候在函數(shù)內(nèi)部又定義了一遍loop
def create_task(coro):
"""Schedule the execution of a coroutine object in a spawn task.
Return a Task object.
"""
loop = events.get_running_loop()
return loop.create_task(coro)
我自己寫的例子
創(chuàng)建4個(gè)異步任務(wù)同時(shí)開始執(zhí)行,每個(gè)任務(wù)執(zhí)行完成后將結(jié)果追加到result_list數(shù)組變量中.
import asyncio
result_list = []
async def fun(var):
return var + 1
def callbackFun(future):
result_list.append(future.result())
task_list = []
for i in range(1, 5):
cor = fun(i)
task = asyncio.ensure_future(cor)
task.add_done_callback(callbackFun)
task_list.append(task)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(task_list))
print(result_list)
當(dāng)然,如果不需要使用回調(diào)函數(shù),而是等所有提交的異步任務(wù)都執(zhí)行完成后獲取它們的結(jié)果,可以這樣寫:
# 前面省略
loop.run_until_complete(asyncio.wait(task_list))
# task_list中的每個(gè)任務(wù)執(zhí)行完成后可以調(diào)用它的.result()方法來獲取結(jié)果
for task in task_list:
print('每個(gè)task執(zhí)行完的結(jié)果:', task.result())
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Python中常用數(shù)據(jù)類型使用示例概括總結(jié)
這篇文章主要為大家介紹了Python中常用數(shù)據(jù)類型使用示例概括總結(jié),有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
python?實(shí)現(xiàn)?pymysql?數(shù)據(jù)庫操作方法
這篇文章主要介紹了python實(shí)現(xiàn)pymysql數(shù)據(jù)庫操作方法,文章基于python的相關(guān)內(nèi)容展開對?pymysql?數(shù)據(jù)庫操作方法的詳細(xì)介紹,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-04-04
Python實(shí)現(xiàn)隨機(jī)分層抽樣的示例詳解
在數(shù)據(jù)分析與機(jī)器學(xué)習(xí)的實(shí)踐中,抽樣是不可或缺的一步,分層抽樣作為一種常用的抽樣方法,能夠確保樣本在不同類別中的比例與總體一致,下面我們看看如何使用Python實(shí)現(xiàn)隨機(jī)分層抽樣吧2024-11-11
Tensorflow 實(shí)現(xiàn)分批量讀取數(shù)據(jù)
今天小編就為大家分享一篇Tensorflow 實(shí)現(xiàn)分批量讀取數(shù)據(jù),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-01-01
python+selenium行為鏈登錄12306(滑動驗(yàn)證碼滑塊)
這篇文章主要介紹了python+selenium行為鏈登錄12306,使用python網(wǎng)絡(luò)爬蟲登錄12306,下面小編為大家分享一段代碼,具有一定的參考價(jià)值,需要的小伙伴可以參考一下2022-02-02
Python實(shí)現(xiàn)SVM支持向量機(jī)的示例代碼
SVM 的目的是在數(shù)據(jù)集中找到一條最佳分隔超平面,使得在這個(gè)超平面兩側(cè)的數(shù)據(jù)分別屬于不同的類別,且該超平面與最近的數(shù)據(jù)點(diǎn)之間的距離最大。本文將通過Python實(shí)現(xiàn)SVM支持向量機(jī),感興趣的可以了解一下2023-02-02

