45 lines
1.4 KiB
Python
45 lines
1.4 KiB
Python
"""JWT 鉴权工具。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any
|
|
|
|
import jwt
|
|
from fastapi import HTTPException, Request, status
|
|
|
|
from fastapi_common.fastapi_common_security.jwtService import JwtService
|
|
|
|
|
|
def verify_access_token(RequestObj: Request) -> dict[str, Any]:
|
|
"""验证 JWT access token 并返回 payload。
|
|
|
|
认证失败必须直接返回 401,不能静默放行为空 payload,
|
|
否则受保护接口会在后续逻辑里变成“假鉴权”。
|
|
"""
|
|
auth = RequestObj.headers.get("Authorization", "")
|
|
if not auth.startswith("Bearer "):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="缺少有效的 Authorization Bearer Token",
|
|
)
|
|
|
|
token = auth.removeprefix("Bearer ").strip()
|
|
try:
|
|
payload = JwtService.verify(token)
|
|
if payload.get("type") != "access":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="无效的访问令牌类型",
|
|
)
|
|
return payload
|
|
except jwt.ExpiredSignatureError as exc:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="访问令牌已过期",
|
|
) from exc
|
|
except jwt.PyJWTError:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="访问令牌无效",
|
|
)
|