Files
2025-11-18 11:06:24 +08:00

31 KiB
Raw Permalink Blame History

JWT 管理文档

DocAuditAI 系统 JWTJSON Web Token)管理完整指南

目录

  1. JWT 概述
  2. JWT 生成机制
  3. JWT 验证流程
  4. JWT 配置说明
  5. JWT 刷新机制
  6. JWT 吊销管理
  7. 前端对接指南
  8. 安全最佳实践
  9. 常见问题
  10. 故障排查

JWT 概述

什么是 JWT

JWTJSON Web Token)是一种开放标准(RFC 7519),用于在各方之间安全地传输信息。在 DocAuditAI 系统中,JWT 用于用户身份认证和授权。

JWT 结构

JWT 由三部分组成,用 . 分隔:

Header.Payload.Signature

示例

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjo1LCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNzMyMTY0MjA3fQ.Kz8fN3Y8Xp4QmW2vY6Jh9Lk0Rn1Tc3Uv5Wx6Yz7

1. Header(头部)

包含 Token 类型和签名算法:

{
  "alg": "HS256",
  "typ": "JWT"
}

2. Payload(载荷)

包含用户信息和权限数据:

{
  "user_id": 5,
  "username": "admin",
  "user_role": "系统管理员",
  "ou_id": "000",
  "ou_name": "test",
  "exp": 1732164207,
  "iat": 1732077807,
  "aud": "docauditai:client"
}

3. Signature(签名)

使用密钥对头部和载荷进行签名,防止篡改:

HMACSHA256(
  base64UrlEncode(header) + "." +
  base64UrlEncode(payload),
  JWT_SECRET
)

JWT 生成机制

生成流程

┌─────────────┐      ┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│ 1. 用户登录  │ -->  │ 2. 验证凭据  │ -->  │ 3. 生成JWT   │ -->  │ 4. 返回Token │
└─────────────┘      └─────────────┘      └─────────────┘      └─────────────┘

代码实现

1. 用户登录接口

接口: POST /auth/login

请求体:

{
  "username": "admin",
  "password": "your_password"
}

响应:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 86400,
  "user_info": {
    "user_id": 5,
    "username": "admin",
    "roles": ["系统管理员"],
    "ou_id": "000",
    "ou_name": "test"
  }
}

2. JWT 生成代码

位置: app/auth/jwt_handler.py

import jwt
from datetime import datetime, timedelta
import os

def create_access_token(user_data: dict) -> str:
    """
    生成 JWT Access Token

    Args:
        user_data: 用户信息字典

    Returns:
        JWT Token 字符串
    """
    # JWT 配置
    JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
    JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
    JWT_EXPIRATION_HOURS = int(os.getenv('JWT_EXPIRATION_HOURS', 24))
    JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')

    # 计算过期时间
    now = datetime.utcnow()
    exp = now + timedelta(hours=JWT_EXPIRATION_HOURS)

    # 构建 Payload
    payload = {
        'user_id': user_data['user_id'],
        'username': user_data['username'],
        'user_role': user_data.get('user_role', ''),
        'ou_id': user_data.get('ou_id', ''),
        'ou_name': user_data.get('ou_name', ''),
        'exp': int(exp.timestamp()),
        'iat': int(now.timestamp()),
        'aud': JWT_AUDIENCE
    }

    # 生成 JWT
    token = jwt.encode(
        payload,
        JWT_SECRET,
        algorithm=JWT_ALGORITHM
    )

    return token

3. 登录接口完整示例

from fastapi import APIRouter, HTTPException, status
from pydantic import BaseModel

router = APIRouter()

class LoginRequest(BaseModel):
    username: str
    password: str

@router.post("/login")
async def login(request: LoginRequest):
    """用户登录接口"""

    # 1. 验证用户名密码(示例:从数据库查询)
    user = await verify_credentials(request.username, request.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误"
        )

    # 2. 获取用户角色和权限
    user_roles = await get_user_roles(user['id'])

    # 3. 构建用户数据
    user_data = {
        'user_id': user['id'],
        'username': user['username'],
        'user_role': user_roles[0] if user_roles else '',
        'ou_id': user['ou_id'],
        'ou_name': user['ou_name']
    }

    # 4. 生成 JWT
    access_token = create_access_token(user_data)

    # 5. 返回响应
    return {
        'access_token': access_token,
        'token_type': 'bearer',
        'expires_in': 86400,  # 24小时(秒)
        'user_info': user_data
    }

