"""首页入口服务实现。""" from __future__ import annotations from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.vo.homeVo import HomeEntryAreaVO, HomeEntryDocumentTypeVO, HomeEntryModuleVO from fastapi_modules.fastapi_leaudit.services.homeService import IHomeService from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl class HomeServiceImpl(IHomeService): """首页入口服务实现。""" _MINIMAL_ENABLED_TARGETS: tuple[str, ...] = ( "/home", "/files/upload", "/documents", "/chat-with-llm/chat", ) def __init__(self) -> None: self.RbacService = RbacServiceImpl() async def GetEntryModules(self, UserId: int) -> list[HomeEntryModuleVO]: """获取当前用户可见的首页入口模块。""" allowedPaths = await self._loadAllowedPaths(UserId=UserId) async with GetAsyncSession() as Session: userResult = await Session.execute( text( """ SELECT u.id, COALESCE(u.area, '') AS area, COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS bypass_area FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = :user_id AND u.deleted_at IS NULL AND u.status = 0 GROUP BY u.id, u.area """ ), {"user_id": UserId}, ) userRow = userResult.mappings().first() if not userRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用") result = await Session.execute( text( """ WITH user_roles AS ( SELECT ur.role_id FROM user_role ur WHERE ur.user_id = :user_id ) SELECT em.id, em.name, em.description, em.path, em.icon_path, em.areas, em.sort_order, COALESCE( json_agg( json_build_object( 'id', dt.id, 'name', dt.name, 'code', dt.code ) ORDER BY dt.sort_order ASC, dt.id ASC ) FILTER ( WHERE dt.id IS NOT NULL AND dt.deleted_at IS NULL AND dt.is_enabled = TRUE ), '[]'::json ) AS document_types FROM leaudit_entry_modules em LEFT JOIN leaudit_document_types dt ON dt.entry_module_id = em.id WHERE em.deleted_at IS NULL AND em.is_enabled = TRUE AND ( :bypass_area = TRUE OR COALESCE(:user_area, '') = '' OR em.areas IS NULL OR jsonb_typeof(em.areas) <> 'array' OR EXISTS ( SELECT 1 FROM jsonb_array_elements(em.areas) AS area_item WHERE area_item->>'area' = :user_area AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) OR EXISTS ( SELECT 1 FROM jsonb_array_elements(em.areas) AS area_item WHERE area_item->>'area' = 'default' AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) ) GROUP BY em.id, em.name, em.description, em.path, em.icon_path, em.areas, em.sort_order ORDER BY em.sort_order ASC, em.id ASC """ ), { "user_id": UserId, "user_area": str(userRow["area"] or ""), "bypass_area": bool(userRow["bypass_area"]), }, ) modules: list[HomeEntryModuleVO] = [] for row in result.mappings().all(): areas: list[HomeEntryAreaVO] = [] rawAreas = row["areas"] if isinstance(rawAreas, list): for areaItem in rawAreas: if isinstance(areaItem, dict) and areaItem.get("area"): areas.append( HomeEntryAreaVO( area=str(areaItem["area"]), enabled=bool(areaItem.get("enabled", False)), sortOrder=int(areaItem.get("sort_order", 0)), ) ) documentTypes: list[HomeEntryDocumentTypeVO] = [] rawDocumentTypes = row["document_types"] if isinstance(rawDocumentTypes, list): for documentType in rawDocumentTypes: if isinstance(documentType, dict) and documentType.get("id") is not None: documentTypes.append( HomeEntryDocumentTypeVO( id=int(documentType["id"]), name=str(documentType["name"]), code=documentType.get("code"), ) ) targetPath = self._normalizeTargetPath( RawPath=str(row["path"] or ""), HasDocumentTypes=len(documentTypes) > 0, ) if not targetPath: continue if not self._isAllowedTargetPath(targetPath, allowedPaths): continue requiresDocumentTypes = targetPath not in {"/chat-with-llm/chat"} modules.append( HomeEntryModuleVO( id=int(row["id"]), name=str(row["name"]), description=row["description"], targetPath=targetPath, routePath=targetPath, iconPath=row["icon_path"], sortOrder=int(row["sort_order"] or 0), requiresDocumentTypes=requiresDocumentTypes, areas=areas, documentTypes=documentTypes, ) ) return modules async def _loadAllowedPaths(self, UserId: int) -> set[str]: """加载当前用户在首页可点击的目标路径集合。""" routesVo = await self.RbacService.GetCurrentUserRoutes(UserId=UserId) allowedPaths: set[str] = set() def collect(items) -> None: for item in items: allowedPaths.add(str(item.route_path or "")) if item.children: collect(item.children) collect(routesVo.routes) return {path for path in allowedPaths if path} def _isAllowedTargetPath(self, TargetPath: str, AllowedPaths: set[str]) -> bool: """判断首页目标路径是否被当前用户路由树覆盖。""" if TargetPath in AllowedPaths: return True return any(TargetPath.startswith(f"{path}/") for path in AllowedPaths) def _normalizeTargetPath(self, RawPath: str, HasDocumentTypes: bool) -> str | None: """将首页入口跳转归一到当前最小可用路径集合。""" if HasDocumentTypes and (not RawPath or RawPath == "/home" or not RawPath.startswith("/")): return "/files/upload" if RawPath == "/contract-template/search" and HasDocumentTypes: return "/files/upload" if RawPath == "/cross-checking": return None if any( RawPath == enabledPath or RawPath.startswith(f"{enabledPath}/") for enabledPath in self._MINIMAL_ENABLED_TARGETS ): return RawPath if HasDocumentTypes: return "/files/upload" return None