bfe39e45a9
5. 修改统一认证登录和管理员登录是通过接口形式进行,存储返回的accessToken。 6. 修改交叉评查的部分样式
1317 lines
31 KiB
Markdown
1317 lines
31 KiB
Markdown
# JWT 管理文档
|
||
|
||
DocAuditAI 系统 JWT(JSON Web Token)管理完整指南
|
||
|
||
## 目录
|
||
|
||
1. [JWT 概述](#jwt-概述)
|
||
2. [JWT 生成机制](#jwt-生成机制)
|
||
3. [JWT 验证流程](#jwt-验证流程)
|
||
4. [JWT 配置说明](#jwt-配置说明)
|
||
5. [JWT 刷新机制](#jwt-刷新机制)
|
||
6. [JWT 吊销管理](#jwt-吊销管理)
|
||
7. [前端对接指南](#前端对接指南)
|
||
8. [安全最佳实践](#安全最佳实践)
|
||
9. [常见问题](#常见问题)
|
||
10. [故障排查](#故障排查)
|
||
|
||
---
|
||
|
||
## JWT 概述
|
||
|
||
### 什么是 JWT?
|
||
|
||
JWT(JSON Web Token)是一种开放标准(RFC 7519),用于在各方之间安全地传输信息。在 DocAuditAI 系统中,JWT 用于用户身份认证和授权。
|
||
|
||
### JWT 结构
|
||
|
||
JWT 由三部分组成,用 `.` 分隔:
|
||
|
||
```
|
||
Header.Payload.Signature
|
||
```
|
||
|
||
**示例**:
|
||
```
|
||
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo1LCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNzMyMTY0MjA3fQ.Kz8fN3Y8Xp4QmW2vY6Jh9Lk0Rn1Tc3Uv5Wx6Yz7
|
||
```
|
||
|
||
#### 1. Header(头部)
|
||
|
||
包含 Token 类型和签名算法:
|
||
|
||
```json
|
||
{
|
||
"alg": "HS256",
|
||
"typ": "JWT"
|
||
}
|
||
```
|
||
|
||
#### 2. Payload(载荷)
|
||
|
||
包含用户信息和权限数据:
|
||
|
||
```json
|
||
{
|
||
"user_id": 5,
|
||
"username": "admin",
|
||
"user_role": "系统管理员",
|
||
"ou_id": "000",
|
||
"ou_name": "test",
|
||
"exp": 1732164207,
|
||
"iat": 1732077807,
|
||
"aud": "docauditai:client"
|
||
}
|
||
```
|
||
|
||
#### 3. Signature(签名)
|
||
|
||
使用密钥对头部和载荷进行签名,防止篡改:
|
||
|
||
```
|
||
HMACSHA256(
|
||
base64UrlEncode(header) + "." +
|
||
base64UrlEncode(payload),
|
||
JWT_SECRET
|
||
)
|
||
```
|
||
|
||
---
|
||
|
||
## JWT 生成机制
|
||
|
||
### 生成流程
|
||
|
||
```
|
||
┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐
|
||
│ 1. 用户登录 │ --> │ 2. 验证凭据 │ --> │ 3. 生成JWT │ --> │ 4. 返回Token │
|
||
└─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘
|
||
```
|
||
|
||
### 代码实现
|
||
|
||
#### 1. 用户登录接口
|
||
|
||
**接口**: `POST /auth/login`
|
||
|
||
**请求体**:
|
||
```json
|
||
{
|
||
"username": "admin",
|
||
"password": "your_password"
|
||
}
|
||
```
|
||
|
||
**响应**:
|
||
```json
|
||
{
|
||
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
||
"token_type": "bearer",
|
||
"expires_in": 86400,
|
||
"user_info": {
|
||
"user_id": 5,
|
||
"username": "admin",
|
||
"roles": ["系统管理员"],
|
||
"ou_id": "000",
|
||
"ou_name": "test"
|
||
}
|
||
}
|
||
```
|
||
|
||
#### 2. JWT 生成代码
|
||
|
||
**位置**: `app/auth/jwt_handler.py`
|
||
|
||
```python
|
||
import jwt
|
||
from datetime import datetime, timedelta
|
||
import os
|
||
|
||
def create_access_token(user_data: dict) -> str:
|
||
"""
|
||
生成 JWT Access Token
|
||
|
||
Args:
|
||
user_data: 用户信息字典
|
||
|
||
Returns:
|
||
JWT Token 字符串
|
||
"""
|
||
# JWT 配置
|
||
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
|
||
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
|
||
JWT_EXPIRATION_HOURS = int(os.getenv('JWT_EXPIRATION_HOURS', 24))
|
||
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
|
||
|
||
# 计算过期时间
|
||
now = datetime.utcnow()
|
||
exp = now + timedelta(hours=JWT_EXPIRATION_HOURS)
|
||
|
||
# 构建 Payload
|
||
payload = {
|
||
'user_id': user_data['user_id'],
|
||
'username': user_data['username'],
|
||
'user_role': user_data.get('user_role', ''),
|
||
'ou_id': user_data.get('ou_id', ''),
|
||
'ou_name': user_data.get('ou_name', ''),
|
||
'exp': int(exp.timestamp()),
|
||
'iat': int(now.timestamp()),
|
||
'aud': JWT_AUDIENCE
|
||
}
|
||
|
||
# 生成 JWT
|
||
token = jwt.encode(
|
||
payload,
|
||
JWT_SECRET,
|
||
algorithm=JWT_ALGORITHM
|
||
)
|
||
|
||
return token
|
||
```
|
||
|
||
#### 3. 登录接口完整示例
|
||
|
||
```python
|
||
from fastapi import APIRouter, HTTPException, status
|
||
from pydantic import BaseModel
|
||
|
||
router = APIRouter()
|
||
|
||
class LoginRequest(BaseModel):
|
||
username: str
|
||
password: str
|
||
|
||
@router.post("/login")
|
||
async def login(request: LoginRequest):
|
||
"""用户登录接口"""
|
||
|
||
# 1. 验证用户名密码(示例:从数据库查询)
|
||
user = await verify_credentials(request.username, request.password)
|
||
if not user:
|
||
raise HTTPException(
|
||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||
detail="用户名或密码错误"
|
||
)
|
||
|
||
# 2. 获取用户角色和权限
|
||
user_roles = await get_user_roles(user['id'])
|
||
|
||
# 3. 构建用户数据
|
||
user_data = {
|
||
'user_id': user['id'],
|
||
'username': user['username'],
|
||
'user_role': user_roles[0] if user_roles else '',
|
||
'ou_id': user['ou_id'],
|
||
'ou_name': user['ou_name']
|
||
}
|
||
|
||
# 4. 生成 JWT
|
||
access_token = create_access_token(user_data)
|
||
|
||
# 5. 返回响应
|
||
return {
|
||
'access_token': access_token,
|
||
'token_type': 'bearer',
|
||
'expires_in': 86400, # 24小时(秒)
|
||
'user_info': user_data
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
## JWT 验证流程
|
||
|
||
### 双层验证机制
|
||
|
||
DocAuditAI 采用**两层 JWT 验证机制**:
|
||
|
||
#### 1. 中间件层(快速验证)
|
||
|
||
**位置**: `app/middleware/jwt_auth.py`
|
||
|
||
**目的**: 快速验证 Token 存在性,解析基本用户信息
|
||
|
||
**流程**:
|
||
```python
|
||
async def jwt_auth_middleware(request: Request, call_next):
|
||
"""JWT 认证中间件(宽松模式)"""
|
||
|
||
# 白名单路径(无需认证)
|
||
if request.url.path in WHITE_LIST:
|
||
return await call_next(request)
|
||
|
||
# 获取 Token
|
||
auth_header = request.headers.get('Authorization')
|
||
if not auth_header or not auth_header.startswith('Bearer '):
|
||
return JSONResponse(
|
||
status_code=401,
|
||
content={'detail': '缺少认证令牌'}
|
||
)
|
||
|
||
token = auth_header.replace('Bearer ', '')
|
||
|
||
try:
|
||
# 解析 JWT(不验证签名,提升性能)
|
||
payload = jwt.decode(
|
||
token,
|
||
options={"verify_signature": False}
|
||
)
|
||
|
||
# 设置用户信息到 request.state
|
||
request.state.current_user = {
|
||
'user_id': payload.get('user_id'),
|
||
'username': payload.get('username'),
|
||
'user_role': payload.get('user_role'),
|
||
'ou_id': payload.get('ou_id')
|
||
}
|
||
|
||
except jwt.DecodeError:
|
||
return JSONResponse(
|
||
status_code=401,
|
||
content={'detail': '无效的令牌格式'}
|
||
)
|
||
|
||
return await call_next(request)
|
||
```
|
||
|
||
**特点**:
|
||
- ✅ 高性能:不验证签名,只解析 Token
|
||
- ✅ 覆盖所有请求
|
||
- ⚠️ 不验证 Token 是否被篡改
|
||
- ⚠️ 不验证 Token 是否过期
|
||
|
||
#### 2. 路由层(完整验证)
|
||
|
||
**位置**: `app/auth/jwt_handler.py` 中的 `verify_token()` 依赖函数
|
||
|
||
**目的**: 严格验证 Token 完整性和有效性
|
||
|
||
**流程**:
|
||
```python
|
||
from fastapi import Depends, HTTPException, status
|
||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||
import jwt
|
||
import os
|
||
|
||
security = HTTPBearer()
|
||
|
||
async def verify_token(
|
||
credentials: HTTPAuthorizationCredentials = Depends(security)
|
||
) -> dict:
|
||
"""
|
||
完整 JWT 验证(路由层)
|
||
|
||
验证内容:
|
||
1. JWT 签名验证
|
||
2. 过期时间验证
|
||
3. Audience 验证
|
||
4. 必需字段验证
|
||
|
||
Returns:
|
||
用户信息字典
|
||
|
||
Raises:
|
||
HTTPException: Token 无效、过期或缺少必需字段
|
||
"""
|
||
token = credentials.credentials
|
||
JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
|
||
JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
|
||
JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')
|
||
|
||
try:
|
||
# 完整验证(包含签名、过期时间、audience)
|
||
payload = jwt.decode(
|
||
token,
|
||
JWT_SECRET,
|
||
algorithms=[JWT_ALGORITHM],
|
||
audience=JWT_AUDIENCE,
|
||
options={
|
||
"verify_signature": True,
|
||
"verify_exp": True,
|
||
"verify_aud": True
|
||
}
|
||
)
|
||
|
||
# 验证必需字段
|
||
required_fields = ['user_id', 'username', 'user_role', 'exp']
|
||
for field in required_fields:
|
||
if field not in payload:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail=f'Token 缺少必需字段: {field}'
|
||
)
|
||
|
||
return payload
|
||
|
||
except jwt.ExpiredSignatureError:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌已过期,请重新登录'
|
||
)
|
||
except jwt.InvalidAudienceError:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌受众验证失败'
|
||
)
|
||
except jwt.InvalidSignatureError:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌签名验证失败'
|
||
)
|
||
except jwt.DecodeError:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌格式错误'
|
||
)
|
||
```
|
||
|
||
**使用示例**:
|
||
```python
|
||
from fastapi import APIRouter, Depends
|
||
from app.auth.jwt_handler import verify_token
|
||
|
||
router = APIRouter()
|
||
|
||
@router.get("/protected/resource")
|
||
async def get_protected_resource(
|
||
current_user: dict = Depends(verify_token)
|
||
):
|
||
"""需要完整 JWT 验证的接口"""
|
||
return {
|
||
'message': '访问成功',
|
||
'user_id': current_user['user_id'],
|
||
'username': current_user['username']
|
||
}
|
||
```
|
||
|
||
**特点**:
|
||
- ✅ 严格验证:签名、过期时间、受众、必需字段
|
||
- ✅ 防篡改:验证 Token 签名
|
||
- ✅ 防重放:验证过期时间
|
||
- ⚠️ 性能较低:需要密钥解密和验证
|
||
|
||
### 验证流程对比表
|
||
|
||
| 验证内容 | 中间件层 | 路由层 |
|
||
|---------|---------|--------|
|
||
| Token 存在性 | ✅ | ✅ |
|
||
| Token 格式正确 | ✅ | ✅ |
|
||
| JWT 签名验证 | ❌ | ✅ |
|
||
| 过期时间验证 | ❌ | ✅ |
|
||
| Audience 验证 | ❌ | ✅ |
|
||
| 必需字段验证 | ❌ | ✅ |
|
||
| 性能 | 高 | 中 |
|
||
| 使用场景 | 所有请求 | 敏感操作 |
|
||
|
||
---
|
||
|
||
## JWT 配置说明
|
||
|
||
### 环境变量配置
|
||
|
||
**位置**: `config/env.{port}` (例如 `config/env.8073`)
|
||
|
||
```ini
|
||
# JWT 配置
|
||
JWT_SECRET=your-256-bit-secret-key-here
|
||
JWT_ALGORITHM=HS256
|
||
JWT_EXPIRATION_HOURS=24
|
||
JWT_AUDIENCE=docauditai:client
|
||
|
||
# PostgREST JWT 配置(不同于用户 JWT)
|
||
PGRST_JWT_SECRET=different-secret-for-postgrest
|
||
```
|
||
|
||
### 配置项说明
|
||
|
||
| 配置项 | 说明 | 默认值 | 推荐值 |
|
||
|--------|------|--------|--------|
|
||
| `JWT_SECRET` | JWT 签名密钥(256位) | `your-secret-key` | 随机生成的256位字符串 |
|
||
| `JWT_ALGORITHM` | JWT 签名算法 | `HS256` | `HS256`(对称)或 `RS256`(非对称) |
|
||
| `JWT_EXPIRATION_HOURS` | Token 有效期(小时) | `24` | `24`(生产环境) / `1`(测试环境) |
|
||
| `JWT_AUDIENCE` | JWT 受众标识 | `docauditai:client` | `docauditai:client` |
|
||
| `PGRST_JWT_SECRET` | PostgREST 内部 JWT 密钥 | - | 与 `JWT_SECRET` 不同的密钥 |
|
||
|
||
### 密钥生成示例
|
||
|
||
#### 方法1:Python 生成
|
||
|
||
```python
|
||
import secrets
|
||
|
||
# 生成 256 位(32 字节)密钥
|
||
jwt_secret = secrets.token_urlsafe(32)
|
||
print(f"JWT_SECRET={jwt_secret}")
|
||
```
|
||
|
||
#### 方法2:OpenSSL 生成
|
||
|
||
```bash
|
||
openssl rand -base64 32
|
||
```
|
||
|
||
#### 方法3:在线生成
|
||
|
||
访问:https://randomkeygen.com/ (选择 256-bit 密钥)
|
||
|
||
### 安全注意事项
|
||
|
||
⚠️ **重要**:
|
||
1. **JWT_SECRET 必须保密**,不要提交到代码仓库
|
||
2. **JWT_SECRET 和 PGRST_JWT_SECRET 必须不同**
|
||
3. **生产环境使用强随机密钥**(至少 256 位)
|
||
4. **定期轮换密钥**(建议每 90 天)
|
||
|
||
---
|
||
|
||
## JWT 刷新机制
|
||
|
||
### 刷新策略
|
||
|
||
DocAuditAI 支持两种 Token 刷新策略:
|
||
|
||
#### 策略1:Token 自动续期(推荐)
|
||
|
||
**原理**: Token 过期前 N 小时内访问任何接口,自动返回新 Token
|
||
|
||
**配置**:
|
||
```ini
|
||
JWT_AUTO_REFRESH_ENABLED=true
|
||
JWT_AUTO_REFRESH_THRESHOLD_HOURS=6
|
||
```
|
||
|
||
**响应头**:
|
||
```
|
||
X-New-Access-Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...
|
||
```
|
||
|
||
**前端处理**:
|
||
```javascript
|
||
// axios 响应拦截器
|
||
apiClient.interceptors.response.use(
|
||
response => {
|
||
// 检查是否有新 Token
|
||
const newToken = response.headers['x-new-access-token'];
|
||
if (newToken) {
|
||
localStorage.setItem('access_token', newToken);
|
||
console.log('Token 已自动续期');
|
||
}
|
||
return response;
|
||
},
|
||
error => Promise.reject(error)
|
||
);
|
||
```
|
||
|
||
#### 策略2:Refresh Token 机制
|
||
|
||
**原理**: 登录时返回 Access Token + Refresh Token,Access Token 过期后使用 Refresh Token 获取新 Token
|
||
|
||
**登录响应**:
|
||
```json
|
||
{
|
||
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
||
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
||
"token_type": "bearer",
|
||
"expires_in": 86400
|
||
}
|
||
```
|
||
|
||
**刷新接口**:
|
||
|
||
**请求**: `POST /auth/refresh`
|
||
|
||
**请求体**:
|
||
```json
|
||
{
|
||
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||
}
|
||
```
|
||
|
||
**响应**:
|
||
```json
|
||
{
|
||
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
|
||
"token_type": "bearer",
|
||
"expires_in": 86400
|
||
}
|
||
```
|
||
|
||
**前端实现**:
|
||
```javascript
|
||
async function refreshAccessToken() {
|
||
const refreshToken = localStorage.getItem('refresh_token');
|
||
if (!refreshToken) {
|
||
// 跳转登录页
|
||
window.location.href = '/login';
|
||
return null;
|
||
}
|
||
|
||
try {
|
||
const response = await fetch('/auth/refresh', {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ refresh_token: refreshToken })
|
||
});
|
||
|
||
if (response.ok) {
|
||
const data = await response.json();
|
||
localStorage.setItem('access_token', data.access_token);
|
||
return data.access_token;
|
||
} else {
|
||
// Refresh Token 也过期,跳转登录页
|
||
localStorage.removeItem('access_token');
|
||
localStorage.removeItem('refresh_token');
|
||
window.location.href = '/login';
|
||
return null;
|
||
}
|
||
} catch (error) {
|
||
console.error('Token 刷新失败:', error);
|
||
return null;
|
||
}
|
||
}
|
||
|
||
// axios 响应拦截器(自动刷新)
|
||
apiClient.interceptors.response.use(
|
||
response => response,
|
||
async error => {
|
||
const originalRequest = error.config;
|
||
|
||
// 如果是 401 错误且未重试过
|
||
if (error.response?.status === 401 && !originalRequest._retry) {
|
||
originalRequest._retry = true;
|
||
|
||
// 刷新 Token
|
||
const newToken = await refreshAccessToken();
|
||
if (newToken) {
|
||
// 使用新 Token 重试原请求
|
||
originalRequest.headers.Authorization = `Bearer ${newToken}`;
|
||
return apiClient(originalRequest);
|
||
}
|
||
}
|
||
|
||
return Promise.reject(error);
|
||
}
|
||
);
|
||
```
|
||
|
||
---
|
||
|
||
## JWT 吊销管理
|
||
|
||
### 吊销场景
|
||
|
||
1. **用户主动登出**
|
||
2. **管理员强制下线用户**
|
||
3. **密码修改后旧 Token 失效**
|
||
4. **检测到异常登录**
|
||
|
||
### 吊销实现方式
|
||
|
||
#### 方式1:黑名单(推荐)
|
||
|
||
**原理**: 将吊销的 Token 存入 Redis 黑名单,验证时检查
|
||
|
||
**表结构**: `jwt_tokens` 表
|
||
|
||
```sql
|
||
CREATE TABLE jwt_tokens (
|
||
id SERIAL PRIMARY KEY,
|
||
user_id INTEGER NOT NULL,
|
||
token_jti VARCHAR(64) UNIQUE NOT NULL, -- JWT ID
|
||
token_type VARCHAR(20) DEFAULT 'access',
|
||
revoked BOOLEAN DEFAULT false,
|
||
revoked_at TIMESTAMP,
|
||
expires_at TIMESTAMP NOT NULL,
|
||
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
||
);
|
||
|
||
CREATE INDEX idx_jwt_tokens_jti ON jwt_tokens(token_jti);
|
||
CREATE INDEX idx_jwt_tokens_user_id ON jwt_tokens(user_id);
|
||
```
|
||
|
||
**生成 Token 时记录**:
|
||
```python
|
||
import uuid
|
||
|
||
def create_access_token(user_data: dict) -> str:
|
||
# 生成唯一 JWT ID
|
||
jti = str(uuid.uuid4())
|
||
|
||
# Payload 中包含 jti
|
||
payload = {
|
||
'user_id': user_data['user_id'],
|
||
'username': user_data['username'],
|
||
'jti': jti, # 唯一标识
|
||
'exp': int(exp.timestamp()),
|
||
'iat': int(now.timestamp()),
|
||
'aud': JWT_AUDIENCE
|
||
}
|
||
|
||
# 生成 Token
|
||
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
||
|
||
# 记录到数据库
|
||
await record_token(
|
||
user_id=user_data['user_id'],
|
||
token_jti=jti,
|
||
expires_at=exp
|
||
)
|
||
|
||
return token
|
||
```
|
||
|
||
**验证 Token 时检查黑名单**:
|
||
```python
|
||
async def verify_token(token: str) -> dict:
|
||
# 解码 Token
|
||
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
||
jti = payload.get('jti')
|
||
|
||
# 检查是否在黑名单
|
||
is_revoked = await check_token_revoked(jti)
|
||
if is_revoked:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌已被吊销'
|
||
)
|
||
|
||
return payload
|
||
|
||
async def check_token_revoked(jti: str) -> bool:
|
||
"""检查 Token 是否已吊销"""
|
||
# 先查 Redis 缓存
|
||
cache_key = f"jwt:revoked:{jti}"
|
||
is_revoked = await redis_client.get(cache_key)
|
||
if is_revoked is not None:
|
||
return is_revoked == 'true'
|
||
|
||
# 查数据库
|
||
query = "SELECT revoked FROM jwt_tokens WHERE token_jti = $1"
|
||
result = await db_pool.fetchrow(query, jti)
|
||
|
||
if result and result['revoked']:
|
||
# 缓存到 Redis(24小时)
|
||
await redis_client.setex(cache_key, 86400, 'true')
|
||
return True
|
||
|
||
return False
|
||
```
|
||
|
||
**吊销 Token 接口**:
|
||
|
||
**登出接口**: `POST /auth/logout`
|
||
|
||
```python
|
||
@router.post("/logout")
|
||
async def logout(current_user: dict = Depends(verify_token)):
|
||
"""用户登出"""
|
||
jti = current_user.get('jti')
|
||
|
||
# 吊销 Token
|
||
await revoke_token(jti)
|
||
|
||
return {'message': '登出成功'}
|
||
|
||
async def revoke_token(jti: str):
|
||
"""吊销指定 Token"""
|
||
query = """
|
||
UPDATE jwt_tokens
|
||
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
|
||
WHERE token_jti = $1
|
||
"""
|
||
await db_pool.execute(query, jti)
|
||
|
||
# 同步到 Redis
|
||
cache_key = f"jwt:revoked:{jti}"
|
||
await redis_client.setex(cache_key, 86400, 'true')
|
||
```
|
||
|
||
**强制下线用户**:
|
||
|
||
```python
|
||
async def revoke_user_all_tokens(user_id: int):
|
||
"""吊销用户的所有 Token"""
|
||
query = """
|
||
UPDATE jwt_tokens
|
||
SET revoked = true, revoked_at = CURRENT_TIMESTAMP
|
||
WHERE user_id = $1 AND revoked = false
|
||
RETURNING token_jti
|
||
"""
|
||
results = await db_pool.fetch(query, user_id)
|
||
|
||
# 同步到 Redis
|
||
for row in results:
|
||
cache_key = f"jwt:revoked:{row['token_jti']}"
|
||
await redis_client.setex(cache_key, 86400, 'true')
|
||
```
|
||
|
||
#### 方式2:Token 版本号
|
||
|
||
**原理**: 为每个用户维护 Token 版本号,密码修改/强制下线时递增版本号
|
||
|
||
**用户表新增字段**:
|
||
```sql
|
||
ALTER TABLE sso_users ADD COLUMN token_version INTEGER DEFAULT 1;
|
||
```
|
||
|
||
**Payload 中包含版本号**:
|
||
```python
|
||
payload = {
|
||
'user_id': user_data['user_id'],
|
||
'username': user_data['username'],
|
||
'token_version': user_data['token_version'], # 版本号
|
||
'exp': int(exp.timestamp()),
|
||
'aud': JWT_AUDIENCE
|
||
}
|
||
```
|
||
|
||
**验证时检查版本号**:
|
||
```python
|
||
async def verify_token(token: str) -> dict:
|
||
payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
||
|
||
# 获取用户当前版本号
|
||
current_version = await get_user_token_version(payload['user_id'])
|
||
|
||
# 检查版本号是否匹配
|
||
if payload.get('token_version', 0) != current_version:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail='令牌版本已过期'
|
||
)
|
||
|
||
return payload
|
||
```
|
||
|
||
**密码修改后递增版本号**:
|
||
```python
|
||
async def change_password(user_id: int, new_password: str):
|
||
# 更新密码
|
||
await update_user_password(user_id, new_password)
|
||
|
||
# 递增 Token 版本号(使所有旧 Token 失效)
|
||
query = """
|
||
UPDATE sso_users
|
||
SET token_version = token_version + 1
|
||
WHERE id = $1
|
||
"""
|
||
await db_pool.execute(query, user_id)
|
||
```
|
||
|
||
---
|
||
|
||
## 前端对接指南
|
||
|
||
### 1. 登录流程
|
||
|
||
```javascript
|
||
// 登录函数
|
||
async function login(username, password) {
|
||
try {
|
||
const response = await fetch('/auth/login', {
|
||
method: 'POST',
|
||
headers: { 'Content-Type': 'application/json' },
|
||
body: JSON.stringify({ username, password })
|
||
});
|
||
|
||
if (response.ok) {
|
||
const data = await response.json();
|
||
|
||
// 存储 Token
|
||
localStorage.setItem('access_token', data.access_token);
|
||
localStorage.setItem('user_info', JSON.stringify(data.user_info));
|
||
|
||
// 可选:存储 Refresh Token
|
||
if (data.refresh_token) {
|
||
localStorage.setItem('refresh_token', data.refresh_token);
|
||
}
|
||
|
||
// 跳转首页
|
||
window.location.href = '/';
|
||
} else {
|
||
const error = await response.json();
|
||
alert(`登录失败: ${error.detail}`);
|
||
}
|
||
} catch (error) {
|
||
console.error('登录错误:', error);
|
||
alert('网络错误,请稍后重试');
|
||
}
|
||
}
|
||
```
|
||
|
||
### 2. 配置 Axios 拦截器
|
||
|
||
```javascript
|
||
import axios from 'axios';
|
||
|
||
// 创建 Axios 实例
|
||
const apiClient = axios.create({
|
||
baseURL: 'http://localhost:8000/api/v1',
|
||
headers: { 'Content-Type': 'application/json' }
|
||
});
|
||
|
||
// 请求拦截器:自动添加 Token
|
||
apiClient.interceptors.request.use(
|
||
config => {
|
||
const token = localStorage.getItem('access_token');
|
||
if (token) {
|
||
config.headers.Authorization = `Bearer ${token}`;
|
||
}
|
||
return config;
|
||
},
|
||
error => Promise.reject(error)
|
||
);
|
||
|
||
// 响应拦截器:处理 401 错误
|
||
apiClient.interceptors.response.use(
|
||
response => {
|
||
// 检查是否有新 Token(自动续期)
|
||
const newToken = response.headers['x-new-access-token'];
|
||
if (newToken) {
|
||
localStorage.setItem('access_token', newToken);
|
||
}
|
||
return response;
|
||
},
|
||
async error => {
|
||
const originalRequest = error.config;
|
||
|
||
// 401 错误处理
|
||
if (error.response?.status === 401) {
|
||
// 如果有 Refresh Token,尝试刷新
|
||
if (!originalRequest._retry && localStorage.getItem('refresh_token')) {
|
||
originalRequest._retry = true;
|
||
const newToken = await refreshAccessToken();
|
||
if (newToken) {
|
||
originalRequest.headers.Authorization = `Bearer ${newToken}`;
|
||
return apiClient(originalRequest);
|
||
}
|
||
}
|
||
|
||
// 否则跳转登录页
|
||
localStorage.clear();
|
||
window.location.href = '/login';
|
||
}
|
||
|
||
return Promise.reject(error);
|
||
}
|
||
);
|
||
|
||
export default apiClient;
|
||
```
|
||
|
||
### 3. 获取用户信息
|
||
|
||
```javascript
|
||
// 从 localStorage 获取
|
||
function getCurrentUser() {
|
||
const userInfo = localStorage.getItem('user_info');
|
||
return userInfo ? JSON.parse(userInfo) : null;
|
||
}
|
||
|
||
// 从服务器获取最新用户信息
|
||
async function fetchCurrentUser() {
|
||
try {
|
||
const response = await apiClient.get('/auth/me');
|
||
const userInfo = response.data;
|
||
localStorage.setItem('user_info', JSON.stringify(userInfo));
|
||
return userInfo;
|
||
} catch (error) {
|
||
console.error('获取用户信息失败:', error);
|
||
return null;
|
||
}
|
||
}
|
||
```
|
||
|
||
### 4. 登出流程
|
||
|
||
```javascript
|
||
async function logout() {
|
||
try {
|
||
// 调用登出接口(吊销 Token)
|
||
await apiClient.post('/auth/logout');
|
||
} catch (error) {
|
||
console.error('登出接口调用失败:', error);
|
||
} finally {
|
||
// 清除本地存储
|
||
localStorage.clear();
|
||
|
||
// 跳转登录页
|
||
window.location.href = '/login';
|
||
}
|
||
}
|
||
```
|
||
|
||
### 5. 路由守卫(Vue.js 示例)
|
||
|
||
```javascript
|
||
import { createRouter, createWebHistory } from 'vue-router';
|
||
|
||
const router = createRouter({
|
||
history: createWebHistory(),
|
||
routes: [
|
||
{ path: '/login', component: LoginPage },
|
||
{ path: '/', component: HomePage, meta: { requiresAuth: true } },
|
||
{ path: '/documents', component: DocumentList, meta: { requiresAuth: true } }
|
||
]
|
||
});
|
||
|
||
// 全局路由守卫
|
||
router.beforeEach((to, from, next) => {
|
||
const token = localStorage.getItem('access_token');
|
||
|
||
// 需要认证的路由
|
||
if (to.meta.requiresAuth && !token) {
|
||
next('/login');
|
||
return;
|
||
}
|
||
|
||
// 已登录用户访问登录页,跳转首页
|
||
if (to.path === '/login' && token) {
|
||
next('/');
|
||
return;
|
||
}
|
||
|
||
next();
|
||
});
|
||
|
||
export default router;
|
||
```
|
||
|
||
---
|
||
|
||
## 安全最佳实践
|
||
|
||
### 1. Token 存储
|
||
|
||
**推荐方式**: `localStorage` 或 `sessionStorage`
|
||
|
||
**不推荐**: Cookie(容易受到 CSRF 攻击)
|
||
|
||
**示例**:
|
||
```javascript
|
||
// 存储
|
||
localStorage.setItem('access_token', token);
|
||
|
||
// 读取
|
||
const token = localStorage.getItem('access_token');
|
||
|
||
// 删除
|
||
localStorage.removeItem('access_token');
|
||
```
|
||
|
||
### 2. HTTPS 传输
|
||
|
||
⚠️ **生产环境必须使用 HTTPS**
|
||
|
||
- Token 通过 HTTPS 加密传输
|
||
- 防止中间人攻击(MITM)
|
||
- 防止 Token 被窃取
|
||
|
||
### 3. Token 有效期
|
||
|
||
**推荐配置**:
|
||
- **生产环境**: 24 小时
|
||
- **测试环境**: 1-2 小时
|
||
- **开发环境**: 7 天(方便调试)
|
||
|
||
### 4. 密钥管理
|
||
|
||
✅ **正确做法**:
|
||
- 使用环境变量存储密钥
|
||
- 生产环境使用强随机密钥
|
||
- 定期轮换密钥(90 天)
|
||
- 使用密钥管理服务(AWS KMS、Azure Key Vault)
|
||
|
||
❌ **错误做法**:
|
||
- 硬编码密钥在代码中
|
||
- 将密钥提交到 Git
|
||
- 使用弱密钥(如 `secret`、`12345`)
|
||
|
||
### 5. Token 吊销
|
||
|
||
✅ **必须实现**:
|
||
- 用户登出时吊销 Token
|
||
- 密码修改后吊销所有旧 Token
|
||
- 检测到异常登录时吊销 Token
|
||
- 管理员强制下线用户
|
||
|
||
### 6. 防止 XSS 攻击
|
||
|
||
✅ **防护措施**:
|
||
- 前端对用户输入进行转义
|
||
- 使用 CSP(Content Security Policy)
|
||
- 避免在 URL 中传递 Token
|
||
- 使用 HttpOnly Cookie(如果使用 Cookie 存储)
|
||
|
||
### 7. 防止 CSRF 攻击
|
||
|
||
✅ **防护措施**:
|
||
- 使用 Bearer Token 而非 Cookie
|
||
- 验证 Referer 头
|
||
- 使用 CSRF Token
|
||
|
||
---
|
||
|
||
## 常见问题
|
||
|
||
### Q1: Token 过期后如何处理?
|
||
|
||
**A**: 使用 Refresh Token 机制或自动续期机制:
|
||
|
||
```javascript
|
||
// 响应拦截器
|
||
apiClient.interceptors.response.use(
|
||
response => response,
|
||
async error => {
|
||
if (error.response?.status === 401 && !error.config._retry) {
|
||
error.config._retry = true;
|
||
const newToken = await refreshAccessToken();
|
||
if (newToken) {
|
||
error.config.headers.Authorization = `Bearer ${newToken}`;
|
||
return apiClient(error.config);
|
||
}
|
||
}
|
||
return Promise.reject(error);
|
||
}
|
||
);
|
||
```
|
||
|
||
### Q2: 如何在多标签页间同步登录状态?
|
||
|
||
**A**: 使用 `storage` 事件监听 `localStorage` 变化:
|
||
|
||
```javascript
|
||
window.addEventListener('storage', (event) => {
|
||
if (event.key === 'access_token') {
|
||
if (!event.newValue) {
|
||
// Token 被删除,其他标签页也跳转登录
|
||
window.location.href = '/login';
|
||
} else {
|
||
// Token 被更新,其他标签页也更新
|
||
console.log('Token 已在其他标签页更新');
|
||
}
|
||
}
|
||
});
|
||
```
|
||
|
||
### Q3: JWT 和 Session 有什么区别?
|
||
|
||
**A**:
|
||
|
||
| 特性 | JWT | Session |
|
||
|------|-----|---------|
|
||
| 存储位置 | 客户端(localStorage) | 服务器(Redis/数据库) |
|
||
| 状态 | 无状态 | 有状态 |
|
||
| 可扩展性 | 高(无需共享状态) | 低(需要共享 Session) |
|
||
| 吊销难度 | 高(需要黑名单) | 低(直接删除 Session) |
|
||
| 性能 | 高(无需查询) | 低(需要查询 Session) |
|
||
|
||
### Q4: 如何实现"记住我"功能?
|
||
|
||
**A**: 使用长期有效的 Refresh Token:
|
||
|
||
```javascript
|
||
// 登录时勾选"记住我"
|
||
async function login(username, password, rememberMe) {
|
||
const response = await fetch('/auth/login', {
|
||
method: 'POST',
|
||
body: JSON.stringify({
|
||
username,
|
||
password,
|
||
remember_me: rememberMe // 传递"记住我"标志
|
||
})
|
||
});
|
||
|
||
const data = await response.json();
|
||
|
||
if (rememberMe) {
|
||
// 长期有效的 Refresh Token(30 天)
|
||
localStorage.setItem('refresh_token', data.refresh_token);
|
||
}
|
||
}
|
||
```
|
||
|
||
### Q5: 如何检测 Token 是否被盗用?
|
||
|
||
**A**: 记录 Token 使用的 IP 和 User-Agent,检测异常:
|
||
|
||
```python
|
||
@router.post("/login")
|
||
async def login(request: Request, login_data: LoginRequest):
|
||
# 获取客户端信息
|
||
client_ip = request.client.host
|
||
user_agent = request.headers.get('User-Agent', '')
|
||
|
||
# 生成 Token 时记录
|
||
await record_token_metadata(
|
||
jti=jti,
|
||
client_ip=client_ip,
|
||
user_agent=user_agent
|
||
)
|
||
```
|
||
|
||
```python
|
||
async def verify_token(request: Request, token: str):
|
||
# 检查 IP 和 User-Agent 是否匹配
|
||
current_ip = request.client.host
|
||
current_user_agent = request.headers.get('User-Agent', '')
|
||
|
||
metadata = await get_token_metadata(jti)
|
||
|
||
if metadata['client_ip'] != current_ip:
|
||
# IP 不匹配,可能被盗用
|
||
await log_security_event(
|
||
'Token used from different IP',
|
||
jti=jti,
|
||
original_ip=metadata['client_ip'],
|
||
current_ip=current_ip
|
||
)
|
||
# 可选:吊销 Token
|
||
await revoke_token(jti)
|
||
raise HTTPException(status_code=401, detail='Token 异常使用')
|
||
```
|
||
|
||
---
|
||
|
||
## 故障排查
|
||
|
||
### 问题1: Token 验证失败 "InvalidSignatureError"
|
||
|
||
**原因**: JWT_SECRET 不一致
|
||
|
||
**解决**:
|
||
1. 检查 `config/env.{port}` 文件中的 `JWT_SECRET`
|
||
2. 确保前端请求的后端实例使用相同的 `JWT_SECRET`
|
||
3. 重新登录获取新 Token
|
||
|
||
### 问题2: Token 过期 "ExpiredSignatureError"
|
||
|
||
**原因**: Token 超过有效期
|
||
|
||
**解决**:
|
||
1. 使用 Refresh Token 刷新
|
||
2. 重新登录
|
||
3. 检查服务器时间是否正确(`exp` 基于 UTC 时间)
|
||
|
||
### 问题3: Token 解码失败 "DecodeError"
|
||
|
||
**原因**: Token 格式错误或被篡改
|
||
|
||
**解决**:
|
||
1. 检查 Token 是否完整(三部分用 `.` 分隔)
|
||
2. 检查 Token 是否被 URL 编码(不应该编码)
|
||
3. 检查前端是否正确存储和传递 Token
|
||
|
||
### 问题4: 中间件不生效
|
||
|
||
**原因**: 中间件注册顺序错误或白名单配置错误
|
||
|
||
**解决**:
|
||
1. 检查 `core/app.py` 中间件注册顺序(JWT 中间件应在路由之前)
|
||
2. 检查白名单配置 `WHITE_LIST`
|
||
3. 检查路由路径是否匹配
|
||
|
||
### 问题5: PostgREST 返回 401
|
||
|
||
**原因**: PostgREST 使用的 JWT Secret 不同
|
||
|
||
**解决**:
|
||
1. 确认 `PGRST_JWT_SECRET` 配置
|
||
2. PostgREST 内部 JWT 由后端生成,前端不直接使用
|
||
3. 检查 PostgREST 代理是否正确转换 Token
|
||
|
||
### 调试技巧
|
||
|
||
#### 1. 解码 Token(不验证签名)
|
||
|
||
```python
|
||
import jwt
|
||
|
||
# 解码 Token(查看内容)
|
||
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||
payload = jwt.decode(token, options={"verify_signature": False})
|
||
print(payload)
|
||
```
|
||
|
||
```javascript
|
||
// 前端解码 Token
|
||
function decodeToken(token) {
|
||
const base64Url = token.split('.')[1];
|
||
const base64 = base64Url.replace(/-/g, '+').replace(/_/g, '/');
|
||
const jsonPayload = decodeURIComponent(atob(base64).split('').map(function(c) {
|
||
return '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2);
|
||
}).join(''));
|
||
|
||
return JSON.parse(jsonPayload);
|
||
}
|
||
|
||
const token = localStorage.getItem('access_token');
|
||
console.log(decodeToken(token));
|
||
```
|
||
|
||
#### 2. 检查 Token 是否过期
|
||
|
||
```javascript
|
||
function isTokenExpired(token) {
|
||
const payload = decodeToken(token);
|
||
const now = Math.floor(Date.now() / 1000); // 当前时间(秒)
|
||
return payload.exp < now;
|
||
}
|
||
|
||
if (isTokenExpired(token)) {
|
||
console.log('Token 已过期');
|
||
}
|
||
```
|
||
|
||
#### 3. 启用详细日志
|
||
|
||
```ini
|
||
# config/env.{port}
|
||
LOG_LEVEL=DEBUG
|
||
JWT_DEBUG_ENABLED=true
|
||
```
|
||
|
||
---
|
||
|
||
## 附录
|
||
|
||
### A. JWT Payload 字段完整说明
|
||
|
||
| 字段 | 类型 | 必需 | 说明 | 示例 |
|
||
|------|------|------|------|------|
|
||
| `user_id` | integer | ✅ | 用户ID | `5` |
|
||
| `username` | string | ✅ | 用户名 | `"admin"` |
|
||
| `user_role` | string | ✅ | 用户角色 | `"系统管理员"` |
|
||
| `ou_id` | string | ❌ | 组织单位ID | `"000"` |
|
||
| `ou_name` | string | ❌ | 组织单位名称 | `"test"` |
|
||
| `exp` | integer | ✅ | 过期时间(Unix 时间戳) | `1732164207` |
|
||
| `iat` | integer | ✅ | 签发时间(Unix 时间戳) | `1732077807` |
|
||
| `aud` | string | ✅ | 受众 | `"docauditai:client"` |
|
||
| `jti` | string | ❌ | JWT ID(用于吊销) | `"550e8400-e29b-41d4-a716-446655440000"` |
|
||
| `token_version` | integer | ❌ | Token 版本号 | `1` |
|
||
|
||
### B. HTTP 状态码说明
|
||
|
||
| 状态码 | 说明 | 场景 |
|
||
|--------|------|------|
|
||
| 200 | 成功 | 登录成功、Token 验证成功 |
|
||
| 401 | 未授权 | Token 缺失、无效、过期、被吊销 |
|
||
| 403 | 禁止访问 | 权限不足 |
|
||
| 422 | 验证错误 | 登录参数错误 |
|
||
|
||
### C. 错误响应示例
|
||
|
||
```json
|
||
{
|
||
"detail": "令牌已过期,请重新登录",
|
||
"error_code": "TOKEN_EXPIRED",
|
||
"timestamp": "2025-11-17T10:30:00Z"
|
||
}
|
||
```
|
||
|
||
---
|
||
|
||
**文档版本**: v1.0
|
||
**最后更新**: 2025-11-17
|
||
**维护者**: DocAuditAI Team
|