JWT 验证流程

双层验证机制

DocAuditAI 采用两层 JWT 验证机制

1. 中间件层(快速验证)

位置: app/middleware/jwt_auth.py

目的: 快速验证 Token 存在性,解析基本用户信息

流程:

async def jwt_auth_middleware(request: Request, call_next):
    """JWT 认证中间件(宽松模式)"""

    # 白名单路径(无需认证)
    if request.url.path in WHITE_LIST:
        return await call_next(request)

    # 获取 Token
    auth_header = request.headers.get('Authorization')
    if not auth_header or not auth_header.startswith('Bearer '):
        return JSONResponse(
            status_code=401,
            content={'detail': '缺少认证令牌'}
        )

    token = auth_header.replace('Bearer ', '')

    try:
        # 解析 JWT(不验证签名,提升性能)
        payload = jwt.decode(
            token,
            options={"verify_signature": False}
        )

        # 设置用户信息到 request.state
        request.state.current_user = {
            'user_id': payload.get('user_id'),
            'username': payload.get('username'),
            'user_role': payload.get('user_role'),
            'ou_id': payload.get('ou_id')
        }

    except jwt.DecodeError:
        return JSONResponse(
            status_code=401,
            content={'detail': '无效的令牌格式'}
        )

    return await call_next(request)

特点:

  • 高性能:不验证签名,只解析 Token
  • 覆盖所有请求
  • ⚠️ 不验证 Token 是否被篡改
  • ⚠️ 不验证 Token 是否过期

2. 路由层(完整验证)

位置: app/auth/jwt_handler.py 中的 verify_token() 依赖函数

目的: 严格验证 Token 完整性和有效性

流程:

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import os

security = HTTPBearer()

async def verify_token(
    credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
    """
    完整 JWT 验证(路由层)

    验证内容:
    1. JWT 签名验证
    2. 过期时间验证
    3. Audience 验证
    4. 必需字段验证

    Returns:
        用户信息字典

    Raises:
        HTTPException: Token 无效、过期或缺少必需字段
    """
    token = credentials.credentials
    JWT_SECRET = os.getenv('JWT_SECRET', 'your-secret-key')
    JWT_ALGORITHM = os.getenv('JWT_ALGORITHM', 'HS256')
    JWT_AUDIENCE = os.getenv('JWT_AUDIENCE', 'docauditai:client')

    try:
        # 完整验证(包含签名、过期时间、audience)
        payload = jwt.decode(
            token,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM],
            audience=JWT_AUDIENCE,
            options={
                "verify_signature": True,
                "verify_exp": True,
                "verify_aud": True
            }
        )

        # 验证必需字段
        required_fields = ['user_id', 'username', 'user_role', 'exp']
        for field in required_fields:
            if field not in payload:
                raise HTTPException(
                    status_code=401,
                    detail=f'Token 缺少必需字段: {field}'
                )

        return payload

    except jwt.ExpiredSignatureError:
        raise HTTPException(
            status_code=401,
            detail='令牌已过期,请重新登录'
        )
    except jwt.InvalidAudienceError:
        raise HTTPException(
            status_code=401,
            detail='令牌受众验证失败'
        )
    except jwt.InvalidSignatureError:
        raise HTTPException(
            status_code=401,
            detail='令牌签名验证失败'
        )
    except jwt.DecodeError:
        raise HTTPException(
            status_code=401,
            detail='令牌格式错误'
        )

使用示例:

from fastapi import APIRouter, Depends
from app.auth.jwt_handler import verify_token

router = APIRouter()

@router.get("/protected/resource")
async def get_protected_resource(
    current_user: dict = Depends(verify_token)
):
    """需要完整 JWT 验证的接口"""
    return {
        'message': '访问成功',
        'user_id': current_user['user_id'],
        'username': current_user['username']
    }

特点:

  • 严格验证:签名、过期时间、受众、必需字段
  • 防篡改:验证 Token 签名
  • 防重放:验证过期时间
  • ⚠️ 性能较低:需要密钥解密和验证

验证流程对比表

