Python實現(xiàn)定時任務(wù)調(diào)度器的示例詳解
功能介紹
這是一個功能強大的定時任務(wù)調(diào)度器,用于自動化執(zhí)行各種重復(fù)性任務(wù)。該工具具備以下核心功能:
靈活的調(diào)度配置:
- 支持Cron表達(dá)式定時調(diào)度
- 支持間隔執(zhí)行(每X秒/分鐘/小時)
- 支持一次性執(zhí)行任務(wù)
- 支持任務(wù)依賴關(guān)系
多任務(wù)管理:
- 并行執(zhí)行多個任務(wù)
- 任務(wù)優(yōu)先級設(shè)置
- 任務(wù)執(zhí)行狀態(tài)監(jiān)控
- 任務(wù)執(zhí)行歷史記錄
任務(wù)類型支持:
- Shell命令執(zhí)行
- Python腳本執(zhí)行
- HTTP請求任務(wù)
- 文件操作任務(wù)
- 自定義函數(shù)任務(wù)
監(jiān)控和告警:
- 任務(wù)執(zhí)行狀態(tài)實時監(jiān)控
- 執(zhí)行失敗自動告警
- 執(zhí)行日志詳細(xì)記錄
- 性能指標(biāo)統(tǒng)計
配置管理:
- YAML/JSON配置文件支持
- 動態(tài)任務(wù)加載和卸載
- 配置熱更新
- 任務(wù)模板支持
場景應(yīng)用
1. 系統(tǒng)運維自動化
- 定時備份數(shù)據(jù)庫和重要文件
- 定期清理系統(tǒng)日志和臨時文件
- 監(jiān)控系統(tǒng)資源使用情況
- 自動化部署和更新應(yīng)用
2. 數(shù)據(jù)處理自動化
- 定時數(shù)據(jù)同步和ETL處理
- 定期生成報表和統(tǒng)計數(shù)據(jù)
- 自動化數(shù)據(jù)清洗和驗證
- 批量處理文件和數(shù)據(jù)
3. 監(jiān)控告警自動化
- 定時檢查服務(wù)狀態(tài)和可用性
- 監(jiān)控網(wǎng)站和API響應(yīng)時間
- 檢測系統(tǒng)性能指標(biāo)異常
- 自動發(fā)送監(jiān)控報告
4. 業(yè)務(wù)流程自動化
- 定時發(fā)送郵件和通知
- 自動化處理用戶請求
- 定期執(zhí)行業(yè)務(wù)邏輯
- 批量處理訂單和交易
報錯處理
1. 任務(wù)執(zhí)行異常
try:
result = task.execute()
if not result.success:
logger.error(f"任務(wù)執(zhí)行失敗: {result.error_message}")
send_alert(f"任務(wù) {task.name} 執(zhí)行失敗: {result.error_message}")
except Exception as e:
logger.error(f"任務(wù)執(zhí)行異常: {str(e)}")
send_alert(f"任務(wù) {task.name} 執(zhí)行異常: {str(e)}")
2. 配置文件異常
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
except yaml.YAMLError as e:
logger.error(f"配置文件格式錯誤: {str(e)}")
raise TaskSchedulerError(f"配置文件無效: {str(e)}")
except FileNotFoundError:
logger.error(f"配置文件不存在: {config_file}")
raise TaskSchedulerError(f"配置文件未找到: {config_file}")
3. 調(diào)度器異常
try:
scheduler.start()
except SchedulerAlreadyRunningError:
logger.warning("調(diào)度器已在運行")
except Exception as e:
logger.error(f"調(diào)度器啟動失敗: {str(e)}")
raise TaskSchedulerError(f"調(diào)度器啟動失敗: {str(e)}")
4. 資源限制異常
try:
if len(running_tasks) >= max_concurrent_tasks:
raise ResourceLimitError(f"超過最大并發(fā)任務(wù)數(shù): {max_concurrent_tasks}")
except ResourceLimitError as e:
logger.warning(f"資源限制: {str(e)}")
# 將任務(wù)加入等待隊列
task_queue.put(task)
代碼實現(xiàn)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
定時任務(wù)調(diào)度器
功能:自動化執(zhí)行定時任務(wù)
作者:Cline
版本:1.0
"""
import argparse
import sys
import json
import yaml
import logging
import os
import time
import threading
import subprocess
import requests
from datetime import datetime, timedelta
from typing import Dict, List, Callable, Any, Optional
import schedule
import signal
from concurrent.futures import ThreadPoolExecutor, as_completed
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sqlite3
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('task_scheduler.log'),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
class TaskSchedulerError(Exception):
"""任務(wù)調(diào)度器異常類"""
pass
class TaskExecutionError(Exception):
"""任務(wù)執(zhí)行異常類"""
pass
class Task:
def __init__(self, config: Dict[str, Any]):
self.id = config.get('id')
self.name = config.get('name', 'Unnamed Task')
self.description = config.get('description', '')
self.type = config.get('type', 'shell')
self.command = config.get('command', '')
self.schedule = config.get('schedule', '* * * * *')
self.enabled = config.get('enabled', True)
self.timeout = config.get('timeout', 300) # 5分鐘超時
self.max_retries = config.get('max_retries', 3)
self.retry_delay = config.get('retry_delay', 5) # 重試延遲秒數(shù)
self.priority = config.get('priority', 0)
self.dependencies = config.get('dependencies', [])
self.alert_on_failure = config.get('alert_on_failure', True)
self.alert_emails = config.get('alert_emails', [])
# 執(zhí)行統(tǒng)計
self.stats = {
'executed_count': 0,
'success_count': 0,
'failed_count': 0,
'last_execution': None,
'last_success': None,
'last_failure': None,
'average_duration': 0.0
}
def execute(self) -> Dict[str, Any]:
"""執(zhí)行任務(wù)"""
if not self.enabled:
logger.info(f"任務(wù) {self.name} 已禁用,跳過執(zhí)行")
return {'success': True, 'message': '任務(wù)已禁用'}
start_time = time.time()
logger.info(f"開始執(zhí)行任務(wù): {self.name}")
try:
result = None
for attempt in range(self.max_retries + 1):
try:
if self.type == 'shell':
result = self._execute_shell_command()
elif self.type == 'python':
result = self._execute_python_script()
elif self.type == 'http':
result = self._execute_http_request()
elif self.type == 'function':
result = self._execute_function()
else:
raise TaskExecutionError(f"不支持的任務(wù)類型: {self.type}")
# 執(zhí)行成功
duration = time.time() - start_time
self._update_stats(True, duration)
logger.info(f"任務(wù) {self.name} 執(zhí)行成功,耗時 {duration:.2f} 秒")
return {'success': True, 'duration': duration, 'result': result}
except Exception as e:
if attempt < self.max_retries:
logger.warning(f"任務(wù) {self.name} 第 {attempt + 1} 次執(zhí)行失敗: {str(e)},{self.retry_delay} 秒后重試")
time.sleep(self.retry_delay)
else:
# 最后一次嘗試也失敗了
duration = time.time() - start_time
self._update_stats(False, duration)
logger.error(f"任務(wù) {self.name} 執(zhí)行失敗: {str(e)}")
return {'success': False, 'error': str(e), 'duration': duration}
except Exception as e:
duration = time.time() - start_time
self._update_stats(False, duration)
logger.error(f"任務(wù) {self.name} 執(zhí)行異常: {str(e)}")
return {'success': False, 'error': str(e), 'duration': duration}
def _execute_shell_command(self) -> str:
"""執(zhí)行Shell命令"""
logger.debug(f"執(zhí)行Shell命令: {self.command}")
result = subprocess.run(
self.command,
shell=True,
capture_output=True,
text=True,
timeout=self.timeout
)
if result.returncode != 0:
raise TaskExecutionError(f"命令執(zhí)行失敗: {result.stderr}")
return result.stdout
def _execute_python_script(self) -> Any:
"""執(zhí)行Python腳本"""
logger.debug(f"執(zhí)行Python腳本: {self.command}")
# 這里可以使用exec或subprocess執(zhí)行Python代碼
# 為了安全起見,建議使用subprocess
script_path = self.command
if not os.path.exists(script_path):
raise TaskExecutionError(f"Python腳本不存在: {script_path}")
result = subprocess.run(
[sys.executable, script_path],
capture_output=True,
text=True,
timeout=self.timeout
)
if result.returncode != 0:
raise TaskExecutionError(f"Python腳本執(zhí)行失敗: {result.stderr}")
return result.stdout
def _execute_http_request(self) -> Dict[str, Any]:
"""執(zhí)行HTTP請求"""
logger.debug(f"執(zhí)行HTTP請求: {self.command}")
# 解析HTTP配置
http_config = self.command if isinstance(self.command, dict) else json.loads(self.command)
method = http_config.get('method', 'GET').upper()
url = http_config.get('url')
headers = http_config.get('headers', {})
data = http_config.get('data')
params = http_config.get('params')
if not url:
raise TaskExecutionError("HTTP請求缺少URL")
response = requests.request(
method=method,
url=url,
headers=headers,
data=data,
params=params,
timeout=self.timeout
)
response.raise_for_status()
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'content': response.text
}
def _execute_function(self) -> Any:
"""執(zhí)行自定義函數(shù)"""
logger.debug(f"執(zhí)行自定義函數(shù): {self.command}")
# 這里需要根據(jù)具體實現(xiàn)來調(diào)用函數(shù)
# 可以使用getattr或eval等方式,但要注意安全性
raise NotImplementedError("自定義函數(shù)執(zhí)行暫未實現(xiàn)")
def _update_stats(self, success: bool, duration: float):
"""更新執(zhí)行統(tǒng)計"""
self.stats['executed_count'] += 1
self.stats['last_execution'] = datetime.now().isoformat()
if success:
self.stats['success_count'] += 1
self.stats['last_success'] = datetime.now().isoformat()
# 更新平均執(zhí)行時間
current_avg = self.stats['average_duration']
count = self.stats['success_count']
self.stats['average_duration'] = (current_avg * (count - 1) + duration) / count
else:
self.stats['failed_count'] += 1
self.stats['last_failure'] = datetime.now().isoformat()
class TaskScheduler:
def __init__(self, config_file: str = None):
self.config_file = config_file
self.tasks = {}
self.running = False
self.executor = None
self.max_workers = 10
self.alert_config = {}
self.database_path = 'task_scheduler.db'
# 加載配置
self.load_config()
# 初始化數(shù)據(jù)庫
self.init_database()
# 注冊信號處理器
signal.signal(signal.SIGINT, self._signal_handler)
signal.signal(signal.SIGTERM, self._signal_handler)
def load_config(self):
"""加載配置文件"""
if not self.config_file or not os.path.exists(self.config_file):
logger.info("未指定配置文件或文件不存在,使用默認(rèn)配置")
self._create_default_config()
return
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
if self.config_file.endswith('.yaml') or self.config_file.endswith('.yml'):
config = yaml.safe_load(f)
else:
config = json.load(f)
# 加載調(diào)度器配置
scheduler_config = config.get('scheduler', {})
self.max_workers = scheduler_config.get('max_workers', 10)
self.alert_config = scheduler_config.get('alerts', {})
# 加載任務(wù)配置
tasks_config = config.get('tasks', [])
for task_config in tasks_config:
task = Task(task_config)
self.tasks[task.id] = task
logger.info(f"加載任務(wù): {task.name} (ID: {task.id})")
logger.info(f"成功加載 {len(self.tasks)} 個任務(wù)")
except Exception as e:
logger.error(f"加載配置文件失敗: {str(e)}")
raise TaskSchedulerError(f"配置加載失敗: {str(e)}")
def _create_default_config(self):
"""創(chuàng)建默認(rèn)配置"""
default_tasks = [
{
"id": "system_monitor",
"name": "系統(tǒng)監(jiān)控",
"description": "監(jiān)控系統(tǒng)資源使用情況",
"type": "shell",
"command": "python system_monitor.py",
"schedule": "*/5 * * * *", # 每5分鐘執(zhí)行一次
"enabled": True,
"timeout": 300,
"max_retries": 3
},
{
"id": "log_cleanup",
"name": "日志清理",
"description": "清理過期日志文件",
"type": "shell",
"command": "find /var/log -name '*.log' -mtime +30 -delete",
"schedule": "0 2 * * *", # 每天凌晨2點執(zhí)行
"enabled": True,
"timeout": 600
}
]
default_config = {
"scheduler": {
"max_workers": 10,
"alerts": {
"smtp_server": "smtp.example.com",
"smtp_port": 587,
"sender": "scheduler@example.com",
"password": "your_password"
}
},
"tasks": default_tasks
}
# 保存默認(rèn)配置
config_path = self.config_file or 'scheduler_config.json'
with open(config_path, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
logger.info(f"創(chuàng)建默認(rèn)配置文件: {config_path}")
def init_database(self):
"""初始化數(shù)據(jù)庫"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
# 創(chuàng)建任務(wù)執(zhí)行記錄表
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_executions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_id TEXT NOT NULL,
task_name TEXT NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT,
duration REAL,
success BOOLEAN,
error_message TEXT,
output TEXT
)
''')
# 創(chuàng)建任務(wù)統(tǒng)計表
cursor.execute('''
CREATE TABLE IF NOT EXISTS task_stats (
task_id TEXT PRIMARY KEY,
executed_count INTEGER DEFAULT 0,
success_count INTEGER DEFAULT 0,
failed_count INTEGER DEFAULT 0,
last_execution TEXT,
last_success TEXT,
last_failure TEXT,
average_duration REAL DEFAULT 0.0
)
''')
conn.commit()
conn.close()
logger.info("數(shù)據(jù)庫初始化完成")
except Exception as e:
logger.error(f"數(shù)據(jù)庫初始化失敗: {str(e)}")
raise TaskSchedulerError(f"數(shù)據(jù)庫初始化失敗: {str(e)}")
def save_task_execution(self, task_id: str, task_name: str, start_time: datetime,
end_time: datetime, duration: float, success: bool,
error_message: str = None, output: str = None):
"""保存任務(wù)執(zhí)行記錄"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO task_executions
(task_id, task_name, start_time, end_time, duration, success, error_message, output)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
task_id, task_name, start_time.isoformat(), end_time.isoformat(),
duration, success, error_message, output
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"保存任務(wù)執(zhí)行記錄失敗: {str(e)}")
def update_task_stats(self, task: Task):
"""更新任務(wù)統(tǒng)計信息到數(shù)據(jù)庫"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO task_stats
(task_id, executed_count, success_count, failed_count,
last_execution, last_success, last_failure, average_duration)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (
task.id,
task.stats['executed_count'],
task.stats['success_count'],
task.stats['failed_count'],
task.stats['last_execution'],
task.stats['last_success'],
task.stats['last_failure'],
task.stats['average_duration']
))
conn.commit()
conn.close()
except Exception as e:
logger.error(f"更新任務(wù)統(tǒng)計信息失敗: {str(e)}")
def schedule_task(self, task: Task):
"""調(diào)度任務(wù)"""
if not task.enabled:
logger.info(f"任務(wù) {task.name} 已禁用,跳過調(diào)度")
return
try:
# 解析Cron表達(dá)式
cron_parts = task.schedule.split()
if len(cron_parts) != 5:
raise TaskSchedulerError(f"無效的Cron表達(dá)式: {task.schedule}")
minute, hour, day, month, weekday = cron_parts
# 使用schedule庫進(jìn)行調(diào)度
job = schedule.every()
# 設(shè)置調(diào)度時間
if minute != '*':
job = job.minute.at(minute)
if hour != '*':
job = job.hour.at(hour)
if day != '*':
job = job.day.at(day)
# 設(shè)置作業(yè)
job.do(self._execute_task, task_id=task.id)
logger.info(f"任務(wù) {task.name} 已調(diào)度: {task.schedule}")
except Exception as e:
logger.error(f"調(diào)度任務(wù) {task.name} 失敗: {str(e)}")
def _execute_task(self, task_id: str):
"""執(zhí)行任務(wù)(內(nèi)部方法)"""
if task_id not in self.tasks:
logger.error(f"任務(wù)不存在: {task_id}")
return
task = self.tasks[task_id]
start_time = datetime.now()
try:
# 執(zhí)行任務(wù)
result = task.execute()
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 保存執(zhí)行記錄
self.save_task_execution(
task_id=task.id,
task_name=task.name,
start_time=start_time,
end_time=end_time,
duration=duration,
success=result['success'],
error_message=result.get('error'),
output=result.get('result')
)
# 更新統(tǒng)計信息
self.update_task_stats(task)
# 發(fā)送告警(如果需要)
if not result['success'] and task.alert_on_failure:
self.send_alert(task, result.get('error', '未知錯誤'))
except Exception as e:
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 保存失敗記錄
self.save_task_execution(
task_id=task.id,
task_name=task.name,
start_time=start_time,
end_time=end_time,
duration=duration,
success=False,
error_message=str(e)
)
logger.error(f"執(zhí)行任務(wù) {task.name} 時發(fā)生異常: {str(e)}")
def send_alert(self, task: Task, error_message: str):
"""發(fā)送告警"""
if not self.alert_config or not task.alert_emails:
return
try:
# 創(chuàng)建郵件內(nèi)容
msg = MIMEMultipart()
msg['From'] = self.alert_config['sender']
msg['To'] = ', '.join(task.alert_emails)
msg['Subject'] = f"任務(wù)調(diào)度器告警 - {task.name}"
body = f"""
任務(wù)調(diào)度器告警通知
任務(wù)名稱: {task.name}
任務(wù)ID: {task.id}
執(zhí)行時間: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
錯誤信息: {error_message}
請及時處理!
---
任務(wù)調(diào)度器
"""
msg.attach(MIMEText(body, 'plain'))
# 發(fā)送郵件
server = smtplib.SMTP(self.alert_config['smtp_server'], self.alert_config['smtp_port'])
server.starttls()
server.login(self.alert_config['sender'], self.alert_config['password'])
server.send_message(msg)
server.quit()
logger.info(f"告警郵件已發(fā)送給: {', '.join(task.alert_emails)}")
except Exception as e:
logger.error(f"發(fā)送告警郵件失敗: {str(e)}")
def start(self):
"""啟動調(diào)度器"""
if self.running:
logger.warning("調(diào)度器已在運行")
return
logger.info("啟動任務(wù)調(diào)度器...")
self.running = True
self.executor = ThreadPoolExecutor(max_workers=self.max_workers)
# 調(diào)度所有任務(wù)
for task in self.tasks.values():
self.schedule_task(task)
# 主循環(huán)
try:
while self.running:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
logger.info("收到中斷信號,正在停止調(diào)度器...")
finally:
self.stop()
def stop(self):
"""停止調(diào)度器"""
logger.info("停止任務(wù)調(diào)度器...")
self.running = False
if self.executor:
self.executor.shutdown(wait=True)
schedule.clear()
logger.info("任務(wù)調(diào)度器已停止")
def _signal_handler(self, signum, frame):
"""信號處理器"""
logger.info(f"收到信號 {signum},準(zhǔn)備停止調(diào)度器...")
self.stop()
sys.exit(0)
def add_task(self, task_config: Dict[str, Any]):
"""添加任務(wù)"""
task = Task(task_config)
self.tasks[task.id] = task
if self.running:
self.schedule_task(task)
logger.info(f"添加任務(wù): {task.name}")
def remove_task(self, task_id: str):
"""移除任務(wù)"""
if task_id in self.tasks:
task = self.tasks[task_id]
del self.tasks[task_id]
logger.info(f"移除任務(wù): {task.name}")
def get_task_stats(self, task_id: str = None) -> Dict[str, Any]:
"""獲取任務(wù)統(tǒng)計信息"""
if task_id:
if task_id in self.tasks:
return self.tasks[task_id].stats
else:
return None
else:
return {task_id: task.stats for task_id, task in self.tasks.items()}
def get_execution_history(self, task_id: str = None, limit: int = 100) -> List[Dict[str, Any]]:
"""獲取執(zhí)行歷史"""
try:
conn = sqlite3.connect(self.database_path)
cursor = conn.cursor()
if task_id:
cursor.execute('''
SELECT * FROM task_executions
WHERE task_id = ?
ORDER BY start_time DESC
LIMIT ?
''', (task_id, limit))
else:
cursor.execute('''
SELECT * FROM task_executions
ORDER BY start_time DESC
LIMIT ?
''', (limit,))
rows = cursor.fetchall()
conn.close()
# 轉(zhuǎn)換為字典列表
columns = [description[0] for description in cursor.description]
return [dict(zip(columns, row)) for row in rows]
except Exception as e:
logger.error(f"獲取執(zhí)行歷史失敗: {str(e)}")
return []
def create_sample_config():
"""創(chuàng)建示例配置文件"""
sample_config = {
"scheduler": {
"max_workers": 5,
"alerts": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password"
}
},
"tasks": [
{
"id": "backup_task",
"name": "數(shù)據(jù)庫備份",
"description": "每日備份數(shù)據(jù)庫",
"type": "shell",
"command": "mysqldump -u root -p mydb > /backups/mydb_$(date +%Y%m%d).sql",
"schedule": "0 2 * * *",
"enabled": True,
"timeout": 3600,
"max_retries": 2,
"alert_on_failure": True,
"alert_emails": ["admin@example.com"]
},
{
"id": "cleanup_task",
"name": "臨時文件清理",
"description": "清理7天前的臨時文件",
"type": "shell",
"command": "find /tmp -name '*.tmp' -mtime +7 -delete",
"schedule": "0 3 * * 0",
"enabled": True,
"timeout": 300
},
{
"id": "api_monitor",
"name": "API監(jiān)控",
"description": "監(jiān)控關(guān)鍵API的可用性",
"type": "http",
"command": json.dumps({
"method": "GET",
"url": "https://api.example.com/health",
"timeout": 30
}),
"schedule": "*/10 * * * *",
"enabled": True,
"timeout": 60,
"alert_on_failure": True,
"alert_emails": ["ops@example.com"]
}
]
}
with open('scheduler_sample_config.json', 'w', encoding='utf-8') as f:
json.dump(sample_config, f, indent=2, ensure_ascii=False)
logger.info("示例配置文件已創(chuàng)建: scheduler_sample_config.json")
def main():
parser = argparse.ArgumentParser(description='定時任務(wù)調(diào)度器')
parser.add_argument('-c', '--config', help='配置文件路徑')
parser.add_argument('--start', action='store_true', help='啟動調(diào)度器')
parser.add_argument('--sample-config', action='store_true', help='創(chuàng)建示例配置文件')
parser.add_argument('--list-tasks', action='store_true', help='列出所有任務(wù)')
parser.add_argument('--task-stats', help='查看指定任務(wù)的統(tǒng)計信息')
parser.add_argument('--history', help='查看指定任務(wù)的執(zhí)行歷史')
args = parser.parse_args()
if args.sample_config:
create_sample_config()
return
scheduler = TaskScheduler(args.config)
if args.list_tasks:
print("任務(wù)列表:")
for task_id, task in scheduler.tasks.items():
status = "啟用" if task.enabled else "禁用"
print(f" - {task.name} ({task_id}) [{status}]")
return
if args.task_stats:
stats = scheduler.get_task_stats(args.task_stats)
if stats:
print(f"任務(wù) {args.task_stats} 的統(tǒng)計信息:")
for key, value in stats.items():
print(f" {key}: {value}")
else:
print(f"任務(wù) {args.task_stats} 不存在")
return
if args.history:
history = scheduler.get_execution_history(args.history)
print(f"任務(wù) {args.history} 的執(zhí)行歷史:")
for record in history:
print(f" 時間: {record['start_time']}, 成功: {record['success']}, 耗時: {record['duration']:.2f}s")
return
if args.start:
scheduler.start()
else:
parser.print_help()
if __name__ == '__main__':
main()
使用說明
1. 安裝依賴
pip install schedule requests pyyaml
2. 創(chuàng)建配置文件
python task_scheduler.py --sample-config
3. 啟動調(diào)度器
python task_scheduler.py --config scheduler_config.json --start
4. 查看任務(wù)列表
python task_scheduler.py --config scheduler_config.json --list-tasks
5. 查看任務(wù)統(tǒng)計
python task_scheduler.py --config scheduler_config.json --task-stats backup_task
6. 查看執(zhí)行歷史
python task_scheduler.py --config scheduler_config.json --history backup_task
配置文件示例
JSON配置文件
{
"scheduler": {
"max_workers": 5,
"alerts": {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"sender": "your_email@gmail.com",
"password": "your_app_password"
}
},
"tasks": [
{
"id": "backup_task",
"name": "數(shù)據(jù)庫備份",
"description": "每日備份數(shù)據(jù)庫",
"type": "shell",
"command": "mysqldump -u root -p mydb > /backups/mydb_$(date +%Y%m%d).sql",
"schedule": "0 2 * * *",
"enabled": true,
"timeout": 3600,
"max_retries": 2,
"alert_on_failure": true,
"alert_emails": ["admin@example.com"]
},
{
"id": "cleanup_task",
"name": "臨時文件清理",
"description": "清理7天前的臨時文件",
"type": "shell",
"command": "find /tmp -name '*.tmp' -mtime +7 -delete",
"schedule": "0 3 * * 0",
"enabled": true,
"timeout": 300
}
]
}
YAML配置文件
scheduler:
max_workers: 5
alerts:
smtp_server: smtp.gmail.com
smtp_port: 587
sender: your_email@gmail.com
password: your_app_password
tasks:
- id: backup_task
name: 數(shù)據(jù)庫備份
description: 每日備份數(shù)據(jù)庫
type: shell
command: mysqldump -u root -p mydb > /backups/mydb_$(date +%Y%m%d).sql
schedule: "0 2 * * *"
enabled: true
timeout: 3600
max_retries: 2
alert_on_failure: true
alert_emails:
- admin@example.com
- id: cleanup_task
name: 臨時文件清理
description: 清理7天前的臨時文件
type: shell
command: find /tmp -name '*.tmp' -mtime +7 -delete
schedule: "0 3 * * 0"
enabled: true
timeout: 300
高級特性
1. 任務(wù)依賴管理
支持任務(wù)間的依賴關(guān)系,確保任務(wù)按正確的順序執(zhí)行:
tasks:
- id: task_a
name: 任務(wù)A
# ... 其他配置
- id: task_b
name: 任務(wù)B
dependencies:
- task_a # 任務(wù)B依賴于任務(wù)A
# ... 其他配置
2. 動態(tài)任務(wù)管理
支持在運行時動態(tài)添加、移除和修改任務(wù),無需重啟調(diào)度器。
3. 執(zhí)行歷史和統(tǒng)計
內(nèi)置SQLite數(shù)據(jù)庫存儲任務(wù)執(zhí)行歷史和統(tǒng)計信息,便于分析和監(jiān)控。
4. 告警通知
支持通過郵件發(fā)送告警通知,及時發(fā)現(xiàn)任務(wù)執(zhí)行異常。
5. 資源限制
支持并發(fā)任務(wù)數(shù)量限制,防止系統(tǒng)資源被過度占用。
最佳實踐
1. 安全性考慮
- 不要在配置文件中明文存儲敏感信息
- 使用環(huán)境變量或加密存儲敏感配置
- 限制腳本執(zhí)行權(quán)限
2. 性能優(yōu)化
- 合理設(shè)置任務(wù)并發(fā)數(shù)
- 為長時間運行的任務(wù)設(shè)置適當(dāng)?shù)某瑫r時間
- 定期清理執(zhí)行歷史數(shù)據(jù)
3. 監(jiān)控和維護(hù)
- 定期檢查任務(wù)執(zhí)行日志
- 監(jiān)控系統(tǒng)資源使用情況
- 及時處理執(zhí)行失敗的任務(wù)
總結(jié)
這個定時任務(wù)調(diào)度器提供了一個功能完整、易于使用的任務(wù)自動化解決方案。通過靈活的配置和豐富的功能,可以滿足各種自動化需求。無論是系統(tǒng)運維、數(shù)據(jù)處理還是業(yè)務(wù)流程自動化,都能通過這個工具大大提高工作效率。
到此這篇關(guān)于Python實現(xiàn)定時任務(wù)調(diào)度器的示例詳解的文章就介紹到這了,更多相關(guān)Python定時任務(wù)調(diào)度內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python測試開發(fā)django之使用supervisord?后臺啟動celery?服務(wù)(worker/beat)
Supervisor是用Python開發(fā)的一個client/server服務(wù),是Linux/Unix系統(tǒng)下的一個進(jìn)程管理工具,不支持Windows系統(tǒng),這篇文章主要介紹了python測試開發(fā)django之使用supervisord?后臺啟動celery?服務(wù)(worker/beat),需要的朋友可以參考下2022-07-07
TensorFlow的環(huán)境配置與安裝教程詳解(win10+GeForce GTX1060+CUDA 9.0+cuDNN7
這篇文章主要介紹了TensorFlow的環(huán)境配置與安裝(win10+GeForce GTX1060+CUDA 9.0+cuDNN7.3+tensorflow-gpu 1.12.0+python3.5.5),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),需要的朋友可以參考下2020-06-06
Python實現(xiàn)樸素貝葉斯的學(xué)習(xí)與分類過程解析
這篇文章主要介紹了Python實現(xiàn)樸素貝葉斯的學(xué)習(xí)與分類過程解析,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-08-08

