Python中實(shí)現(xiàn)JWT認(rèn)證的完整指南(從生成到驗(yàn)證)
引言
JSON Web Tokens (JWT) 是現(xiàn)代 Web 開(kāi)發(fā)中廣泛使用的身份驗(yàn)證機(jī)制。本文將用生動(dòng)的方式帶你全面了解 JWT 在 Python 中的實(shí)現(xiàn),包括生成、驗(yàn)證和各種相關(guān)方法,通過(guò)豐富的比喻、表格和流程圖幫助你徹底掌握 JWT。
一、JWT 是什么?為什么需要它?
想象你去游樂(lè)園,入園時(shí)會(huì)得到一個(gè)手環(huán)。這個(gè)手環(huán):
- 包含信息:顯示你的門票類型(VIP/普通)
- 防偽設(shè)計(jì):有特殊圖案難以偽造
- 有效期:只在當(dāng)天有效
JWT 就是這樣的數(shù)字手環(huán):
| 游樂(lè)園手環(huán) | JWT 令牌 |
|---|---|
| 門票類型 | 用戶角色 |
| 入園時(shí)間 | 簽發(fā)時(shí)間(iat) |
| 閉園時(shí)間 | 過(guò)期時(shí)間(exp) |
| 防偽標(biāo)記 | 數(shù)字簽名 |
| 手環(huán)材質(zhì) | 加密算法 |
傳統(tǒng) session 與 JWT 對(duì)比