验证内容 中间件层 路由层
Token 存在性
Token 格式正确
JWT 签名验证
过期时间验证
Audience 验证
必需字段验证
性能
使用场景 所有请求 敏感操作

JWT 配置说明

环境变量配置

位置: config/env.{port} (例如 config/env.8073

# JWT 配置
JWT_SECRET=your-256-bit-secret-key-here
JWT_ALGORITHM=HS256
JWT_EXPIRATION_HOURS=24
JWT_AUDIENCE=docauditai:client

# PostgREST JWT 配置(不同于用户 JWT)
PGRST_JWT_SECRET=different-secret-for-postgrest

配置项说明

配置项 说明 默认值 推荐值
JWT_SECRET JWT 签名密钥(256位) your-secret-key 随机生成的256位字符串
JWT_ALGORITHM JWT 签名算法 HS256 HS256(对称)或 RS256(非对称)
JWT_EXPIRATION_HOURS Token 有效期(小时) 24 24(生产环境) / 1(测试环境)
JWT_AUDIENCE JWT 受众标识 docauditai:client docauditai:client
PGRST_JWT_SECRET PostgREST 内部 JWT 密钥 - JWT_SECRET 不同的密钥

密钥生成示例

方法1Python 生成

import secrets

# 生成 256 位(32 字节)密钥
jwt_secret = secrets.token_urlsafe(32)
print(f"JWT_SECRET={jwt_secret}")

方法2OpenSSL 生成

openssl rand -base64 32

方法3:在线生成

访问:https://randomkeygen.com/ (选择 256-bit 密钥)

安全注意事项

⚠️ 重要

  1. JWT_SECRET 必须保密,不要提交到代码仓库
  2. JWT_SECRET 和 PGRST_JWT_SECRET 必须不同
  3. 生产环境使用强随机密钥(至少 256 位)
  4. 定期轮换密钥(建议每 90 天)

JWT 刷新机制

刷新策略

DocAuditAI 支持两种 Token 刷新策略:

策略1Token 自动续期(推荐)

原理: Token 过期前 N 小时内访问任何接口,自动返回新 Token

配置:

JWT_AUTO_REFRESH_ENABLED=true
JWT_AUTO_REFRESH_THRESHOLD_HOURS=6

响应头:

X-New-Access-Token: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...

前端处理:

// axios 响应拦截器
apiClient.interceptors.response.use(
  response => {
    // 检查是否有新 Token
    const newToken = response.headers['x-new-access-token'];
    if (newToken) {
      localStorage.setItem('access_token', newToken);
      console.log('Token 已自动续期');
    }
    return response;
  },
  error => Promise.reject(error)
);

策略2Refresh Token 机制

原理: 登录时返回 Access Token + Refresh TokenAccess Token 过期后使用 Refresh Token 获取新 Token

登录响应:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 86400
}

刷新接口:

请求: POST /auth/refresh

请求体:

{
  "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}

响应:

{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
  "token_type": "bearer",
  "expires_in": 86400
}

前端实现:

async function refreshAccessToken() {
  const refreshToken = localStorage.getItem('refresh_token');
  if (!refreshToken) {
    // 跳转登录页
    window.location.href = '/login';
    return null;
  }

  try {
    const response = await fetch('/auth/refresh', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ refresh_token: refreshToken })
    });

    if (response.ok) {
      const data = await response.json();
      localStorage.setItem('access_token', data.access_token);
      return data.access_token;
    } else {
      // Refresh Token 也过期,跳转登录页
      localStorage.removeItem('access_token');
      localStorage.removeItem('refresh_token');
      window.location.href = '/login';
      return null;
    }
  } catch (error) {
    console.error('Token 刷新失败:', error);
    return null;
  }
}

// axios 响应拦截器(自动刷新)
apiClient.interceptors.response.use(
  response => response,
  async error => {
    const originalRequest = error.config;

    // 如果是 401 错误且未重试过
    if (error.response?.status === 401 && !originalRequest._retry) {
      originalRequest._retry = true;

      // 刷新 Token
      const newToken = await refreshAccessToken();
      if (newToken) {
        // 使用新 Token 重试原请求
        originalRequest.headers.Authorization = `Bearer ${newToken}`;
        return apiClient(originalRequest);
      }
    }

    return Promise.reject(error);
  }
);

