Files

934 lines
39 KiB
Python

"""入口模块管理服务实现。"""
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
from typing import Any
from sqlalchemy import bindparam, text
from fastapi_admin.config import OSS_BASE_URL, OSS_BUCKET
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.Dto.entryModuleDto import (
EntryModuleAreaDTO,
EntryModuleCreateDTO,
EntryModuleTenantDTO,
EntryModuleUpdateDTO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.entryModuleAdminVo import (
EntryModuleBusinessScopeVO,
EntryModuleImageUploadVO,
EntryModuleListVO,
EntryModuleTenantVO,
EntryModuleVO,
)
from fastapi_modules.fastapi_leaudit.services.entryModuleAdminService import IEntryModuleAdminService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolution, TenantResolver
_ALLOWED_MENU_PROFILES = {"document_review", "contract", "govdoc", "cross_checking", "custom"}
_ALLOWED_FEATURES = {
"home",
"documents",
"upload",
"rules",
"rule_groups",
"contract_template_search",
"contract_template_list",
"govdoc_audits",
"govdoc_upload",
"cross_checking",
"cross_checking_upload",
"cross_checking_list",
"usage_stats",
}
_DEFAULT_FEATURES_BY_PROFILE = {
"document_review": ["home", "documents", "upload", "rules", "rule_groups"],
"contract": ["home", "documents", "upload", "rules", "contract_template_search", "contract_template_list"],
"govdoc": ["home", "govdoc_audits", "govdoc_upload", "rules"],
"cross_checking": ["cross_checking", "cross_checking_upload", "cross_checking_list"],
"custom": ["home", "documents"],
}
class EntryModuleAdminServiceImpl(IEntryModuleAdminService):
"""入口模块管理服务实现。"""
def __init__(self) -> None:
self.OssService = OssServiceImpl()
self.TenantResolver = TenantResolver()
self._tenant_table_exists_cache: bool | None = None
self._entry_module_tenant_table_exists_cache: bool | None = None
self._entry_module_menu_columns_exist_cache: bool | None = None
async def ListModules(
self,
Name: str | None,
Area: str | None,
TenantCode: str | None,
Page: int,
PageSize: int,
) -> EntryModuleListVO:
"""分页查询入口模块。"""
offset = max(Page - 1, 0) * PageSize
filters = ["em.deleted_at IS NULL"]
params: dict[str, object] = {"limit": PageSize, "offset": offset}
if Name:
filters.append("em.name ILIKE :name")
params["name"] = f"%{Name.strip()}%"
resolved_filter_tenant_code = await self._resolveFilterTenantCode(Area=Area, TenantCode=TenantCode)
has_tenant_mapping_table = await self._entry_module_tenant_table_exists()
if resolved_filter_tenant_code:
legacy_area = (Area or "").strip()
if not legacy_area:
tenant_name_map = await self._loadTenantNameMap([resolved_filter_tenant_code])
legacy_area = tenant_name_map.get(resolved_filter_tenant_code, "")
if has_tenant_mapping_table:
filters.append(
"""
(
EXISTS (
SELECT 1
FROM leaudit_entry_module_tenants emt
WHERE emt.entry_module_id = em.id
AND emt.deleted_at IS NULL
AND emt.tenant_code = :tenant_code
)
OR (
NOT EXISTS (
SELECT 1
FROM leaudit_entry_module_tenants emt0
WHERE emt0.entry_module_id = em.id
AND emt0.deleted_at IS NULL
)
AND EXISTS (
SELECT 1
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
WHERE area_item->>'area' = :legacy_area
)
)
)
"""
)
params["tenant_code"] = resolved_filter_tenant_code
else:
filters.append(
"""
EXISTS (
SELECT 1
FROM jsonb_array_elements(COALESCE(em.areas, '[]'::jsonb)) AS area_item
WHERE area_item->>'area' = :legacy_area
)
"""
)
params["legacy_area"] = legacy_area
where_clause = " AND ".join(filters)
tenant_select_sql = (
"""
COALESCE(
(
SELECT jsonb_agg(
jsonb_build_object(
'tenant_code', emt.tenant_code,
'tenant_name', emt.tenant_name,
'enabled', emt.is_enabled,
'sort_order', emt.sort_order
)
ORDER BY emt.sort_order ASC, emt.id ASC
)
FROM leaudit_entry_module_tenants emt
WHERE emt.entry_module_id = em.id
AND emt.deleted_at IS NULL
),
'[]'::jsonb
) AS tenants
"""
if has_tenant_mapping_table
else "'[]'::jsonb AS tenants"
)
menu_select_sql = await self._entry_module_menu_select_sql()
business_scope_select_sql = self._entry_module_business_scope_select_sql()
async with GetAsyncSession() as session:
total = int(
(
await session.execute(
text(f"SELECT COUNT(*) FROM leaudit_entry_modules em WHERE {where_clause}"),
params,
)
).scalar_one()
)
rows = (
await session.execute(
text(
f"""
SELECT
em.id,
em.name,
em.description,
em.path,
em.icon_path,
em.areas,
em.sort_order,
em.is_enabled,
em.created_at,
em.updated_at,
{menu_select_sql},
{business_scope_select_sql},
{tenant_select_sql}
FROM leaudit_entry_modules em
WHERE {where_clause}
ORDER BY em.sort_order ASC, em.id ASC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
items = [await self._toModuleVo(row) for row in rows]
return EntryModuleListVO(total=total, page=Page, page_size=PageSize, items=items)
async def GetModule(self, ModuleId: int) -> EntryModuleVO:
"""获取入口模块详情。"""
row = await self._getModuleRow(ModuleId)
return await self._toModuleVo(row)
async def CreateModule(self, Body: EntryModuleCreateDTO) -> EntryModuleVO:
"""创建入口模块。"""
route_path = (Body.route_path if Body.route_path is not None else Body.path or "").strip() or None
normalized_tenants = await self._normalizeTenants(
Tenants=Body.tenants,
Areas=Body.areas,
)
self._ensureTenantAssignments(normalized_tenants)
legacy_areas_json = self._legacyAreasJson(normalized_tenants)
menu_profile = self._normalizeMenuProfile(Body.menu_profile)
features = self._normalizeFeatures(Body.features, menu_profile)
has_menu_columns = await self._entry_module_menu_columns_exist()
async with GetAsyncSession() as session:
try:
menu_insert_columns = ", menu_profile, features" if has_menu_columns else ""
menu_insert_values = ", :menu_profile, CAST(:features AS jsonb)" if has_menu_columns else ""
params = {
"name": Body.name.strip(),
"description": (Body.description or "").strip() or None,
"route_path": route_path,
"icon_path": None,
"areas": legacy_areas_json,
"sort_order": await self._nextSortOrder(session),
"menu_profile": menu_profile,
"features": json.dumps(features, ensure_ascii=False),
}
row = (
await session.execute(
text(
f"""
INSERT INTO leaudit_entry_modules (
name, description, path, icon_path, areas, sort_order, is_enabled, created_at, updated_at, deleted_at{menu_insert_columns}
) VALUES (
:name, :description, :route_path, :icon_path, CAST(:areas AS jsonb), :sort_order, TRUE, NOW(), NOW(), NULL{menu_insert_values}
)
RETURNING id
"""
),
params,
)
).mappings().one()
module_id = int(row["id"])
await self._syncModuleTenants(session, module_id, normalized_tenants)
await session.commit()
except Exception as exc:
await session.rollback()
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"创建入口模块失败: {exc}") from exc
return await self.GetModule(module_id)
async def UpdateModule(self, ModuleId: int, Body: EntryModuleUpdateDTO) -> EntryModuleVO:
"""更新入口模块。"""
current = await self._getModuleRow(ModuleId)
incoming_route_path = Body.route_path if Body.route_path is not None else Body.path
if Body.tenants is not None or Body.areas is not None:
normalized_tenants = await self._normalizeTenants(Tenants=Body.tenants, Areas=Body.areas)
else:
normalized_tenants = await self._extractTenantsFromRow(current)
self._ensureTenantAssignments(normalized_tenants)
current_menu_profile = self._safeMenuProfile(current.get("menu_profile"))
menu_profile = self._normalizeMenuProfile(Body.menu_profile) if Body.menu_profile is not None else current_menu_profile
features = (
self._normalizeFeatures(Body.features, menu_profile)
if Body.features is not None
else self._parseFeatures(current.get("features"), menu_profile)
)
has_menu_columns = await self._entry_module_menu_columns_exist()
async with GetAsyncSession() as session:
menu_update_sql = ", menu_profile = :menu_profile, features = CAST(:features AS jsonb)" if has_menu_columns else ""
params = {
"module_id": ModuleId,
"name": Body.name.strip() if Body.name is not None else current["name"],
"description": Body.description.strip() if Body.description is not None else current.get("description"),
"route_path": incoming_route_path.strip() if incoming_route_path is not None else current.get("path"),
"areas": self._legacyAreasJson(normalized_tenants),
"menu_profile": menu_profile,
"features": json.dumps(features, ensure_ascii=False),
}
row = (
await session.execute(
text(
f"""
UPDATE leaudit_entry_modules
SET
name = :name,
description = :description,
path = :route_path,
areas = CAST(:areas AS jsonb),
updated_at = NOW()
{menu_update_sql}
WHERE id = :module_id
AND deleted_at IS NULL
RETURNING id
"""
),
params,
)
).mappings().first()
if not row:
await session.rollback()
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "入口模块不存在")
await self._syncModuleTenants(session, ModuleId, normalized_tenants)
await session.commit()
return await self.GetModule(ModuleId)
async def DeleteModule(self, ModuleId: int) -> None:
"""删除入口模块。"""
async with GetAsyncSession() as session:
await session.execute(
text(
"""
UPDATE leaudit_entry_modules
SET deleted_at = NOW(), updated_at = NOW()
WHERE id = :module_id AND deleted_at IS NULL
"""
),
{"module_id": ModuleId},
)
if await self._entry_module_tenant_table_exists():
await session.execute(
text(
"""
UPDATE leaudit_entry_module_tenants
SET deleted_at = NOW(), updated_at = NOW()
WHERE entry_module_id = :module_id AND deleted_at IS NULL
"""
),
{"module_id": ModuleId},
)
await session.commit()
async def UploadModuleImage(self, ModuleId: int, FileName: str, ContentType: str, Content: bytes) -> EntryModuleImageUploadVO:
"""上传入口模块图标。"""
module = await self._getModuleRow(ModuleId)
suffix = Path(FileName).suffix.lower() or ".png"
object_key = f"documents/mz/static/img/entry_module_{ModuleId}{suffix}"
await self.OssService.UploadBytes(ObjectKey=object_key, Content=Content, ContentType=ContentType or "application/octet-stream")
async with GetAsyncSession() as session:
await session.execute(
text(
"""
UPDATE leaudit_entry_modules
SET icon_path = :icon_path, updated_at = NOW()
WHERE id = :module_id AND deleted_at IS NULL
"""
),
{"module_id": ModuleId, "icon_path": object_key},
)
await session.commit()
return EntryModuleImageUploadVO(
module_id=ModuleId,
path=object_key,
url=f"{OSS_BASE_URL.rstrip('/')}/{OSS_BUCKET}/{object_key}",
message=f"入口模块 {module['name']} 图标上传成功",
)
async def _getModuleRow(self, ModuleId: int):
"""查询入口模块原始记录。"""
has_tenant_mapping_table = await self._entry_module_tenant_table_exists()
tenant_select_sql = (
"""
COALESCE(
(
SELECT jsonb_agg(
jsonb_build_object(
'tenant_code', emt.tenant_code,
'tenant_name', emt.tenant_name,
'enabled', emt.is_enabled,
'sort_order', emt.sort_order
)
ORDER BY emt.sort_order ASC, emt.id ASC
)
FROM leaudit_entry_module_tenants emt
WHERE emt.entry_module_id = em.id
AND emt.deleted_at IS NULL
),
'[]'::jsonb
) AS tenants
"""
if has_tenant_mapping_table
else "'[]'::jsonb AS tenants"
)
menu_select_sql = await self._entry_module_menu_select_sql()
business_scope_select_sql = self._entry_module_business_scope_select_sql()
async with GetAsyncSession() as session:
row = (
await session.execute(
text(
f"""
SELECT
em.id,
em.name,
em.description,
em.path,
em.icon_path,
em.areas,
em.sort_order,
em.is_enabled,
em.created_at,
em.updated_at,
{menu_select_sql},
{business_scope_select_sql},
{tenant_select_sql}
FROM leaudit_entry_modules em
WHERE em.id = :module_id
AND em.deleted_at IS NULL
"""
),
{"module_id": ModuleId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "入口模块不存在")
return row
async def _nextSortOrder(self, Session) -> int:
"""获取下一个排序号。"""
max_sort = (
await Session.execute(text("SELECT COALESCE(MAX(sort_order), 0) FROM leaudit_entry_modules WHERE deleted_at IS NULL"))
).scalar_one()
return int(max_sort or 0) + 10
async def _toModuleVo(self, Row) -> EntryModuleVO:
"""把数据库记录转换为 VO。"""
tenants = await self._extractTenantsFromRow(Row)
return EntryModuleVO(
id=int(Row["id"]),
name=str(Row["name"] or ""),
description=Row.get("description"),
path=Row.get("icon_path"),
route_path=Row.get("path"),
menu_profile=self._safeMenuProfile(Row.get("menu_profile")),
features=self._parseFeatures(Row.get("features"), Row.get("menu_profile")),
sort_order=int(Row.get("sort_order") or 0),
is_enabled=bool(Row.get("is_enabled", True)),
tenants=tenants,
business_scope=self._parseBusinessScope(Row.get("business_scope")),
created_at=self._toIso(Row.get("created_at")),
updated_at=self._toIso(Row.get("updated_at")),
)
async def _extractTenantsFromRow(self, Row) -> list[EntryModuleTenantVO]:
raw_tenants = Row.get("tenants") or []
normalized: list[EntryModuleTenantVO] = []
if isinstance(raw_tenants, list) and raw_tenants:
for item in raw_tenants:
if not isinstance(item, dict) or not item.get("tenant_code"):
continue
normalized.append(
EntryModuleTenantVO(
tenant_code=str(item["tenant_code"]),
tenant_name=item.get("tenant_name"),
enabled=bool(item.get("enabled", True)),
sort_order=int(item.get("sort_order", 0)),
)
)
normalized.sort(key=lambda item: (item.sort_order, item.tenant_code))
if normalized:
return normalized
raw_areas = Row.get("areas") or []
if not isinstance(raw_areas, list):
return []
fallback_tenants: list[EntryModuleTenantVO] = []
for index, item in enumerate(raw_areas, start=1):
if not isinstance(item, dict) or not item.get("area"):
continue
resolution = await self._resolveLegacyTenantValue(
RawValue=str(item.get("area") or ""),
Source="entry_module_legacy_area",
)
fallback_tenants.append(
EntryModuleTenantVO(
tenant_code=resolution.tenant_code,
tenant_name=resolution.tenant_name,
enabled=bool(item.get("enabled", True)),
sort_order=int(item.get("sort_order", index)),
)
)
unique: dict[str, EntryModuleTenantVO] = {}
for item in fallback_tenants:
unique[item.tenant_code] = item
return sorted(unique.values(), key=lambda item: (item.sort_order, item.tenant_code))
async def _normalizeTenants(
self,
*,
Tenants: list[EntryModuleTenantDTO] | None,
Areas: list[EntryModuleAreaDTO] | None,
) -> list[dict[str, Any]]:
if Tenants is not None:
return await self._normalizeTenantDtos(Tenants)
if Areas is not None:
# 仅兼容旧请求体,优先要求前端使用 tenants。
return await self._normalizeAreas(Areas)
return []
async def _normalizeTenantDtos(self, Tenants: list[EntryModuleTenantDTO]) -> list[dict[str, Any]]:
if not Tenants:
return []
tenant_codes = [str(item.tenant_code).strip() for item in Tenants if str(item.tenant_code).strip()]
tenant_name_map = await self._loadTenantNameMap(tenant_codes)
normalized: dict[str, dict[str, Any]] = {}
for index, item in enumerate(Tenants, start=1):
tenant_code = str(item.tenant_code).strip()
if not tenant_code:
continue
if tenant_code not in tenant_name_map:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, f"未知租户编码: {tenant_code}")
normalized[tenant_code] = {
"tenant_code": tenant_code,
"tenant_name": str(item.tenant_name or tenant_name_map[tenant_code] or "").strip() or tenant_name_map[tenant_code],
"enabled": bool(item.enabled),
"sort_order": int(item.sort_order or index),
}
return sorted(normalized.values(), key=lambda item: (int(item["sort_order"]), str(item["tenant_code"])))
async def _normalizeAreas(self, Areas: list[EntryModuleAreaDTO]) -> list[dict[str, Any]]:
if not Areas:
return []
normalized: dict[str, dict[str, Any]] = {}
for index, item in enumerate(Areas, start=1):
area_name = str(item.area or "").strip()
if not area_name:
continue
resolution = await self._resolveLegacyTenantValue(
RawValue=area_name,
Source="entry_module_area_payload",
)
normalized[resolution.tenant_code] = {
"tenant_code": resolution.tenant_code,
"tenant_name": resolution.tenant_name or area_name,
"enabled": bool(item.enabled),
"sort_order": int(item.sort_order or index),
}
return sorted(normalized.values(), key=lambda item: (int(item["sort_order"]), str(item["tenant_code"])))
async def _loadTenantNameMap(self, TenantCodes: list[str]) -> dict[str, str]:
tenant_codes = [code for code in {str(item).strip() for item in TenantCodes} if code]
if not tenant_codes:
return {}
if not await self._tenant_table_exists():
return {
code: ("公共" if code == "PUBLIC" else "省局" if code == "PROVINCIAL" else code)
for code in tenant_codes
}
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT tenant_code, tenant_name
FROM sys_tenants
WHERE tenant_code IN :tenant_codes
AND deleted_at IS NULL
AND is_enabled = TRUE
"""
).bindparams(bindparam("tenant_codes", expanding=True)),
{"tenant_codes": tenant_codes},
)
).mappings().all()
return {str(row["tenant_code"]): str(row["tenant_name"] or "") for row in rows}
async def _tenant_table_exists(self) -> bool:
"""兼容旧环境未落 sys_tenants 时的入口模块读写。"""
if self._tenant_table_exists_cache is not None:
return self._tenant_table_exists_cache
async with GetAsyncSession() as session:
exists = bool(
(
await session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = 'sys_tenants'
)
"""
)
)
).scalar_one()
)
self._tenant_table_exists_cache = exists
return exists
async def _entry_module_tenant_table_exists(self) -> bool:
"""兼容旧环境未落入口模块租户映射表时的读写。"""
if self._entry_module_tenant_table_exists_cache is not None:
return self._entry_module_tenant_table_exists_cache
async with GetAsyncSession() as session:
exists = bool(
(
await session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = 'leaudit_entry_module_tenants'
)
"""
)
)
).scalar_one()
)
self._entry_module_tenant_table_exists_cache = exists
return exists
async def _entry_module_menu_columns_exist(self) -> bool:
if self._entry_module_menu_columns_exist_cache is not None:
return self._entry_module_menu_columns_exist_cache
async with GetAsyncSession() as session:
count = int(
(
await session.execute(
text(
"""
SELECT COUNT(*)
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = 'leaudit_entry_modules'
AND column_name IN ('menu_profile', 'features')
"""
)
)
).scalar_one()
)
self._entry_module_menu_columns_exist_cache = count == 2
return self._entry_module_menu_columns_exist_cache
async def _entry_module_menu_select_sql(self) -> str:
if await self._entry_module_menu_columns_exist():
return "em.menu_profile, em.features"
return "'document_review'::varchar AS menu_profile, '[]'::jsonb AS features"
def _entry_module_business_scope_select_sql(self) -> str:
return """
COALESCE(
(
SELECT jsonb_build_object(
'category_count', COUNT(DISTINCT dt.id),
'business_type_count', GREATEST(
COUNT(DISTINCT child_by_type.id),
COALESCE(
(
SELECT COUNT(DISTINCT child_by_entry.id)
FROM leaudit_evaluation_point_groups root_by_entry
JOIN leaudit_evaluation_point_groups child_by_entry
ON child_by_entry.pid = root_by_entry.id
AND child_by_entry.deleted_at IS NULL
WHERE root_by_entry.entry_module_id = em.id
AND root_by_entry.pid = 0
AND root_by_entry.deleted_at IS NULL
),
0
)
),
'categories', COALESCE(
(
SELECT jsonb_agg(category_name ORDER BY category_name)
FROM (
SELECT DISTINCT dt2.name AS category_name
FROM leaudit_document_types dt2
WHERE dt2.entry_module_id = em.id
AND dt2.deleted_at IS NULL
AND dt2.is_enabled = TRUE
) category_rows
),
'[]'::jsonb
)
)
FROM leaudit_document_types dt
LEFT JOIN leaudit_evaluation_point_groups root
ON root.document_type_id = dt.id
AND root.pid = 0
AND root.deleted_at IS NULL
LEFT JOIN leaudit_evaluation_point_groups child_by_type
ON child_by_type.pid = root.id
AND child_by_type.deleted_at IS NULL
WHERE dt.entry_module_id = em.id
AND dt.deleted_at IS NULL
AND dt.is_enabled = TRUE
),
jsonb_build_object('category_count', 0, 'business_type_count', 0, 'categories', '[]'::jsonb)
) AS business_scope
"""
def _parseBusinessScope(self, RawValue: object) -> EntryModuleBusinessScopeVO:
if isinstance(RawValue, str):
try:
RawValue = json.loads(RawValue)
except json.JSONDecodeError:
RawValue = {}
if not isinstance(RawValue, dict):
RawValue = {}
categories_raw = RawValue.get("categories") or []
categories = [str(item).strip() for item in categories_raw if str(item or "").strip()] if isinstance(categories_raw, list) else []
return EntryModuleBusinessScopeVO(
category_count=int(RawValue.get("category_count") or len(categories)),
business_type_count=int(RawValue.get("business_type_count") or 0),
categories=categories,
)
async def _resolveLegacyTenantValue(self, *, RawValue: str, Source: str) -> TenantResolution:
resolution = await self.TenantResolver.Resolve(
RawValue=RawValue,
Source=Source,
)
if resolution.tenant_code:
return resolution
normalized = str(RawValue or "").strip()
if normalized in {"", "default", "省级", "公共"}:
public_resolution = await self.TenantResolver.Resolve(
RawValue="",
Source=Source,
)
if public_resolution.tenant_code:
return public_resolution
return TenantResolution(
tenant_code="PUBLIC",
tenant_name="公共",
tenant_type="PUBLIC",
raw_value=RawValue,
normalized_value=normalized,
source=Source,
is_public=True,
)
if normalized == "省局":
return TenantResolution(
tenant_code="PROVINCIAL",
tenant_name="省局",
tenant_type="PROVINCIAL",
raw_value=RawValue,
normalized_value=normalized,
source=Source,
is_public=False,
)
return TenantResolution(
tenant_code=normalized or None,
tenant_name=normalized or None,
tenant_type="LEGACY_AREA" if normalized else None,
raw_value=RawValue,
normalized_value=normalized,
source=Source,
is_public=False,
)
async def _syncModuleTenants(self, Session, ModuleId: int, Tenants: list[dict[str, Any]]) -> None:
if not await self._entry_module_tenant_table_exists():
return
tenant_codes = [str(item["tenant_code"]) for item in Tenants if item.get("tenant_code")]
if tenant_codes:
await Session.execute(
text(
"""
UPDATE leaudit_entry_module_tenants
SET deleted_at = NOW(), updated_at = NOW()
WHERE entry_module_id = :module_id
AND deleted_at IS NULL
AND tenant_code NOT IN :tenant_codes
"""
).bindparams(bindparam("tenant_codes", expanding=True)),
{"module_id": ModuleId, "tenant_codes": tenant_codes},
)
else:
await Session.execute(
text(
"""
UPDATE leaudit_entry_module_tenants
SET deleted_at = NOW(), updated_at = NOW()
WHERE entry_module_id = :module_id
AND deleted_at IS NULL
"""
),
{"module_id": ModuleId},
)
for item in Tenants:
await Session.execute(
text(
"""
INSERT INTO leaudit_entry_module_tenants (
entry_module_id, tenant_code, tenant_name, is_enabled, sort_order, created_at, updated_at, deleted_at
) VALUES (
:module_id, :tenant_code, :tenant_name, :is_enabled, :sort_order, NOW(), NOW(), NULL
)
ON CONFLICT (entry_module_id, tenant_code)
DO UPDATE SET
tenant_name = EXCLUDED.tenant_name,
is_enabled = EXCLUDED.is_enabled,
sort_order = EXCLUDED.sort_order,
updated_at = NOW(),
deleted_at = NULL
"""
),
{
"module_id": ModuleId,
"tenant_code": item["tenant_code"],
"tenant_name": item.get("tenant_name"),
"is_enabled": bool(item.get("enabled", True)),
"sort_order": int(item.get("sort_order", 0)),
},
)
async def _resolveFilterTenantCode(self, *, Area: str | None, TenantCode: str | None) -> str | None:
if TenantCode and TenantCode.strip():
return TenantCode.strip()
if Area and Area.strip():
resolution = await self._resolveLegacyTenantValue(
RawValue=Area.strip(),
Source="entry_module_filter_area",
)
return resolution.tenant_code
return None
@staticmethod
def _legacyAreasJson(Tenants: list[dict[str, Any]]) -> str:
# 继续回写 areas 仅用于兼容旧读链路,主配置来源是 leaudit_entry_module_tenants。
areas = [
{
"area": str(item.get("tenant_name") or item.get("tenant_code") or ""),
"enabled": bool(item.get("enabled", True)),
"sort_order": int(item.get("sort_order", 0)),
}
for item in Tenants
if item.get("tenant_code")
]
return json.dumps(areas, ensure_ascii=False)
@staticmethod
def _ensureTenantAssignments(Tenants: list[dict[str, Any]]) -> None:
if Tenants:
return
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
"入口模块至少需要配置一个适用租户",
)
@staticmethod
def _normalizeMenuProfile(MenuProfile: str | None) -> str:
value = str(MenuProfile or "document_review").strip() or "document_review"
if value not in _ALLOWED_MENU_PROFILES:
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
f"不支持的菜单模板: {value}",
)
return value
@staticmethod
def _safeMenuProfile(MenuProfile: str | None) -> str:
value = str(MenuProfile or "document_review").strip() or "document_review"
return value if value in _ALLOWED_MENU_PROFILES else "document_review"
@staticmethod
def _normalizeFeatures(Features: list[str] | None, MenuProfile: str) -> list[str]:
raw_features = Features if Features else _DEFAULT_FEATURES_BY_PROFILE.get(MenuProfile, _DEFAULT_FEATURES_BY_PROFILE["document_review"])
normalized: list[str] = []
invalid: list[str] = []
for item in raw_features:
feature = str(item or "").strip()
if not feature:
continue
if MenuProfile == "govdoc" and feature == "rule_groups":
feature = "rules"
if feature not in _ALLOWED_FEATURES:
invalid.append(feature)
continue
if feature not in normalized:
normalized.append(feature)
if invalid:
raise LeauditException(
StatusCodeEnum.HTTP_400_BAD_REQUEST,
f"不支持的功能编码: {', '.join(invalid)}",
)
return normalized or list(_DEFAULT_FEATURES_BY_PROFILE.get(MenuProfile, _DEFAULT_FEATURES_BY_PROFILE["document_review"]))
@classmethod
def _parseFeatures(cls, RawFeatures: Any, MenuProfile: str | None) -> list[str]:
menu_profile = cls._safeMenuProfile(MenuProfile)
if isinstance(RawFeatures, list):
return cls._filterFeatures([str(item) for item in RawFeatures], menu_profile)
if isinstance(RawFeatures, str) and RawFeatures.strip():
try:
parsed = json.loads(RawFeatures)
except json.JSONDecodeError:
parsed = []
if isinstance(parsed, list):
return cls._filterFeatures([str(item) for item in parsed], menu_profile)
return list(_DEFAULT_FEATURES_BY_PROFILE.get(menu_profile, _DEFAULT_FEATURES_BY_PROFILE["document_review"]))
@staticmethod
def _filterFeatures(Features: list[str], MenuProfile: str) -> list[str]:
normalized: list[str] = []
for item in Features:
feature = str(item or "").strip()
if MenuProfile == "govdoc" and feature == "rule_groups":
feature = "rules"
if feature in _ALLOWED_FEATURES and feature not in normalized:
normalized.append(feature)
return normalized or list(_DEFAULT_FEATURES_BY_PROFILE.get(MenuProfile, _DEFAULT_FEATURES_BY_PROFILE["document_review"]))
@staticmethod
def _toIso(Value) -> str | None:
"""时间转 ISO 字符串。"""
if Value is None:
return None
if isinstance(Value, datetime):
return Value.isoformat()
return str(Value)