"""首页入口服务实现。""" from __future__ import annotations from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.vo.homeVo import ( HomeEntryAreaVO, HomeEntryDocumentTypeVO, HomeEntryModuleVO, HomeEntryTenantVO, ) from fastapi_modules.fastapi_leaudit.services.homeService import IHomeService from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolution, TenantResolver class HomeServiceImpl(IHomeService): """首页入口服务实现。""" _MINIMAL_ENABLED_TARGETS: tuple[str, ...] = ( "/home", "/files/upload", "/documents", "/chat-with-llm/chat", "/cross-checking", ) _DOCUMENT_ENTRY_TARGETS: tuple[str, ...] = ( "/files/upload", "/documents", "/documents/list", ) def __init__(self) -> None: self.RbacService = RbacServiceImpl() self.TenantResolver = TenantResolver() self._entry_module_tenant_table_exists_cache: bool | None = None async def GetEntryModules(self, UserId: int) -> list[HomeEntryModuleVO]: """获取当前用户可见的首页入口模块。""" allowed_paths = await self._loadAllowedPaths(UserId=UserId) has_tenant_mapping_table = await self._entry_module_tenant_table_exists() async with GetAsyncSession() as session: sso_user_columns = await SsoUserCompat.get_columns(session) tenant_code_select = SsoUserCompat.optional_coalesce_as( sso_user_columns, alias="u", column="tenant_code", fallback_sql="''", ) tenant_name_select = SsoUserCompat.optional_coalesce_as( sso_user_columns, alias="u", column="tenant_name", fallback_sql="''", ) user_result = await session.execute( text( f""" SELECT u.id, COALESCE(u.area, '') AS area, {tenant_code_select}, {tenant_name_select}, COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS bypass_tenant FROM sso_users u LEFT JOIN user_role ur ON ur.user_id = u.id LEFT JOIN roles r ON r.id = ur.role_id WHERE u.id = :user_id AND u.deleted_at IS NULL AND u.status = 0 GROUP BY u.id, u.area """ ), {"user_id": UserId}, ) user_row = user_result.mappings().first() if not user_row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用") tenant_resolution = await self.TenantResolver.ResolveUserContext( Area=str(user_row["area"] or ""), TenantCode=str(user_row["tenant_code"] or ""), TenantName=str(user_row["tenant_name"] or ""), Source="home_entry_user", ) effective_tenant_code = tenant_resolution.tenant_code or "" effective_tenant_name = ( str(tenant_resolution.tenant_name or tenant_resolution.normalized_value or user_row["area"] or "").strip() ) legacy_area_candidates = [ candidate for candidate in { str(user_row["area"] or "").strip(), effective_tenant_name, } if candidate ] tenant_select_sql = ( """ COALESCE( ( SELECT jsonb_agg( jsonb_build_object( 'tenant_code', emt.tenant_code, 'tenant_name', emt.tenant_name, 'enabled', emt.is_enabled, 'sort_order', emt.sort_order ) ORDER BY emt.sort_order ASC, emt.id ASC ) FROM leaudit_entry_module_tenants emt WHERE emt.entry_module_id = em.id AND emt.deleted_at IS NULL ), '[]'::jsonb ) AS tenants """ if has_tenant_mapping_table else "'[]'::jsonb AS tenants" ) tenant_scope_filter_sql = ( """ ( :bypass_tenant = TRUE OR EXISTS ( SELECT 1 FROM leaudit_entry_module_tenants emt WHERE emt.entry_module_id = em.id AND emt.deleted_at IS NULL AND emt.is_enabled = TRUE AND ( emt.tenant_code = :user_tenant_code OR emt.tenant_code = 'PUBLIC' ) ) OR ( NOT EXISTS ( SELECT 1 FROM leaudit_entry_module_tenants emt0 WHERE emt0.entry_module_id = em.id AND emt0.deleted_at IS NULL ) AND ( EXISTS ( SELECT 1 FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item WHERE area_item->>'area' = ANY(:legacy_area_candidates) AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) OR EXISTS ( SELECT 1 FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item WHERE area_item->>'area' IN ('default', '公共') AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) ) ) ) """ if has_tenant_mapping_table else """ ( :bypass_tenant = TRUE OR EXISTS ( SELECT 1 FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item WHERE area_item->>'area' = ANY(:legacy_area_candidates) AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) OR EXISTS ( SELECT 1 FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item WHERE area_item->>'area' IN ('default', '公共') AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE ) ) """ ) result = await session.execute( text( f""" SELECT em.id, em.name, em.description, em.path, em.icon_path, em.areas, em.sort_order, {tenant_select_sql}, COALESCE( json_agg( json_build_object( 'id', dt.id, 'name', dt.name, 'code', dt.code ) ORDER BY dt.sort_order ASC, dt.id ASC ) FILTER ( WHERE dt.id IS NOT NULL AND dt.deleted_at IS NULL AND dt.is_enabled = TRUE ), '[]'::json ) AS document_types FROM leaudit_entry_modules em LEFT JOIN leaudit_document_types dt ON dt.entry_module_id = em.id WHERE em.deleted_at IS NULL AND em.is_enabled = TRUE AND {tenant_scope_filter_sql} GROUP BY em.id, em.name, em.description, em.path, em.icon_path, em.areas, em.sort_order ORDER BY em.sort_order ASC, em.id ASC """ ), { "user_tenant_code": effective_tenant_code, "legacy_area_candidates": legacy_area_candidates, "bypass_tenant": bool(user_row["bypass_tenant"]), }, ) modules: list[HomeEntryModuleVO] = [] for row in result.mappings().all(): tenants: list[HomeEntryTenantVO] = [] raw_tenants = row["tenants"] if isinstance(raw_tenants, list): for item in raw_tenants: if isinstance(item, dict) and item.get("tenant_code"): tenants.append( HomeEntryTenantVO( tenantCode=str(item["tenant_code"]), tenantName=item.get("tenant_name"), enabled=bool(item.get("enabled", False)), sortOrder=int(item.get("sort_order", 0)), ) ) if not tenants: raw_areas = row["areas"] if isinstance(raw_areas, list): for index, area_item in enumerate(raw_areas, start=1): if not isinstance(area_item, dict) or not area_item.get("area"): continue resolution = await self._resolveLegacyTenantValue( RawValue=str(area_item.get("area") or ""), Source="home_entry_legacy_area", ) tenants.append( HomeEntryTenantVO( tenantCode=resolution.tenant_code, tenantName=resolution.tenant_name, enabled=bool(area_item.get("enabled", False)), sortOrder=int(area_item.get("sort_order", index)), ) ) tenants.sort(key=lambda item: (item.sortOrder, item.tenantCode)) areas: list[HomeEntryAreaVO] = [ HomeEntryAreaVO( area=item.tenantName or item.tenantCode, enabled=item.enabled, sortOrder=item.sortOrder, ) for item in tenants ] document_types: list[HomeEntryDocumentTypeVO] = [] raw_document_types = row["document_types"] if isinstance(raw_document_types, list): for document_type in raw_document_types: if isinstance(document_type, dict) and document_type.get("id") is not None: document_types.append( HomeEntryDocumentTypeVO( id=int(document_type["id"]), name=str(document_type["name"]), code=document_type.get("code"), ) ) target_path = self._normalizeTargetPath( RawPath=str(row["path"] or ""), HasDocumentTypes=len(document_types) > 0, ) if not target_path: continue if not self._isAllowedTargetPath(target_path, allowed_paths): continue requires_document_types = target_path not in {"/chat-with-llm/chat", "/cross-checking"} modules.append( HomeEntryModuleVO( id=int(row["id"]), name=str(row["name"]), description=row["description"], targetPath=target_path, routePath=target_path, iconPath=row["icon_path"], sortOrder=int(row["sort_order"] or 0), requiresDocumentTypes=requires_document_types, areas=areas, tenants=tenants, documentTypes=document_types, ) ) return modules async def _entry_module_tenant_table_exists(self) -> bool: if self._entry_module_tenant_table_exists_cache is not None: return self._entry_module_tenant_table_exists_cache async with GetAsyncSession() as session: exists = bool( ( await session.execute( text( """ SELECT EXISTS ( SELECT 1 FROM information_schema.tables WHERE table_schema = current_schema() AND table_name = 'leaudit_entry_module_tenants' ) """ ) ) ).scalar_one() ) self._entry_module_tenant_table_exists_cache = exists return exists async def _resolveLegacyTenantValue(self, *, RawValue: str, Source: str) -> TenantResolution: resolution = await self.TenantResolver.Resolve( RawValue=RawValue, Source=Source, ) if resolution.tenant_code: return resolution normalized = str(RawValue or "").strip() if normalized in {"", "default", "省级", "公共"}: public_resolution = await self.TenantResolver.Resolve( RawValue="", Source=Source, ) if public_resolution.tenant_code: return public_resolution return TenantResolution( tenant_code="PUBLIC", tenant_name="公共", tenant_type="PUBLIC", raw_value=RawValue, normalized_value=normalized, source=Source, is_public=True, ) if normalized == "省局": provincial_resolution = await self.TenantResolver.Resolve(RawValue="省局", Source=Source) if provincial_resolution.tenant_code: return provincial_resolution return TenantResolution( tenant_code="PROVINCIAL", tenant_name="省局", tenant_type="PROVINCIAL", raw_value=RawValue, normalized_value=normalized, source=Source, is_public=False, ) return TenantResolution( tenant_code=normalized or None, tenant_name=normalized or None, tenant_type="LEGACY_AREA" if normalized else None, raw_value=RawValue, normalized_value=normalized, source=Source, is_public=False, ) async def _loadAllowedPaths(self, UserId: int) -> set[str]: """加载当前用户在首页可点击的目标路径集合。""" routes_vo = await self.RbacService.GetCurrentUserRoutes(UserId=UserId) allowed_paths: set[str] = set() def collect(items) -> None: for item in items: allowed_paths.add(str(item.route_path or "")) if item.children: collect(item.children) collect(routes_vo.routes) return {path for path in allowed_paths if path} def _isAllowedTargetPath(self, TargetPath: str, AllowedPaths: set[str]) -> bool: """判断首页目标路径是否被当前用户路由树覆盖。""" if TargetPath in self._DOCUMENT_ENTRY_TARGETS: return True if TargetPath in AllowedPaths: return True return any(TargetPath.startswith(f"{path}/") for path in AllowedPaths) def _normalizeTargetPath(self, RawPath: str, HasDocumentTypes: bool) -> str | None: """将首页入口跳转归一到当前最小可用路径集合。""" if HasDocumentTypes and (not RawPath or RawPath == "/home" or not RawPath.startswith("/")): return "/files/upload" if RawPath == "/contract-template/search" and HasDocumentTypes: return "/files/upload" if any( RawPath == enabled_path or RawPath.startswith(f"{enabled_path}/") for enabled_path in self._MINIMAL_ENABLED_TARGETS ): return RawPath if HasDocumentTypes: return "/files/upload" return None