JWT 吊销管理

吊销场景

  1. 用户主动登出
  2. 管理员强制下线用户
  3. 密码修改后旧 Token 失效
  4. 检测到异常登录

吊销实现方式

方式1:黑名单(推荐)

原理: 将吊销的 Token 存入 Redis 黑名单,验证时检查

表结构: jwt_tokens

CREATE TABLE jwt_tokens (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    token_jti VARCHAR(64) UNIQUE NOT NULL,  -- JWT ID
    token_type VARCHAR(20) DEFAULT 'access',
    revoked BOOLEAN DEFAULT false,
    revoked_at TIMESTAMP,
    expires_at TIMESTAMP NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

CREATE INDEX idx_jwt_tokens_jti ON jwt_tokens(token_jti);
CREATE INDEX idx_jwt_tokens_user_id ON jwt_tokens(user_id);

生成 Token 时记录:

import uuid

def create_access_token(user_data: dict) -> str:
    # 生成唯一 JWT ID
    jti = str(uuid.uuid4())

    # Payload 中包含 jti
    payload = {
        'user_id': user_data['user_id'],
        'username': user_data['username'],
        'jti': jti,  # 唯一标识
        'exp': int(exp.timestamp()),
        'iat': int(now.timestamp()),
        'aud': JWT_AUDIENCE
    }

    # 生成 Token
    token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)

    # 记录到数据库
    await record_token(
        user_id=user_data['user_id'],
        token_jti=jti,
        expires_at=exp
    )

    return token

验证 Token 时检查黑名单:

async def verify_token(token: str) -> dict:
    # 解码 Token
    payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])
    jti = payload.get('jti')

    # 检查是否在黑名单
    is_revoked = await check_token_revoked(jti)
    if is_revoked:
        raise HTTPException(
            status_code=401,
            detail='令牌已被吊销'
        )

    return payload

async def check_token_revoked(jti: str) -> bool:
    """检查 Token 是否已吊销"""
    # 先查 Redis 缓存
    cache_key = f"jwt:revoked:{jti}"
    is_revoked = await redis_client.get(cache_key)
    if is_revoked is not None:
        return is_revoked == 'true'

    # 查数据库
    query = "SELECT revoked FROM jwt_tokens WHERE token_jti = $1"
    result = await db_pool.fetchrow(query, jti)

    if result and result['revoked']:
        # 缓存到 Redis24小时)
        await redis_client.setex(cache_key, 86400, 'true')
        return True

    return False

吊销 Token 接口:

登出接口: POST /auth/logout

@router.post("/logout")
async def logout(current_user: dict = Depends(verify_token)):
    """用户登出"""
    jti = current_user.get('jti')

    # 吊销 Token
    await revoke_token(jti)

    return {'message': '登出成功'}

async def revoke_token(jti: str):
    """吊销指定 Token"""
    query = """
        UPDATE jwt_tokens
        SET revoked = true, revoked_at = CURRENT_TIMESTAMP
        WHERE token_jti = $1
    """
    await db_pool.execute(query, jti)

    # 同步到 Redis
    cache_key = f"jwt:revoked:{jti}"
    await redis_client.setex(cache_key, 86400, 'true')

强制下线用户:

async def revoke_user_all_tokens(user_id: int):
    """吊销用户的所有 Token"""
    query = """
        UPDATE jwt_tokens
        SET revoked = true, revoked_at = CURRENT_TIMESTAMP
        WHERE user_id = $1 AND revoked = false
        RETURNING token_jti
    """
    results = await db_pool.fetch(query, user_id)

    # 同步到 Redis
    for row in results:
        cache_key = f"jwt:revoked:{row['token_jti']}"
        await redis_client.setex(cache_key, 86400, 'true')

方式2Token 版本号

原理: 为每个用户维护 Token 版本号,密码修改/强制下线时递增版本号

用户表新增字段:

ALTER TABLE sso_users ADD COLUMN token_version INTEGER DEFAULT 1;

Payload 中包含版本号:

payload = {
    'user_id': user_data['user_id'],
    'username': user_data['username'],
    'token_version': user_data['token_version'],  # 版本号
    'exp': int(exp.timestamp()),
    'aud': JWT_AUDIENCE
}

验证时检查版本号:

