Python異步多進(jìn)程調(diào)度系統(tǒng)的完整實(shí)現(xiàn)與實(shí)戰(zhàn)指南
1. 引言:Python多進(jìn)程編程的價(jià)值
在當(dāng)今數(shù)據(jù)密集型的應(yīng)用場景中,高效處理并行任務(wù)是提升程序性能的關(guān)鍵。Python的全局解釋器鎖(GIL)限制了線程的并行執(zhí)行能力,使得多進(jìn)程編程成為CPU密集型任務(wù)的首選方案。本文將深入探討如何構(gòu)建一個(gè)功能完整的異步多進(jìn)程調(diào)度系統(tǒng),實(shí)現(xiàn)任務(wù)分發(fā)、實(shí)時(shí)監(jiān)控和結(jié)果管理的一體化解決方案。
傳統(tǒng)的同步編程模型在處理大量I/O操作或計(jì)算任務(wù)時(shí)效率低下,而多進(jìn)程技術(shù)可以充分利用多核CPU優(yōu)勢,顯著提升吞吐量。與多線程相比,多進(jìn)程避免了GIL的限制,每個(gè)進(jìn)程擁有獨(dú)立的Python解釋器和內(nèi)存空間,能夠?qū)崿F(xiàn)真正的并行計(jì)算。
本文將基于Python內(nèi)置的multiprocessing模塊,展示一個(gè)生產(chǎn)級別的多進(jìn)程調(diào)度系統(tǒng)實(shí)現(xiàn),涵蓋進(jìn)程池管理、進(jìn)程間通信、實(shí)時(shí)監(jiān)控和結(jié)果持久化等核心功能。
2. 系統(tǒng)架構(gòu)與設(shè)計(jì)原理
2.1 核心組件概述
我們的異步多進(jìn)程調(diào)度系統(tǒng)采用模塊化設(shè)計(jì),每個(gè)組件負(fù)責(zé)特定的功能領(lǐng)域:
- 任務(wù)執(zhí)行器:負(fù)責(zé)具體業(yè)務(wù)的處理,支持異步執(zhí)行和超時(shí)控制
- 結(jié)果存儲器:提供統(tǒng)一的結(jié)果存儲接口,支持內(nèi)存共享和文件持久化
- 監(jiān)控器:實(shí)時(shí)跟蹤任務(wù)狀態(tài),提供進(jìn)度統(tǒng)計(jì)和可視化反饋
- 進(jìn)程池管理器:優(yōu)化資源分配,控制并發(fā)進(jìn)程數(shù)量
2.2 多進(jìn)程基礎(chǔ)
Python的multiprocessing模塊通過創(chuàng)建獨(dú)立的子進(jìn)程來規(guī)避GIL限制,每個(gè)子進(jìn)程擁有自己的Python解釋器和內(nèi)存空間。與多線程相比,多進(jìn)程更適合CPU密集型任務(wù),但進(jìn)程間通信成本較高,需要特殊機(jī)制實(shí)現(xiàn)數(shù)據(jù)交換。
# 基本的多進(jìn)程創(chuàng)建示例
from multiprocessing import Process
def worker(task_id):
print(f"處理任務(wù) {task_id}")
if __name__ == '__main__':
processes = []
for i in range(3):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
系統(tǒng)采用Manager模式實(shí)現(xiàn)進(jìn)程間數(shù)據(jù)共享,通過BaseManager創(chuàng)建的共享對象可以在多個(gè)進(jìn)程間安全訪問。
3. 核心實(shí)現(xiàn)詳解
3.1 異步結(jié)果存儲機(jī)制
AsyncResultStorage類是系統(tǒng)的數(shù)據(jù)持久化核心,它采用雙存儲策略:內(nèi)存共享用于實(shí)時(shí)訪問,文件系統(tǒng)用于長期持久化。
class AsyncResultStorage:
"""異步結(jié)果存儲器"""
def __init__(self, storage_file="async_results.json", manager=None):
self.storage_file = storage_file
# 使用傳入的manager創(chuàng)建共享列表和鎖
if manager:
self.results = manager.list()
self.lock = manager.Lock()
else:
self.results = []
self.lock = multiprocessing.Lock()
存儲機(jī)制的關(guān)鍵優(yōu)勢在于:
- 線程安全:通過Lock確保并發(fā)寫入的數(shù)據(jù)一致性
- 異步持久化:文件寫入操作不影響主任務(wù)執(zhí)行流程
- 靈活查詢:支持按狀態(tài)篩選和統(tǒng)計(jì)信息生成
共享數(shù)據(jù)的管理需要特別注意同步問題,使用Lock可以避免競爭條件,確保數(shù)據(jù)完整性。
3.2 進(jìn)程池管理與任務(wù)調(diào)度
系統(tǒng)采用ProcessPoolExecutor作為進(jìn)程池管理引擎,相比原生的multiprocessing.Pool,它提供了更簡潔的API和更好的未來對象支持。
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交所有任務(wù)
future_to_task = {
executor.submit(send_request, task_id, status_queue, storage): task_id
for task_id in range(num_tasks)
}
# 使用as_completed獲取完成的任務(wù)
for future in as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result(timeout=30)
except Exception as e:
logger.error(f"任務(wù) {task_id} 執(zhí)行異常: {e}")
進(jìn)程池的大小配置是性能調(diào)優(yōu)的關(guān)鍵因素。過多的進(jìn)程會導(dǎo)致資源競爭,過少的進(jìn)程無法充分利用CPU。實(shí)踐中,通常將進(jìn)程數(shù)設(shè)置為CPU核心數(shù)或稍多。
3.3 實(shí)時(shí)監(jiān)控與進(jìn)度跟蹤
監(jiān)控進(jìn)程是系統(tǒng)的可視化核心,它通過多進(jìn)程隊(duì)列實(shí)現(xiàn)與工作進(jìn)程的通信。
def monitor_status(status_queue, total_tasks, storage):
"""增強(qiáng)的監(jiān)控函數(shù),支持實(shí)時(shí)統(tǒng)計(jì)和結(jié)果查詢"""
completed = failed = 0
start_time = time.time()
while completed + failed < total_tasks:
if not status_queue.empty():
status_info = status_queue.get(timeout=1)
# 更新進(jìn)度狀態(tài)...
# 進(jìn)度條顯示邏輯
progress = (completed + failed) / total_tasks
bar_length = 30
filled_length = int(bar_length * progress)
bar = '█' * filled_length + '?' * (bar_length - filled_length)
監(jiān)控器提供以下關(guān)鍵功能:
- 實(shí)時(shí)進(jìn)度更新:每秒刷新進(jìn)度條和統(tǒng)計(jì)信息
- ETA預(yù)估:基于平均處理時(shí)間預(yù)測剩余時(shí)間
- 異常警報(bào):即時(shí)發(fā)現(xiàn)并報(bào)告失敗任務(wù)
進(jìn)度可視化不僅提升用戶體驗(yàn),還能幫助開發(fā)者快速識別系統(tǒng)瓶頸,優(yōu)化任務(wù)分配策略。
4. 進(jìn)程間通信與數(shù)據(jù)共享
4.1 隊(duì)列通信機(jī)制
系統(tǒng)使用Queue作為進(jìn)程間通信橋梁,實(shí)現(xiàn)工作進(jìn)程與監(jiān)控進(jìn)程的解耦。
# 創(chuàng)建管理器共享隊(duì)列
manager = multiprocessing.Manager()
status_queue = manager.Queue()
# 工作進(jìn)程發(fā)送狀態(tài)
status_queue.put({
'task_id': task_id,
'status': scenario['status'],
'message': scenario['message'],
'timestamp': datetime.now().isoformat()
})
Queue內(nèi)部使用管道和鎖機(jī)制,保證消息的順序傳遞和線程安全。與Pipe相比,Queue支持多生產(chǎn)者和多消費(fèi)者模式,更適合本系統(tǒng)的架構(gòu)。
4.2 共享數(shù)據(jù)管理
Manager對象提供了一種高級共享數(shù)據(jù)解決方案,支持列表、字典、鎖等復(fù)雜數(shù)據(jù)結(jié)構(gòu)的跨進(jìn)程訪問。
# 使用Manager創(chuàng)建共享存儲
manager = multiprocessing.Manager()
storage = AsyncResultStorage(manager=manager)
# 多個(gè)進(jìn)程可以安全訪問共享數(shù)據(jù)
with storage.lock:
storage.results.append(new_result)
共享數(shù)據(jù)的訪問需要通過鎖機(jī)制進(jìn)行同步,防止競態(tài)條件導(dǎo)致的數(shù)據(jù)不一致。本系統(tǒng)在AsyncResultStorage內(nèi)部封裝了鎖邏輯,簡化了使用復(fù)雜度。
5. 錯(cuò)誤處理與容錯(cuò)機(jī)制
5.1 異常捕獲與恢復(fù)
系統(tǒng)的容錯(cuò)性能直接影響穩(wěn)定性,我們在多個(gè)層面實(shí)現(xiàn)異常處理:
def send_request(task_id, status_queue, storage):
try:
# 任務(wù)邏輯...
except Exception as e:
error_msg = f"任務(wù) {task_id} 執(zhí)行異常: {str(e)}"
logger.error(error_msg)
# 存儲異常結(jié)果,確保不會丟失失敗記錄
storage.add_result(
task_id=task_id,
status='Failed',
message=error_msg,
response_data=None
)
異常處理策略包括:
- 任務(wù)級隔離:單個(gè)任務(wù)失敗不影響整體進(jìn)程池
- 異常記錄:詳細(xì)記錄錯(cuò)誤上下文便于調(diào)試
- 優(yōu)雅降級:失敗任務(wù)跳過不影響后續(xù)處理
5.2 超時(shí)控制與資源管理
為避免任務(wù)無限阻塞,系統(tǒng)實(shí)現(xiàn)超時(shí)控制機(jī)制:
# 任務(wù)執(zhí)行超時(shí)控制
for future in as_completed(future_to_task):
try:
result = future.result(timeout=30) # 30秒超時(shí)
except TimeoutError:
logger.error("任務(wù)執(zhí)行超時(shí)")
# 標(biāo)記任務(wù)為失敗,釋放資源
超時(shí)機(jī)制配合進(jìn)程池的自動(dòng)清理功能,防止僵尸進(jìn)程積累,確保系統(tǒng)長期穩(wěn)定運(yùn)行。
6. 性能優(yōu)化與實(shí)踐建議
6.1 進(jìn)程池配置優(yōu)化
根據(jù)任務(wù)特性調(diào)整進(jìn)程池參數(shù)可以顯著提升性能:
- CPU密集型任務(wù):進(jìn)程數(shù)≈CPU核心數(shù)
- I/O密集型任務(wù):可適當(dāng)增加進(jìn)程數(shù)
- 內(nèi)存限制:控制總進(jìn)程數(shù)避免內(nèi)存溢出
# 根據(jù)任務(wù)類型動(dòng)態(tài)調(diào)整進(jìn)程數(shù)
if task_type == "cpu_intensive":
workers = multiprocessing.cpu_count()
elif task_type == "io_intensive":
workers = multiprocessing.cpu_count() * 2
else:
workers = 4
6.2 內(nèi)存與資源管理
多進(jìn)程環(huán)境下的資源管理尤為重要:
- 數(shù)據(jù)序列化:進(jìn)程間通信需要序列化數(shù)據(jù),控制傳輸量
- 內(nèi)存共享:大型數(shù)據(jù)使用共享內(nèi)存減少拷貝開銷
- 及時(shí)清理:顯式關(guān)閉進(jìn)程釋放資源
系統(tǒng)使用with語句確保資源正確釋放,避免資源泄漏:
with ProcessPoolExecutor(max_workers=4) as executor:
# 執(zhí)行任務(wù)...
# 池自動(dòng)清理,無需手動(dòng)調(diào)用shutdown
7. 實(shí)際應(yīng)用場景擴(kuò)展
本文的異步多進(jìn)程調(diào)度系統(tǒng)可應(yīng)用于多種場景:
7.1 大規(guī)模數(shù)據(jù)處理的實(shí)踐應(yīng)用
- 數(shù)據(jù)清洗與轉(zhuǎn)換:并行處理多個(gè)數(shù)據(jù)文件
- Web爬蟲:并發(fā)請求提高采集效率
- 模型推理:并行執(zhí)行多個(gè)預(yù)測任務(wù)
7.2 與分布式任務(wù)隊(duì)列的集成
對于更大規(guī)模的應(yīng)用,可將系統(tǒng)與分布式任務(wù)隊(duì)列(如Celery)集成:
# 與Celery集成的示例
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def distributed_task(task_id):
# 在分布式環(huán)境中執(zhí)行任務(wù)
return send_request(task_id, status_queue, storage)
這種架構(gòu)結(jié)合了多進(jìn)程的高性能和分布式系統(tǒng)的可擴(kuò)展性,適合超大規(guī)模任務(wù)處理。
8. 總結(jié)
本文詳細(xì)介紹了一個(gè)基于Python multiprocessing模塊的異步多進(jìn)程調(diào)度系統(tǒng),涵蓋了從基礎(chǔ)概念到高級優(yōu)化的全方位內(nèi)容。系統(tǒng)具備以下特點(diǎn):
- 功能完整:集任務(wù)執(zhí)行、監(jiān)控、存儲于一體的解決方案
- 穩(wěn)定可靠:全面的錯(cuò)誤處理和資源管理機(jī)制
- 易于擴(kuò)展:模塊化設(shè)計(jì)支持功能定制和規(guī)模擴(kuò)展
- 用戶友好:實(shí)時(shí)進(jìn)度反饋和詳細(xì)統(tǒng)計(jì)信息
多進(jìn)程編程是Python高性能計(jì)算的重要技術(shù),掌握它能夠讓開發(fā)者應(yīng)對更復(fù)雜的計(jì)算場景。本文提供的實(shí)現(xiàn)方案既可直接用于生產(chǎn)環(huán)境,也可作為進(jìn)一步學(xué)習(xí)多進(jìn)程編程的基礎(chǔ)框架。
希望本文能幫助讀者深入理解Python多進(jìn)程編程,并在實(shí)際項(xiàng)目中應(yīng)用這些技術(shù),提升應(yīng)用性能和用戶體驗(yàn)。
import multiprocessing
import requests
import time
import random
import json
import logging
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor, as_completed
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
class AsyncResultStorage:
"""異步結(jié)果存儲器"""
def __init__(self, storage_file="async_results.json", manager=None):
self.storage_file = storage_file
# 使用傳入的manager創(chuàng)建共享列表和鎖,如果沒有提供manager則創(chuàng)建普通對象
if manager:
self.results = manager.list()
self.lock = manager.Lock()
else:
self.results = []
self.lock = multiprocessing.Lock()
def add_result(self, task_id, status, message, response_data=None, timestamp=None):
"""添加結(jié)果到存儲(線程安全)"""
if timestamp is None:
timestamp = datetime.now().isoformat()
result = {
'task_id': task_id,
'status': status,
'message': message,
'response_data': response_data,
'timestamp': timestamp,
'process_id': multiprocessing.current_process().pid
}
# 使用鎖確保線程安全
with self.lock:
self.results.append(result)
# 異步寫入文件(實(shí)際應(yīng)用中可替換為數(shù)據(jù)庫存儲)
self._async_save_to_file(result)
return result
def _async_save_to_file(self, result):
"""異步保存結(jié)果到文件"""
try:
with open(self.storage_file, 'a', encoding='utf-8') as f:
f.write(json.dumps(result, ensure_ascii=False) + '\n')
except Exception as e:
logger.error(f"保存結(jié)果到文件失敗: {e}")
def get_results_by_status(self, status):
"""根據(jù)狀態(tài)篩選結(jié)果"""
with self.lock:
return [r for r in self.results if r['status'] == status]
def get_statistics(self):
"""獲取統(tǒng)計(jì)信息"""
with self.lock:
total = len(self.results)
completed = len([r for r in self.results if r['status'] == "Completed"])
failed = len([r for r in self.results if r['status'] == "Failed"])
return {
'total_tasks': total,
'completed': completed,
'failed': failed,
'success_rate': completed / total if total > 0 else 0
}
def send_request(task_id, status_queue, storage):
"""改進(jìn)的請求發(fā)送函數(shù),支持結(jié)果存儲"""
logger.info(f"任務(wù) {task_id} 開始執(zhí)行,進(jìn)程ID: {multiprocessing.current_process().pid}")
try:
# 模擬不同的請求類型和參數(shù)
request_types = ['GET', 'POST']
request_type = random.choice(request_types)
# 模擬請求延遲
delay = random.uniform(0.5, 3.0)
time.sleep(delay)
# 模擬不同的響應(yīng)情況
response_scenarios = [
{'status': 'Completed', 'message': '請求成功', 'data': {'score': random.randint(1, 100)}},
{'status': 'Completed', 'message': '請求成功', 'data': {'result': 'ok'}},
{'status': 'Failed', 'message': '請求超時(shí)', 'data': None},
{'status': 'Failed', 'message': '服務(wù)器錯(cuò)誤', 'data': None}
]
scenario = random.choice(response_scenarios)
# 存儲詳細(xì)結(jié)果
storage.add_result(
task_id=task_id,
status=scenario['status'],
message=scenario['message'],
response_data=scenario['data']
)
# 通知狀態(tài)隊(duì)列(用于實(shí)時(shí)監(jiān)控)
status_queue.put({
'task_id': task_id,
'status': scenario['status'],
'message': scenario['message'],
'timestamp': datetime.now().isoformat()
})
logger.info(f"任務(wù) {task_id} 完成,狀態(tài): {scenario['status']}")
return scenario['status']
except Exception as e:
error_msg = f"任務(wù) {task_id} 執(zhí)行異常: {str(e)}"
logger.error(error_msg)
# 存儲異常結(jié)果
storage.add_result(
task_id=task_id,
status='Failed',
message=error_msg,
response_data=None
)
status_queue.put({
'task_id': task_id,
'status': 'Failed',
'message': error_msg,
'timestamp': datetime.now().isoformat()
})
return 'Failed'
def format_time_delta(seconds):
"""格式化時(shí)間差"""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
def monitor_status(status_queue, total_tasks, storage):
"""增強(qiáng)的監(jiān)控函數(shù),支持實(shí)時(shí)統(tǒng)計(jì)和結(jié)果查詢"""
completed = failed = 0
last_report_time = time.time()
start_time = time.time()
# 進(jìn)度條相關(guān)變量
last_progress_update = time.time()
logger.info(f"監(jiān)控進(jìn)程啟動(dòng),共監(jiān)控 {total_tasks} 個(gè)任務(wù)")
while completed + failed < total_tasks:
try:
# 非阻塞獲取狀態(tài)信息
if not status_queue.empty():
status_info = status_queue.get(timeout=1)
task_id = status_info['task_id']
print(
f"[{status_info['timestamp']}] 任務(wù) {task_id} 狀態(tài): {status_info['status']} - {status_info['message']}")
if status_info['status'] == "Completed":
completed += 1
elif status_info['status'] == "Failed":
failed += 1
# 更新進(jìn)度條顯示
current_time = time.time()
if current_time - last_progress_update >= 1: # 每秒更新一次進(jìn)度條
processed_tasks = completed + failed
progress = processed_tasks / total_tasks
percentage = progress * 100
# 計(jì)算 ETA
elapsed_time = current_time - start_time
if processed_tasks > 0:
avg_time_per_task = elapsed_time / processed_tasks
remaining_tasks = total_tasks - processed_tasks
eta_seconds = avg_time_per_task * remaining_tasks
eta_formatted = format_time_delta(eta_seconds)
avg_time_formatted = format_time_delta(avg_time_per_task)
else:
eta_formatted = "未知"
avg_time_formatted = "未知"
# 顯示進(jìn)度條
bar_length = 30
filled_length = int(bar_length * progress)
bar = '█' * filled_length + '?' * (bar_length - filled_length)
print(f"\r進(jìn)度: |{bar}| {percentage:.1f}% 完成 ({processed_tasks}/{total_tasks}), "
f"平均耗時(shí): {avg_time_formatted}, ETA: {eta_formatted}", end='', flush=True)
last_progress_update = current_time
else:
time.sleep(0.1) # 短暫休眠避免忙等待
# 每5秒報(bào)告一次統(tǒng)計(jì)信息
current_time = time.time()
if current_time - last_report_time >= 5:
stats = storage.get_statistics()
logger.info(
f"任務(wù)進(jìn)度: {completed + failed}/{total_tasks} "
f"(成功: {stats['completed']}, 失敗: {stats['failed']}, "
f"成功率: {stats['success_rate']:.2%})"
)
last_report_time = current_time
except Exception as e:
logger.error(f"監(jiān)控進(jìn)程異常: {e}")
time.sleep(1) # 異常時(shí)休眠1秒
# 最終統(tǒng)計(jì)報(bào)告
final_stats = storage.get_statistics()
total_elapsed_time = time.time() - start_time
print() # 換行
logger.info(f"所有任務(wù)執(zhí)行完成!總耗時(shí): {total_elapsed_time:.2f} 秒, 最終統(tǒng)計(jì): {final_stats}")
def result_query_handler(storage):
"""結(jié)果查詢處理器(可擴(kuò)展為API接口)"""
def get_results(status=None, limit=None):
with storage.lock:
results = list(storage.results) # 轉(zhuǎn)換為普通列表
if status:
results = [r for r in results if r['status'] == status]
if limit:
results = results[:limit]
return results
return get_results
def main_enhanced():
"""增強(qiáng)的主調(diào)度函數(shù)"""
num_tasks = 10 # 增加任務(wù)數(shù)量以更好測試異步性能
# 使用Manager創(chuàng)建可以在進(jìn)程間共享的隊(duì)列和鎖
manager = multiprocessing.Manager()
status_queue = manager.Queue()
# 創(chuàng)建共享存儲實(shí)例,傳入manager以便創(chuàng)建可共享的對象
storage = AsyncResultStorage(manager=manager)
logger.info("啟動(dòng)異步多進(jìn)程調(diào)度系統(tǒng)...")
logger.info(f"總共 {num_tasks} 個(gè)任務(wù)需要執(zhí)行")
start_time = time.time()
# 啟動(dòng)監(jiān)控進(jìn)程
monitor_process = multiprocessing.Process(
target=monitor_status,
args=(status_queue, num_tasks, storage)
)
monitor_process.start()
# 使用ProcessPoolExecutor以獲得更好的進(jìn)程管理
with ProcessPoolExecutor(max_workers=4) as executor:
# 提交所有任務(wù)
future_to_task = {
executor.submit(send_request, task_id, status_queue, storage): task_id
for task_id in range(num_tasks)
}
# 等待所有任務(wù)完成(可選,因?yàn)楸O(jiān)控進(jìn)程會跟蹤完成狀態(tài))
completed_futures = 0
for future in as_completed(future_to_task):
task_id = future_to_task[future]
try:
result = future.result(timeout=30) # 設(shè)置超時(shí)時(shí)間
completed_futures += 1
except Exception as e:
logger.error(f"任務(wù) {task_id} 執(zhí)行出現(xiàn)異常: {e}")
# 等待監(jiān)控進(jìn)程結(jié)束
monitor_process.join(timeout=10)
if monitor_process.is_alive():
logger.warning("監(jiān)控進(jìn)程未正常結(jié)束,強(qiáng)制終止")
monitor_process.terminate()
# 計(jì)算執(zhí)行時(shí)間
total_time = time.time() - start_time
# 輸出最終結(jié)果和統(tǒng)計(jì)信息
stats = storage.get_statistics()
logger.info(f"系統(tǒng)執(zhí)行完成,總耗時(shí): {total_time:.2f} 秒")
logger.info(f"最終統(tǒng)計(jì): 共{stats['total_tasks']}個(gè)任務(wù), "
f"成功{stats['completed']}個(gè), 失敗{stats['failed']}個(gè), "
f"成功率: {stats['success_rate']:.2%}")
# 返回結(jié)果查詢接口
return result_query_handler(storage)
if __name__ == "__main__":
# 使用Manager創(chuàng)建可以在進(jìn)程間共享的對象
manager = multiprocessing.Manager()
# 創(chuàng)建測試存儲文件
test_storage_file = f"async_results_{int(time.time())}.json"
storage = AsyncResultStorage(test_storage_file, manager)
# 運(yùn)行增強(qiáng)版系統(tǒng)
query_handler = main_enhanced()
# 示例:查詢執(zhí)行結(jié)果
print("\n=== 執(zhí)行結(jié)果查詢 ===")
all_results = query_handler()
completed_results = query_handler(status="Completed")
failed_results = query_handler(status="Failed")
print(f"總?cè)蝿?wù)數(shù): {len(all_results)}")
print(f"成功任務(wù): {len(completed_results)}")
print(f"失敗任務(wù): {len(failed_results)}")
# 顯示最近3個(gè)任務(wù)的結(jié)果
print("\n最近3個(gè)任務(wù)的結(jié)果:")
for result in all_results[-3:]:
print(f" 任務(wù) {result['task_id']}: {result['status']} - {result['message']}")
到此這篇關(guān)于Python異步多進(jìn)程調(diào)度系統(tǒng)的完整實(shí)現(xiàn)與實(shí)戰(zhàn)指南的文章就介紹到這了,更多相關(guān)Python異步多進(jìn)程調(diào)度內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python數(shù)據(jù)結(jié)構(gòu)與算法的雙端隊(duì)列詳解
這篇文章主要為大家詳細(xì)介紹了Python的雙端隊(duì)列,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03
GDAL 矢量屬性數(shù)據(jù)修改方式(python)
這篇文章主要介紹了GDAL 矢量屬性數(shù)據(jù)修改方式(python),具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-03-03
查看已經(jīng)安裝的python版本和相關(guān)路徑信息的三種方法
Python是一門計(jì)算機(jī)程序編程語言,更是一種面向?qū)ο蟮膭?dòng)態(tài)類型語言,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨(dú)立的、大型項(xiàng)目的開發(fā),那么如何查看已安裝Python版本和路徑呢?我們通過這篇文章來了解一下2025-03-03
?python中pandas讀取csv文件?時(shí)如何省去csv.reader()操作指定列步驟
這篇文章主要介紹了?python中pandas讀取csv文件?時(shí)如何省去csv.reader()操作指定列步驟,對正在工作的你可能有一定的幫助,需要的朋友可以參考一下2022-01-01
python用tkinter實(shí)現(xiàn)一個(gè)gui的翻譯工具
這篇文章主要介紹了python用tkinter實(shí)現(xiàn)一個(gè)gui的翻譯工具,幫助大家更好的理解和使用python,感興趣的朋友可以了解下 +2020-10-10

