"""JWT 鉴权工具。""" from __future__ import annotations from typing import Any import jwt from fastapi import HTTPException, Request, status from fastapi_common.fastapi_common_security.jwtService import JwtService def verify_access_token(RequestObj: Request) -> dict[str, Any]: """验证 JWT access token 并返回 payload。 认证失败必须直接返回 401,不能静默放行为空 payload, 否则受保护接口会在后续逻辑里变成“假鉴权”。 """ auth = RequestObj.headers.get("Authorization", "") if not auth.startswith("Bearer "): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少有效的 Authorization Bearer Token", ) token = auth.removeprefix("Bearer ").strip() try: payload = JwtService.verify(token) if payload.get("type") != "access": raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="无效的访问令牌类型", ) return payload except jwt.ExpiredSignatureError as exc: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="访问令牌已过期", ) from exc except jwt.PyJWTError: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="访问令牌无效", )