| 特性 | Session | JWT |
|---|---|---|
| 存儲(chǔ)位置 | 服務(wù)器 | 客戶端 |
| 擴(kuò)展性 | 需要共享session | 天然無(wú)狀態(tài) |
| 跨域支持 | 需要配置 | 原生支持 |
| 移動(dòng)端友好度 | 需處理cookie | 直接使用header |
| 安全性 | 依賴cookie安全 | 依賴token存儲(chǔ)方式 |
| 典型場(chǎng)景 | 傳統(tǒng)Web應(yīng)用 | API/移動(dòng)應(yīng)用/微服務(wù) |
二、JWT 的結(jié)構(gòu)解析
一個(gè) JWT 看起來(lái)像這樣:xxxxx.yyyyy.zzzzz
就像三明治分三層:
Header (面包上層)
{
"alg": "HS256", // 簽名算法(HMAC SHA256)
"typ": "JWT" // 類型標(biāo)識(shí)
}
Payload (餡料)
{
"sub": "user123", // 主題(用戶ID)
"name": "張三",
"admin": true,
"iat": 1516239022 // 簽發(fā)時(shí)間
}
Signature (面包下層+防偽標(biāo)記)
HMACSHA256( base64UrlEncode(header) + "." + base64UrlEncode(payload), 密鑰 )
生成流程:
[Header] → base64編碼 → xxxxx [Payload] → base64編碼 → yyyyy [xxxxx.yyyyy + 密鑰] → 簽名算法 → zzzzz 最終令牌:xxxxx.yyyyy.zzzzz
三、Python 中實(shí)現(xiàn) JWT
1. 安裝 PyJWT 包
pip install pyjwt
2. 生成 JWT
import jwt
import datetime
from datetime import timezone
# 建議從環(huán)境變量讀取
SECRET_KEY = "your_super_secret_key"
def generate_jwt(user_id: str, username: str, role: str) -> str:
"""生成JWT令牌"""
payload = {
"sub": user_id, # 標(biāo)準(zhǔn)字段:主題
"name": username,
"role": role,
"iat": datetime.datetime.now(tz=timezone.utc), # 簽發(fā)時(shí)間
"exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(hours=1) # 過(guò)期時(shí)間
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
參數(shù)詳解表:
| 參數(shù) | 類型 | 必填 | 說(shuō)明 | 示例 |
|---|---|---|---|---|
| payload | dict | 是 | 負(fù)載數(shù)據(jù) | {“sub”: “user123”} |
| key | str | 是 | 簽名密鑰 | “secret” |
| algorithm | str | 否 | 簽名算法 | “HS256” |
| headers | dict | 否 | 額外頭部 | {“kid”: “key1”} |
標(biāo)準(zhǔn)聲明字段(建議):
| 字段 | 全稱 | 說(shuō)明 | 示例 |
|---|---|---|---|
| sub | Subject | 主題(用戶ID) | “user123” |
| exp | Expiration | 過(guò)期時(shí)間 | 1735689600 |
| iat | Issued At | 簽發(fā)時(shí)間 | 1735686000 |
| aud | Audience | 接收方 | “mobile-app” |
| iss | Issuer | 簽發(fā)者 | “auth-server” |
3. 驗(yàn)證 JWT
def verify_jwt(token: str) -> dict:
"""驗(yàn)證JWT令牌"""
try:
payload = jwt.decode(
token,
SECRET_KEY,
algorithms=["HS256"],
audience="your-app", # 驗(yàn)證接收方
issuer="auth-server" # 驗(yàn)證簽發(fā)方
)
return payload
except jwt.PyJWTError as e:
print(f"Token驗(yàn)證失敗: {type(e).__name__}: {e}")
return None
驗(yàn)證流程示意圖:

4. 錯(cuò)誤處理大全
from fastapi import HTTPException, status
def validate_token(token: str):
try:
return jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token已過(guò)期",
headers={"WWW-Authenticate": "Bearer"}
)
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"無(wú)效Token: {str(e)}",
headers={"WWW-Authenticate": "Bearer"}
)
異常類型表:
| 異常類 | 觸發(fā)條件 | HTTP狀態(tài)碼 | 處理建議 |
|---|---|---|---|
| ExpiredSignatureError | Token過(guò)期 | 401 | 提示重新登錄 |
| InvalidSignatureError | 簽名無(wú)效 | 401 | 拒絕訪問(wèn) |
| InvalidTokenError | 通用錯(cuò)誤 | 401 | 記錄日志 |
| MissingRequiredClaimError | 缺少必要聲明 | 400 | 返回錯(cuò)誤詳情 |
| InvalidIssuerError | 簽發(fā)者不匹配 | 403 | 審計(jì)日志 |
四、高級(jí)應(yīng)用場(chǎng)景
1. 雙令牌系統(tǒng)(Access + Refresh)
以下是基于你的 JWT 認(rèn)證流程整理的表格表示:
| 步驟 | 流程節(jié)點(diǎn) | 條件/判斷 | 動(dòng)作/響應(yīng) | 備注 |
|---|---|---|---|---|
| 1 | 用戶登錄 | 提交賬號(hào)密碼 | 系統(tǒng)驗(yàn)證憑證 | |
| 2 | 驗(yàn)證結(jié)果 | 驗(yàn)證成功 | 生成: - access_token(15分鐘) - refresh_token(7天) | |
| 驗(yàn)證失敗 | 返回錯(cuò)誤信息 | HTTP 401 | ||
| 3 | 返回響應(yīng) | 返回雙token給客戶端 | ||
| 4 | API訪問(wèn) | 攜帶access_token | 驗(yàn)證token有效性 | |
| 5 | 驗(yàn)證結(jié)果 | access_token有效 | 正常返回請(qǐng)求數(shù)據(jù) | HTTP 200 |
| access_token無(wú)效 | 檢查是否存在refresh_token | |||
| 6 | 刷新檢查 | 存在有效refresh_token | 發(fā)放新access_token | HTTP 200 + 新token |
| 無(wú)有效refresh_token | 要求重新登錄 | HTTP 401 |
詳細(xì)說(shuō)明表格:
| 階段 | 條件分支 | 系統(tǒng)行為 | 客戶端響應(yīng) | HTTP狀態(tài)碼 |
|---|---|---|---|---|
| 登錄階段 | ||||
| 1.1 | 憑證正確 | 生成雙token | 接收: - access_token - refresh_token | 200 |
| 1.2 | 憑證錯(cuò)誤 | 終止流程 | 收到錯(cuò)誤提示 | 401 |
| API訪問(wèn)階段 | ||||
| 2.1 | access_token有效 | 處理請(qǐng)求 | 獲取正常數(shù)據(jù) | 200 |
| 2.2 | access_token過(guò)期 | 檢查refresh_token | ||
| Token刷新階段 | ||||
| 3.1 | refresh_token有效 | 發(fā)放新access_token | 獲取新token | 200 |
| 3.2 | refresh_token無(wú)效 | 終止流程 | 要求重新登錄 | 401 |
異常處理補(bǔ)充表:
| 異常情況 | 系統(tǒng)處理 | 客戶端表現(xiàn) |
|---|---|---|
| access_token格式錯(cuò)誤 | 直接拒絕請(qǐng)求 | 收到"無(wú)效token"錯(cuò)誤 |
| refresh_token過(guò)期 | 清除客戶端存儲(chǔ) | 跳轉(zhuǎn)登錄頁(yè)面 |
| 連續(xù)使用過(guò)期refresh_token | 標(biāo)記為安全事件 | 強(qiáng)制登出所有設(shè)備 |
實(shí)現(xiàn)代碼:
def generate_token_pair(user_id: str):
"""生成令牌對(duì)"""
access_payload = {
"sub": user_id,
"type": "access",
"exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
}
refresh_payload = {
"sub": user_id,
"type": "refresh",
"exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(days=7)
}
access_token = jwt.encode(access_payload, SECRET_KEY, algorithm="HS256")
refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm="HS256")
return {
"access_token": access_token,
"refresh_token": refresh_token
}
def refresh_access_token(refresh_token: str):
"""使用refresh_token獲取新access_token"""
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=["HS256"])
if payload.get("type") != "refresh":
raise HTTPException(status_code=400, detail="無(wú)效的refresh token類型")
new_payload = {
"sub": payload["sub"],
"type": "access",
"exp": datetime.datetime.now(tz=timezone.utc) + datetime.timedelta(minutes=15)
}
return jwt.encode(new_payload, SECRET_KEY, algorithm="HS256")
except jwt.PyJWTError as e:
raise HTTPException(status_code=401, detail=f"refresh token無(wú)效: {str(e)}")
2. 與 FastAPI/Django 集成
FastAPI 示例:
from fastapi import Depends, FastAPI, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
security = HTTPBearer()
app = FastAPI()
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.PyJWTError:
raise HTTPException(status_code=401, detail="無(wú)效或過(guò)期的token")
@app.get("/protected")
async def protected_route(user: dict = Depends(get_current_user)):
return {"message": f"你好, {user['sub']}!", "user_data": user}
Django 示例:
from django.http import JsonResponse
from functools import wraps
def jwt_required(view_func):
@wraps(view_func)
def wrapper(request, *args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JsonResponse({"error": "未提供token"}, status=401)
token = auth_header.split(' ')[1]
try:
request.user = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return view_func(request, *args, **kwargs)
except jwt.ExpiredSignatureError:
return JsonResponse({"error": "token已過(guò)期"}, status=401)
except jwt.InvalidTokenError:
return JsonResponse({"error": "無(wú)效token"}, status=401)
return wrapper
@jwt_required
def protected_view(request):
return JsonResponse({"data": "受保護(hù)的內(nèi)容", "user": request.user})
五、安全最佳實(shí)踐
密鑰管理:
- 使用環(huán)境變量存儲(chǔ)密鑰
import os
SECRET_KEY = os.getenv("JWT_SECRET_KEY", "fallback-secret")
- 定期輪換密鑰(密鑰版本控制)
keys = {
"2023": "old-secret",
"2024": "current-secret"
}
增強(qiáng)安全措施:
def generate_secure_token(user, ip):
"""生成帶IP綁定的token"""
payload = {
"sub": user.id,
"ip": ip, # 綁定客戶端IP
"jti": str(uuid.uuid4()) # 唯一標(biāo)識(shí)防止重放
}
return jwt.encode(payload, SECRET_KEY, algorithm="HS256")
def verify_secure_token(token, ip):
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
if payload.get("ip") != ip:
raise ValueError("IP地址不匹配")
return payload
黑名單實(shí)現(xiàn):
from redis import Redis
redis = Redis(host='localhost', port=6379)
def revoke_token(jti: str, expire_in: int):
"""將token加入黑名單"""
redis.setex(f"blacklist:{jti}", expire_in, "revoked")
def is_revoked(jti: str) -> bool:
"""檢查是否在黑名單"""
return bool(redis.exists(f"blacklist:{jti}"))
六、性能優(yōu)化技巧
選擇更快的算法:
算法性能比較表(執(zhí)行時(shí)間,越小越好)
| 算法類型 | 性能指標(biāo)(單位:ms) | 備注 |
|---|---|---|
| HS256 | 1.2 | 對(duì)稱加密算法 |
| RS256 | 3.8 | RSA 非對(duì)稱加密 |
| ES256 | 4.1 | ECDSA 非對(duì)稱加密 |
詳細(xì)對(duì)比表格:
| 特性對(duì)比 | HS256 | RS256 | ES256 |
|---|---|---|---|
| 算法類型 | 對(duì)稱加密 | 非對(duì)稱加密 | 非對(duì)稱加密 |
| 密鑰管理 | 共享密鑰 | 公鑰/私鑰 | 公鑰/私鑰 |
| 簽名速度 | ?? 1.2ms | ?? 3.8ms | ?? 4.1ms |
| 驗(yàn)證速度 | ?? 最快 | ?? 慢 | ?? 最慢 |
| 安全性 | 中等 | 高 | 最高 |
| 適用場(chǎng)景 | 內(nèi)部服務(wù) | 公開(kāi)API | 金融級(jí)應(yīng)用 |
減少payload大小:
# 不推薦 - payload過(guò)大
payload = {**user.__dict__, "iat": ..., "exp": ...}
# 推薦 - 只存儲(chǔ)必要信息
payload = {
"sub": user.id,
"role": user.role,
"iat": ...,
"exp": ...
}
異步簽名驗(yàn)證:
import asyncio
from jwt import PyJWT
async def async_verify(token: str):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
lambda: jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
)
七、完整示例:FastAPI 實(shí)現(xiàn)
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import jwt
import datetime
from datetime import timezone
from typing import Optional
app = FastAPI()
security = HTTPBearer()
# 配置
SECRET_KEY = "your-secret-key-here" # 生產(chǎn)環(huán)境應(yīng)從環(huán)境變量獲取
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE = datetime.timedelta(minutes=15)
REFRESH_TOKEN_EXPIRE = datetime.timedelta(days=7)
# 模擬數(shù)據(jù)庫(kù)
fake_users_db = {
"johndoe": {
"username": "johndoe",
"password": "secret123",
"role": "user"
},
"admin": {
"username": "admin",
"password": "admin123",
"role": "admin"
}
}
class Token(BaseModel):
access_token: str
refresh_token: str
token_type: str
class User(BaseModel):
username: str
role: Optional[str] = None
def create_tokens(username: str):
"""創(chuàng)建access和refresh令牌對(duì)"""
access_payload = {
"sub": username,
"role": fake_users_db[username]["role"],
"type": "access",
"exp": datetime.datetime.now(tz=timezone.utc) + ACCESS_TOKEN_EXPIRE
}
refresh_payload = {
"sub": username,
"type": "refresh",
"exp": datetime.datetime.now(tz=timezone.utc) + REFRESH_TOKEN_EXPIRE
}
access_token = jwt.encode(access_payload, SECRET_KEY, algorithm=ALGORITHM)
refresh_token = jwt.encode(refresh_payload, SECRET_KEY, algorithm=ALGORITHM)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}
async def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""依賴項(xiàng):驗(yàn)證JWT并返回當(dāng)前用戶"""
token = credentials.credentials
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "access":
raise HTTPException(status_code=400, detail="需要access token")
username = payload.get("sub")
if username not in fake_users_db:
raise HTTPException(status_code=404, detail="用戶不存在")
return User(username=username, role=payload.get("role"))
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token已過(guò)期",
headers={"WWW-Authenticate": "Bearer"}
)
except jwt.InvalidTokenError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="無(wú)效Token",
headers={"WWW-Authenticate": "Bearer"}
)
@app.post("/login")
async def login(username: str, password: str):
"""登錄接口"""
if username not in fake_users_db or fake_users_db[username]["password"] != password:
raise HTTPException(status_code=400, detail="用戶名或密碼錯(cuò)誤")
return create_tokens(username)
@app.post("/refresh")
async def refresh_token(refresh_token: str):
"""刷新access token"""
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=400, detail="需要refresh token")
return create_tokens(payload.get("sub"))
except jwt.PyJWTError as e:
raise HTTPException(status_code=401, detail=str(e))
@app.get("/me")
async def read_users_me(current_user: User = Depends(get_current_user)):
"""獲取當(dāng)前用戶信息"""
return current_user
@app.get("/admin")
async def admin_route(current_user: User = Depends(get_current_user)):
"""管理員專屬路由"""
if current_user.role != "admin":
raise HTTPException(status_code=403, detail="無(wú)權(quán)訪問(wèn)")
return {"message": "歡迎管理員"}
結(jié)語(yǔ)
通過(guò)本文,你已經(jīng)掌握了:
- JWT 的核心原理和結(jié)構(gòu)
- Python 中生成和驗(yàn)證 JWT 的完整方法
- 各種高級(jí)應(yīng)用場(chǎng)景和安全實(shí)踐
- 與流行框架的集成方式
記住這些黃金法則:
- 最小化payload:只存儲(chǔ)必要信息
- 保護(hù)密鑰:像保護(hù)銀行密碼一樣保護(hù)你的密鑰
- 合理設(shè)置有效期:平衡安全性和用戶體驗(yàn)
- 考慮刷新機(jī)制:提升用戶體驗(yàn)同時(shí)保持安全
現(xiàn)在,你已經(jīng)準(zhǔn)備好為你的 Python 應(yīng)用實(shí)現(xiàn)強(qiáng)大的 JWT 認(rèn)證系統(tǒng)了!
以上就是Python中JWT認(rèn)證的完整指南(從生成到驗(yàn)證)的詳細(xì)內(nèi)容,更多關(guān)于Python JWT認(rèn)證指南的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Python通過(guò)內(nèi)置函數(shù)和自寫算法DFS實(shí)現(xiàn)排列組合
這篇文章主要介紹了Python通過(guò)內(nèi)置函數(shù)和自寫算法DFS實(shí)現(xiàn)排列組合,排列組合是數(shù)學(xué)中的一種常見(jiàn)的計(jì)算方法,用于求出從給定的元素中選取若干個(gè)元素的所有可能的排列或組合。在Python中,有多種方式可以實(shí)現(xiàn)排列組合的計(jì)算,需要的朋友可以參考下2023-05-05
python 基于opencv實(shí)現(xiàn)高斯平滑
這篇文章主要介紹了python 基于opencv實(shí)現(xiàn)高斯平滑,幫助大家更好的理解和使用python處理圖片,感興趣的朋友可以了解下2020-12-12
python文件編譯為pyc后運(yùn)行的實(shí)現(xiàn)步驟
本文主要介紹了python文件編譯為pyc后運(yùn)行的實(shí)現(xiàn)步驟,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2023-02-02
解決pycharm導(dǎo)入本地py文件時(shí),模塊下方出現(xiàn)紅色波浪線的問(wèn)題
這篇文章主要介紹了解決pycharm導(dǎo)入本地py文件時(shí),模塊下方出現(xiàn)紅色波浪線的問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-06-06
python利用跳板機(jī)ssh遠(yuǎn)程連接redis的方法
今天小編就為大家分享一篇python利用跳板機(jī)ssh遠(yuǎn)程連接redis的方法,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2019-02-02
Python基于opencv的簡(jiǎn)單圖像輪廓形狀識(shí)別(全網(wǎng)最簡(jiǎn)單最少代碼)
這篇文章主要介紹了基于opencv的簡(jiǎn)單圖像輪廓形狀識(shí)別(全網(wǎng)最簡(jiǎn)單最少代碼),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01

