224 lines
9.1 KiB
Python
224 lines
9.1 KiB
Python
"""首页入口服务实现。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from sqlalchemy import text
|
|
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
|
|
|
from fastapi_modules.fastapi_leaudit.domian.vo.homeVo import HomeEntryAreaVO, HomeEntryDocumentTypeVO, HomeEntryModuleVO
|
|
from fastapi_modules.fastapi_leaudit.services.homeService import IHomeService
|
|
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
|
|
|
|
|
|
class HomeServiceImpl(IHomeService):
|
|
"""首页入口服务实现。"""
|
|
|
|
_MINIMAL_ENABLED_TARGETS: tuple[str, ...] = (
|
|
"/home",
|
|
"/files/upload",
|
|
"/documents",
|
|
"/chat-with-llm/chat",
|
|
"/cross-checking",
|
|
)
|
|
|
|
def __init__(self) -> None:
|
|
self.RbacService = RbacServiceImpl()
|
|
|
|
async def GetEntryModules(self, UserId: int) -> list[HomeEntryModuleVO]:
|
|
"""获取当前用户可见的首页入口模块。"""
|
|
allowedPaths = await self._loadAllowedPaths(UserId=UserId)
|
|
async with GetAsyncSession() as Session:
|
|
userResult = await Session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
u.id,
|
|
COALESCE(u.area, '') AS area,
|
|
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS bypass_area
|
|
FROM sso_users u
|
|
LEFT JOIN user_role ur ON ur.user_id = u.id
|
|
LEFT JOIN roles r ON r.id = ur.role_id
|
|
WHERE u.id = :user_id
|
|
AND u.deleted_at IS NULL
|
|
AND u.status = 0
|
|
GROUP BY u.id, u.area
|
|
"""
|
|
),
|
|
{"user_id": UserId},
|
|
)
|
|
userRow = userResult.mappings().first()
|
|
if not userRow:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用")
|
|
|
|
result = await Session.execute(
|
|
text(
|
|
"""
|
|
WITH user_roles AS (
|
|
SELECT ur.role_id
|
|
FROM user_role ur
|
|
WHERE ur.user_id = :user_id
|
|
)
|
|
SELECT
|
|
em.id,
|
|
em.name,
|
|
em.description,
|
|
em.path,
|
|
em.icon_path,
|
|
em.areas,
|
|
em.sort_order,
|
|
COALESCE(
|
|
json_agg(
|
|
json_build_object(
|
|
'id', dt.id,
|
|
'name', dt.name,
|
|
'code', dt.code
|
|
)
|
|
ORDER BY dt.sort_order ASC, dt.id ASC
|
|
) FILTER (
|
|
WHERE dt.id IS NOT NULL
|
|
AND dt.deleted_at IS NULL
|
|
AND dt.is_enabled = TRUE
|
|
),
|
|
'[]'::json
|
|
) AS document_types
|
|
FROM leaudit_entry_modules em
|
|
LEFT JOIN leaudit_document_types dt
|
|
ON dt.entry_module_id = em.id
|
|
WHERE em.deleted_at IS NULL
|
|
AND em.is_enabled = TRUE
|
|
AND (
|
|
:bypass_area = TRUE
|
|
OR COALESCE(:user_area, '') = ''
|
|
OR em.areas IS NULL
|
|
OR jsonb_typeof(em.areas) <> 'array'
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(em.areas) AS area_item
|
|
WHERE area_item->>'area' = :user_area
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(em.areas) AS area_item
|
|
WHERE area_item->>'area' = 'default'
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
)
|
|
GROUP BY
|
|
em.id,
|
|
em.name,
|
|
em.description,
|
|
em.path,
|
|
em.icon_path,
|
|
em.areas,
|
|
em.sort_order
|
|
ORDER BY em.sort_order ASC, em.id ASC
|
|
"""
|
|
),
|
|
{
|
|
"user_id": UserId,
|
|
"user_area": str(userRow["area"] or ""),
|
|
"bypass_area": bool(userRow["bypass_area"]),
|
|
},
|
|
)
|
|
|
|
modules: list[HomeEntryModuleVO] = []
|
|
for row in result.mappings().all():
|
|
areas: list[HomeEntryAreaVO] = []
|
|
rawAreas = row["areas"]
|
|
if isinstance(rawAreas, list):
|
|
for areaItem in rawAreas:
|
|
if isinstance(areaItem, dict) and areaItem.get("area"):
|
|
areas.append(
|
|
HomeEntryAreaVO(
|
|
area=str(areaItem["area"]),
|
|
enabled=bool(areaItem.get("enabled", False)),
|
|
sortOrder=int(areaItem.get("sort_order", 0)),
|
|
)
|
|
)
|
|
|
|
documentTypes: list[HomeEntryDocumentTypeVO] = []
|
|
rawDocumentTypes = row["document_types"]
|
|
if isinstance(rawDocumentTypes, list):
|
|
for documentType in rawDocumentTypes:
|
|
if isinstance(documentType, dict) and documentType.get("id") is not None:
|
|
documentTypes.append(
|
|
HomeEntryDocumentTypeVO(
|
|
id=int(documentType["id"]),
|
|
name=str(documentType["name"]),
|
|
code=documentType.get("code"),
|
|
)
|
|
)
|
|
|
|
targetPath = self._normalizeTargetPath(
|
|
RawPath=str(row["path"] or ""),
|
|
HasDocumentTypes=len(documentTypes) > 0,
|
|
)
|
|
if not targetPath:
|
|
continue
|
|
|
|
if not self._isAllowedTargetPath(targetPath, allowedPaths):
|
|
continue
|
|
|
|
requiresDocumentTypes = targetPath not in {"/chat-with-llm/chat", "/cross-checking"}
|
|
|
|
modules.append(
|
|
HomeEntryModuleVO(
|
|
id=int(row["id"]),
|
|
name=str(row["name"]),
|
|
description=row["description"],
|
|
targetPath=targetPath,
|
|
routePath=targetPath,
|
|
iconPath=row["icon_path"],
|
|
sortOrder=int(row["sort_order"] or 0),
|
|
requiresDocumentTypes=requiresDocumentTypes,
|
|
areas=areas,
|
|
documentTypes=documentTypes,
|
|
)
|
|
)
|
|
|
|
return modules
|
|
|
|
async def _loadAllowedPaths(self, UserId: int) -> set[str]:
|
|
"""加载当前用户在首页可点击的目标路径集合。"""
|
|
routesVo = await self.RbacService.GetCurrentUserRoutes(UserId=UserId)
|
|
allowedPaths: set[str] = set()
|
|
|
|
def collect(items) -> None:
|
|
for item in items:
|
|
allowedPaths.add(str(item.route_path or ""))
|
|
if item.children:
|
|
collect(item.children)
|
|
|
|
collect(routesVo.routes)
|
|
return {path for path in allowedPaths if path}
|
|
|
|
def _isAllowedTargetPath(self, TargetPath: str, AllowedPaths: set[str]) -> bool:
|
|
"""判断首页目标路径是否被当前用户路由树覆盖。"""
|
|
if TargetPath in AllowedPaths:
|
|
return True
|
|
|
|
return any(TargetPath.startswith(f"{path}/") for path in AllowedPaths)
|
|
|
|
def _normalizeTargetPath(self, RawPath: str, HasDocumentTypes: bool) -> str | None:
|
|
"""将首页入口跳转归一到当前最小可用路径集合。"""
|
|
if HasDocumentTypes and (not RawPath or RawPath == "/home" or not RawPath.startswith("/")):
|
|
return "/files/upload"
|
|
|
|
if RawPath == "/contract-template/search" and HasDocumentTypes:
|
|
return "/files/upload"
|
|
|
|
if any(
|
|
RawPath == enabledPath or RawPath.startswith(f"{enabledPath}/")
|
|
for enabledPath in self._MINIMAL_ENABLED_TARGETS
|
|
):
|
|
return RawPath
|
|
|
|
if HasDocumentTypes:
|
|
return "/files/upload"
|
|
|
|
return None
|