Python異步編程之a(chǎn)syncio.create_task()用法示例解析
前言
asyncio.create_task() 是Python 3.7+引入的函數(shù),用于創(chuàng)建一個(gè)Task對(duì)象。Task是對(duì)協(xié)程的封裝,可以讓協(xié)程在事件循環(huán)中并發(fā)執(zhí)行。
1. 基本語(yǔ)法
task = asyncio.create_task(coro)
coro: 要包裝的協(xié)程對(duì)象- 返回值: Task對(duì)象
2. 基礎(chǔ)用法示例
2.1 簡(jiǎn)單的任務(wù)創(chuàng)建
import asyncio
import time
async def say_hello(name, delay):
print(f"開(kāi)始執(zhí)行 {name}")
await asyncio.sleep(delay)
print(f"Hello, {name}!")
return f"完成 {name}"
async def main():
# 創(chuàng)建任務(wù)
task1 = asyncio.create_task(say_hello("Alice", 2))
task2 = asyncio.create_task(say_hello("Bob", 1))
# 等待任務(wù)完成
result1 = await task1
result2 = await task2
print(result1)
print(result2)
# 運(yùn)行主函數(shù)
asyncio.run(main())2.2 多個(gè)任務(wù)并發(fā)執(zhí)行
import asyncio
async def fetch_data(url, delay):
print(f"開(kāi)始獲取 {url}")
await asyncio.sleep(delay)
return f"數(shù)據(jù)來(lái)自 {url}"
async def main():
# 創(chuàng)建多個(gè)任務(wù)
tasks = [
asyncio.create_task(fetch_data("https://api1.com", 1)),
asyncio.create_task(fetch_data("https://api2.com", 2)),
asyncio.create_task(fetch_data("https://api3.com", 1.5))
]
# 等待所有任務(wù)完成
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(main())3. 高級(jí)用法
3.1 任務(wù)的取消
import asyncio
async def long_running_task():
try:
for i in range(10):
print(f"任務(wù)執(zhí)行中... {i}")
await asyncio.sleep(1)
return "任務(wù)完成"
except asyncio.CancelledError:
print("任務(wù)被取消")
raise
async def main():
task = asyncio.create_task(long_running_task())
# 3秒后取消任務(wù)
await asyncio.sleep(3)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("捕獲到取消異常")
asyncio.run(main())3.2 任務(wù)狀態(tài)檢查
import asyncio
async def sample_task():
await asyncio.sleep(2)
return "完成"
async def main():
task = asyncio.create_task(sample_task())
print(f"任務(wù)創(chuàng)建后 - 完成: {task.done()}")
await asyncio.sleep(1)
print(f"等待1秒后 - 完成: {task.done()}")
await task
print(f"任務(wù)完成后 - 完成: {task.done()}")
print(f"任務(wù)結(jié)果: {task.result()}")
asyncio.run(main())3.3 異常處理
import asyncio
async def task_with_exception():
await asyncio.sleep(1)
raise ValueError("這是一個(gè)錯(cuò)誤")
async def normal_task():
await asyncio.sleep(2)
return "正常完成"
async def main():
task1 = asyncio.create_task(task_with_exception())
task2 = asyncio.create_task(normal_task())
try:
# 使用 gather 并設(shè)置 return_exceptions=True 來(lái)處理異常
results = await asyncio.gather(task1, task2, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任務(wù) {i} 發(fā)生異常: {result}")
else:
print(f"任務(wù) {i} 結(jié)果: {result}")
except Exception as e:
print(f"捕獲異常: {e}")
asyncio.run(main())4. 實(shí)際應(yīng)用場(chǎng)景
4.1 并發(fā)HTTP請(qǐng)求
示例一:
import asyncio
import aiohttp
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.text()
except Exception as e:
return f"錯(cuò)誤: {e}"
async def fetch_multiple_urls():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
async with aiohttp.ClientSession() as session:
# 創(chuàng)建任務(wù)列表
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
# 并發(fā)執(zhí)行所有任務(wù)
results = await asyncio.gather(*tasks)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result) if isinstance(result, str) else result} 字符")
# 注意:需要安裝 aiohttp: pip install aiohttp
# asyncio.run(fetch_multiple_urls())示例二:
import asyncio
import aiohttp
async def process_http_request():
# 使用aiohttp進(jìn)行異步HTTP請(qǐng)求
timeout = aiohttp.ClientTimeout(total=1000)
async with aiohttp.ClientSession(timeout=timeout) as session:
headers = {}
data = {}
url = ""
is_error = False
try:
print(f"HTTP異步請(qǐng)求開(kāi)始,data: {data} ")
async with session.post(
url=url,
headers=headers,
json=data,
timeout=timeout # 為單個(gè)請(qǐng)求設(shè)置超時(shí)
) as response:
if response.status == 200:
# 獲取響應(yīng)數(shù)據(jù)
response_data = await response.json()
if response_data['data']['status'] != 200:
print(f"HTTP異步請(qǐng)求處理失敗: {response_data} ")
is_error = True
else:
print(f"HTTP異步請(qǐng)求處理失敗,狀態(tài)碼: {response.status}")
is_error = True
except asyncio.TimeoutError:
print(f"HTTP異步請(qǐng)求超時(shí)!")
is_error = True
print(f"HTTP異步請(qǐng)求完成")
return is_error
4.2 數(shù)據(jù)庫(kù)操作并發(fā)
import asyncio
# import asyncpg # 示例使用 asyncpg
async def query_database(query_id):
# 模擬數(shù)據(jù)庫(kù)查詢
await asyncio.sleep(1) # 模擬查詢時(shí)間
return f"查詢結(jié)果 {query_id}"
async def concurrent_queries():
# 創(chuàng)建多個(gè)查詢?nèi)蝿?wù)
tasks = [
asyncio.create_task(query_database(i))
for i in range(5)
]
# 并發(fā)執(zhí)行查詢
results = await asyncio.gather(*tasks)
for result in results:
print(result)
asyncio.run(concurrent_queries())create_task() vs ensure_future()
在Python 3.7之前,我們使用 asyncio.ensure_future() 來(lái)創(chuàng)建任務(wù):
# 舊方式(仍然可用) task = asyncio.ensure_future(coro) # 新方式(推薦) task = asyncio.create_task(coro)
兩者的主要區(qū)別:
create_task()更明確地表示創(chuàng)建一個(gè)任務(wù)ensure_future()功能更廣泛,可以處理多種類型的awaitable對(duì)象- 在現(xiàn)代Python中,推薦使用
create_task()
5. 最佳實(shí)踐
5.1 合理使用任務(wù)
import asyncio
async def good_practice():
# 好的做法:明確創(chuàng)建任務(wù)
task1 = asyncio.create_task(some_coroutine())
task2 = asyncio.create_task(another_coroutine())
# 等待結(jié)果
result1 = await task1
result2 = await task2
return result1, result2
async def avoid_this():
# 避免這樣做:直接await協(xié)程不會(huì)并發(fā)執(zhí)行
result1 = await some_coroutine()
result2 = await another_coroutine()
return result1, result25.2 使用超時(shí)控制
import asyncio
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
task = asyncio.create_task(slow_operation())
# 設(shè)置3秒超時(shí)
result = await asyncio.wait_for(task, timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("操作超時(shí)")
# 取消任務(wù)
task.cancel()
asyncio.run(main())6.常見(jiàn)問(wèn)題
在Python的asyncio中,asyncio.create_task()創(chuàng)建的任務(wù)不需要被await也會(huì)運(yùn)行。
asyncio任務(wù)執(zhí)行機(jī)制
當(dāng)你使用asyncio.create_task()時(shí),任務(wù)會(huì)被立即調(diào)度執(zhí)行,而不需要顯式地await它。這是因?yàn)椋?/p>
asyncio.create_task()會(huì)將任務(wù)添加到事件循環(huán)中進(jìn)行調(diào)度
事件循環(huán)會(huì)自動(dòng)運(yùn)行這些任務(wù),不需要顯式await
await的作用是等待任務(wù)完成并獲取結(jié)果,而不是觸發(fā)任務(wù)執(zhí)行
1. 任務(wù)創(chuàng)建與調(diào)度
當(dāng)調(diào)用asyncio.create_task()時(shí),會(huì)發(fā)生以下事情:
(1)任務(wù)被調(diào)度:任務(wù)被添加到事件循環(huán)(event loop)中進(jìn)行調(diào)度
(2)立即執(zhí)行:任務(wù)會(huì)在事件循環(huán)的下一個(gè)周期開(kāi)始執(zhí)行
(3)并發(fā)運(yùn)行:任務(wù)與當(dāng)前代碼以及其他任務(wù)并發(fā)運(yùn)行
2. await的作用
await關(guān)鍵字的作用是:
(1)等待任務(wù)完成:暫停當(dāng)前協(xié)程直到任務(wù)完成
(2)獲取結(jié)果:獲取任務(wù)的返回值
(3)異常處理:傳播任務(wù)中發(fā)生的異常
但是await不是觸發(fā)任務(wù)執(zhí)行的必要條件。
3. 實(shí)際示例
import asyncio
async def background_task(name):
print(f"任務(wù) {name} 開(kāi)始")
await asyncio.sleep(2)
print(f"任務(wù) {name} 完成")
return f"結(jié)果 {name}"
async def main():
# 創(chuàng)建任務(wù)但不await - 任務(wù)會(huì)自動(dòng)運(yùn)行
task1 = asyncio.create_task(background_task("1"))
task2 = asyncio.create_task(background_task("2"))
print("任務(wù)已創(chuàng)建")
# 做一些其他工作
await asyncio.sleep(1)
print("主協(xié)程繼續(xù)執(zhí)行")
# 等待任務(wù)完成獲取結(jié)果
result1 = await task1
result2 = await task2
print(f"結(jié)果: {result1}, {result2}")
# 運(yùn)行主函數(shù)
asyncio.run(main())
從輸出結(jié)果可以看出,即使沒(méi)有立即await任務(wù),它們也已經(jīng)開(kāi)始運(yùn)行了。
7. 總結(jié)
asyncio.create_task() 是Python異步編程中的核心函數(shù),學(xué)會(huì)使用能夠:
- 并發(fā)執(zhí)行協(xié)程:通過(guò)創(chuàng)建Task對(duì)象實(shí)現(xiàn)真正的并發(fā)
- 靈活控制任務(wù):可以取消、檢查狀態(tài)、處理異常
- 提高程序性能:特別適合I/O密集型操作
- 簡(jiǎn)化異步代碼:讓異步編程更加直觀和易用
到此這篇關(guān)于Python異步編程之a(chǎn)syncio.create_task()用法的文章就介紹到這了,更多相關(guān)Python asyncio.create_task()用法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python通過(guò)PyQt5實(shí)現(xiàn)登錄界面的示例代碼
本文主要介紹了python通過(guò)PyQt5實(shí)現(xiàn)登錄界面的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
Python中的多行注釋文檔編寫(xiě)風(fēng)格匯總
在Python中利用多行注釋編寫(xiě)小型的程序文檔說(shuō)明非常方便,而約定俗成的格式也多種多樣,這里我們就進(jìn)行一下最常見(jiàn)的Python中的多行注釋文檔編寫(xiě)風(fēng)格匯總:2016-06-06
NumPy中np.random.rand函數(shù)的實(shí)現(xiàn)
np.random.rand是NumPy庫(kù)中的一個(gè)函數(shù),用于生成隨機(jī)數(shù),本文主要介紹了NumPy中np.random.rand函數(shù)的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-07-07
Python使用Apache Kafka時(shí)Poll拉取速度慢的解決方法
在使用Apache Kafka時(shí),poll方法拉取消息速度慢常見(jiàn)于網(wǎng)絡(luò)延遲、消息大小過(guò)大、消費(fèi)者配置不當(dāng)或高負(fù)載情況,本文提供了優(yōu)化消費(fèi)者配置、并行消費(fèi)、優(yōu)化消息處理邏輯和監(jiān)控調(diào)試的解決方案,并附有Python代碼示例和相關(guān)類圖、序列圖以幫助理解和實(shí)現(xiàn)2024-09-09
使用python將請(qǐng)求的requests headers參數(shù)格式化方法
今天小編就為大家分享一篇使用python將請(qǐng)求的requests headers參數(shù)格式化方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-01-01
Python一句代碼實(shí)現(xiàn)找出所有水仙花數(shù)的方法
今天小編就為大家分享一篇Python一句代碼實(shí)現(xiàn)找出所有水仙花數(shù)的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2018-11-11
在Python中使用Protocol?Buffers的詳細(xì)介紹
本文詳細(xì)介紹了協(xié)議緩沖區(qū)(Protocol Buffers)在Python中的應(yīng)用,包括其定義、序列化和解析過(guò)程,協(xié)議緩沖區(qū)是一種靈活且高效的自動(dòng)化解決方案,本文包括了如何將地址簿應(yīng)用程序的個(gè)人詳細(xì)信息寫(xiě)入文件的示例代碼,并提供了相應(yīng)的下載和安裝指導(dǎo),感興趣的朋友一起看看吧2024-10-10
pytorch加載語(yǔ)音類自定義數(shù)據(jù)集的方法教程
這篇文章主要給大家介紹了關(guān)于pytorch加載語(yǔ)音類自定義數(shù)據(jù)集的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11

