160 lines
5.1 KiB
Python
160 lines
5.1 KiB
Python
"""JWT 服务 —— Token 签发、验证、刷新、撤销。
|
||
|
||
合并自旧项目的 auth.py(create_jwt_token / decode_jwt_token)和
|
||
jwt_manager.py(generate_tokens / verify_token / refresh_token / revoke_token)。
|
||
逻辑不变,仅重组为服务类 + 使用项目统一配置。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import hashlib
|
||
import time
|
||
import uuid
|
||
from datetime import datetime, timedelta, timezone
|
||
from typing import Any
|
||
|
||
import jwt
|
||
|
||
from fastapi_common.fastapi_common_logger import logger
|
||
from fastapi_admin.config import JWT_SECRET_KEY, JWT_ALGORITHM
|
||
|
||
ACCESS_TOKEN_EXPIRE_MINUTES = 60
|
||
REFRESH_TOKEN_EXPIRE_DAYS = 7
|
||
JWT_AUDIENCE = "leaudit-platform"
|
||
JWT_ISSUER = "leaudit-platform"
|
||
|
||
|
||
class JwtService:
|
||
"""JWT Token 管理服务。"""
|
||
|
||
# ------------------------------------------------------------------
|
||
# Token 生成
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def generate(
|
||
userId: int,
|
||
username: str,
|
||
nickName: str = "",
|
||
ouId: str = "",
|
||
ouName: str = "",
|
||
roles: list[str] | None = None,
|
||
permissions: list[str] | None = None,
|
||
area: str | None = None,
|
||
tenantCode: str | None = None,
|
||
tenantName: str | None = None,
|
||
tenantType: str | None = None,
|
||
userRole: str | None = None,
|
||
deviceId: str | None = None,
|
||
deviceName: str | None = None,
|
||
userAgent: str | None = None,
|
||
ipAddress: str | None = None,
|
||
) -> dict[str, Any]:
|
||
"""生成 Access Token + Refresh Token。
|
||
|
||
返回格式与旧项目 auth.py 完全一致:
|
||
{
|
||
"access_token": "...",
|
||
"refresh_token": "...",
|
||
"token_type": "Bearer",
|
||
"expires_in": 900
|
||
}
|
||
"""
|
||
now = datetime.now(timezone.utc)
|
||
jti = str(uuid.uuid4())
|
||
|
||
# Access Token
|
||
# Token 只保留鉴权链路真正需要的最小字段,避免省局/管理员权限过多时
|
||
# 把 permissions / roles 全塞进 JWT,最终导致前端 Cookie Session 超过 4KB。
|
||
accessPayload = {
|
||
"jti": jti,
|
||
"user_id": userId,
|
||
"username": username,
|
||
"nick_name": nickName,
|
||
"ou_id": ouId,
|
||
"ou_name": ouName,
|
||
"area": area,
|
||
"tenant_code": tenantCode,
|
||
"tenant_name": tenantName,
|
||
"tenant_type": tenantType,
|
||
"user_role": userRole,
|
||
"iat": now,
|
||
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
|
||
"aud": JWT_AUDIENCE,
|
||
"iss": JWT_ISSUER,
|
||
"type": "access",
|
||
}
|
||
accessToken = jwt.encode(accessPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
|
||
|
||
# Refresh Token
|
||
refreshJti = str(uuid.uuid4())
|
||
refreshPayload = {
|
||
"jti": refreshJti,
|
||
"user_id": userId,
|
||
"username": username,
|
||
"iat": now,
|
||
"exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
|
||
"aud": JWT_AUDIENCE,
|
||
"iss": JWT_ISSUER,
|
||
"type": "refresh",
|
||
}
|
||
refreshToken = jwt.encode(refreshPayload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
|
||
|
||
# Token 签发时间(格式化字符串,供前端展示)
|
||
issuedTime = now.strftime("%Y-%m-%d %H:%M:%S")
|
||
|
||
logger.info(f"JWT issued: user={userId}, jti={jti}")
|
||
|
||
return {
|
||
"access_token": accessToken,
|
||
"refresh_token": refreshToken,
|
||
"token_type": "Bearer",
|
||
"expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60,
|
||
"issued_time": issuedTime,
|
||
"jti": jti,
|
||
"refresh_jti": refreshJti,
|
||
}
|
||
|
||
# ------------------------------------------------------------------
|
||
# Token 验证
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def verify(token: str) -> dict[str, Any]:
|
||
"""验证并解码 JWT Token。
|
||
|
||
Raises:
|
||
jwt.ExpiredSignatureError: Token 过期
|
||
jwt.InvalidTokenError: Token 无效
|
||
"""
|
||
payload = jwt.decode(
|
||
token,
|
||
JWT_SECRET_KEY,
|
||
algorithms=[JWT_ALGORITHM],
|
||
audience=JWT_AUDIENCE,
|
||
issuer=JWT_ISSUER,
|
||
)
|
||
return payload
|
||
|
||
@staticmethod
|
||
def decodeUnsafe(token: str) -> dict[str, Any] | None:
|
||
"""不验证签名解码 Token(仅用于调试或获取过期 Token 信息)。"""
|
||
try:
|
||
return jwt.decode(token, options={"verify_signature": False})
|
||
except Exception:
|
||
return None
|
||
|
||
# ------------------------------------------------------------------
|
||
# Token 撤销
|
||
# ------------------------------------------------------------------
|
||
|
||
@staticmethod
|
||
def _hashToken(token: str) -> str:
|
||
return hashlib.sha256(token.encode()).hexdigest()
|
||
|
||
@staticmethod
|
||
def getJti(token: str) -> str | None:
|
||
"""从 Token 中提取 JTI(不验证签名)。"""
|
||
payload = JwtService.decodeUnsafe(token)
|
||
return payload.get("jti") if payload else None
|