Files
leaudit-platform-backend/fastapi_common/fastapi_common_security/jwtService.py
T

154 lines
4.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JWT 服务 —— Token 签发、验证、刷新、撤销。
合并自旧项目的 auth.pycreate_jwt_token / decode_jwt_token)和
jwt_manager.pygenerate_tokens / verify_token / refresh_token / revoke_token)。
逻辑不变,仅重组为服务类 + 使用项目统一配置。
"""
from __future__ import annotations
import hashlib
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from fastapi_common.fastapi_common_logger import logger
from fastapi_admin.config import JWT_SECRET_KEY, JWT_ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = 60
REFRESH_TOKEN_EXPIRE_DAYS = 7
JWT_AUDIENCE = "leaudit-platform"
JWT_ISSUER = "leaudit-platform"
class JwtService:
"""JWT Token 管理服务。"""
# ------------------------------------------------------------------
# Token 生成
# ------------------------------------------------------------------
@staticmethod
def generate(
userId: int,
username: str,
nickName: str = "",
ouId: str = "",
ouName: str = "",
roles: list[str] | None = None,
permissions: list[str] | None = None,
area: str | None = None,
userRole: str | None = None,
deviceId: str | None = None,
deviceName: str | None = None,
userAgent: str | None = None,
ipAddress: str | None = None,
) -> dict[str, Any]:
"""生成 Access Token + Refresh Token。
返回格式与旧项目 auth.py 完全一致:
{
"access_token": "...",
"refresh_token": "...",
"token_type": "Bearer",
"expires_in": 900
}
"""
now = datetime.now(timezone.utc)
jti = str(uuid.uuid4())
# Access Token
# Token 只保留鉴权链路真正需要的最小字段,避免省局/管理员权限过多时
# 把 permissions / roles 全塞进 JWT,最终导致前端 Cookie Session 超过 4KB。
accessPayload = {
"jti": jti,
"user_id": userId,
"username": username,
"nick_name": nickName,
"ou_id": ouId,
"ou_name": ouName,
"area": area,
"user_role": userRole,
"iat": now,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
"aud": JWT_AUDIENCE,
"iss": JWT_ISSUER,
"type": "access",
}
accessToken = jwt.encode(accessPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
# Refresh Token
refreshJti = str(uuid.uuid4())
refreshPayload = {
"jti": refreshJti,
"user_id": userId,
"username": username,
"iat": now,
"exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
"aud": JWT_AUDIENCE,
"iss": JWT_ISSUER,
"type": "refresh",
}
refreshToken = jwt.encode(refreshPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
# Token 签发时间(格式化字符串,供前端展示)
issuedTime = now.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"JWT issued: user={userId}, jti={jti}")
return {
"access_token": accessToken,
"refresh_token": refreshToken,
"token_type": "Bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
"issued_time": issuedTime,
"jti": jti,
"refresh_jti": refreshJti,
}
# ------------------------------------------------------------------
# Token 验证
# ------------------------------------------------------------------
@staticmethod
def verify(token: str) -> dict[str, Any]:
"""验证并解码 JWT Token。
Raises:
jwt.ExpiredSignatureError: Token 过期
jwt.InvalidTokenError: Token 无效
"""
payload = jwt.decode(
token,
JWT_SECRET_KEY,
algorithms=[JWT_ALGORITHM],
audience=JWT_AUDIENCE,
issuer=JWT_ISSUER,
)
return payload
@staticmethod
def decodeUnsafe(token: str) -> dict[str, Any] | None:
"""不验证签名解码 Token(仅用于调试或获取过期 Token 信息)。"""
try:
return jwt.decode(token, options={"verify_signature": False})
except Exception:
return None
# ------------------------------------------------------------------
# Token 撤销
# ------------------------------------------------------------------
@staticmethod
def _hashToken(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
@staticmethod
def getJti(token: str) -> str | None:
"""从 Token 中提取 JTI(不验证签名)。"""
payload = JwtService.decodeUnsafe(token)
return payload.get("jti") if payload else None