async def verify_token(token: str) -> dict:
    payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALGORITHM])

    # 获取用户当前版本号
    current_version = await get_user_token_version(payload['user_id'])

    # 检查版本号是否匹配
    if payload.get('token_version', 0) != current_version:
        raise HTTPException(
            status_code=401,
            detail='令牌版本已过期'
        )

    return payload

密码修改后递增版本号:

async def change_password(user_id: int, new_password: str):
    # 更新密码
    await update_user_password(user_id, new_password)

    # 递增 Token 版本号(使所有旧 Token 失效)
    query = """
        UPDATE sso_users
        SET token_version = token_version + 1
        WHERE id = $1
    """
    await db_pool.execute(query, user_id)

前端对接指南

1. 登录流程

// 登录函数
async function login(username, password) {
  try {
    const response = await fetch('/auth/login', {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({ username, password })
    });

    if (response.ok) {
      const data = await response.json();

      // 存储 Token
      localStorage.setItem('access_token', data.access_token);
      localStorage.setItem('user_info', JSON.stringify(data.user_info));

      // 可选:存储 Refresh Token
      if (data.refresh_token) {
        localStorage.setItem('refresh_token', data.refresh_token);
      }

      // 跳转首页
      window.location.href = '/';
    } else {
      const error = await response.json();
      alert(`登录失败: ${error.detail}`);
    }
  } catch (error) {
    console.error('登录错误:', error);
    alert('网络错误,请稍后重试');
  }
}

2. 配置 Axios 拦截器

import axios from 'axios';

// 创建 Axios 实例
const apiClient = axios.create({
  baseURL: 'http://localhost:8000/api/v1',
  headers: { 'Content-Type': 'application/json' }
});

// 请求拦截器:自动添加 Token
apiClient.interceptors.request.use(
  config => {
    const token = localStorage.getItem('access_token');
    if (token) {
      config.headers.Authorization = `Bearer ${token}`;
    }
    return config;
  },
  error => Promise.reject(error)
);

// 响应拦截器:处理 401 错误
apiClient.interceptors.response.use(
  response => {
    // 检查是否有新 Token(自动续期)
    const newToken = response.headers['x-new-access-token'];
    if (newToken) {
      localStorage.setItem('access_token', newToken);
    }
    return response;
  },
  async error => {
    const originalRequest = error.config;

    // 401 错误处理
    if (error.response?.status === 401) {
      // 如果有 Refresh Token,尝试刷新
      if (!originalRequest._retry && localStorage.getItem('refresh_token')) {
        originalRequest._retry = true;
        const newToken = await refreshAccessToken();
        if (newToken) {
          originalRequest.headers.Authorization = `Bearer ${newToken}`;
          return apiClient(originalRequest);
        }
      }

      // 否则跳转登录页
      localStorage.clear();
      window.location.href = '/login';
    }

    return Promise.reject(error);
  }
);

export default apiClient;

3. 获取用户信息

// 从 localStorage 获取
function getCurrentUser() {
  const userInfo = localStorage.getItem('user_info');
  return userInfo ? JSON.parse(userInfo) : null;
}

// 从服务器获取最新用户信息
async function fetchCurrentUser() {
  try {
    const response = await apiClient.get('/auth/me');
    const userInfo = response.data;
    localStorage.setItem('user_info', JSON.stringify(userInfo));
    return userInfo;
  } catch (error) {
    console.error('获取用户信息失败:', error);
    return null;
  }
}

4. 登出流程

async function logout() {
  try {
    // 调用登出接口(吊销 Token)
    await apiClient.post('/auth/logout');
  } catch (error) {
    console.error('登出接口调用失败:', error);
  } finally {
    // 清除本地存储
    localStorage.clear();

    // 跳转登录页
    window.location.href = '/login';
  }
}

5. 路由守卫(Vue.js 示例)

import { createRouter, createWebHistory } from 'vue-router';

const router = createRouter({
  history: createWebHistory(),
  routes: [
    { path: '/login', component: LoginPage },
    { path: '/', component: HomePage, meta: { requiresAuth: true } },
    { path: '/documents', component: DocumentList, meta: { requiresAuth: true } }
  ]
});

