5. 修改统一认证登录和管理员登录是通过接口形式进行,存储返回的accessToken。 6. 修改交叉评查的部分样式
31 KiB
JWT 管理文档
DocAuditAI 系统 JWT(JSON Web Token)管理完整指南
目录
JWT 概述
什么是 JWT?
JWT(JSON Web Token)是一种开放标准(RFC 7519),用于在各方之间安全地传输信息。在 DocAuditAI 系统中,JWT 用于用户身份认证和授权。
JWT 结构
JWT 由三部分组成,用 . 分隔:
Header.Payload.Signature
示例:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo1LCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNzMyMTY0MjA3fQ.Kz8fN3Y8Xp4QmW2vY6Jh9Lk0Rn1Tc3Uv5Wx6Yz7
1. Header(头部)
包含 Token 类型和签名算法:
{
"alg": "HS256",
"typ": "JWT"
}
2. Payload(载荷)
包含用户信息和权限数据:
{
"user_id": 5,
"username": "admin",
"user_role": "系统管理员",
"ou_id": "000",
"ou_name": "test",
"exp": 1732164207,
"iat": 1732077807,
"aud": "docauditai:client"
}
3. Signature(签名)
使用密钥对头部和载荷进行签名,防止篡改:
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
JWT_SECRET
)
JWT 生成机制
生成流程
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 1. 用户登录 │ --> │ 2. 验证凭据 │ --> │ 3. 生成JWT │ --> │ 4. 返回Token │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
代码实现
1. 用户登录接口
接口: POST /auth/login
请求体:
{
"username": "admin",
"password": "your_password"
}
响应:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400,
"user_info": {
"user_id": 5,
"username": "admin",
"roles": ["系统管理员"],
"ou_id": "000",
"ou_name": "test"
}
}
2. JWT 生成代码
位置: app/auth/jwt_handler.py
import jwt
from datetime import datetime, timedelta
import os
def create_access_token(user_data: dict) -> str:
"""
生成 JWT Access Token
Args:
user_data: 用户信息字典
Returns:
JWT Token 字符串
"""
# JWT 配置
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_EXPIRATION_HOURS = int(os.getenv('JWT_EXPIRATION_HOURS', 24))
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
# 计算过期时间
now = datetime.utcnow()
exp = now + timedelta(hours=JWT_EXPIRATION_HOURS)
# 构建 Payload
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'user_role': user_data.get('user_role', ''),
'ou_id': user_data.get('ou_id', ''),
'ou_name': user_data.get('ou_name', ''),
'exp': int(exp.timestamp()),
'iat': int(now.timestamp()),
'aud': JWT_AUDIENCE
}
# 生成 JWT
token = jwt.encode(
payload,
JWT_SECRET,
algorithm=JWT_ALGORITHM
)
return token
3. 登录接口完整示例
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
@router.post("/login")
async def login(request: LoginRequest):
"""用户登录接口"""
# 1. 验证用户名密码(示例:从数据库查询)
user = await verify_credentials(request.username, request.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
# 2. 获取用户角色和权限
user_roles = await get_user_roles(user['id'])
# 3. 构建用户数据
user_data = {
'user_id': user['id'],
'username': user['username'],
'user_role': user_roles[0] if user_roles else '',
'ou_id': user['ou_id'],
'ou_name': user['ou_name']
}
# 4. 生成 JWT
access_token = create_access_token(user_data)
# 5. 返回响应
return {
'access_token': access_token,
'token_type': 'bearer',
'expires_in': 86400, # 24小时(秒)
'user_info': user_data
}
JWT 验证流程
双层验证机制
DocAuditAI 采用两层 JWT 验证机制:
1. 中间件层(快速验证)
位置: app/middleware/jwt_auth.py
目的: 快速验证 Token 存在性,解析基本用户信息
流程:
async def jwt_auth_middleware(request: Request, call_next):
"""JWT 认证中间件(宽松模式)"""
# 白名单路径(无需认证)
if request.url.path in WHITE_LIST:
return await call_next(request)
# 获取 Token
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JSONResponse(
status_code=401,
content={'detail': '缺少认证令牌'}
)
token = auth_header.replace('Bearer ', '')
try:
# 解析 JWT(不验证签名,提升性能)
payload = jwt.decode(
token,
options={"verify_signature": False}
)
# 设置用户信息到 request.state
request.state.current_user = {
'user_id': payload.get('user_id'),
'username': payload.get('username'),
'user_role': payload.get('user_role'),
'ou_id': payload.get('ou_id')
}
except jwt.DecodeError:
return JSONResponse(
status_code=401,
content={'detail': '无效的令牌格式'}
)
return await call_next(request)
特点:
- ✅ 高性能:不验证签名,只解析 Token
- ✅ 覆盖所有请求
- ⚠️ 不验证 Token 是否被篡改
- ⚠️ 不验证 Token 是否过期
2. 路由层(完整验证)
位置: app/auth/jwt_handler.py 中的 verify_token() 依赖函数
目的: 严格验证 Token 完整性和有效性
流程:
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import os
security = HTTPBearer()
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""
完整 JWT 验证(路由层)
验证内容:
1. JWT 签名验证
2. 过期时间验证
3. Audience 验证
4. 必需字段验证
Returns:
用户信息字典
Raises:
HTTPException: Token 无效、过期或缺少必需字段
"""
token = credentials.credentials
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
try:
# 完整验证(包含签名、过期时间、audience)
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
audience=JWT_AUDIENCE,
options={
"verify_signature": True,
"verify_exp": True,
"verify_aud": True
}
)
# 验证必需字段
required_fields = ['user_id', 'username', 'user_role', 'exp']
for field in required_fields:
if field not in payload:
raise HTTPException(
status_code=401,
detail=f'Token 缺少必需字段: {field}'
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail='令牌已过期,请重新登录'
)
except jwt.InvalidAudienceError:
raise HTTPException(
status_code=401,
detail='令牌受众验证失败'
)
except jwt.InvalidSignatureError:
raise HTTPException(
status_code=401,
detail='令牌签名验证失败'
)
except jwt.DecodeError:
raise HTTPException(
status_code=401,
detail='令牌格式错误'
)
使用示例:
from fastapi import APIRouter, Depends
from app.auth.jwt_handler import verify_token
router = APIRouter()
@router.get("/protected/resource")
async def get_protected_resource(
current_user: dict = Depends(verify_token)
):
"""需要完整 JWT 验证的接口"""
return {
'message': '访问成功',
'user_id': current_user['user_id'],
'username': current_user['username']
}
特点:
- ✅ 严格验证:签名、过期时间、受众、必需字段
- ✅ 防篡改:验证 Token 签名
- ✅ 防重放:验证过期时间
- ⚠️ 性能较低:需要密钥解密和验证
验证流程对比表
| 验证内容 | 中间件层 | 路由层 |
|---|---|---|
| Token 存在性 | ✅ | ✅ |
| Token 格式正确 | ✅ | ✅ |
| JWT 签名验证 | ❌ | ✅ |
| 过期时间验证 | ❌ | ✅ |
| Audience 验证 | ❌ | ✅ |
| 必需字段验证 | ❌ | ✅ |
| 性能 | 高 | 中 |
| 使用场景 | 所有请求 | 敏感操作 |
JWT 配置说明
环境变量配置
位置: config/env.{port} (例如 config/env.8073)
# JWT 配置
JWT_SECRET=your-256-bit-secret-key-here
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=24
JWT_AUDIENCE=docauditai:client
# PostgREST JWT 配置(不同于用户 JWT)
PGRST_JWT_SECRET=different-secret-for-postgrest
配置项说明
| 配置项 | 说明 | 默认值 | 推荐值 |
|---|---|---|---|
JWT_SECRET |
JWT 签名密钥(256位) | your-secret-key |
随机生成的256位字符串 |
JWT_ALGORITHM |
JWT 签名算法 | HS256 |
HS256(对称)或 RS256(非对称) |
JWT_EXPIRATION_HOURS |
Token 有效期(小时) | 24 |
24(生产环境) / 1(测试环境) |
JWT_AUDIENCE |
JWT 受众标识 | docauditai:client |
docauditai:client |
PGRST_JWT_SECRET |
PostgREST 内部 JWT 密钥 | - | 与 JWT_SECRET 不同的密钥 |
密钥生成示例
方法1:Python 生成
import secrets
# 生成 256 位(32 字节)密钥
jwt_secret = secrets.token_urlsafe(32)
print(f"JWT_SECRET={jwt_secret}")
方法2:OpenSSL 生成
openssl rand -base64 32
方法3:在线生成
访问:https://randomkeygen.com/ (选择 256-bit 密钥)
安全注意事项
⚠️ 重要:
- JWT_SECRET 必须保密,不要提交到代码仓库
- JWT_SECRET 和 PGRST_JWT_SECRET 必须不同
- 生产环境使用强随机密钥(至少 256 位)
- 定期轮换密钥(建议每 90 天)
JWT 刷新机制
刷新策略
DocAuditAI 支持两种 Token 刷新策略:
策略1:Token 自动续期(推荐)
原理: Token 过期前 N 小时内访问任何接口,自动返回新 Token
配置:
JWT_AUTO_REFRESH_ENABLED=true
JWT_AUTO_REFRESH_THRESHOLD_HOURS=6
响应头:
X-New-Access-Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
前端处理:
// axios 响应拦截器
apiClient.interceptors.response.use(
response => {
// 检查是否有新 Token
const newToken = response.headers['x-new-access-token'];
if (newToken) {
localStorage.setItem('access_token', newToken);
console.log('Token 已自动续期');
}
return response;
},
error => Promise.reject(error)
);
策略2:Refresh Token 机制
原理: 登录时返回 Access Token + Refresh Token,Access Token 过期后使用 Refresh Token 获取新 Token
登录响应:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400
}
刷新接口:
请求: POST /auth/refresh
请求体:
{
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
响应:
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400
}
前端实现:
async function refreshAccessToken() {
const refreshToken = localStorage.getItem('refresh_token');
if (!refreshToken) {
// 跳转登录页
window.location.href = '/login';
return null;
}
try {
const response = await fetch('/auth/refresh', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh_token: refreshToken })
});
if (response.ok) {
const data = await response.json();
localStorage.setItem('access_token', data.access_token);
return data.access_token;
} else {
// Refresh Token 也过期,跳转登录页
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
window.location.href = '/login';
return null;
}
} catch (error) {
console.error('Token 刷新失败:', error);
return null;
}
}
// axios 响应拦截器(自动刷新)
apiClient.interceptors.response.use(
response => response,
async error => {
const originalRequest = error.config;
// 如果是 401 错误且未重试过
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
// 刷新 Token
const newToken = await refreshAccessToken();
if (newToken) {
// 使用新 Token 重试原请求
originalRequest.headers.Authorization = `Bearer ${newToken}`;
return apiClient(originalRequest);
}
}
return Promise.reject(error);
}
);
JWT 吊销管理
吊销场景
- 用户主动登出
- 管理员强制下线用户
- 密码修改后旧 Token 失效
- 检测到异常登录
吊销实现方式
方式1:黑名单(推荐)
原理: 将吊销的 Token 存入 Redis 黑名单,验证时检查
表结构: jwt_tokens 表
CREATE TABLE jwt_tokens (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
token_jti VARCHAR(64) UNIQUE NOT NULL, -- JWT ID
token_type VARCHAR(20) DEFAULT 'access',
revoked BOOLEAN DEFAULT false,
revoked_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_jwt_tokens_jti ON jwt_tokens(token_jti);
CREATE INDEX idx_jwt_tokens_user_id ON jwt_tokens(user_id);
生成 Token 时记录:
import uuid
def create_access_token(user_data: dict) -> str:
# 生成唯一 JWT ID
jti = str(uuid.uuid4())
# Payload 中包含 jti
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'jti': jti, # 唯一标识
'exp': int(exp.timestamp()),
'iat': int(now.timestamp()),
'aud': JWT_AUDIENCE
}
# 生成 Token
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
# 记录到数据库
await record_token(
user_id=user_data['user_id'],
token_jti=jti,
expires_at=exp
)
return token
验证 Token 时检查黑名单:
async def verify_token(token: str) -> dict:
# 解码 Token
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
jti = payload.get('jti')
# 检查是否在黑名单
is_revoked = await check_token_revoked(jti)
if is_revoked:
raise HTTPException(
status_code=401,
detail='令牌已被吊销'
)
return payload
async def check_token_revoked(jti: str) -> bool:
"""检查 Token 是否已吊销"""
# 先查 Redis 缓存
cache_key = f"jwt:revoked:{jti}"
is_revoked = await redis_client.get(cache_key)
if is_revoked is not None:
return is_revoked == 'true'
# 查数据库
query = "SELECT revoked FROM jwt_tokens WHERE token_jti = $1"
result = await db_pool.fetchrow(query, jti)
if result and result['revoked']:
# 缓存到 Redis(24小时)
await redis_client.setex(cache_key, 86400, 'true')
return True
return False
吊销 Token 接口:
登出接口: POST /auth/logout
@router.post("/logout")
async def logout(current_user: dict = Depends(verify_token)):
"""用户登出"""
jti = current_user.get('jti')
# 吊销 Token
await revoke_token(jti)
return {'message': '登出成功'}
async def revoke_token(jti: str):
"""吊销指定 Token"""
query = """
UPDATE jwt_tokens
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
WHERE token_jti = $1
"""
await db_pool.execute(query, jti)
# 同步到 Redis
cache_key = f"jwt:revoked:{jti}"
await redis_client.setex(cache_key, 86400, 'true')
强制下线用户:
async def revoke_user_all_tokens(user_id: int):
"""吊销用户的所有 Token"""
query = """
UPDATE jwt_tokens
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
WHERE user_id = $1 AND revoked = false
RETURNING token_jti
"""
results = await db_pool.fetch(query, user_id)
# 同步到 Redis
for row in results:
cache_key = f"jwt:revoked:{row['token_jti']}"
await redis_client.setex(cache_key, 86400, 'true')
方式2:Token 版本号
原理: 为每个用户维护 Token 版本号,密码修改/强制下线时递增版本号
用户表新增字段:
ALTER TABLE sso_users ADD COLUMN token_version INTEGER DEFAULT 1;
Payload 中包含版本号:
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'token_version': user_data['token_version'], # 版本号
'exp': int(exp.timestamp()),
'aud': JWT_AUDIENCE
}
验证时检查版本号:
async def verify_token(token: str) -> dict:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# 获取用户当前版本号
current_version = await get_user_token_version(payload['user_id'])
# 检查版本号是否匹配
if payload.get('token_version', 0) != current_version:
raise HTTPException(
status_code=401,
detail='令牌版本已过期'
)
return payload
密码修改后递增版本号:
async def change_password(user_id: int, new_password: str):
# 更新密码
await update_user_password(user_id, new_password)
# 递增 Token 版本号(使所有旧 Token 失效)
query = """
UPDATE sso_users
SET token_version = token_version + 1
WHERE id = $1
"""
await db_pool.execute(query, user_id)
前端对接指南
1. 登录流程
// 登录函数
async function login(username, password) {
try {
const response = await fetch('/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password })
});
if (response.ok) {
const data = await response.json();
// 存储 Token
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('user_info', JSON.stringify(data.user_info));
// 可选:存储 Refresh Token
if (data.refresh_token) {
localStorage.setItem('refresh_token', data.refresh_token);
}
// 跳转首页
window.location.href = '/';
} else {
const error = await response.json();
alert(`登录失败: ${error.detail}`);
}
} catch (error) {
console.error('登录错误:', error);
alert('网络错误,请稍后重试');
}
}
2. 配置 Axios 拦截器
import axios from 'axios';
// 创建 Axios 实例
const apiClient = axios.create({
baseURL: 'http://localhost:8000/api/v1',
headers: { 'Content-Type': 'application/json' }
});
// 请求拦截器:自动添加 Token
apiClient.interceptors.request.use(
config => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
error => Promise.reject(error)
);
// 响应拦截器:处理 401 错误
apiClient.interceptors.response.use(
response => {
// 检查是否有新 Token(自动续期)
const newToken = response.headers['x-new-access-token'];
if (newToken) {
localStorage.setItem('access_token', newToken);
}
return response;
},
async error => {
const originalRequest = error.config;
// 401 错误处理
if (error.response?.status === 401) {
// 如果有 Refresh Token,尝试刷新
if (!originalRequest._retry && localStorage.getItem('refresh_token')) {
originalRequest._retry = true;
const newToken = await refreshAccessToken();
if (newToken) {
originalRequest.headers.Authorization = `Bearer ${newToken}`;
return apiClient(originalRequest);
}
}
// 否则跳转登录页
localStorage.clear();
window.location.href = '/login';
}
return Promise.reject(error);
}
);
export default apiClient;
3. 获取用户信息
// 从 localStorage 获取
function getCurrentUser() {
const userInfo = localStorage.getItem('user_info');
return userInfo ? JSON.parse(userInfo) : null;
}
// 从服务器获取最新用户信息
async function fetchCurrentUser() {
try {
const response = await apiClient.get('/auth/me');
const userInfo = response.data;
localStorage.setItem('user_info', JSON.stringify(userInfo));
return userInfo;
} catch (error) {
console.error('获取用户信息失败:', error);
return null;
}
}
4. 登出流程
async function logout() {
try {
// 调用登出接口(吊销 Token)
await apiClient.post('/auth/logout');
} catch (error) {
console.error('登出接口调用失败:', error);
} finally {
// 清除本地存储
localStorage.clear();
// 跳转登录页
window.location.href = '/login';
}
}
5. 路由守卫(Vue.js 示例)
import { createRouter, createWebHistory } from 'vue-router';
const router = createRouter({
history: createWebHistory(),
routes: [
{ path: '/login', component: LoginPage },
{ path: '/', component: HomePage, meta: { requiresAuth: true } },
{ path: '/documents', component: DocumentList, meta: { requiresAuth: true } }
]
});
// 全局路由守卫
router.beforeEach((to, from, next) => {
const token = localStorage.getItem('access_token');
// 需要认证的路由
if (to.meta.requiresAuth && !token) {
next('/login');
return;
}
// 已登录用户访问登录页,跳转首页
if (to.path === '/login' && token) {
next('/');
return;
}
next();
});
export default router;
安全最佳实践
1. Token 存储
推荐方式: localStorage 或 sessionStorage
不推荐: Cookie(容易受到 CSRF 攻击)
示例:
// 存储
localStorage.setItem('access_token', token);
// 读取
const token = localStorage.getItem('access_token');
// 删除
localStorage.removeItem('access_token');
2. HTTPS 传输
⚠️ 生产环境必须使用 HTTPS
- Token 通过 HTTPS 加密传输
- 防止中间人攻击(MITM)
- 防止 Token 被窃取
3. Token 有效期
推荐配置:
- 生产环境: 24 小时
- 测试环境: 1-2 小时
- 开发环境: 7 天(方便调试)
4. 密钥管理
✅ 正确做法:
- 使用环境变量存储密钥
- 生产环境使用强随机密钥
- 定期轮换密钥(90 天)
- 使用密钥管理服务(AWS KMS、Azure Key Vault)
❌ 错误做法:
- 硬编码密钥在代码中
- 将密钥提交到 Git
- 使用弱密钥(如
secret、12345)
5. Token 吊销
✅ 必须实现:
- 用户登出时吊销 Token
- 密码修改后吊销所有旧 Token
- 检测到异常登录时吊销 Token
- 管理员强制下线用户
6. 防止 XSS 攻击
✅ 防护措施:
- ���端对用户输入进行转义
- 使用 CSP(Content Security Policy)
- 避免在 URL 中传递 Token
- 使用 HttpOnly Cookie(如果使用 Cookie 存储)
7. 防止 CSRF 攻击
✅ 防护措施:
- 使用 Bearer Token 而非 Cookie
- 验证 Referer 头
- 使用 CSRF Token
常见问题
Q1: Token 过期后如何处理?
A: 使用 Refresh Token 机制或自动续期机制:
// 响应拦截器
apiClient.interceptors.response.use(
response => response,
async error => {
if (error.response?.status === 401 && !error.config._retry) {
error.config._retry = true;
const newToken = await refreshAccessToken();
if (newToken) {
error.config.headers.Authorization = `Bearer ${newToken}`;
return apiClient(error.config);
}
}
return Promise.reject(error);
}
);
Q2: 如何在多标签页间同步登录状态?
A: 使用 storage 事件监听 localStorage 变化:
window.addEventListener('storage', (event) => {
if (event.key === 'access_token') {
if (!event.newValue) {
// Token 被删除,其他标签页也跳转登录
window.location.href = '/login';
} else {
// Token 被更新,其他标签页也更新
console.log('Token 已在其他标签页更新');
}
}
});
Q3: JWT 和 Session 有什么区别?
A:
| 特性 | JWT | Session |
|---|---|---|
| 存储位置 | 客户端(localStorage) | 服务器(Redis/数据库) |
| 状态 | 无状态 | 有状态 |
| 可扩展性 | 高(无需共享状态) | 低(需要共享 Session) |
| 吊销难度 | 高(需要黑名单) | 低(直接删除 Session) |
| 性能 | 高(无需查询) | 低(需要查询 Session) |
Q4: 如何实现"记住我"功能?
A: 使用长期有效的 Refresh Token:
// 登录时勾选"记住我"
async function login(username, password, rememberMe) {
const response = await fetch('/auth/login', {
method: 'POST',
body: JSON.stringify({
username,
password,
remember_me: rememberMe // 传递"记住我"标志
})
});
const data = await response.json();
if (rememberMe) {
// 长期有效的 Refresh Token(30 天)
localStorage.setItem('refresh_token', data.refresh_token);
}
}
Q5: 如何检测 Token 是否被盗用?
A: 记录 Token 使用的 IP 和 User-Agent,检测异常:
@router.post("/login")
async def login(request: Request, login_data: LoginRequest):
# 获取客户端信息
client_ip = request.client.host
user_agent = request.headers.get('User-Agent', '')
# 生成 Token 时记录
await record_token_metadata(
jti=jti,
client_ip=client_ip,
user_agent=user_agent
)
async def verify_token(request: Request, token: str):
# 检查 IP 和 User-Agent 是否匹配
current_ip = request.client.host
current_user_agent = request.headers.get('User-Agent', '')
metadata = await get_token_metadata(jti)
if metadata['client_ip'] != current_ip:
# IP 不匹配,可能被盗用
await log_security_event(
'Token used from different IP',
jti=jti,
original_ip=metadata['client_ip'],
current_ip=current_ip
)
# 可选:吊销 Token
await revoke_token(jti)
raise HTTPException(status_code=401, detail='Token 异常使用')
故障排查
问题1: Token 验证失败 "InvalidSignatureError"
原因: JWT_SECRET 不一致
解决:
- 检查
config/env.{port}文件中的JWT_SECRET - 确保前端请求的后端实例使用相同的
JWT_SECRET - 重新登录获取新 Token
问题2: Token 过期 "ExpiredSignatureError"
原因: Token 超过有效期
解决:
- 使用 Refresh Token 刷新
- 重新登录
- 检查服务器时间是否正确(
exp基于 UTC 时间)
问题3: Token 解码失败 "DecodeError"
原因: Token 格式错误或被篡改
解决:
- 检查 Token 是否完整(三部分用
.分隔) - 检查 Token 是否被 URL 编码(不应该编码)
- 检查前端是否正确存储和传递 Token
问题4: 中间件不生效
原因: 中间件注册顺序错误或白名单配置错误
解决:
- 检查
core/app.py中间件注册顺序(JWT 中间件应在路由之前) - 检查白名单配置
WHITE_LIST - 检查路由路径是否匹配
问题5: PostgREST 返回 401
原因: PostgREST 使用的 JWT Secret 不同
解决:
- 确认
PGRST_JWT_SECRET配置 - PostgREST 内部 JWT 由后端生成,前端不直接使用
- 检查 PostgREST 代理是否正确转换 Token
调试技巧
1. 解码 Token(不验证签名)
import jwt
# 解码 Token(查看内容)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
payload = jwt.decode(token, options={"verify_signature": False})
print(payload)
// 前端解码 Token
function decodeToken(token) {
const base64Url = token.split('.')[1];
const base64 = base64Url.replace(/-/g, '+').replace(/_/g, '/');
const jsonPayload = decodeURIComponent(atob(base64).split('').map(function(c) {
return '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2);
}).join(''));
return JSON.parse(jsonPayload);
}
const token = localStorage.getItem('access_token');
console.log(decodeToken(token));
2. 检查 Token 是否过期
function isTokenExpired(token) {
const payload = decodeToken(token);
const now = Math.floor(Date.now() / 1000); // 当前时间(秒)
return payload.exp < now;
}
if (isTokenExpired(token)) {
console.log('Token 已过期');
}
3. 启用详细日志
# config/env.{port}
LOG_LEVEL=DEBUG
JWT_DEBUG_ENABLED=true
附录
A. JWT Payload 字段完整说明
| 字段 | 类型 | 必需 | 说明 | 示例 |
|---|---|---|---|---|
user_id |
integer | ✅ | 用户ID | 5 |
username |
string | ✅ | 用户名 | "admin" |
user_role |
string | ✅ | 用户角色 | "系统管理员" |
ou_id |
string | ❌ | 组织单位ID | "000" |
ou_name |
string | ❌ | 组织单位名称 | "test" |
exp |
integer | ✅ | 过期时间(Unix 时间戳) | 1732164207 |
iat |
integer | ✅ | 签发时间(Unix 时间戳) | 1732077807 |
aud |
string | ✅ | 受众 | "docauditai:client" |
jti |
string | ❌ | JWT ID(用于吊销) | "550e8400-e29b-41d4-a716-446655440000" |
token_version |
integer | ❌ | Token 版本号 | 1 |
B. HTTP 状态码说明
| 状态码 | 说明 | 场景 |
|---|---|---|
| 200 | 成功 | 登录成功、Token 验证成功 |
| 401 | 未授权 | Token 缺失、无效、过期、被吊销 |
| 403 | 禁止访问 | 权限不足 |
| 422 | 验证错误 | 登录参数错误 |
C. 错误响应示例
{
"detail": "令牌已过期,请重新登录",
"error_code": "TOKEN_EXPIRED",
"timestamp": "2025-11-17T10:30:00Z"
}
文档版本: v1.0 最后更新: 2025-11-17 维护者: DocAuditAI Team