Files
leaudit-platform-frontend/auth_doc/JWT管理文档.md
T
2025-11-18 11:06:24 +08:00

1317 lines
31 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# JWT 管理文档
DocAuditAI 系统 JWTJSON Web Token)管理完整指南
## 目录
1. [JWT 概述](#jwt-概述)
2. [JWT 生成机制](#jwt-生成机制)
3. [JWT 验证流程](#jwt-验证流程)
4. [JWT 配置说明](#jwt-配置说明)
5. [JWT 刷新机制](#jwt-刷新机制)
6. [JWT 吊销管理](#jwt-吊销管理)
7. [前端对接指南](#前端对接指南)
8. [安全最佳实践](#安全最佳实践)
9. [常见问题](#常见问题)
10. [故障排查](#故障排查)
---
## JWT 概述
### 什么是 JWT
JWTJSON Web Token)是一种开放标准(RFC 7519),用于在各方之间安全地传输信息。在 DocAuditAI 系统中,JWT 用于用户身份认证和授权。
### JWT 结构
JWT 由三部分组成,用 `.` 分隔:
```
Header.Payload.Signature
```
**示例**
```
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo1LCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNzMyMTY0MjA3fQ.Kz8fN3Y8Xp4QmW2vY6Jh9Lk0Rn1Tc3Uv5Wx6Yz7
```
#### 1. Header(头部)
包含 Token 类型和签名算法:
```json
{
"alg": "HS256",
"typ": "JWT"
}
```
#### 2. Payload(载荷)
包含用户信息和权限数据:
```json
{
"user_id": 5,
"username": "admin",
"user_role": "系统管理员",
"ou_id": "000",
"ou_name": "test",
"exp": 1732164207,
"iat": 1732077807,
"aud": "docauditai:client"
}
```
#### 3. Signature(签名)
使用密钥对头部和载荷进行签名,防止篡改:
```
HMACSHA256(
base64UrlEncode(header) + "." +
base64UrlEncode(payload),
JWT_SECRET
)
```
---
## JWT 生成机制
### 生成流程
```
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ 1. 用户登录 │ --> │ 2. 验证凭据 │ --> │ 3. 生成JWT │ --> │ 4. 返回Token │
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
```
### 代码实现
#### 1. 用户登录接口
**接口**: `POST /auth/login`
**请求体**:
```json
{
"username": "admin",
"password": "your_password"
}
```
**响应**:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400,
"user_info": {
"user_id": 5,
"username": "admin",
"roles": ["系统管理员"],
"ou_id": "000",
"ou_name": "test"
}
}
```
#### 2. JWT 生成代码
**位置**: `app/auth/jwt_handler.py`
```python
import jwt
from datetime import datetime, timedelta
import os
def create_access_token(user_data: dict) -> str:
"""
生成 JWT Access Token
Args:
user_data: 用户信息字典
Returns:
JWT Token 字符串
"""
# JWT 配置
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_EXPIRATION_HOURS = int(os.getenv('JWT_EXPIRATION_HOURS', 24))
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
# 计算过期时间
now = datetime.utcnow()
exp = now + timedelta(hours=JWT_EXPIRATION_HOURS)
# 构建 Payload
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'user_role': user_data.get('user_role', ''),
'ou_id': user_data.get('ou_id', ''),
'ou_name': user_data.get('ou_name', ''),
'exp': int(exp.timestamp()),
'iat': int(now.timestamp()),
'aud': JWT_AUDIENCE
}
# 生成 JWT
token = jwt.encode(
payload,
JWT_SECRET,
algorithm=JWT_ALGORITHM
)
return token
```
#### 3. 登录接口完整示例
```python
from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel
router = APIRouter()
class LoginRequest(BaseModel):
username: str
password: str
@router.post("/login")
async def login(request: LoginRequest):
"""用户登录接口"""
# 1. 验证用户名密码(示例:从数据库查询)
user = await verify_credentials(request.username, request.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误"
)
# 2. 获取用户角色和权限
user_roles = await get_user_roles(user['id'])
# 3. 构建用户数据
user_data = {
'user_id': user['id'],
'username': user['username'],
'user_role': user_roles[0] if user_roles else '',
'ou_id': user['ou_id'],
'ou_name': user['ou_name']
}
# 4. 生成 JWT
access_token = create_access_token(user_data)
# 5. 返回响应
return {
'access_token': access_token,
'token_type': 'bearer',
'expires_in': 86400, # 24小时(秒)
'user_info': user_data
}
```
---
## JWT 验证流程
### 双层验证机制
DocAuditAI 采用**两层 JWT 验证机制**
#### 1. 中间件层(快速验证)
**位置**: `app/middleware/jwt_auth.py`
**目的**: 快速验证 Token 存在性,解析基本用户信息
**流程**:
```python
async def jwt_auth_middleware(request: Request, call_next):
"""JWT 认证中间件(宽松模式)"""
# 白名单路径(无需认证)
if request.url.path in WHITE_LIST:
return await call_next(request)
# 获取 Token
auth_header = request.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
return JSONResponse(
status_code=401,
content={'detail': '缺少认证令牌'}
)
token = auth_header.replace('Bearer ', '')
try:
# 解析 JWT(不验证签名,提升性能)
payload = jwt.decode(
token,
options={"verify_signature": False}
)
# 设置用户信息到 request.state
request.state.current_user = {
'user_id': payload.get('user_id'),
'username': payload.get('username'),
'user_role': payload.get('user_role'),
'ou_id': payload.get('ou_id')
}
except jwt.DecodeError:
return JSONResponse(
status_code=401,
content={'detail': '无效的令牌格式'}
)
return await call_next(request)
```
**特点**:
- ✅ 高性能:不验证签名,只解析 Token
- ✅ 覆盖所有请求
- ⚠️ 不验证 Token 是否被篡改
- ⚠️ 不验证 Token 是否过期
#### 2. 路由层(完整验证)
**位置**: `app/auth/jwt_handler.py` 中的 `verify_token()` 依赖函数
**目的**: 严格验证 Token 完整性和有效性
**流程**:
```python
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import os
security = HTTPBearer()
async def verify_token(
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""
完整 JWT 验证(路由层)
验证内容:
1. JWT 签名验证
2. 过期时间验证
3. Audience 验证
4. 必需字段验证
Returns:
用户信息字典
Raises:
HTTPException: Token 无效、过期或缺少必需字段
"""
token = credentials.credentials
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
try:
# 完整验证(包含签名、过期时间、audience)
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=[JWT_ALGORITHM],
audience=JWT_AUDIENCE,
options={
"verify_signature": True,
"verify_exp": True,
"verify_aud": True
}
)
# 验证必需字段
required_fields = ['user_id', 'username', 'user_role', 'exp']
for field in required_fields:
if field not in payload:
raise HTTPException(
status_code=401,
detail=f'Token 缺少必需字段: {field}'
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=401,
detail='令牌已过期,请重新登录'
)
except jwt.InvalidAudienceError:
raise HTTPException(
status_code=401,
detail='令牌受众验证失败'
)
except jwt.InvalidSignatureError:
raise HTTPException(
status_code=401,
detail='令牌签名验证失败'
)
except jwt.DecodeError:
raise HTTPException(
status_code=401,
detail='令牌格式错误'
)
```
**使用示例**:
```python
from fastapi import APIRouter, Depends
from app.auth.jwt_handler import verify_token
router = APIRouter()
@router.get("/protected/resource")
async def get_protected_resource(
current_user: dict = Depends(verify_token)
):
"""需要完整 JWT 验证的接口"""
return {
'message': '访问成功',
'user_id': current_user['user_id'],
'username': current_user['username']
}
```
**特点**:
- ✅ 严格验证:签名、过期时间、受众、必需字段
- ✅ 防篡改:验证 Token 签名
- ✅ 防重放:验证过期时间
- ⚠️ 性能较低:需要密钥解密和验证
### 验证流程对比表
| 验证内容 | 中间件层 | 路由层 |
|---------|---------|--------|
| Token 存在性 | ✅ | ✅ |
| Token 格式正确 | ✅ | ✅ |
| JWT 签名验证 | ❌ | ✅ |
| 过期时间验证 | ❌ | ✅ |
| Audience 验证 | ❌ | ✅ |
| 必需字段验证 | ❌ | ✅ |
| 性能 | 高 | 中 |
| 使用场景 | 所有请求 | 敏感操作 |
---
## JWT 配置说明
### 环境变量配置
**位置**: `config/env.{port}` (例如 `config/env.8073`
```ini
# JWT 配置
JWT_SECRET=your-256-bit-secret-key-here
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=24
JWT_AUDIENCE=docauditai:client
# PostgREST JWT 配置(不同于用户 JWT)
PGRST_JWT_SECRET=different-secret-for-postgrest
```
### 配置项说明
| 配置项 | 说明 | 默认值 | 推荐值 |
|--------|------|--------|--------|
| `JWT_SECRET` | JWT 签名密钥(256位) | `your-secret-key` | 随机生成的256位字符串 |
| `JWT_ALGORITHM` | JWT 签名算法 | `HS256` | `HS256`(对称)或 `RS256`(非对称) |
| `JWT_EXPIRATION_HOURS` | Token 有效期(小时) | `24` | `24`(生产环境) / `1`(测试环境) |
| `JWT_AUDIENCE` | JWT 受众标识 | `docauditai:client` | `docauditai:client` |
| `PGRST_JWT_SECRET` | PostgREST 内部 JWT 密钥 | - | 与 `JWT_SECRET` 不同的密钥 |
### 密钥生成示例
#### 方法1Python 生成
```python
import secrets
# 生成 256 位(32 字节)密钥
jwt_secret = secrets.token_urlsafe(32)
print(f"JWT_SECRET={jwt_secret}")
```
#### 方法2OpenSSL 生成
```bash
openssl rand -base64 32
```
#### 方法3:在线生成
访问:https://randomkeygen.com/ (选择 256-bit 密钥)
### 安全注意事项
⚠️ **重要**
1. **JWT_SECRET 必须保密**,不要提交到代码仓库
2. **JWT_SECRET 和 PGRST_JWT_SECRET 必须不同**
3. **生产环境使用强随机密钥**(至少 256 位)
4. **定期轮换密钥**(建议每 90 天)
---
## JWT 刷新机制
### 刷新策略
DocAuditAI 支持两种 Token 刷新策略:
#### 策略1:Token 自动续期(推荐)
**原理**: Token 过期前 N 小时内访问任何接口,自动返回新 Token
**配置**:
```ini
JWT_AUTO_REFRESH_ENABLED=true
JWT_AUTO_REFRESH_THRESHOLD_HOURS=6
```
**响应头**:
```
X-New-Access-Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
```
**前端处理**:
```javascript
// axios 响应拦截器
apiClient.interceptors.response.use(
response => {
// 检查是否有新 Token
const newToken = response.headers['x-new-access-token'];
if (newToken) {
localStorage.setItem('access_token', newToken);
console.log('Token 已自动续期');
}
return response;
},
error => Promise.reject(error)
);
```
#### 策略2Refresh Token 机制
**原理**: 登录时返回 Access Token + Refresh TokenAccess Token 过期后使用 Refresh Token 获取新 Token
**登录响应**:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400
}
```
**刷新接口**:
**请求**: `POST /auth/refresh`
**请求体**:
```json
{
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
```
**响应**:
```json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 86400
}
```
**前端实现**:
```javascript
async function refreshAccessToken() {
const refreshToken = localStorage.getItem('refresh_token');
if (!refreshToken) {
// 跳转登录页
window.location.href = '/login';
return null;
}
try {
const response = await fetch('/auth/refresh', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ refresh_token: refreshToken })
});
if (response.ok) {
const data = await response.json();
localStorage.setItem('access_token', data.access_token);
return data.access_token;
} else {
// Refresh Token 也过期,跳转登录页
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
window.location.href = '/login';
return null;
}
} catch (error) {
console.error('Token 刷新失败:', error);
return null;
}
}
// axios 响应拦截器(自动刷新)
apiClient.interceptors.response.use(
response => response,
async error => {
const originalRequest = error.config;
// 如果是 401 错误且未重试过
if (error.response?.status === 401 && !originalRequest._retry) {
originalRequest._retry = true;
// 刷新 Token
const newToken = await refreshAccessToken();
if (newToken) {
// 使用新 Token 重试原请求
originalRequest.headers.Authorization = `Bearer ${newToken}`;
return apiClient(originalRequest);
}
}
return Promise.reject(error);
}
);
```
---
## JWT 吊销管理
### 吊销场景
1. **用户主动登出**
2. **管理员强制下线用户**
3. **密码修改后旧 Token 失效**
4. **检测到异常登录**
### 吊销实现方式
#### 方式1:黑名单(推荐)
**原理**: 将吊销的 Token 存入 Redis 黑名单,验证时检查
**表结构**: `jwt_tokens`
```sql
CREATE TABLE jwt_tokens (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
token_jti VARCHAR(64) UNIQUE NOT NULL, -- JWT ID
token_type VARCHAR(20) DEFAULT 'access',
revoked BOOLEAN DEFAULT false,
revoked_at TIMESTAMP,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX idx_jwt_tokens_jti ON jwt_tokens(token_jti);
CREATE INDEX idx_jwt_tokens_user_id ON jwt_tokens(user_id);
```
**生成 Token 时记录**:
```python
import uuid
def create_access_token(user_data: dict) -> str:
# 生成唯一 JWT ID
jti = str(uuid.uuid4())
# Payload 中包含 jti
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'jti': jti, # 唯一标识
'exp': int(exp.timestamp()),
'iat': int(now.timestamp()),
'aud': JWT_AUDIENCE
}
# 生成 Token
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
# 记录到数据库
await record_token(
user_id=user_data['user_id'],
token_jti=jti,
expires_at=exp
)
return token
```
**验证 Token 时检查黑名单**:
```python
async def verify_token(token: str) -> dict:
# 解码 Token
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
jti = payload.get('jti')
# 检查是否在黑名单
is_revoked = await check_token_revoked(jti)
if is_revoked:
raise HTTPException(
status_code=401,
detail='令牌已被吊销'
)
return payload
async def check_token_revoked(jti: str) -> bool:
"""检查 Token 是否已吊销"""
# 先查 Redis 缓存
cache_key = f"jwt:revoked:{jti}"
is_revoked = await redis_client.get(cache_key)
if is_revoked is not None:
return is_revoked == 'true'
# 查数据库
query = "SELECT revoked FROM jwt_tokens WHERE token_jti = $1"
result = await db_pool.fetchrow(query, jti)
if result and result['revoked']:
# 缓存到 Redis24小时)
await redis_client.setex(cache_key, 86400, 'true')
return True
return False
```
**吊销 Token 接口**:
**登出接口**: `POST /auth/logout`
```python
@router.post("/logout")
async def logout(current_user: dict = Depends(verify_token)):
"""用户登出"""
jti = current_user.get('jti')
# 吊销 Token
await revoke_token(jti)
return {'message': '登出成功'}
async def revoke_token(jti: str):
"""吊销指定 Token"""
query = """
UPDATE jwt_tokens
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
WHERE token_jti = $1
"""
await db_pool.execute(query, jti)
# 同步到 Redis
cache_key = f"jwt:revoked:{jti}"
await redis_client.setex(cache_key, 86400, 'true')
```
**强制下线用户**:
```python
async def revoke_user_all_tokens(user_id: int):
"""吊销用户的所有 Token"""
query = """
UPDATE jwt_tokens
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
WHERE user_id = $1 AND revoked = false
RETURNING token_jti
"""
results = await db_pool.fetch(query, user_id)
# 同步到 Redis
for row in results:
cache_key = f"jwt:revoked:{row['token_jti']}"
await redis_client.setex(cache_key, 86400, 'true')
```
#### 方式2Token 版本号
**原理**: 为每个用户维护 Token 版本号,密码修改/强制下线时递增版本号
**用户表新增字段**:
```sql
ALTER TABLE sso_users ADD COLUMN token_version INTEGER DEFAULT 1;
```
**Payload 中包含版本号**:
```python
payload = {
'user_id': user_data['user_id'],
'username': user_data['username'],
'token_version': user_data['token_version'], # 版本号
'exp': int(exp.timestamp()),
'aud': JWT_AUDIENCE
}
```
**验证时检查版本号**:
```python
async def verify_token(token: str) -> dict:
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
# 获取用户当前版本号
current_version = await get_user_token_version(payload['user_id'])
# 检查版本号是否匹配
if payload.get('token_version', 0) != current_version:
raise HTTPException(
status_code=401,
detail='令牌版本已过期'
)
return payload
```
**密码修改后递增版本号**:
```python
async def change_password(user_id: int, new_password: str):
# 更新密码
await update_user_password(user_id, new_password)
# 递增 Token 版本号(使所有旧 Token 失效)
query = """
UPDATE sso_users
SET token_version = token_version + 1
WHERE id = $1
"""
await db_pool.execute(query, user_id)
```
---
## 前端对接指南
### 1. 登录流程
```javascript
// 登录函数
async function login(username, password) {
try {
const response = await fetch('/auth/login', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ username, password })
});
if (response.ok) {
const data = await response.json();
// 存储 Token
localStorage.setItem('access_token', data.access_token);
localStorage.setItem('user_info', JSON.stringify(data.user_info));
// 可选:存储 Refresh Token
if (data.refresh_token) {
localStorage.setItem('refresh_token', data.refresh_token);
}
// 跳转首页
window.location.href = '/';
} else {
const error = await response.json();
alert(`登录失败: ${error.detail}`);
}
} catch (error) {
console.error('登录错误:', error);
alert('网络错误,请稍后重试');
}
}
```
### 2. 配置 Axios 拦截器
```javascript
import axios from 'axios';
// 创建 Axios 实例
const apiClient = axios.create({
baseURL: 'http://localhost:8000/api/v1',
headers: { 'Content-Type': 'application/json' }
});
// 请求拦截器:自动添加 Token
apiClient.interceptors.request.use(
config => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
},
error => Promise.reject(error)
);
// 响应拦截器:处理 401 错误
apiClient.interceptors.response.use(
response => {
// 检查是否有新 Token(自动续期)
const newToken = response.headers['x-new-access-token'];
if (newToken) {
localStorage.setItem('access_token', newToken);
}
return response;
},
async error => {
const originalRequest = error.config;
// 401 错误处理
if (error.response?.status === 401) {
// 如果有 Refresh Token,尝试刷新
if (!originalRequest._retry && localStorage.getItem('refresh_token')) {
originalRequest._retry = true;
const newToken = await refreshAccessToken();
if (newToken) {
originalRequest.headers.Authorization = `Bearer ${newToken}`;
return apiClient(originalRequest);
}
}
// 否则跳转登录页
localStorage.clear();
window.location.href = '/login';
}
return Promise.reject(error);
}
);
export default apiClient;
```
### 3. 获取用户信息
```javascript
// 从 localStorage 获取
function getCurrentUser() {
const userInfo = localStorage.getItem('user_info');
return userInfo ? JSON.parse(userInfo) : null;
}
// 从服务器获取最新用户信息
async function fetchCurrentUser() {
try {
const response = await apiClient.get('/auth/me');
const userInfo = response.data;
localStorage.setItem('user_info', JSON.stringify(userInfo));
return userInfo;
} catch (error) {
console.error('获取用户信息失败:', error);
return null;
}
}
```
### 4. 登出流程
```javascript
async function logout() {
try {
// 调用登出接口(吊销 Token)
await apiClient.post('/auth/logout');
} catch (error) {
console.error('登出接口调用失败:', error);
} finally {
// 清除本地存储
localStorage.clear();
// 跳转登录页
window.location.href = '/login';
}
}
```
### 5. 路由守卫(Vue.js 示例)
```javascript
import { createRouter, createWebHistory } from 'vue-router';
const router = createRouter({
history: createWebHistory(),
routes: [
{ path: '/login', component: LoginPage },
{ path: '/', component: HomePage, meta: { requiresAuth: true } },
{ path: '/documents', component: DocumentList, meta: { requiresAuth: true } }
]
});
// 全局路由守卫
router.beforeEach((to, from, next) => {
const token = localStorage.getItem('access_token');
// 需要认证的路由
if (to.meta.requiresAuth && !token) {
next('/login');
return;
}
// 已登录用户访问登录页,跳转首页
if (to.path === '/login' && token) {
next('/');
return;
}
next();
});
export default router;
```
---
## 安全最佳实践
### 1. Token 存储
**推荐方式**: `localStorage``sessionStorage`
**不推荐**: Cookie(容易受到 CSRF 攻击)
**示例**:
```javascript
// 存储
localStorage.setItem('access_token', token);
// 读取
const token = localStorage.getItem('access_token');
// 删除
localStorage.removeItem('access_token');
```
### 2. HTTPS 传输
⚠️ **生产环境必须使用 HTTPS**
- Token 通过 HTTPS 加密传输
- 防止中间人攻击(MITM
- 防止 Token 被窃取
### 3. Token 有效期
**推荐配置**:
- **生产环境**: 24 小时
- **测试环境**: 1-2 小时
- **开发环境**: 7 天(方便调试)
### 4. 密钥管理
**正确做法**:
- 使用环境变量存储密钥
- 生产环境使用强随机密钥
- 定期轮换密钥(90 天)
- 使用密钥管理服务(AWS KMS、Azure Key Vault
**错误做法**:
- 硬编码密钥在代码中
- 将密钥提交到 Git
- 使用弱密钥(如 `secret``12345`
### 5. Token 吊销
**必须实现**:
- 用户登出时吊销 Token
- 密码修改后吊销所有旧 Token
- 检测到异常登录时吊销 Token
- 管理员强制下线用户
### 6. 防止 XSS 攻击
**防护措施**:
- 前端对用户输入进行转义
- 使用 CSPContent Security Policy
- 避免在 URL 中传递 Token
- 使用 HttpOnly Cookie(如果使用 Cookie 存储)
### 7. 防止 CSRF 攻击
**防护措施**:
- 使用 Bearer Token 而非 Cookie
- 验证 Referer 头
- 使用 CSRF Token
---
## 常见问题
### Q1: Token 过期后如何处理?
**A**: 使用 Refresh Token 机制或自动续期机制:
```javascript
// 响应拦截器
apiClient.interceptors.response.use(
response => response,
async error => {
if (error.response?.status === 401 && !error.config._retry) {
error.config._retry = true;
const newToken = await refreshAccessToken();
if (newToken) {
error.config.headers.Authorization = `Bearer ${newToken}`;
return apiClient(error.config);
}
}
return Promise.reject(error);
}
);
```
### Q2: 如何在多标签页间同步登录状态?
**A**: 使用 `storage` 事件监听 `localStorage` 变化:
```javascript
window.addEventListener('storage', (event) => {
if (event.key === 'access_token') {
if (!event.newValue) {
// Token 被删除,其他标签页也跳转登录
window.location.href = '/login';
} else {
// Token 被更新,其他标签页也更新
console.log('Token 已在其他标签页更新');
}
}
});
```
### Q3: JWT 和 Session 有什么区别?
**A**:
| 特性 | JWT | Session |
|------|-----|---------|
| 存储位置 | 客户端(localStorage | 服务器(Redis/数据库) |
| 状态 | 无状态 | 有状态 |
| 可扩展性 | 高(无需共享状态) | 低(需要共享 Session) |
| 吊销难度 | 高(需要黑名单) | 低(直接删除 Session) |
| 性能 | 高(无需查询) | 低(需要查询 Session) |
### Q4: 如何实现"记住我"功能?
**A**: 使用长期有效的 Refresh Token
```javascript
// 登录时勾选"记住我"
async function login(username, password, rememberMe) {
const response = await fetch('/auth/login', {
method: 'POST',
body: JSON.stringify({
username,
password,
remember_me: rememberMe // 传递"记住我"标志
})
});
const data = await response.json();
if (rememberMe) {
// 长期有效的 Refresh Token30 天)
localStorage.setItem('refresh_token', data.refresh_token);
}
}
```
### Q5: 如何检测 Token 是否被盗用?
**A**: 记录 Token 使用的 IP 和 User-Agent,检测异常:
```python
@router.post("/login")
async def login(request: Request, login_data: LoginRequest):
# 获取客户端信息
client_ip = request.client.host
user_agent = request.headers.get('User-Agent', '')
# 生成 Token 时记录
await record_token_metadata(
jti=jti,
client_ip=client_ip,
user_agent=user_agent
)
```
```python
async def verify_token(request: Request, token: str):
# 检查 IP 和 User-Agent 是否匹配
current_ip = request.client.host
current_user_agent = request.headers.get('User-Agent', '')
metadata = await get_token_metadata(jti)
if metadata['client_ip'] != current_ip:
# IP 不匹配,可能被盗用
await log_security_event(
'Token used from different IP',
jti=jti,
original_ip=metadata['client_ip'],
current_ip=current_ip
)
# 可选:吊销 Token
await revoke_token(jti)
raise HTTPException(status_code=401, detail='Token 异常使用')
```
---
## 故障排查
### 问题1: Token 验证失败 "InvalidSignatureError"
**原因**: JWT_SECRET 不一致
**解决**:
1. 检查 `config/env.{port}` 文件中的 `JWT_SECRET`
2. 确保前端请求的后端实例使用相同的 `JWT_SECRET`
3. 重新登录获取新 Token
### 问题2: Token 过期 "ExpiredSignatureError"
**原因**: Token 超过有效期
**解决**:
1. 使用 Refresh Token 刷新
2. 重新登录
3. 检查服务器时间是否正确(`exp` 基于 UTC 时间)
### 问题3: Token 解码失败 "DecodeError"
**原因**: Token 格式错误或被篡改
**解决**:
1. 检查 Token 是否完整(三部分用 `.` 分隔)
2. 检查 Token 是否被 URL 编码(不应该编码)
3. 检查前端是否正确存储和传递 Token
### 问题4: 中间件不生效
**原因**: 中间件注册顺序错误或白名单配置错误
**解决**:
1. 检查 `core/app.py` 中间件注册顺序(JWT 中间件应在路由之前)
2. 检查白名单配置 `WHITE_LIST`
3. 检查路由路径是否匹配
### 问题5: PostgREST 返回 401
**原因**: PostgREST 使用的 JWT Secret 不同
**解决**:
1. 确认 `PGRST_JWT_SECRET` 配置
2. PostgREST 内部 JWT 由后端生成,前端不直接使用
3. 检查 PostgREST 代理是否正确转换 Token
### 调试技巧
#### 1. 解码 Token(不验证签名)
```python
import jwt
# 解码 Token(查看内容)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
payload = jwt.decode(token, options={"verify_signature": False})
print(payload)
```
```javascript
// 前端解码 Token
function decodeToken(token) {
const base64Url = token.split('.')[1];
const base64 = base64Url.replace(/-/g, '+').replace(/_/g, '/');
const jsonPayload = decodeURIComponent(atob(base64).split('').map(function(c) {
return '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2);
}).join(''));
return JSON.parse(jsonPayload);
}
const token = localStorage.getItem('access_token');
console.log(decodeToken(token));
```
#### 2. 检查 Token 是否过期
```javascript
function isTokenExpired(token) {
const payload = decodeToken(token);
const now = Math.floor(Date.now() / 1000); // 当前时间(秒)
return payload.exp < now;
}
if (isTokenExpired(token)) {
console.log('Token 已过期');
}
```
#### 3. 启用详细日志
```ini
# config/env.{port}
LOG_LEVEL=DEBUG
JWT_DEBUG_ENABLED=true
```
---
## 附录
### A. JWT Payload 字段完整说明
| 字段 | 类型 | 必需 | 说明 | 示例 |
|------|------|------|------|------|
| `user_id` | integer | ✅ | 用户ID | `5` |
| `username` | string | ✅ | 用户名 | `"admin"` |
| `user_role` | string | ✅ | 用户角色 | `"系统管理员"` |
| `ou_id` | string | ❌ | 组织单位ID | `"000"` |
| `ou_name` | string | ❌ | 组织单位名称 | `"test"` |
| `exp` | integer | ✅ | 过期时间(Unix 时间戳) | `1732164207` |
| `iat` | integer | ✅ | 签发时间(Unix 时间戳) | `1732077807` |
| `aud` | string | ✅ | 受众 | `"docauditai:client"` |
| `jti` | string | ❌ | JWT ID(用于吊销) | `"550e8400-e29b-41d4-a716-446655440000"` |
| `token_version` | integer | ❌ | Token 版本号 | `1` |
### B. HTTP 状态码说明
| 状态码 | 说明 | 场景 |
|--------|------|------|
| 200 | 成功 | 登录成功、Token 验证成功 |
| 401 | 未授权 | Token 缺失、无效、过期、被吊销 |
| 403 | 禁止访问 | 权限不足 |
| 422 | 验证错误 | 登录参数错误 |
### C. 错误响应示例
```json
{
"detail": "令牌已过期,请重新登录",
"error_code": "TOKEN_EXPIRED",
"timestamp": "2025-11-17T10:30:00Z"
}
```
---
**文档版本**: v1.0
**最后更新**: 2025-11-17
**维护者**: DocAuditAI Team