Python標(biāo)準(zhǔn)庫(kù)asyncio用法完全指南
什么是 asyncio
asyncio 是 Python 3.4+ 引入的標(biāo)準(zhǔn)庫(kù),用于編寫并發(fā)代碼的異步 I/O 框架。它使用事件循環(huán)和協(xié)程實(shí)現(xiàn)單線程并發(fā),特別適合處理 I/O 密集型任務(wù)。
核心優(yōu)勢(shì):
- 單線程內(nèi)實(shí)現(xiàn)高并發(fā)
- 避免線程切換開銷
- 更少的內(nèi)存占用
- 代碼更易于理解和調(diào)試
核心概念
協(xié)程 (Coroutine)
使用 async def 定義的特殊函數(shù),可以在執(zhí)行過程中暫停和恢復(fù)。
async def my_coroutine():
print("開始執(zhí)行")
await asyncio.sleep(1)
print("執(zhí)行完成")
事件循環(huán) (Event Loop)
asyncio 的核心,負(fù)責(zé)調(diào)度和執(zhí)行協(xié)程。
await 關(guān)鍵字
用于等待異步操作完成,只能在 async 函數(shù)內(nèi)使用。
Task
對(duì)協(xié)程的封裝,可以并發(fā)執(zhí)行多個(gè)協(xié)程。
Future
表示一個(gè)異步操作的最終結(jié)果。
基礎(chǔ)用法
1. 運(yùn)行協(xié)程的三種方式
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
# 方式1: Python 3.7+ 推薦
asyncio.run(hello())
# 方式2: 手動(dòng)管理事件循環(huán)
loop = asyncio.get_event_loop()
loop.run_until_complete(hello())
loop.close()
# 方式3: 在已有事件循環(huán)中
# await hello() # 只能在異步函數(shù)內(nèi)使用
2. 創(chuàng)建和管理任務(wù)
async def task1():
await asyncio.sleep(2)
return "任務(wù)1完成"
async def task2():
await asyncio.sleep(1)
return "任務(wù)2完成"
async def main():
# 創(chuàng)建任務(wù)
t1 = asyncio.create_task(task1())
t2 = asyncio.create_task(task2())
# 等待所有任務(wù)完成
results = await asyncio.gather(t1, t2)
print(results)
asyncio.run(main())
3. 并發(fā)執(zhí)行
async def fetch_data(n):
print(f"開始獲取數(shù)據(jù) {n}")
await asyncio.sleep(1)
print(f"完成獲取數(shù)據(jù) {n}")
return f"數(shù)據(jù) {n}"
async def main():
# gather: 并發(fā)執(zhí)行,按順序返回結(jié)果
results = await asyncio.gather(
fetch_data(1),
fetch_data(2),
fetch_data(3)
)
print(results)
asyncio.run(main())
內(nèi)部機(jī)理
事件循環(huán)的工作原理
┌─────────────────────────────────┐ │ 事件循環(huán) (Event Loop) │ │ │ │ ┌──────────────────────────┐ │ │ │ 就緒隊(duì)列 (Ready Queue) │ │ │ │ [task1, task2, task3] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ 等待隊(duì)列 (Wait Queue) │ │ │ │ [task4, task5] │ │ │ └──────────────────────────┘ │ │ │ │ ┌──────────────────────────┐ │ │ │ I/O 選擇器 │ │ │ │ (epoll/kqueue/select) │ │ │ └──────────────────────────┘ │ └─────────────────────────────────┘
執(zhí)行流程
- 初始化: 創(chuàng)建事件循環(huán)
- 注冊(cè)協(xié)程: 將協(xié)程包裝成 Task 對(duì)象
- 調(diào)度執(zhí)行:
- 從就緒隊(duì)列取出 Task
- 執(zhí)行到 await 處暫停
- 注冊(cè)到相應(yīng)的等待隊(duì)列
- I/O 多路復(fù)用: 監(jiān)聽 I/O 事件
- 喚醒協(xié)程: I/O 完成后,將 Task 移回就緒隊(duì)列
- 循環(huán)往復(fù): 直到所有任務(wù)完成
協(xié)程狀態(tài)轉(zhuǎn)換
async def example():
print("1. 開始執(zhí)行") # RUNNING
await asyncio.sleep(1) # WAITING (掛起)
print("2. 繼續(xù)執(zhí)行") # RUNNING (恢復(fù))
return "完成" # FINISHED
底層實(shí)現(xiàn)關(guān)鍵點(diǎn)
# 簡(jiǎn)化的事件循環(huán)偽代碼
class EventLoop:
def __init__(self):
self._ready = deque() # 就緒隊(duì)列
self._selector = select.epoll() # I/O 選擇器
def run_until_complete(self, coro):
task = Task(coro)
self._ready.append(task)
while self._ready or self._has_pending_io():
# 執(zhí)行就緒的任務(wù)
if self._ready:
task = self._ready.popleft()
task.step()
# 等待 I/O 事件
events = self._selector.select(timeout=0)
for event in events:
self._ready.append(event.callback)
與多線程/多進(jìn)程的區(qū)別
對(duì)比表格
| 特性 | asyncio | 多線程 (threading) | 多進(jìn)程 (multiprocessing) |
|---|---|---|---|
| 并發(fā)模型 | 協(xié)作式并發(fā) | 搶占式并發(fā) | 真正并行 |
| 運(yùn)行環(huán)境 | 單線程 | 多線程 | 多進(jìn)程 |
| GIL 影響 | 無(wú)影響 | 受 GIL 限制 | 不受 GIL 限制 |
| 切換開銷 | 極小(用戶態(tài)) | 較大(內(nèi)核態(tài)) | 最大(進(jìn)程切換) |
| 內(nèi)存占用 | 低 | 中等 | 高 |
| 適用場(chǎng)景 | I/O 密集型 | I/O 密集型 | CPU 密集型 |
| 數(shù)據(jù)共享 | 簡(jiǎn)單(同一線程) | 需要鎖 | 需要 IPC |
| 調(diào)試難度 | 容易 | 困難 | 中等 |
使用場(chǎng)景詳解
asyncio 適合的場(chǎng)景
- 網(wǎng)絡(luò)請(qǐng)求(爬蟲、API 調(diào)用)
- 文件 I/O
- 數(shù)據(jù)庫(kù)查詢
- WebSocket 連接
- 異步消息隊(duì)列
# 典型的 asyncio 場(chǎng)景: 并發(fā)網(wǎng)絡(luò)請(qǐng)求
import aiohttp
import asyncio
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = ['http://example.com'] * 100
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 個(gè)請(qǐng)求")
asyncio.run(main())
多線程適合的場(chǎng)景
- 有阻塞 I/O 操作且無(wú)異步替代
- 需要并發(fā)執(zhí)行的庫(kù)不支持 asyncio
- 中等規(guī)模并發(fā)
import threading
import requests
def fetch_url(url):
response = requests.get(url)
print(f"完成: {url}")
urls = ['http://example.com'] * 10
threads = [threading.Thread(target=fetch_url, args=(url,)) for url in urls]
for t in threads:
t.start()
for t in threads:
t.join()
多進(jìn)程適合的場(chǎng)景
- CPU 密集型計(jì)算
- 需要繞過 GIL
- 數(shù)據(jù)并行處理
from multiprocessing import Pool
def cpu_intensive(n):
return sum(i*i for i in range(n))
with Pool(4) as pool:
results = pool.map(cpu_intensive, [10000000] * 4)
print(results)
性能對(duì)比示例
import asyncio
import threading
import multiprocessing
import time
# I/O 密集型任務(wù)
def io_bound_sync():
time.sleep(1)
async def io_bound_async():
await asyncio.sleep(1)
# 測(cè)試 asyncio
async def test_asyncio():
start = time.time()
await asyncio.gather(*[io_bound_async() for _ in range(100)])
print(f"asyncio: {time.time() - start:.2f}s") # 約 1 秒
# 測(cè)試多線程
def test_threading():
start = time.time()
threads = [threading.Thread(target=io_bound_sync) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
print(f"threading: {time.time() - start:.2f}s") # 約 1-2 秒
# asyncio 在 I/O 密集型任務(wù)中表現(xiàn)最優(yōu)
高級(jí)特性
1. 異步上下文管理器
class AsyncResource:
async def __aenter__(self):
print("獲取資源")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("釋放資源")
await asyncio.sleep(1)
async def main():
async with AsyncResource() as resource:
print("使用資源")
asyncio.run(main())
2. 異步迭代器
class AsyncRange:
def __init__(self, start, end):
self.current = start
self.end = end
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.end:
raise StopAsyncIteration
await asyncio.sleep(0.1)
self.current += 1
return self.current - 1
async def main():
async for i in AsyncRange(0, 5):
print(i)
asyncio.run(main())
3. 異步生成器
async def async_generator():
for i in range(5):
await asyncio.sleep(1)
yield i
async def main():
async for value in async_generator():
print(value)
asyncio.run(main())
4. 超時(shí)控制
async def slow_operation():
await asyncio.sleep(5)
return "完成"
async def main():
try:
# 設(shè)置 2 秒超時(shí)
result = await asyncio.wait_for(slow_operation(), timeout=2)
except asyncio.TimeoutError:
print("操作超時(shí)")
asyncio.run(main())
5. 信號(hào)量和鎖
# 限制并發(fā)數(shù)量
async def limited_task(sem, n):
async with sem:
print(f"任務(wù) {n} 開始")
await asyncio.sleep(1)
print(f"任務(wù) {n} 完成")
async def main():
sem = asyncio.Semaphore(3) # 最多 3 個(gè)并發(fā)
await asyncio.gather(*[limited_task(sem, i) for i in range(10)])
asyncio.run(main())
6. 隊(duì)列
async def producer(queue, n):
for i in range(n):
await queue.put(i)
print(f"生產(chǎn): {i}")
await asyncio.sleep(0.5)
async def consumer(queue, name):
while True:
item = await queue.get()
print(f"{name} 消費(fèi): {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 創(chuàng)建生產(chǎn)者和消費(fèi)者
producers = [asyncio.create_task(producer(queue, 5))]
consumers = [asyncio.create_task(consumer(queue, f"消費(fèi)者{i}"))
for i in range(2)]
await asyncio.gather(*producers)
await queue.join() # 等待所有任務(wù)處理完成
for c in consumers:
c.cancel()
asyncio.run(main())
實(shí)戰(zhàn)示例
示例1: 異步網(wǎng)絡(luò)爬蟲
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch_page(session, url):
try:
async with session.get(url, timeout=10) as response:
return await response.text()
except Exception as e:
print(f"錯(cuò)誤 {url}: {e}")
return None
async def parse_page(html):
if html:
soup = BeautifulSoup(html, 'html.parser')
return soup.title.string if soup.title else "無(wú)標(biāo)題"
return None
async def crawl_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
titles = await asyncio.gather(*[parse_page(page) for page in pages])
return titles
urls = [
'http://example.com',
'http://example.org',
'http://example.net',
]
# 執(zhí)行爬蟲
# titles = asyncio.run(crawl_urls(urls))
# print(titles)
示例2: 異步數(shù)據(jù)庫(kù)操作
import asyncio
import aiosqlite
async def create_table(db):
await db.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY,
name TEXT,
email TEXT
)
''')
await db.commit()
async def insert_user(db, name, email):
await db.execute(
'INSERT INTO users (name, email) VALUES (?, ?)',
(name, email)
)
await db.commit()
async def fetch_users(db):
async with db.execute('SELECT * FROM users') as cursor:
return await cursor.fetchall()
async def main():
async with aiosqlite.connect('test.db') as db:
await create_table(db)
# 并發(fā)插入
await asyncio.gather(
insert_user(db, 'Alice', 'alice@example.com'),
insert_user(db, 'Bob', 'bob@example.com'),
insert_user(db, 'Charlie', 'charlie@example.com')
)
users = await fetch_users(db)
for user in users:
print(user)
# asyncio.run(main())
示例3: 異步 Web 服務(wù)器
import asyncio
async def handle_client(reader, writer):
data = await reader.read(1024)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"收到來(lái)自 {addr} 的消息: {message}")
response = f"Echo: {message}"
writer.write(response.encode())
await writer.drain()
writer.close()
await writer.wait_closed()
async def start_server():
server = await asyncio.start_server(
handle_client, '127.0.0.1', 8888
)
addr = server.sockets[0].getsockname()
print(f"服務(wù)器啟動(dòng)在 {addr}")
async with server:
await server.serve_forever()
# asyncio.run(start_server())
示例4: 實(shí)時(shí)數(shù)據(jù)流處理
import asyncio
import random
async def data_stream():
"""模擬數(shù)據(jù)流"""
while True:
yield random.randint(1, 100)
await asyncio.sleep(0.5)
async def process_data(value):
"""處理數(shù)據(jù)"""
await asyncio.sleep(0.1)
return value * 2
async def monitor_stream():
"""監(jiān)控和處理數(shù)據(jù)流"""
buffer = []
async for data in data_stream():
print(f"接收數(shù)據(jù): {data}")
# 異步處理數(shù)據(jù)
task = asyncio.create_task(process_data(data))
buffer.append(task)
# 每 5 個(gè)數(shù)據(jù)批量處理
if len(buffer) >= 5:
results = await asyncio.gather(*buffer)
print(f"處理結(jié)果: {results}")
buffer = []
# 演示用,處理 20 個(gè)數(shù)據(jù)后停止
if data > 20:
break
# asyncio.run(monitor_stream())
示例5: 多任務(wù)協(xié)調(diào)
import asyncio
async def task_with_priority(name, priority, duration):
print(f"[優(yōu)先級(jí) {priority}] {name} 開始")
await asyncio.sleep(duration)
print(f"[優(yōu)先級(jí) {priority}] {name} 完成")
return f"{name} 結(jié)果"
async def coordinator():
# 創(chuàng)建不同優(yōu)先級(jí)的任務(wù)
high_priority = [
task_with_priority(f"高優(yōu)先級(jí)-{i}", 1, 1)
for i in range(3)
]
low_priority = [
task_with_priority(f"低優(yōu)先級(jí)-{i}", 3, 2)
for i in range(3)
]
# 先執(zhí)行高優(yōu)先級(jí)任務(wù)
high_results = await asyncio.gather(*high_priority)
print(f"高優(yōu)先級(jí)完成: {high_results}")
# 再執(zhí)行低優(yōu)先級(jí)任務(wù)
low_results = await asyncio.gather(*low_priority)
print(f"低優(yōu)先級(jí)完成: {low_results}")
asyncio.run(coordinator())
最佳實(shí)踐
1. 錯(cuò)誤處理
async def safe_task(n):
try:
if n == 3:
raise ValueError("錯(cuò)誤的值")
await asyncio.sleep(1)
return f"任務(wù) {n} 成功"
except Exception as e:
print(f"任務(wù) {n} 失敗: {e}")
return None
async def main():
results = await asyncio.gather(
safe_task(1),
safe_task(2),
safe_task(3),
return_exceptions=True # 不會(huì)因?yàn)橐粋€(gè)任務(wù)失敗而停止
)
print(results)
asyncio.run(main())
2. 資源清理
class AsyncConnection:
async def __aenter__(self):
self.conn = await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def connect(self):
print("建立連接")
await asyncio.sleep(1)
return "connection"
async def close(self):
print("關(guān)閉連接")
await asyncio.sleep(1)
async def main():
async with AsyncConnection() as conn:
print(f"使用連接: {conn.conn}")
asyncio.run(main())
3. 避免阻塞
import asyncio
from concurrent.futures import ThreadPoolExecutor
# 錯(cuò)誤: 阻塞操作
async def bad_example():
time.sleep(5) # 這會(huì)阻塞整個(gè)事件循環(huán)!
# 正確: 使用 executor 運(yùn)行阻塞操作
async def good_example():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(executor, blocking_function)
return result
def blocking_function():
import time
time.sleep(5)
return "完成"
4. 合理設(shè)置超時(shí)
async def with_timeout():
try:
result = await asyncio.wait_for(
long_running_task(),
timeout=5.0
)
except asyncio.TimeoutError:
print("任務(wù)超時(shí),執(zhí)行回退邏輯")
result = "默認(rèn)值"
return result
5. 使用 TaskGroup (Python 3.11+)
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro())
task2 = tg.create_task(another_coro())
# 所有任務(wù)完成或某個(gè)任務(wù)失敗時(shí)退出
print("所有任務(wù)完成")
總結(jié)
asyncio 的核心優(yōu)勢(shì)
- 高性能: 單線程處理大量并發(fā),避免線程切換開銷
- 低資源消耗: 協(xié)程比線程更輕量,內(nèi)存占用少
- 代碼可讀性: async/await 語(yǔ)法讓異步代碼像同步代碼一樣易讀
- 豐富的生態(tài): aiohttp、aiopg、aiomysql 等大量異步庫(kù)支持
何時(shí)使用 asyncio
? 適合使用:
- 大量網(wǎng)絡(luò) I/O 操作(爬蟲、API 服務(wù))
- WebSocket 長(zhǎng)連接
- 異步數(shù)據(jù)庫(kù)操作
- 實(shí)時(shí)數(shù)據(jù)處理
- 微服務(wù)間通信
? 不適合使用:
- CPU 密集型計(jì)算(使用多進(jìn)程)
- 簡(jiǎn)單的小型腳本
- 依賴大量同步庫(kù)的項(xiàng)目
- 團(tuán)隊(duì)不熟悉異步編程
學(xué)習(xí)路徑建議
- 基礎(chǔ)階段: 掌握 async/await、事件循環(huán)、Task 基本概念
- 進(jìn)階階段: 學(xué)習(xí)異步上下文管理器、生成器、同步原語(yǔ)
- 實(shí)戰(zhàn)階段: 使用 aiohttp、aiopg 等庫(kù)構(gòu)建實(shí)際項(xiàng)目
- 優(yōu)化階段: 性能分析、錯(cuò)誤處理、最佳實(shí)踐
常見陷阱
- 在異步函數(shù)中使用阻塞調(diào)用
- 忘記使用 await 導(dǎo)致協(xié)程未執(zhí)行
- 過度使用 asyncio(簡(jiǎn)單任務(wù)反而降低效率)
- 忽略異常處理導(dǎo)致任務(wù)靜默失敗
- 不理解事件循環(huán)的單線程特性
參考資源
結(jié)語(yǔ): asyncio 是 Python 異步編程的強(qiáng)大工具,掌握它可以顯著提升 I/O 密集型應(yīng)用的性能。理解其內(nèi)部機(jī)制和最佳實(shí)踐,能幫助你寫出高效、可維護(hù)的異步代碼。
總結(jié)
到此這篇關(guān)于Python標(biāo)準(zhǔn)庫(kù)asyncio用法完全指南的文章就介紹到這了,更多相關(guān)Python asyncio完全指南內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python實(shí)現(xiàn)一行代碼自動(dòng)繪制藝術(shù)畫
DiscoArt?是一個(gè)很牛的開源模塊,它能根據(jù)你給定的關(guān)鍵詞自動(dòng)繪畫。本文就將利用這一模塊實(shí)現(xiàn)一行代碼自動(dòng)繪制藝術(shù)畫,需要的可以參考一下2022-12-12
Python?input輸入超時(shí)選擇默認(rèn)值自動(dòng)跳過問題
這篇文章主要介紹了Python?input輸入超時(shí)選擇默認(rèn)值自動(dòng)跳過問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-02-02
Python接收Gmail新郵件并發(fā)送到gtalk的方法
這篇文章主要介紹了Python接收Gmail新郵件并發(fā)送到gtalk的方法,實(shí)例分析了Python操作郵件的技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-03-03
簡(jiǎn)單且有用的Python數(shù)據(jù)分析和機(jī)器學(xué)習(xí)代碼
Python編程是一種通用的編程語(yǔ)言,開源、靈活、功能強(qiáng)大且易于使用,python最重要的特性之一是其用于數(shù)據(jù)處理和分析任務(wù)的豐富實(shí)用程序和庫(kù)集,這篇文章主要給大家介紹了一些簡(jiǎn)單且有用的Python數(shù)據(jù)分析和機(jī)器學(xué)習(xí)代碼,需要的朋友可以參考下2021-07-07
Pandas?DataFrame添加一行數(shù)據(jù)的幾種方法
在處理數(shù)據(jù)分析和數(shù)據(jù)科學(xué)項(xiàng)目時(shí),經(jīng)常會(huì)使用到Python中的pandas庫(kù)來(lái)進(jìn)行數(shù)據(jù)操作和分析,其中DataFrame是pandas庫(kù)中最重要的數(shù)據(jù)結(jié)構(gòu)之一,這篇文章主要給大家介紹了關(guān)于Pandas?DataFrame添加一行數(shù)據(jù)的幾種方法,需要的朋友可以參考下2024-08-08
python 用正則表達(dá)式篩選文本信息的實(shí)例
今天小編就為大家分享一篇python 用正則表達(dá)式篩選文本信息的實(shí)例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2018-06-06
python開發(fā)微信服務(wù)號(hào)消息推送示例
這篇文章主要為大家介紹了python開發(fā)微信服務(wù)號(hào)消息推送示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-10-10
python簡(jiǎn)單實(shí)現(xiàn)最大似然估計(jì)&scipy庫(kù)的使用詳解
這篇文章主要介紹了python簡(jiǎn)單實(shí)現(xiàn)最大似然估計(jì)&scipy庫(kù)的使用詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來(lái)看看吧2020-04-04

