"""认证控制器。 路由路径与旧项目完全一致: POST /auth/login — 统一登录(OAuth + 密码自动检测) POST /auth/password_login — 账密登录 响应格式按新项目规范使用 Result。 """ from fastapi import Request from fastapi.responses import JSONResponse from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result from fastapi_common.fastapi_common_logger import logger from fastapi_modules.fastapi_leaudit.domian.Dto.auth.loginDto import PasswordLoginDTO, OAuthLoginDTO from fastapi_modules.fastapi_leaudit.domian.vo.auth.loginTokenVo import LoginTokenVO from fastapi_modules.fastapi_leaudit.services import IAuthService from fastapi_modules.fastapi_leaudit.services.impl.authServiceImpl import AuthServiceImpl class AuthController(BaseController): """认证控制器。""" def __init__(self): super().__init__(prefix="/auth", tags=["认证"]) self.AuthService: IAuthService = AuthServiceImpl() @self.router.post("/login") async def Login(RequestObj: Request): """统一登录接口。 自动检测登录方式: - 含 userInfo.sub → OAuth 登录 - 含 username + password → 密码登录 """ try: requestData = await RequestObj.json() if "userInfo" in requestData and isinstance(requestData["userInfo"], dict) and "sub" in requestData["userInfo"]: logger.info("检测到 OAuth 登录请求") ui = requestData["userInfo"] vo = await self.AuthService.OAuthLogin( Sub=ui["sub"], Username=ui.get("username"), Nickname=ui.get("nickname"), Email=ui.get("email"), PhoneNumber=ui.get("phone_number"), OuId=ui.get("ou_id"), OuName=ui.get("ou_name"), IsLeader=ui.get("is_leader"), Area=requestData.get("area"), ExpiresIn=requestData.get("expiresIn", 3600), ) elif "username" in requestData and "password" in requestData: logger.info(f"检测到密码登录请求 - username={requestData['username']}") vo = await self.AuthService.PasswordLogin( Sub=requestData["username"], Password=requestData["password"], ) else: return JSONResponse(status_code=400, content={"code": 400, "message": "无效的登录请求格式", "data": None}) return JSONResponse(status_code=200, content={ "code": 200, "message": "ok", "data": { "access_token": vo.access_token, "token_type": vo.token_type, "expires_in": vo.expires_in, "issued_time": vo.issued_time, "user_info": vo.user_info, }, }) except Exception as e: logger.error(f"登录失败: {e}") return JSONResponse(status_code=401, content={ "code": 401, "message": str(e), "data": None, }) @self.router.post("/password_login") async def PasswordLogin(RequestObj: Request): """账密登录。校验 sso_users 表 sub + password。""" try: requestData = await RequestObj.json() dto = PasswordLoginDTO(**requestData) vo = await self.AuthService.PasswordLogin(Sub=dto.sub, Password=dto.password) return JSONResponse(status_code=200, content={ "code": 200, "message": "ok", "data": vo.model_dump(), }) except Exception as e: logger.error(f"密码登录失败: {e}") return JSONResponse(status_code=401, content={ "code": 401, "message": str(e), "data": None, })