629 lines
27 KiB
Python
629 lines
27 KiB
Python
"""首页入口服务实现。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import date, datetime
|
|
import json
|
|
from typing import Any
|
|
|
|
from sqlalchemy import text
|
|
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
|
|
|
from fastapi_modules.fastapi_leaudit.domian.vo.homeVo import (
|
|
HomeDashboardGrowthVO,
|
|
HomeDashboardStatisticsVO,
|
|
HomeEntryAreaVO,
|
|
HomeEntryDocumentTypeVO,
|
|
HomeEntryModuleVO,
|
|
HomeEntryTenantVO,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.services import IDocumentService
|
|
from fastapi_modules.fastapi_leaudit.services.homeService import IHomeService
|
|
from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl
|
|
from fastapi_modules.fastapi_leaudit.services.impl.rbacServiceImpl import RbacServiceImpl
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat
|
|
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolution, TenantResolver
|
|
|
|
|
|
class HomeServiceImpl(IHomeService):
|
|
"""首页入口服务实现。"""
|
|
|
|
_MINIMAL_ENABLED_TARGETS: tuple[str, ...] = (
|
|
"/home",
|
|
"/files/upload",
|
|
"/documents",
|
|
"/chat-with-llm/chat",
|
|
"/cross-checking",
|
|
"/govdoc",
|
|
"/govdoc/home",
|
|
"/govdoc/audits",
|
|
"/govdoc/upload",
|
|
"/govdoc-audit",
|
|
"/govdoc-audit/home",
|
|
"/govdoc-audit/audits",
|
|
"/govdoc-audit/upload",
|
|
)
|
|
_DOCUMENT_ENTRY_TARGETS: tuple[str, ...] = (
|
|
"/files/upload",
|
|
"/documents",
|
|
"/documents/list",
|
|
)
|
|
_DEFAULT_FEATURES_BY_PROFILE: dict[str, list[str]] = {
|
|
"document_review": ["home", "documents", "upload", "rules", "rule_groups"],
|
|
"contract": ["home", "documents", "upload", "rules", "contract_template_search", "contract_template_list"],
|
|
"govdoc": ["home", "govdoc_audits", "govdoc_upload", "rule_groups"],
|
|
"cross_checking": ["cross_checking", "cross_checking_upload", "cross_checking_list"],
|
|
"custom": ["home", "documents"],
|
|
}
|
|
|
|
def __init__(self, DocumentService: IDocumentService | None = None) -> None:
|
|
self.RbacService = RbacServiceImpl()
|
|
self.TenantResolver = TenantResolver()
|
|
self.DocumentService = DocumentService or DocumentServiceImpl()
|
|
self._entry_module_tenant_table_exists_cache: bool | None = None
|
|
self._entry_module_menu_columns_exist_cache: bool | None = None
|
|
|
|
async def GetEntryModules(self, UserId: int) -> list[HomeEntryModuleVO]:
|
|
"""获取当前用户可见的首页入口模块。"""
|
|
allowed_paths = await self._loadAllowedPaths(UserId=UserId)
|
|
has_tenant_mapping_table = await self._entry_module_tenant_table_exists()
|
|
|
|
async with GetAsyncSession() as session:
|
|
sso_user_columns = await SsoUserCompat.get_columns(session)
|
|
tenant_code_select = SsoUserCompat.optional_coalesce_as(
|
|
sso_user_columns,
|
|
alias="u",
|
|
column="tenant_code",
|
|
fallback_sql="''",
|
|
)
|
|
tenant_name_select = SsoUserCompat.optional_coalesce_as(
|
|
sso_user_columns,
|
|
alias="u",
|
|
column="tenant_name",
|
|
fallback_sql="''",
|
|
)
|
|
user_result = await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
u.id,
|
|
COALESCE(u.area, '') AS area,
|
|
{tenant_code_select},
|
|
{tenant_name_select},
|
|
COALESCE(bool_or(r.role_key = 'super_admin'), FALSE) AS bypass_tenant
|
|
FROM sso_users u
|
|
LEFT JOIN user_role ur ON ur.user_id = u.id
|
|
LEFT JOIN roles r ON r.id = ur.role_id
|
|
WHERE u.id = :user_id
|
|
AND u.deleted_at IS NULL
|
|
AND u.status = 0
|
|
GROUP BY u.id, u.area
|
|
"""
|
|
),
|
|
{"user_id": UserId},
|
|
)
|
|
user_row = user_result.mappings().first()
|
|
if not user_row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用")
|
|
|
|
tenant_resolution = await self.TenantResolver.ResolveUserContext(
|
|
Area=str(user_row["area"] or ""),
|
|
TenantCode=str(user_row["tenant_code"] or ""),
|
|
TenantName=str(user_row["tenant_name"] or ""),
|
|
Source="home_entry_user",
|
|
)
|
|
effective_tenant_code = tenant_resolution.tenant_code or ""
|
|
effective_tenant_name = (
|
|
str(tenant_resolution.tenant_name or tenant_resolution.normalized_value or user_row["area"] or "").strip()
|
|
)
|
|
legacy_area_candidates = [
|
|
candidate
|
|
for candidate in {
|
|
str(user_row["area"] or "").strip(),
|
|
effective_tenant_name,
|
|
}
|
|
if candidate
|
|
]
|
|
tenant_select_sql = (
|
|
"""
|
|
COALESCE(
|
|
(
|
|
SELECT jsonb_agg(
|
|
jsonb_build_object(
|
|
'tenant_code', emt.tenant_code,
|
|
'tenant_name', emt.tenant_name,
|
|
'enabled', emt.is_enabled,
|
|
'sort_order', emt.sort_order
|
|
)
|
|
ORDER BY emt.sort_order ASC, emt.id ASC
|
|
)
|
|
FROM leaudit_entry_module_tenants emt
|
|
WHERE emt.entry_module_id = em.id
|
|
AND emt.deleted_at IS NULL
|
|
),
|
|
'[]'::jsonb
|
|
) AS tenants
|
|
"""
|
|
if has_tenant_mapping_table
|
|
else "'[]'::jsonb AS tenants"
|
|
)
|
|
has_menu_columns = await self._entry_module_menu_columns_exist()
|
|
menu_select_sql = self._entry_module_menu_select_sql(has_menu_columns)
|
|
menu_group_by_sql = ",\n em.menu_profile,\n em.features" if has_menu_columns else ""
|
|
tenant_scope_filter_sql = (
|
|
"""
|
|
(
|
|
:bypass_tenant = TRUE
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM leaudit_entry_module_tenants emt
|
|
WHERE emt.entry_module_id = em.id
|
|
AND emt.deleted_at IS NULL
|
|
AND emt.is_enabled = TRUE
|
|
AND (
|
|
emt.tenant_code = :user_tenant_code
|
|
OR emt.tenant_code = 'PUBLIC'
|
|
)
|
|
)
|
|
OR (
|
|
NOT EXISTS (
|
|
SELECT 1
|
|
FROM leaudit_entry_module_tenants emt0
|
|
WHERE emt0.entry_module_id = em.id
|
|
AND emt0.deleted_at IS NULL
|
|
)
|
|
AND (
|
|
EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
|
|
WHERE area_item->>'area' = ANY(:legacy_area_candidates)
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
|
|
WHERE area_item->>'area' IN ('default', '公共')
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
)
|
|
)
|
|
)
|
|
"""
|
|
if has_tenant_mapping_table
|
|
else """
|
|
(
|
|
:bypass_tenant = TRUE
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
|
|
WHERE area_item->>'area' = ANY(:legacy_area_candidates)
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
OR EXISTS (
|
|
SELECT 1
|
|
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
|
|
WHERE area_item->>'area' IN ('default', '公共')
|
|
AND COALESCE((area_item->>'enabled')::boolean, FALSE) = TRUE
|
|
)
|
|
)
|
|
"""
|
|
)
|
|
|
|
result = await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
em.id,
|
|
em.name,
|
|
em.description,
|
|
em.path,
|
|
em.icon_path,
|
|
em.areas,
|
|
em.sort_order,
|
|
{menu_select_sql},
|
|
{tenant_select_sql},
|
|
COALESCE(
|
|
json_agg(
|
|
json_build_object(
|
|
'id', dt.id,
|
|
'name', dt.name,
|
|
'code', dt.code
|
|
)
|
|
ORDER BY dt.sort_order ASC, dt.id ASC
|
|
) FILTER (
|
|
WHERE dt.id IS NOT NULL
|
|
AND dt.deleted_at IS NULL
|
|
AND dt.is_enabled = TRUE
|
|
),
|
|
'[]'::json
|
|
) AS document_types
|
|
FROM leaudit_entry_modules em
|
|
LEFT JOIN leaudit_document_types dt
|
|
ON dt.entry_module_id = em.id
|
|
WHERE em.deleted_at IS NULL
|
|
AND em.is_enabled = TRUE
|
|
AND {tenant_scope_filter_sql}
|
|
GROUP BY
|
|
em.id,
|
|
em.name,
|
|
em.description,
|
|
em.path,
|
|
em.icon_path,
|
|
em.areas,
|
|
em.sort_order
|
|
{menu_group_by_sql}
|
|
ORDER BY em.sort_order ASC, em.id ASC
|
|
"""
|
|
),
|
|
{
|
|
"user_tenant_code": effective_tenant_code,
|
|
"legacy_area_candidates": legacy_area_candidates,
|
|
"bypass_tenant": bool(user_row["bypass_tenant"]),
|
|
},
|
|
)
|
|
|
|
modules: list[HomeEntryModuleVO] = []
|
|
for row in result.mappings().all():
|
|
tenants: list[HomeEntryTenantVO] = []
|
|
raw_tenants = row["tenants"]
|
|
if isinstance(raw_tenants, list):
|
|
for item in raw_tenants:
|
|
if isinstance(item, dict) and item.get("tenant_code"):
|
|
tenants.append(
|
|
HomeEntryTenantVO(
|
|
tenantCode=str(item["tenant_code"]),
|
|
tenantName=item.get("tenant_name"),
|
|
enabled=bool(item.get("enabled", False)),
|
|
sortOrder=int(item.get("sort_order", 0)),
|
|
)
|
|
)
|
|
if not tenants:
|
|
raw_areas = row["areas"]
|
|
if isinstance(raw_areas, list):
|
|
for index, area_item in enumerate(raw_areas, start=1):
|
|
if not isinstance(area_item, dict) or not area_item.get("area"):
|
|
continue
|
|
resolution = await self._resolveLegacyTenantValue(
|
|
RawValue=str(area_item.get("area") or ""),
|
|
Source="home_entry_legacy_area",
|
|
)
|
|
tenants.append(
|
|
HomeEntryTenantVO(
|
|
tenantCode=resolution.tenant_code,
|
|
tenantName=resolution.tenant_name,
|
|
enabled=bool(area_item.get("enabled", False)),
|
|
sortOrder=int(area_item.get("sort_order", index)),
|
|
)
|
|
)
|
|
tenants.sort(key=lambda item: (item.sortOrder, item.tenantCode))
|
|
|
|
areas: list[HomeEntryAreaVO] = [
|
|
HomeEntryAreaVO(
|
|
area=item.tenantName or item.tenantCode,
|
|
enabled=item.enabled,
|
|
sortOrder=item.sortOrder,
|
|
)
|
|
for item in tenants
|
|
]
|
|
|
|
document_types: list[HomeEntryDocumentTypeVO] = []
|
|
raw_document_types = row["document_types"]
|
|
if isinstance(raw_document_types, list):
|
|
for document_type in raw_document_types:
|
|
if isinstance(document_type, dict) and document_type.get("id") is not None:
|
|
document_types.append(
|
|
HomeEntryDocumentTypeVO(
|
|
id=int(document_type["id"]),
|
|
name=str(document_type["name"]),
|
|
code=document_type.get("code"),
|
|
)
|
|
)
|
|
|
|
target_path = self._normalizeTargetPath(
|
|
RawPath=str(row["path"] or ""),
|
|
HasDocumentTypes=len(document_types) > 0,
|
|
)
|
|
if not target_path:
|
|
continue
|
|
|
|
if not self._isAllowedTargetPath(target_path, allowed_paths):
|
|
continue
|
|
|
|
requires_document_types = target_path not in {"/chat-with-llm/chat", "/cross-checking"}
|
|
|
|
modules.append(
|
|
HomeEntryModuleVO(
|
|
id=int(row["id"]),
|
|
name=str(row["name"]),
|
|
description=row["description"],
|
|
targetPath=target_path,
|
|
routePath=target_path,
|
|
menuProfile=self._normalizeMenuProfile(row.get("menu_profile")),
|
|
features=self._parseFeatures(row.get("features"), row.get("menu_profile")),
|
|
tenantCode=effective_tenant_code or None,
|
|
iconPath=row["icon_path"],
|
|
sortOrder=int(row["sort_order"] or 0),
|
|
requiresDocumentTypes=requires_document_types,
|
|
areas=areas,
|
|
tenants=tenants,
|
|
documentTypes=document_types,
|
|
)
|
|
)
|
|
|
|
return modules
|
|
|
|
async def GetDashboardStatistics(
|
|
self,
|
|
UserId: int,
|
|
Today: str | None = None,
|
|
TypeIds: list[int] | None = None,
|
|
EntryModuleId: int | None = None,
|
|
) -> HomeDashboardStatisticsVO:
|
|
"""获取当前业务入口的首页统计卡片数据。"""
|
|
today = date.fromisoformat(Today) if Today else date.today()
|
|
current_month_start = today.replace(day=1)
|
|
previous_month_end = current_month_start.fromordinal(current_month_start.toordinal() - 1)
|
|
previous_month_start = previous_month_end.replace(day=1)
|
|
normalized_type_ids = [int(typeId) for typeId in (TypeIds or []) if int(typeId) > 0]
|
|
normalized_entry_module_id = int(EntryModuleId) if EntryModuleId and int(EntryModuleId) > 0 else None
|
|
|
|
async def load_documents() -> list[Any]:
|
|
page = 1
|
|
documents: list[Any] = []
|
|
while True:
|
|
result = await self.DocumentService.ListDocuments(
|
|
CurrentUserId=UserId,
|
|
Page=page,
|
|
PageSize=100,
|
|
TypeIds=normalized_type_ids or None,
|
|
EntryModuleId=normalized_entry_module_id,
|
|
)
|
|
documents.extend(result.documents)
|
|
total_pages = max(1, int(result.totalPages or 1))
|
|
if page >= total_pages:
|
|
return documents
|
|
page += 1
|
|
|
|
def issue_count(Documents: list[Any]) -> int:
|
|
return sum(max(0, int(document.failedCount or 0)) for document in Documents)
|
|
|
|
def pass_rate(Documents: list[Any]) -> int:
|
|
if not Documents:
|
|
return 0
|
|
passed_count = len([document for document in Documents if int(document.failedCount or 0) == 0])
|
|
return round((passed_count / len(Documents)) * 100)
|
|
|
|
def growth(Current: int, Previous: int) -> HomeDashboardGrowthVO:
|
|
if Previous <= 0:
|
|
return HomeDashboardGrowthVO(value=100 if Current > 0 else 0, isUp=Current >= Previous)
|
|
return HomeDashboardGrowthVO(
|
|
value=round(abs(((Current - Previous) / Previous) * 100)),
|
|
isUp=Current >= Previous,
|
|
)
|
|
|
|
def document_date(Document: Any) -> date | None:
|
|
raw_value = str(getattr(Document, "updatedAt", "") or "").strip()
|
|
if not raw_value:
|
|
return None
|
|
normalized = raw_value.replace("Z", "+00:00")
|
|
try:
|
|
return datetime.fromisoformat(normalized).date()
|
|
except ValueError:
|
|
try:
|
|
return datetime.strptime(raw_value, "%Y-%m-%d %H:%M:%S").date()
|
|
except ValueError:
|
|
return None
|
|
|
|
def in_range(Document: Any, DateFrom: date, DateTo: date) -> bool:
|
|
current_date = document_date(Document)
|
|
return current_date is not None and DateFrom <= current_date <= DateTo
|
|
|
|
documents = await load_documents()
|
|
today_documents = [document for document in documents if in_range(document, today, today)]
|
|
current_month_documents = [document for document in documents if in_range(document, current_month_start, today)]
|
|
previous_month_documents = [
|
|
document for document in documents if in_range(document, previous_month_start, previous_month_end)
|
|
]
|
|
current_reviewed_documents = [document for document in current_month_documents if int(document.auditStatus or 0) == 1]
|
|
previous_reviewed_documents = [document for document in previous_month_documents if int(document.auditStatus or 0) == 1]
|
|
current_pass_rate = pass_rate(current_reviewed_documents)
|
|
previous_pass_rate = pass_rate(previous_reviewed_documents)
|
|
current_issue_count = issue_count(current_reviewed_documents)
|
|
previous_issue_count = issue_count(previous_reviewed_documents)
|
|
|
|
return HomeDashboardStatisticsVO(
|
|
todayPendingFiles=len([document for document in today_documents if int(document.auditStatus or 0) != 1]),
|
|
monthlyReviewedFiles=len(current_reviewed_documents),
|
|
monthlyReviewGrowth=growth(len(current_reviewed_documents), len(previous_reviewed_documents)),
|
|
monthlyPassRate=current_pass_rate,
|
|
passRateGrowth=growth(current_pass_rate, previous_pass_rate),
|
|
issuesDetected=current_issue_count,
|
|
issuesGrowth=growth(current_issue_count, previous_issue_count),
|
|
)
|
|
|
|
async def _entry_module_tenant_table_exists(self) -> bool:
|
|
if self._entry_module_tenant_table_exists_cache is not None:
|
|
return self._entry_module_tenant_table_exists_cache
|
|
async with GetAsyncSession() as session:
|
|
exists = bool(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT EXISTS (
|
|
SELECT 1
|
|
FROM information_schema.tables
|
|
WHERE table_schema = current_schema()
|
|
AND table_name = 'leaudit_entry_module_tenants'
|
|
)
|
|
"""
|
|
)
|
|
)
|
|
).scalar_one()
|
|
)
|
|
self._entry_module_tenant_table_exists_cache = exists
|
|
return exists
|
|
|
|
async def _entry_module_menu_columns_exist(self) -> bool:
|
|
if self._entry_module_menu_columns_exist_cache is not None:
|
|
return self._entry_module_menu_columns_exist_cache
|
|
async with GetAsyncSession() as session:
|
|
count = int(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT COUNT(*)
|
|
FROM information_schema.columns
|
|
WHERE table_schema = current_schema()
|
|
AND table_name = 'leaudit_entry_modules'
|
|
AND column_name IN ('menu_profile', 'features')
|
|
"""
|
|
)
|
|
)
|
|
).scalar_one()
|
|
)
|
|
self._entry_module_menu_columns_exist_cache = count == 2
|
|
return self._entry_module_menu_columns_exist_cache
|
|
|
|
@staticmethod
|
|
def _entry_module_menu_select_sql(HasMenuColumns: bool) -> str:
|
|
if HasMenuColumns:
|
|
return "em.menu_profile, em.features"
|
|
return "'document_review'::varchar AS menu_profile, '[]'::jsonb AS features"
|
|
|
|
@classmethod
|
|
def _normalizeMenuProfile(cls, MenuProfile: str | None) -> str:
|
|
value = str(MenuProfile or "document_review").strip() or "document_review"
|
|
if value not in cls._DEFAULT_FEATURES_BY_PROFILE:
|
|
return "document_review"
|
|
return value
|
|
|
|
@classmethod
|
|
def _parseFeatures(cls, RawFeatures: Any, MenuProfile: str | None) -> list[str]:
|
|
menu_profile = cls._normalizeMenuProfile(MenuProfile)
|
|
allowed_features = {
|
|
"home",
|
|
"documents",
|
|
"upload",
|
|
"rules",
|
|
"rule_groups",
|
|
"contract_template_search",
|
|
"contract_template_list",
|
|
"govdoc_audits",
|
|
"govdoc_upload",
|
|
"cross_checking",
|
|
"cross_checking_upload",
|
|
"cross_checking_list",
|
|
"usage_stats",
|
|
}
|
|
if isinstance(RawFeatures, list):
|
|
parsed = RawFeatures
|
|
elif isinstance(RawFeatures, str) and RawFeatures.strip():
|
|
try:
|
|
parsed = json.loads(RawFeatures)
|
|
except json.JSONDecodeError:
|
|
parsed = []
|
|
else:
|
|
parsed = []
|
|
normalized: list[str] = []
|
|
for item in parsed:
|
|
feature = str(item or "").strip()
|
|
if feature in allowed_features and feature not in normalized:
|
|
normalized.append(feature)
|
|
return normalized or list(cls._DEFAULT_FEATURES_BY_PROFILE[menu_profile])
|
|
|
|
async def _resolveLegacyTenantValue(self, *, RawValue: str, Source: str) -> TenantResolution:
|
|
resolution = await self.TenantResolver.Resolve(
|
|
RawValue=RawValue,
|
|
Source=Source,
|
|
)
|
|
if resolution.tenant_code:
|
|
return resolution
|
|
|
|
normalized = str(RawValue or "").strip()
|
|
if normalized in {"", "default", "省级", "公共"}:
|
|
public_resolution = await self.TenantResolver.Resolve(
|
|
RawValue="",
|
|
Source=Source,
|
|
)
|
|
if public_resolution.tenant_code:
|
|
return public_resolution
|
|
return TenantResolution(
|
|
tenant_code="PUBLIC",
|
|
tenant_name="公共",
|
|
tenant_type="PUBLIC",
|
|
raw_value=RawValue,
|
|
normalized_value=normalized,
|
|
source=Source,
|
|
is_public=True,
|
|
)
|
|
if normalized == "省局":
|
|
provincial_resolution = await self.TenantResolver.Resolve(RawValue="省局", Source=Source)
|
|
if provincial_resolution.tenant_code:
|
|
return provincial_resolution
|
|
return TenantResolution(
|
|
tenant_code="PROVINCIAL",
|
|
tenant_name="省局",
|
|
tenant_type="PROVINCIAL",
|
|
raw_value=RawValue,
|
|
normalized_value=normalized,
|
|
source=Source,
|
|
is_public=False,
|
|
)
|
|
return TenantResolution(
|
|
tenant_code=normalized or None,
|
|
tenant_name=normalized or None,
|
|
tenant_type="LEGACY_AREA" if normalized else None,
|
|
raw_value=RawValue,
|
|
normalized_value=normalized,
|
|
source=Source,
|
|
is_public=False,
|
|
)
|
|
|
|
async def _loadAllowedPaths(self, UserId: int) -> set[str]:
|
|
"""加载当前用户在首页可点击的目标路径集合。"""
|
|
routes_vo = await self.RbacService.GetCurrentUserRoutes(UserId=UserId)
|
|
allowed_paths: set[str] = set()
|
|
|
|
def collect(items) -> None:
|
|
for item in items:
|
|
allowed_paths.add(str(item.route_path or ""))
|
|
if item.children:
|
|
collect(item.children)
|
|
|
|
collect(routes_vo.routes)
|
|
return {path for path in allowed_paths if path}
|
|
|
|
def _isAllowedTargetPath(self, TargetPath: str, AllowedPaths: set[str]) -> bool:
|
|
"""判断首页目标路径是否被当前用户路由树覆盖。"""
|
|
if TargetPath in self._DOCUMENT_ENTRY_TARGETS:
|
|
return True
|
|
|
|
if TargetPath in AllowedPaths:
|
|
return True
|
|
|
|
return any(TargetPath.startswith(f"{path}/") for path in AllowedPaths)
|
|
|
|
def _normalizeTargetPath(self, RawPath: str, HasDocumentTypes: bool) -> str | None:
|
|
"""将首页入口跳转归一到当前最小可用路径集合。"""
|
|
if HasDocumentTypes and (not RawPath or RawPath == "/home" or not RawPath.startswith("/")):
|
|
return "/files/upload"
|
|
|
|
if RawPath == "/contract-template/search" and HasDocumentTypes:
|
|
return "/files/upload"
|
|
|
|
if any(
|
|
RawPath == enabled_path or RawPath.startswith(f"{enabled_path}/")
|
|
for enabled_path in self._MINIMAL_ENABLED_TARGETS
|
|
):
|
|
return RawPath
|
|
|
|
if HasDocumentTypes:
|
|
return "/files/upload"
|
|
|
|
return None
|