Files
leaudit-platform-backend/fastapi_common/fastapi_common_security/jwtService.py
T
wren 535d97a70c chore: initial commit — leaudit-platform project skeleton
17-table PostgreSQL schema with full Chinese column comments,
FastAPI project structure (admin/common/modules),
DSL rule files, and schema migration scripts.
2026-04-27 16:48:22 +08:00

152 lines
4.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JWT 服务 —— Token 签发、验证、刷新、撤销。
合并自旧项目的 auth.pycreate_jwt_token / decode_jwt_token)和
jwt_manager.pygenerate_tokens / verify_token / refresh_token / revoke_token)。
逻辑不变,仅重组为服务类 + 使用项目统一配置。
"""
from __future__ import annotations
import hashlib
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any
import jwt
from fastapi_common.fastapi_common_logger import logger
from fastapi_admin.config import JWT_SECRET_KEY, JWT_ALGORITHM
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
JWT_AUDIENCE = "leaudit-platform"
JWT_ISSUER = "leaudit-platform"
class JwtService:
"""JWT Token 管理服务。"""
# ------------------------------------------------------------------
# Token 生成
# ------------------------------------------------------------------
@staticmethod
def generate(
userId: int,
username: str,
nickName: str = "",
ouId: str = "",
ouName: str = "",
roles: list[str] | None = None,
area: str | None = None,
userRole: str | None = None,
deviceId: str | None = None,
deviceName: str | None = None,
userAgent: str | None = None,
ipAddress: str | None = None,
) -> dict[str, Any]:
"""生成 Access Token + Refresh Token。
返回格式与旧项目 auth.py 完全一致:
{
"access_token": "...",
"refresh_token": "...",
"token_type": "Bearer",
"expires_in": 900
}
"""
now = datetime.now(timezone.utc)
jti = str(uuid.uuid4())
# Access Token
accessPayload = {
"jti": jti,
"user_id": userId,
"username": username,
"nick_name": nickName,
"ou_id": ouId,
"ou_name": ouName,
"roles": roles or [],
"area": area,
"user_role": userRole,
"iat": now,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
"aud": JWT_AUDIENCE,
"iss": JWT_ISSUER,
"type": "access",
}
accessToken = jwt.encode(accessPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
# Refresh Token
refreshJti = str(uuid.uuid4())
refreshPayload = {
"jti": refreshJti,
"user_id": userId,
"username": username,
"iat": now,
"exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
"aud": JWT_AUDIENCE,
"iss": JWT_ISSUER,
"type": "refresh",
}
refreshToken = jwt.encode(refreshPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
# Token 签发时间(格式化字符串,供前端展示)
issuedTime = now.strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"JWT issued: user={userId}, jti={jti}")
return {
"access_token": accessToken,
"refresh_token": refreshToken,
"token_type": "Bearer",
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
"issued_time": issuedTime,
"jti": jti,
"refresh_jti": refreshJti,
}
# ------------------------------------------------------------------
# Token 验证
# ------------------------------------------------------------------
@staticmethod
def verify(token: str) -> dict[str, Any]:
"""验证并解码 JWT Token。
Raises:
jwt.ExpiredSignatureError: Token 过期
jwt.InvalidTokenError: Token 无效
"""
payload = jwt.decode(
token,
JWT_SECRET_KEY,
algorithms=[JWT_ALGORITHM],
audience=JWT_AUDIENCE,
issuer=JWT_ISSUER,
)
return payload
@staticmethod
def decodeUnsafe(token: str) -> dict[str, Any] | None:
"""不验证签名解码 Token(仅用于调试或获取过期 Token 信息)。"""
try:
return jwt.decode(token, options={"verify_signature": False})
except Exception:
return None
# ------------------------------------------------------------------
# Token 撤销
# ------------------------------------------------------------------
@staticmethod
def _hashToken(token: str) -> str:
return hashlib.sha256(token.encode()).hexdigest()
@staticmethod
def getJti(token: str) -> str | None:
"""从 Token 中提取 JTI(不验证签名)。"""
payload = JwtService.decodeUnsafe(token)
return payload.get("jti") if payload else None