Python 異步編程操作示例詳解
Python 異步編程是一種基于非阻塞 IO 模型的并發(fā)編程范式,核心目標是在處理 IO 密集型任務(如網(wǎng)絡請求、文件讀寫、數(shù)據(jù)庫交互)時,通過高效的任務調(diào)度減少等待時間,最大化 CPU 利用率。
異步編程通過事件循環(huán)實現(xiàn)任務調(diào)度:當一個任務因 IO 操作需要等待時,事件循環(huán)會暫停該任務,切換到其他就緒任務;當 IO 操作完成(如響應到達),事件循環(huán)再恢復原任務的執(zhí)行。
核心優(yōu)勢:
- 單線程內(nèi)實現(xiàn)并發(fā),避免多線程的上下文切換開銷。
- 針對 IO 密集型任務,性能提升顯著(通常是同步方式的數(shù)倍到數(shù)十倍)。
1、核心組件
1.1 事件循環(huán)
事件循環(huán)是異步編程的 “心臟”,負責任務調(diào)度、IO 事件監(jiān)聽、狀態(tài)管理。
# 偽代碼
任務列表 = [ 任務1,任務2,任務3,...]
while True:
可執(zhí)行的任務列表,已完成的任務列表 = 去任務列表中檢查所有的任務,將'可執(zhí)行'和'已完成'的任務返回
for 就緒任務 in 已準備就緒的任務列表:
執(zhí)行已就緒的任務
for 已完成的任務 in 已完成的任務列表:
在任務列表中移除 已完成的任務
如果 任務列表 中的任務都已完成,則終止循環(huán)關(guān)鍵細節(jié):
- 事件循環(huán)在單線程中運行,所有任務都在這個線程內(nèi)切換執(zhí)行。
- 僅當任務遇到
await(表示需要等待 IO)時才會切換,純計算任務不會觸發(fā)切換。
1.2 協(xié)程
協(xié)程是異步任務的基本單元,是一種用戶態(tài)的上下文切換技術(shù),其實就是通過一個線程實現(xiàn)代碼塊相互切換執(zhí)行,本質(zhì)是可暫停 / 恢復的函數(shù),通過 async def 定義。與普通函數(shù)的區(qū)別在于:
- 調(diào)用協(xié)程不會立即執(zhí)行,而是返回一個協(xié)程對象。
- 必須通過事件循環(huán)調(diào)度(如
await、create_task)才能執(zhí)行。
# 協(xié)程的定義與狀態(tài)
import asyncio
async def my_coroutine():
print("協(xié)程開始")
await asyncio.sleep(1) # 暫停點:釋放CPU,允許切換
print("協(xié)程結(jié)束")
return "結(jié)果"
# 協(xié)程對象(未執(zhí)行)
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# 必須通過事件循環(huán)執(zhí)行
async def main():
result = await coro # 調(diào)度執(zhí)行,等待結(jié)果
print(result) # 輸出:結(jié)果
asyncio.run(main())協(xié)程的生命周期:
- 創(chuàng)建:
coro = my_coroutine()→ 未執(zhí)行狀態(tài)。 - 運行:
await coro或create_task(coro)→ 進入事件循環(huán)。 - 暫停:執(zhí)行到
await語句 → 等待 IO 時掛起。 - 恢復:IO 完成 → 從暫停點繼續(xù)執(zhí)行。
- 完成:執(zhí)行到函數(shù)末尾 → 返回結(jié)果或拋出異常。
1.3 任務
任務是協(xié)程的包裝器,由事件循環(huán)直接調(diào)度,用于實現(xiàn)并發(fā)。任務會將協(xié)程注冊到事件循環(huán),并跟蹤其狀態(tài)(運行中 / 已完成 / 已取消)。
async def task_func(name, delay):
print(f"任務 name={name}, delay={delay} === 111")
await asyncio.sleep(delay)
print(f"任務 name={name}, delay={delay} === 222")
return f"任務 {name} 完成"
async def main():
# 創(chuàng)建任務(立即加入事件循環(huán),開始調(diào)度)
task1 = asyncio.create_task(task_func("A", 1))
task2 = asyncio.create_task(task_func("B", 2))
print("任務狀態(tài):", task1.done()) # False(未完成)
# 等待任務完成并獲取結(jié)果
result1 = await task1
result2 = await task2
print("結(jié)果:", result1, result2) # 任務 A 完成 任務 B 完成
print("任務狀態(tài):", task1.done()) # True(已完成)
asyncio.run(main())任務的核心方法:
task.done():判斷任務是否完成。task.result():獲取任務返回值(任務未完成時調(diào)用會報錯)。task.cancel():取消任務(觸發(fā)CancelledError)。task.add_done_callback(func):注冊回調(diào)函數(shù)(任務完成后執(zhí)行)。
更常用寫法:
async def task_func(name, delay):
print(f"任務 name={name}, delay={delay} === 111")
await asyncio.sleep(delay)
print(f"任務 name={name}, delay={delay} === 222")
return f"任務 {name} 完成"
async def main():
# 創(chuàng)建任務(立即加入事件循環(huán),開始調(diào)度)
task_list = [
asyncio.create_task(task_func("A", 1), name="task_A"),
asyncio.create_task(task_func("B", 2), name="task_B"),
]
done, pending = await asyncio.wait(task_list, timeout=None)
# 等待任務完成并獲取結(jié)果
print(done)
asyncio.run(main())1.4 Future 對象
Future 是異步操作結(jié)果的容器,表示 “未來可能完成的操作”。任務(Task)是 Future 的子類,因此具備 Future 的所有特性,task對象內(nèi)部await結(jié)果的處理是基于future的:
- 存儲異步操作的狀態(tài)(
PENDING/FINISHED/CANCELLED)。 - 提供結(jié)果設(shè)置(
set_result())和異常設(shè)置(set_exception())方法。 - 支持通過
await獲取結(jié)果,或注冊回調(diào)函數(shù)。
async def main():
# 創(chuàng)建一個空的Future對象
future = asyncio.Future()
# 定義一個設(shè)置Future結(jié)果的協(xié)程
async def set_future_result():
await asyncio.sleep(1)
future.set_result("Future 結(jié)果") # 設(shè)置結(jié)果,標記為完成
# 并發(fā)執(zhí)行:設(shè)置結(jié)果的協(xié)程 + 等待結(jié)果的操作
asyncio.create_task(set_future_result())
result = await future # 等待Future完成
print(result) # 輸出:Future 結(jié)果
asyncio.run(main())Task 與 Future 的關(guān)系:
- Task 繼承自 Future,是 “可執(zhí)行的 Future”(綁定了協(xié)程)。
- Future 更底層,通常用于手動管理異步操作結(jié)果(如包裝回調(diào)式 API)。
2、基礎(chǔ)語法和核心API
2.1async/await語法
async/await 是 Python 3.5+ 引入的異步語法糖,用于定義協(xié)程和暫停執(zhí)行:
async def:定義協(xié)程函數(shù)(返回協(xié)程對象)。await:暫停協(xié)程,等待另一個協(xié)程 / Future/Task 完成,只能在協(xié)程內(nèi)部使用。
async def nested():
return 42
async def main():
# 直接調(diào)用協(xié)程不會執(zhí)行,必須用await
result = await nested() # 等待nested完成,獲取結(jié)果
print(result) # 42
asyncio.run(main())注意:
- 普通函數(shù)中不能使用
await(會報SyntaxError)。 await后面必須是 “可等待對象”(協(xié)程、Task、Future)
2.2 事件循環(huán)的啟動與管理
Python 3.7+ 推薦用 asyncio.run() 啟動事件循環(huán)(自動創(chuàng)建、運行、關(guān)閉循環(huán)),低版本需手動管理:
# Python 3.7+ 推薦方式
async def main():
await asyncio.sleep(1)
print("完成")
asyncio.run(main()) # 自動處理事件循環(huán)的生命周期
# 低版本手動管理方式(3.6及以下)
loop = asyncio.get_event_loop() # 獲取事件循環(huán)
try:
loop.run_until_complete(main()) # 運行直到協(xié)程完成
finally:
loop.close() # 關(guān)閉循環(huán)2.3 并發(fā)任務管理
2.3.1asyncio.gather()
批量并發(fā)與結(jié)果聚合
gather() 用于同時運行多個可等待對象,按輸入順序返回結(jié)果,適合需要統(tǒng)一收集結(jié)果的場景。
async def task(i):
await asyncio.sleep(i)
return i
async def main():
# 并發(fā)執(zhí)行3個任務
results = await asyncio.gather(
task(1),
task(2),
task(0.5)
)
print(results) # [1, 2, 0.5](按輸入順序,而非完成順序)
asyncio.run(main())高級參數(shù):
return_exceptions=True:將異常作為結(jié)果返回,不中斷其他任務。
async def faulty_task():
raise ValueError("出錯了")
async def main():
results = await asyncio.gather(
faulty_task(),
task(1),
return_exceptions=True # 異常會被包裝到結(jié)果中
)
print(results) # [ValueError('出錯了'), 1]2.3.2asyncio.wait()
靈活控制任務完成條件
wait() 比 gather() 更靈活,支持按 “第一個完成”“所有完成” 等條件返回,返回值是已完成和未完成的任務集合。
async def main():
tasks = [task(1), task(2), task(0.5)]
# 等待所有任務完成(默認)
done, pending = await asyncio.wait(tasks)
print("已完成任務數(shù):", len(done)) # 3
print("未完成任務數(shù):", len(pending)) # 0
# 等待第一個任務完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print("第一個完成的任務結(jié)果:", [t.result() for t in done]) # [0.5]return_when 可選值:
FIRST_COMPLETED:第一個任務完成時返回。FIRST_EXCEPTION:第一個任務拋出異常時返回(無異常則等所有完成)。ALL_COMPLETED:所有任務完成時返回(默認)。
3、同步代碼的異步化:兼容舊庫
實際開發(fā)中常需在異步程序中調(diào)用同步阻塞庫(如 requests、pymysql),直接調(diào)用會阻塞事件循環(huán),需通過線程池異步執(zhí)行。
3.1 核心方法:loop.run_in_executor()
該方法將同步函數(shù)提交到線程池執(zhí)行,返回 Future 對象,可通過 await 獲取結(jié)果。
import asyncio
import requests # 同步阻塞庫
# 同步函數(shù)(阻塞)
def sync_get(url):
return requests.get(url).status_code
async def async_get(url):
# 獲取事件循環(huán)
loop = asyncio.get_event_loop()
# 提交到線程池執(zhí)行(None 表示使用默認線程池)
future = loop.run_in_executor(
None, # 線程池執(zhí)行器(可選自定義)
sync_get, # 同步函數(shù)
url # 函數(shù)參數(shù)
)
return await future # 等待線程池結(jié)果
async def main():
urls = ["https://www.baidu.com", "https://www.github.com"]
# 并發(fā)執(zhí)行同步函數(shù)的異步包裝
results = await asyncio.gather(*[async_get(url) for url in urls])
print("結(jié)果:", results) # [200, 200]
asyncio.run(main())3.2 自定義線程池
默認線程池大小有限(通常為 CPU 核心數(shù) * 5),高并發(fā)場景可自定義線程池:
from concurrent.futures import ThreadPoolExecutor
async def main():
# 自定義線程池(最大10個線程)
executor = ThreadPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()
# 使用自定義線程池
future = loop.run_in_executor(executor, sync_get, "https://www.baidu.com")
print(await future) # 2004、異步 IO 實戰(zhàn)
4.1 異步網(wǎng)絡請求(aiohttp)
aiohttp 是異步 HTTP 客戶端 / 服務器庫,支持異步請求、連接池、超時控制等,是替代同步 requests 的最佳選擇。
并發(fā)爬取網(wǎng)頁(帶超時與重試)
import asyncio
import aiohttp
from aiohttp import ClientTimeout
async def fetch(session, url, retry=3):
"""帶重試機制的異步請求"""
timeout = ClientTimeout(total=10) # 超時控制(10秒)
try:
async with session.get(url, timeout=timeout) as response:
return {
"url": url,
"status": response.status,
"length": len(await response.text())
}
except Exception as e:
if retry > 0:
print(f"請求 {url} 失敗,重試 {retry-1} 次: {e}")
await asyncio.sleep(1) # 重試前等待1秒
return await fetch(session, url, retry-1)
return {"url": url, "error": str(e)}
async def main():
urls = [
"https://www.baidu.com",
"https://www.github.com",
"https://www.python.org",
"https://invalid.url"
]
# 創(chuàng)建會話(復用連接,提高效率)
async with aiohttp.ClientSession() as session:
# 生成任務列表
tasks = [fetch(session, url) for url in urls]
# 并發(fā)執(zhí)行
results = await asyncio.gather(*tasks)
# 輸出結(jié)果
for res in results:
if "error" in res:
print(f"{res['url']}: {res['error']}")
else:
print(f"{res['url']} | 狀態(tài): {res['status']} | 長度: {res['length']}")
asyncio.run(main())4.2 異步文件操作(aiofiles)
傳統(tǒng) open() 是同步阻塞的,aiofiles 提供異步文件讀寫,支持 async with 和 await 語法。
異步讀寫多文件
import asyncio
import aiofiles
async def write_file(filename, content):
"""異步寫入文件"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content) # 異步寫入
print(f"已寫入: {filename}")
async def read_file(filename):
"""異步讀取文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read() # 異步讀取
return filename, content
async def main():
# 并發(fā)寫入3個文件
await asyncio.gather(
write_file("file1.txt", "異步文件1"),
write_file("file2.txt", "異步文件2"),
write_file("file3.txt", "異步文件3")
)
# 并發(fā)讀取文件
files = ["file1.txt", "file2.txt", "file3.txt"]
results = await asyncio.gather(*[read_file(f) for f in files])
# 打印內(nèi)容
for name, content in results:
print(f"{name} 內(nèi)容: {content}")
asyncio.run(main())4.3 異步數(shù)據(jù)庫操作(aiomysql)
aiomysql 是 MySQL 的異步驅(qū)動,支持異步連接、查詢、事務,避免同步 pymysql 的阻塞問題。
異步查詢 MySQL
import asyncio
import aiomysql
async def query_db():
# 建立異步連接
connection = await aiomysql.connect(
host='localhost',
port=3306,
user='root',
password='password',
db='test',
autocommit=True
)
try:
# 創(chuàng)建游標
async with connection.cursor(aiomysql.DictCursor) as cursor:
# 異步執(zhí)行查詢
await cursor.execute("SELECT * FROM users LIMIT 3")
# 異步獲取結(jié)果
results = await cursor.fetchall()
print("查詢結(jié)果:", results)
finally:
# 關(guān)閉連接
connection.close()
asyncio.run(query_db())5、總結(jié)
Python 異步編程通過事件循環(huán)驅(qū)動的任務切換,實現(xiàn)了 IO 密集型任務的高效并發(fā)。核心組件包括協(xié)程(任務單元)、事件循環(huán)(調(diào)度中心)、任務(并發(fā)單元)和 Future(結(jié)果容器)。
到此這篇關(guān)于Python 異步編程操作示例詳解的文章就介紹到這了,更多相關(guān)Python 異步編程內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
pytorch中forwod函數(shù)在父類中的調(diào)用方式解讀
這篇文章主要介紹了pytorch中forwod函數(shù)在父類中的調(diào)用方式解讀,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02
Python自動化辦公之圖片轉(zhuǎn)PDF的實現(xiàn)
實現(xiàn)圖片轉(zhuǎn)換成PDF文檔的操作方法有很多,綜合對比以后感覺fpdf這個模塊用起來比較方便而且代碼量相當少。所以本文將利用Python語言實現(xiàn)圖片轉(zhuǎn)PDF,感興趣的可以了解一下2022-04-04
python實現(xiàn)去除下載電影和電視劇文件名中的多余字符的方法
這篇文章主要介紹了python實現(xiàn)去除下載電影和電視劇文件名中的多余字符的方法,可以批量修改視頻文件名稱,非常具有實用價值,需要的朋友可以參考下2014-09-09

