從入門到避坑解析Python FastAPI定時任務(wù)全攻略
先說事實案例:有個促銷活動需要定時上線。結(jié)果呢?依賴的云函數(shù)服務(wù)突然抖動,那個“簡單可靠”的crontab腳本愣是沒觸發(fā)。凌晨三點,運營的電話直接把你的美夢干碎。事后復(fù)盤,才意識到:把定時任務(wù)寄生于操作系統(tǒng)或者外部黑盒服務(wù),在微服務(wù)架構(gòu)里,就是給自己埋雷。
痛定思痛,最后把定時任務(wù)“請”回了應(yīng)用內(nèi)部,用APScheduler在FastAPI里搞了個自治的小鬧鐘。今天,咱們就來聊聊這套實戰(zhàn)經(jīng)驗,連同那些半夜爬起來填的坑……
本文你能得到什么
1. 為什么說FastAPI自帶的BackgroundTasks不適合做定時任務(wù)。
2. APScheduler的核心概念,用“鬧鐘”和“餐廳”的比喻讓你秒懂。
3. 手把手集成,提供可直接復(fù)制粘貼的代碼塊。
4. 最重要的:多進程部署(比如用Uvicorn workers)時,定時任務(wù)重復(fù)執(zhí)行的“鬼故事”與解決之道。
第一部分:問題與背景 —— 為什么另起爐灶?
FastAPI 的 BackgroundTasks 是個好同志,但它只是個“跑腿小哥”。你API請求來了,它幫你異步處理些雜事,比如發(fā)郵件、寫日志。但它有個硬傷:它沒有記憶,也不會看表。 服務(wù)一重啟,所有計劃內(nèi)的“跑腿”任務(wù)全忘光光。
定時任務(wù)呢?它需要的是“忠誠的管家”。不管服務(wù)是否重啟,都要記得每天上午10點要發(fā)報表,每周一凌晨要清緩存。這需要持久化和時間調(diào)度能力,這正是 APScheduler 的絕活。
你可能會問,用Celery行不行?行,但殺雞用牛刀了。APScheduler更輕量,與你FastAPI應(yīng)用同生共死,管理起來簡單直接,特別適合業(yè)務(wù)邏輯清晰、不需要分布式協(xié)調(diào)的定時場景。
第二部分:核心原理 —— APScheduler的三板斧
別被它的名字嚇到,把它想象成一個高度可定制的智能鬧鐘系統(tǒng)。它主要由三部分組成:
觸發(fā)器 (Trigger): 決定“什么時候響”。是每天固定時間(date),還是間隔固定時間(interval),或者是像crontab那樣的復(fù)雜周期(cron)?
作業(yè)存儲器 (Job Store): 記住“有哪些鬧鐘要響”。默認存在內(nèi)存里,重啟就忘。我們可以讓它記在數(shù)據(jù)庫里(比如SQLite、PostgreSQL),實現(xiàn)持久化。
執(zhí)行器 (Executor): 負責(zé)“鬧鐘響了以后具體做什么”。是用線程池還是進程池來執(zhí)行我們的任務(wù)函數(shù)?
而調(diào)度器 (Scheduler) 就是總控臺,把上面三個部件組裝起來,并啟動這個鬧鐘系統(tǒng)。
第三部分:實戰(zhàn)演示 —— 手把手集成到FastAPI
好,咱們先來安裝。這步最簡單:
pip install apscheduler
接下來重點來了,初始化并集成到FastAPI的生命周期。這里有個關(guān)鍵技巧:一定要把scheduler的啟動和關(guān)閉掛在FastAPI的應(yīng)用事件上,保證應(yīng)用啟動時它啟動,應(yīng)用優(yōu)雅關(guān)閉時它也停下。千萬別學(xué)我當(dāng)初,直接在模塊層面scheduler.start(),導(dǎo)致測試時腳本跑完不退出。
# 項目結(jié)構(gòu)建議
# app/
# __init__.py
# main.py # FastAPI 應(yīng)用創(chuàng)建和事件處理
# scheduler.py # 調(diào)度器配置和任務(wù)定義
# tasks.py # 具體的任務(wù)函數(shù)
# app/scheduler.py
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
# 1. 配置組件
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite') # 使用SQLite持久化
}
executors = {
'default': ThreadPoolExecutor(20) # 線程池執(zhí)行
}
job_defaults = {
'coalesce': False, # 錯過的任務(wù)是否合并執(zhí)行(一般False)
'max_instances': 3 # 同一個任務(wù)同時運行的最大實例數(shù)
}
# 2. 創(chuàng)建調(diào)度器實例
scheduler = AsyncIOScheduler(
jobstores=jobstores,
executors=executors,
job_defaults=job_defaults,
timezone="Asia/Shanghai" # 時區(qū)!時區(qū)!時區(qū)!重要的事說三遍
)
# 3. 定義任務(wù)函數(shù) (可以放在同文件,也可從其他模塊導(dǎo)入)
def my_sync_job():
print("同步任務(wù)執(zhí)行了!")
async def my_async_job():
print("異步任務(wù)執(zhí)行了!")
# 這里可以愉快地調(diào)用其他async函數(shù)
# 4. 添加任務(wù)的函數(shù) (通常在應(yīng)用啟動時調(diào)用)
def add_jobs():
# 間隔任務(wù):每30秒執(zhí)行一次
scheduler.add_job(my_sync_job, 'interval', seconds=30, id='sync_interval_job')
# Cron任務(wù):每分鐘的第30秒執(zhí)行
scheduler.add_job(my_async_job, 'cron', second=30, id='async_cron_job')
# 單次任務(wù):2023年10月1日執(zhí)行
# scheduler.add_job(xxx, 'date', run_date='2023-10-01 00:00:00') 然后在你的main.py里,把它和FastAPI綁在一起:
# app/main.py
from fastapi import FastAPI
from .scheduler import scheduler, add_jobs
app = FastAPI(title="定時任務(wù)演示")
@app.on_event("startup")
async def startup_event():
# 應(yīng)用啟動時,添加任務(wù)并啟動調(diào)度器
if not scheduler.running:
add_jobs()
scheduler.start()
print("APScheduler 已啟動")
@app.on_event("shutdown")
async def shutdown_event():
# 應(yīng)用關(guān)閉時,優(yōu)雅地關(guān)閉調(diào)度器
if scheduler.running:
scheduler.shutdown()
print("APScheduler 已關(guān)閉")
#from contextlib import asynccontextmanager
#@asynccontextmanager
#async def lifespan(app: FastAPI):
# # 應(yīng)用啟動時,添加任務(wù)并啟動調(diào)度器
# if not scheduler.running:
# add_jobs()
# scheduler.start()
# print("APScheduler 已啟動")
# yield
# # 應(yīng)用關(guān)閉時,優(yōu)雅地關(guān)閉調(diào)度器
# if scheduler.running:
# scheduler.shutdown()
# print("APScheduler 已關(guān)閉")
#app = FastAPI(title="定時任務(wù)演示", lifespan=lifespan)
@app.get("/")
async def root():
return {"message": "Hello World"}
# 可選:提供一個API來手動觸發(fā)或查看任務(wù)狀態(tài)
@app.get("/jobs")
async def list_jobs():
jobs = scheduler.get_jobs()
return {"jobs": [{"id": j.id, "next_run": str(j.next_run_time)} for j in jobs]} 這里保留了舊式的on_event生命周期管理函數(shù),方便理解scheduler的開啟與關(guān)閉邏輯,開發(fā)時改為lifespan進行更優(yōu)雅的生命周期管理。
跑起來試試吧!你會看到控制臺每隔30秒和每分鐘的第30秒都有輸出。到數(shù)據(jù)庫里看看,jobs.sqlite里已經(jīng)存下了我們的任務(wù)配置,重啟應(yīng)用任務(wù)也不會丟失。
第四部分:天坑預(yù)警 —— 多進程部署與重復(fù)執(zhí)行
是不是以為這樣就萬事大吉了?最大的坑才剛剛浮出水面。
當(dāng)你用生產(chǎn)模式啟動FastAPI,比如:
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
這--workers 4意味著啟動了4個獨立的進程。那么,app.on_event("startup")會在這4個進程里各執(zhí)行一次!結(jié)果就是,你的定時任務(wù)被添加了4次,會被重復(fù)執(zhí)行4次!想象一下,每小時發(fā)一次的報表郵件,突然變成了每小時發(fā)四封,老板和用戶都會瘋掉。
解決方案:文件鎖與領(lǐng)導(dǎo)者選舉
核心思路很簡單:確保在多個進程中,只有一個進程能真正啟動和添加定時任務(wù)。 這里分享兩種我們線上在用的方法。
方案一:簡單粗暴的文件鎖(適合大部分場景)
利用fcntl(Linux)或msvcrt(Windows)給一個文件加鎖,只有拿到鎖的進程才能初始化調(diào)度器。
# 在 scheduler.py 或 startup 事件中
import os
import sys
def try_acquire_lock(lock_file):
try:
import fcntl
f = open(lock_file, 'w')
# 嘗試獲取非阻塞的獨占鎖
fcntl.flock(f, fcntl.LOCK_EX | fcntl.LOCK_NB)
return f # 返回文件對象,保持打開狀態(tài)以持有鎖
except (BlockingIOError, ImportError):
# 獲取失?。ㄆ渌M程已持有鎖)或不支持的系統(tǒng)
return None
lock_file = "/tmp/fastapi_scheduler.lock"
lock_fd = try_acquire_lock(lock_file)
@app.on_event("startup")
async def startup_event():
if lock_fd is not None:
# 只有拿到鎖的進程才啟動調(diào)度器
if not scheduler.running:
add_jobs()
scheduler.start()
print(f"進程 {os.getpid()} 成功啟動 APScheduler")
else:
print(f"進程 {os.getpid()} 未獲得鎖,跳過調(diào)度器啟動") 方案二:利用數(shù)據(jù)庫原子操作(更分布式)
在數(shù)據(jù)庫里建一張表,用原子性的“插入或競爭”操作來選舉一個“領(lǐng)導(dǎo)者”進程。
# 假設(shè)使用SQLAlchemy ORM
from sqlalchemy.ext.asyncio import AsyncSession
from your_app.models import SchedulerLock
import datetime
async def acquire_db_lock(session: AsyncSession, timeout_minutes=10):
try:
# 嘗試插入一條鎖記錄,host和pid標識當(dāng)前進程
lock = SchedulerLock(
id=1, # 固定ID
host="my_host",
pid=os.getpid(),
last_heartbeat=datetime.datetime.utcnow()
)
session.add(lock)
await session.commit()
return True # 插入成功,獲得鎖
except IntegrityError: # 唯一約束沖突,記錄已存在
await session.rollback()
# 檢查已有的鎖是否已過期
existing_lock = await session.get(SchedulerLock, 1)
if existing_lock and (datetime.datetime.utcnow() - existing_lock.last_heartbeat).seconds > timeout_minutes * 60:
# 鎖已過期,更新為當(dāng)前進程
existing_lock.host = "my_host"
existing_lock.pid = os.getpid()
existing_lock.last_heartbeat = datetime.datetime.utcnow()
await session.commit()
return True
return False # 未能獲得鎖
# 在 startup 事件中調(diào)用 acquire_db_lock 判斷 記住,多進程部署下定時任務(wù)初始化,不加鎖等于制造線上事故。 我個人更推薦方案一,足夠簡單可靠,除非你已經(jīng)是跨機器的分布式部署了。
最后啰嗦一句
定時任務(wù)看似是小功能,但把它做可靠卻需要處處留心。從選擇APScheduler,到正確集成到應(yīng)用生命周期,再到最后用文件鎖避開多進程的坑,每一步都是我們踩過的雷。
技術(shù)棧沒有銀彈,但有了這套組合拳,你的FastAPI后臺定時任務(wù),基本可以高枕無憂了。至少,能讓你睡個安穩(wěn)覺,不用再擔(dān)心凌晨三點的電話。
到此這篇關(guān)于從入門到避坑解析Python FastAPI定時任務(wù)全攻略的文章就介紹到這了,更多相關(guān)Python FastAPI定時任務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
python實現(xiàn)合并多個list及合并多個django QuerySet的方法示例
這篇文章主要介紹了python實現(xiàn)合并多個list及合并多個django QuerySet的方法,結(jié)合實例形式分析了Python使用chain合并多個list以及合并Django中多個QuerySet的相關(guān)操作技巧,需要的朋友可以參考下2019-06-06
詳解Python 實現(xiàn)元胞自動機中的生命游戲(Game of life)
本篇文章主要介紹了詳解Python 實現(xiàn)元胞自動機中的生命游戲(Game of life),具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-01-01
Python爬蟲獲取國外大橋排行榜數(shù)據(jù)清單
這篇文章主要介紹了Python爬蟲獲取國外大橋排行榜數(shù)據(jù)清單,文章通過PyQuery?解析框架展開全文詳細內(nèi)容,需要的小伙伴可以參考一下2022-05-05
在django中form的label和verbose name的區(qū)別說明
這篇文章主要介紹了在django中form的label和verbose name的區(qū)別說明,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-05-05