// 全局路由守卫
router.beforeEach((to, from, next) => {
  const token = localStorage.getItem('access_token');

  // 需要认证的路由
  if (to.meta.requiresAuth && !token) {
    next('/login');
    return;
  }

  // 已登录用户访问登录页,跳转首页
  if (to.path === '/login' && token) {
    next('/');
    return;
  }

  next();
});

export default router;

安全最佳实践

1. Token 存储

推荐方式: localStoragesessionStorage

不推荐: Cookie(容易受到 CSRF 攻击)

示例:

// 存储
localStorage.setItem('access_token', token);

// 读取
const token = localStorage.getItem('access_token');

// 删除
localStorage.removeItem('access_token');

2. HTTPS 传输

⚠️ 生产环境必须使用 HTTPS

  • Token 通过 HTTPS 加密传输
  • 防止中间人攻击(MITM
  • 防止 Token 被窃取

3. Token 有效期

推荐配置:

  • 生产环境: 24 小时
  • 测试环境: 1-2 小时
  • 开发环境: 7 天(方便调试)

4. 密钥管理

正确做法:

  • 使用环境变量存储密钥
  • 生产环境使用强随机密钥
  • 定期轮换密钥(90 天)
  • 使用密钥管理服务(AWS KMS、Azure Key Vault

错误做法:

  • 硬编码密钥在代码中
  • 将密钥提交到 Git
  • 使用弱密钥(如 secret12345

5. Token 吊销

必须实现:

  • 用户登出时吊销 Token
  • 密码修改后吊销所有旧 Token
  • 检测到异常登录时吊销 Token
  • 管理员强制下线用户

6. 防止 XSS 攻击

防护措施:

  • 端对用户输入进行转义
  • 使用 CSPContent Security Policy
  • 避免在 URL 中传递 Token
  • 使用 HttpOnly Cookie(如果使用 Cookie 存储)

7. 防止 CSRF 攻击

防护措施:

  • 使用 Bearer Token 而非 Cookie
  • 验证 Referer 头
  • 使用 CSRF Token

常见问题

Q1: Token 过期后如何处理?

A: 使用 Refresh Token 机制或自动续期机制:

// 响应拦截器
apiClient.interceptors.response.use(
  response => response,
  async error => {
    if (error.response?.status === 401 && !error.config._retry) {
      error.config._retry = true;
      const newToken = await refreshAccessToken();
      if (newToken) {
        error.config.headers.Authorization = `Bearer ${newToken}`;
        return apiClient(error.config);
      }
    }
    return Promise.reject(error);
  }
);

Q2: 如何在多标签页间同步登录状态?

A: 使用 storage 事件监听 localStorage 变化:

window.addEventListener('storage', (event) => {
  if (event.key === 'access_token') {
    if (!event.newValue) {
      // Token 被删除,其他标签页也跳转登录
      window.location.href = '/login';
    } else {
      // Token 被更新,其他标签页也更新
      console.log('Token 已在其他标签页更新');
    }
  }
});

Q3: JWT 和 Session 有什么区别?

A:

特性 JWT Session
存储位置 客户端(localStorage 服务器(Redis/数据库)
状态 无状态 有状态
可扩展性 高(无需共享状态) 低(需要共享 Session
吊销难度 高(需要黑名单) 低(直接删除 Session
性能 高(无需查询) 低(需要查询 Session

Q4: 如何实现"记住我"功能?

A: 使用长期有效的 Refresh Token

// 登录时勾选"记住我"
async function login(username, password, rememberMe) {
  const response = await fetch('/auth/login', {
    method: 'POST',
    body: JSON.stringify({
      username,
      password,
      remember_me: rememberMe  // 传递"记住我"标志
    })
  });

  const data = await response.json();

  if (rememberMe) {
    // 长期有效的 Refresh Token30 天)
    localStorage.setItem('refresh_token', data.refresh_token);
  }
}

Q5: 如何检测 Token 是否被盗用?

A: 记录 Token 使用的 IP 和 User-Agent,检测异常:

@router.post("/login")
async def login(request: Request, login_data: LoginRequest):
    # 获取客户端信息
    client_ip = request.client.host
    user_agent = request.headers.get('User-Agent', '')

    # 生成 Token 时记录
    await record_token_metadata(
        jti=jti,
        client_ip=client_ip,
        user_agent=user_agent
    )
async def verify_token(request: Request, token: str):
    # 检查 IP 和 User-Agent 是否匹配
    current_ip = request.client.host
    current_user_agent = request.headers.get('User-Agent', '')

    metadata = await get_token_metadata(jti)

    if metadata['client_ip'] != current_ip:
        # IP 不匹配,可能被盗用
        await log_security_event(
            'Token used from different IP',
            jti=jti,
            original_ip=metadata['client_ip'],
            current_ip=current_ip
        )
        # 可选:吊销 Token
        await revoke_token(jti)
        raise HTTPException(status_code=401, detail='Token 异常使用')

故障排查

问题1: Token 验证失败 "InvalidSignatureError"

原因: JWT_SECRET 不一致

解决:

  1. 检查 config/env.{port} 文件中的 JWT_SECRET
  2. 确保前端请求的后端实例使用相同的 JWT_SECRET
  3. 重新登录获取新 Token

问题2: Token 过期 "ExpiredSignatureError"

原因: Token 超过有效期

解决:

  1. 使用 Refresh Token 刷新
  2. 重新登录
  3. 检查服务器时间是否正确(exp 基于 UTC 时间)

问题3: Token 解码失败 "DecodeError"

原因: Token 格式错误或被篡改

解决:

  1. 检查 Token 是否完整(三部分用 . 分隔)
  2. 检查 Token 是否被 URL 编码(不应该编码)
  3. 检查前端是否正确存储和传递 Token

问题4: 中间件不生效

原因: 中间件注册顺序错误或白名单配置错误

解决:

  1. 检查 core/app.py 中间件注册顺序(JWT 中间件应在路由之前)
  2. 检查白名单配置 WHITE_LIST
  3. 检查路由路径是否匹配

问题5: PostgREST 返回 401

原因: PostgREST 使用的 JWT Secret 不同

解决:

  1. 确认 PGRST_JWT_SECRET 配置
  2. PostgREST 内部 JWT 由后端生成,前端不直接使用
  3. 检查 PostgREST 代理是否正确转换 Token

调试技巧

1. 解码 Token(不验证签名)

import jwt

# 解码 Token(查看内容)
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
payload = jwt.decode(token, options={"verify_signature": False})
print(payload)
// 前端解码 Token
function decodeToken(token) {
  const base64Url = token.split('.')[1];
  const base64 = base64Url.replace(/-/g, '+').replace(/_/g, '/');
  const jsonPayload = decodeURIComponent(atob(base64).split('').map(function(c) {
    return '%' + ('00' + c.charCodeAt(0).toString(16)).slice(-2);
  }).join(''));

  return JSON.parse(jsonPayload);
}

const token = localStorage.getItem('access_token');
console.log(decodeToken(token));

2. 检查 Token 是否过期

function isTokenExpired(token) {
  const payload = decodeToken(token);
  const now = Math.floor(Date.now() / 1000);  // 当前时间(秒)
  return payload.exp < now;
}

if (isTokenExpired(token)) {
  console.log('Token 已过期');
}

3. 启用详细日志

# config/env.{port}
LOG_LEVEL=DEBUG
JWT_DEBUG_ENABLED=true

附录

A. JWT Payload 字段完整说明

字段 类型 必需 说明 示例
user_id integer 用户ID 5
username string 用户名 "admin"
user_role string 用户角色 "系统管理员"
ou_id string 组织单位ID "000"
ou_name string 组织单位名称 "test"
exp integer 过期时间(Unix 时间戳) 1732164207
iat integer 签发时间(Unix 时间戳) 1732077807
aud string 受众 "docauditai:client"
jti string JWT ID(用于吊销) "550e8400-e29b-41d4-a716-446655440000"
token_version integer Token 版本号 1

B. HTTP 状态码说明

状态码 说明 场景
200 成功 登录成功、Token 验证成功
401 未授权 Token 缺失、无效、过期、被吊销
403 禁止访问 权限不足
422 验证错误 登录参数错误

C. 错误响应示例

{
  "detail": "令牌已过期,请重新登录",
  "error_code": "TOKEN_EXPIRED",
  "timestamp": "2025-11-17T10:30:00Z"
}

文档版本: v1.0 最后更新: 2025-11-17 维护者: DocAuditAI Team