"""认证控制器。 路由路径与旧项目完全一致: POST /auth/login — 统一登录(OAuth + 密码自动检测) POST /auth/password_login — 账密登录 前端当前统一按 ``success + message + data`` 解析登录结果, 这里显式对齐该契约,避免登录成功却被前端误判为失败。 """ from typing import Any from fastapi import Depends, Request from fastapi.responses import JSONResponse from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_common.fastapi_common_logger import logger from fastapi_common.fastapi_common_security.security import verify_access_token from fastapi_modules.fastapi_leaudit.domian.Dto.auth.loginDto import PasswordLoginDTO from fastapi_modules.fastapi_leaudit.services import IAuthService from fastapi_modules.fastapi_leaudit.services.impl.authServiceImpl import AuthServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.usageStatsServiceImpl import UsageStatsServiceImpl class AuthController(BaseController): """认证控制器。""" def __init__(self): super().__init__(prefix="/auth", tags=["认证"]) self.AuthService: IAuthService = AuthServiceImpl() self.UsageStatsService = UsageStatsServiceImpl() def _extract_client_ip(request_obj: Request) -> str | None: forwarded_for = request_obj.headers.get("x-forwarded-for", "").strip() if forwarded_for: return forwarded_for.split(",")[0].strip() or None real_ip = request_obj.headers.get("x-real-ip", "").strip() if real_ip: return real_ip or None if request_obj.client and request_obj.client.host: return request_obj.client.host return None def _extract_user_agent(request_obj: Request) -> str | None: user_agent = request_obj.headers.get("user-agent", "").strip() return user_agent or None async def _record_login_event( *, request_obj: Request, user_info: dict[str, Any] | None, sub: str | None, login_result: str, login_type: str, failure_reason: str | None = None, ) -> None: try: await self.UsageStatsService.RecordLoginEvent( UserInfo=user_info, Sub=sub, LoginResult=login_result, LoginType=login_type, IpAddress=_extract_client_ip(request_obj), UserAgent=_extract_user_agent(request_obj), FailureReason=failure_reason, ) except Exception as record_error: logger.warning("记录登录审计事件失败: %s", record_error) @self.router.post("/login") async def Login(RequestObj: Request): """统一登录接口。 自动检测登录方式: - 含 userInfo.sub → OAuth 登录 - 含 username + password → 密码登录 """ try: requestData = await RequestObj.json() sub: str | None = None login_type = "password" if "userInfo" in requestData and isinstance(requestData["userInfo"], dict) and "sub" in requestData["userInfo"]: logger.info("检测到 OAuth 登录请求") ui = requestData["userInfo"] sub = ui["sub"] login_type = "oauth" vo = await self.AuthService.OAuthLogin( Sub=sub, Username=ui.get("username"), Nickname=ui.get("nickname"), Email=ui.get("email"), PhoneNumber=ui.get("phone_number"), OuId=ui.get("ou_id"), OuName=ui.get("ou_name"), IsLeader=ui.get("is_leader"), Area=requestData.get("area"), ExpiresIn=requestData.get("expiresIn", 3600), ) elif "username" in requestData and "password" in requestData: logger.info(f"检测到密码登录请求 - username={requestData['username']}") sub = str(requestData["username"]) vo = await self.AuthService.PasswordLogin( Sub=sub, Password=requestData["password"], ) else: await _record_login_event( request_obj=RequestObj, user_info=None, sub=None, login_result="failed", login_type="unknown", failure_reason="invalid_request", ) return JSONResponse( status_code=400, content={"success": False, "message": "无效的登录请求格式", "data": None}, ) await _record_login_event( request_obj=RequestObj, user_info=vo.user_info if isinstance(vo.user_info, dict) else None, sub=sub, login_result="success", login_type=login_type, ) return JSONResponse(status_code=200, content={ "success": True, "message": "ok", "data": { "access_token": vo.access_token, "token_type": vo.token_type, "expires_in": vo.expires_in, "issued_time": vo.issued_time, "user_info": vo.user_info, }, }) except LeauditException as e: logger.error(f"登录失败: {e.message}") request_data = {} try: request_data = await RequestObj.json() except Exception: request_data = {} request_user_info = request_data.get("userInfo") if isinstance(request_data.get("userInfo"), dict) else None request_sub = None login_type = "unknown" if request_user_info and request_user_info.get("sub"): request_sub = str(request_user_info["sub"]) login_type = "oauth" elif request_data.get("username"): request_sub = str(request_data["username"]) login_type = "password" await _record_login_event( request_obj=RequestObj, user_info=None, sub=request_sub, login_result="failed", login_type=login_type, failure_reason=e.message, ) return JSONResponse( status_code=e.status.value, content={"success": False, "message": e.message, "data": None}, ) except Exception as e: logger.error(f"登录失败: {e}") request_data = {} try: request_data = await RequestObj.json() except Exception: request_data = {} request_user_info = request_data.get("userInfo") if isinstance(request_data.get("userInfo"), dict) else None request_sub = None login_type = "unknown" if request_user_info and request_user_info.get("sub"): request_sub = str(request_user_info["sub"]) login_type = "oauth" elif request_data.get("username"): request_sub = str(request_data["username"]) login_type = "password" await _record_login_event( request_obj=RequestObj, user_info=None, sub=request_sub, login_result="failed", login_type=login_type, failure_reason=str(e), ) return JSONResponse(status_code=401, content={ "success": False, "message": "登录失败,请稍后重试", "data": None, }) @self.router.post("/password_login") async def PasswordLogin(RequestObj: Request): """账密登录。校验 sso_users 表 sub + password。""" try: requestData = await RequestObj.json() dto = PasswordLoginDTO(**requestData) vo = await self.AuthService.PasswordLogin(Sub=dto.sub, Password=dto.password) await _record_login_event( request_obj=RequestObj, user_info=vo.user_info if isinstance(vo.user_info, dict) else None, sub=dto.sub, login_result="success", login_type="password", ) return JSONResponse(status_code=200, content={ "success": True, "message": "ok", "data": vo.model_dump(), }) except LeauditException as e: logger.error(f"密码登录失败: {e.message}") request_sub = None try: request_data = await RequestObj.json() request_sub = request_data.get("sub") or request_data.get("username") except Exception: request_sub = None await _record_login_event( request_obj=RequestObj, user_info=None, sub=str(request_sub) if request_sub else None, login_result="failed", login_type="password", failure_reason=e.message, ) return JSONResponse( status_code=e.status.value, content={"success": False, "message": e.message, "data": None}, ) except Exception as e: logger.error(f"密码登录失败: {e}") request_sub = None try: request_data = await RequestObj.json() request_sub = request_data.get("sub") or request_data.get("username") except Exception: request_sub = None await _record_login_event( request_obj=RequestObj, user_info=None, sub=str(request_sub) if request_sub else None, login_result="failed", login_type="password", failure_reason=str(e), ) return JSONResponse(status_code=401, content={ "success": False, "message": "登录失败,请稍后重试", "data": None, }) @self.router.get("/me", response_model=Result[dict[str, Any]]) async def GetCurrentUser(payload: dict[str, Any] = Depends(verify_access_token)): """获取当前登录用户信息。""" Data = await self.AuthService.GetCurrentUser(UserId=int(payload["user_id"])) return Result.success(data=Data)