Files
2026-05-25 09:50:01 +08:00

1546 lines
73 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""评查点分组服务实现(新链路:文档类型 -> 子类型 -> 规则集)。"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import bindparam, text
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_modules.fastapi_leaudit.domian.Dto.evaluationPointGroupDto import (
EvaluationPointGroupBatchDeleteDTO,
EvaluationPointGroupBatchStatusDTO,
EvaluationPointGroupBindingCreateDTO,
EvaluationPointGroupBindingUpdateDTO,
EvaluationPointGroupCreateDTO,
EvaluationPointGroupRuleDraftCreateDTO,
EvaluationPointGroupRebindDTO,
EvaluationPointGroupUpdateDTO,
)
from fastapi_modules.fastapi_leaudit.domian.vo.evaluationPointGroupVo import (
EvaluationPointGroupBatchDeleteVO,
EvaluationPointGroupBatchStatusVO,
EvaluationPointGroupDeleteVO,
EvaluationPointGroupListVO,
EvaluationPointGroupRuleDraftVO,
EvaluationPointGroupRuleTemplateContextVO,
EvaluationPointGroupRuleTemplateVO,
EvaluationPointGroupRebindVO,
EvaluationPointGroupVO,
RuleGroupBindingVO,
)
from fastapi_modules.fastapi_leaudit.services.evaluationPointGroupService import IEvaluationPointGroupService
from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import (
bootstrap_rule_groups,
ensure_rule_group_schema,
sync_doc_type_bindings_from_group,
)
from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import GetRuleServiceSingleton
from fastapi_modules.fastapi_leaudit.services.impl.ruleTenantScope import normalize_scoped_tenant_code
from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolver
class EvaluationPointGroupServiceImpl(IEvaluationPointGroupService):
"""评查点分组服务实现。"""
def __init__(self) -> None:
self.RuleService = GetRuleServiceSingleton()
self.TenantResolver = TenantResolver()
self._entry_module_tenant_table_exists_cache: bool | None = None
async def ListGroups(
self,
Name: str | None,
Code: str | None,
IsEnabled: bool | None,
Pid: int | None,
Page: int,
PageSize: int,
CurrentUserId: int,
EntryModuleId: int | None = None,
) -> EvaluationPointGroupListVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
offset = max(Page - 1, 0) * PageSize
filters = ["g.deleted_at IS NULL", "(COALESCE(g.pid, 0) <> 0 OR g.document_type_id IS NOT NULL OR g.entry_module_id IS NOT NULL)"]
params: dict[str, Any] = {"limit": PageSize, "offset": offset}
from_clause = """
FROM leaudit_evaluation_point_groups g
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
"""
filters.append(
await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="group_scope",
)
)
if Name:
filters.append("g.name ILIKE :name")
params["name"] = f"%{Name.strip()}%"
if Code:
filters.append("g.code ILIKE :code")
params["code"] = f"%{Code.strip()}%"
if IsEnabled is not None:
filters.append("g.is_enabled = :is_enabled")
params["is_enabled"] = IsEnabled
if Pid is not None:
filters.append("COALESCE(g.pid, 0) = :pid")
params["pid"] = self._normalize_pid(Pid)
if EntryModuleId is not None:
await self._assert_entry_module_access(session, EntryModuleId, current_user)
filters.append("COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) = :entry_module_id")
params["entry_module_id"] = int(EntryModuleId)
where_clause = " AND ".join(filters)
total = int(
(
await session.execute(
text(f"SELECT COUNT(*) {from_clause} WHERE {where_clause}"),
params,
)
).scalar_one()
)
rows = (
await session.execute(
text(
f"""
SELECT
g.id,
g.pid,
g.name,
g.code,
g.description,
g.document_type_id,
dt.name AS document_type_name,
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
em.name AS entry_module_name,
g.sort_order,
g.is_enabled,
g.created_at,
g.updated_at,
COALESCE(bg.binding_count, 0) AS rule_count
{from_clause}
LEFT JOIN (
SELECT group_id, COUNT(*)::int AS binding_count
FROM leaudit_rule_group_bindings
WHERE deleted_at IS NULL
GROUP BY group_id
) bg ON bg.group_id = g.id
WHERE {where_clause}
ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC
LIMIT :limit OFFSET :offset
"""
),
params,
)
).mappings().all()
binding_map = await self._load_binding_map(session, [int(row["id"]) for row in rows], current_user=current_user)
return EvaluationPointGroupListVO(
data=[self._to_group_vo(row, binding_map.get(int(row["id"]), []), include_rule_count=True) for row in rows],
total=total,
page=Page,
page_size=PageSize,
)
async def ListAllGroups(
self,
IncludeDisabled: bool,
WithRuleCount: bool,
CurrentUserId: int,
EntryModuleId: int | None = None,
) -> list[EvaluationPointGroupVO]:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
filters = ["g.deleted_at IS NULL", "(COALESCE(g.pid, 0) <> 0 OR g.document_type_id IS NOT NULL OR g.entry_module_id IS NOT NULL)"]
params: dict[str, Any] = {}
filters.append(
await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="group_scope",
)
)
if not IncludeDisabled:
filters.append("g.is_enabled = TRUE")
if EntryModuleId is not None:
await self._assert_entry_module_access(session, EntryModuleId, current_user)
filters.append("COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) = :entry_module_id")
params["entry_module_id"] = int(EntryModuleId)
rows = (
await session.execute(
text(
f"""
SELECT
g.id,
g.pid,
g.name,
g.code,
g.description,
g.document_type_id,
dt.name AS document_type_name,
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
em.name AS entry_module_name,
g.sort_order,
g.is_enabled,
g.created_at,
g.updated_at,
COALESCE(bg.binding_count, 0) AS rule_count
FROM leaudit_evaluation_point_groups g
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
LEFT JOIN (
SELECT group_id, COUNT(*)::int AS binding_count
FROM leaudit_rule_group_bindings
WHERE deleted_at IS NULL
GROUP BY group_id
) bg ON bg.group_id = g.id
WHERE {' AND '.join(filters)}
ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC
"""
),
params,
)
).mappings().all()
binding_map = await self._load_binding_map(session, [int(row["id"]) for row in rows], current_user=current_user)
groups = [self._to_group_vo(row, binding_map.get(int(row["id"]), []), include_rule_count=WithRuleCount) for row in rows]
by_parent: dict[int, list[EvaluationPointGroupVO]] = {}
roots: list[EvaluationPointGroupVO] = []
for group in groups:
parent_id = self._normalize_pid(group.pid)
if parent_id == 0:
roots.append(group)
else:
by_parent.setdefault(parent_id, []).append(group)
for group in groups:
children = by_parent.get(group.id)
if children:
group.children = children
return roots
async def ListGroupsByDocumentTypes(
self,
DocumentTypeIds: list[int],
IncludeDisabled: bool,
WithRuleCount: bool,
CurrentUserId: int,
) -> list[EvaluationPointGroupVO]:
normalized_ids = sorted({int(item) for item in DocumentTypeIds if item})
if not normalized_ids:
return []
roots = await self.ListAllGroups(
IncludeDisabled=IncludeDisabled,
WithRuleCount=WithRuleCount,
CurrentUserId=CurrentUserId,
EntryModuleId=None,
)
result: list[EvaluationPointGroupVO] = []
for root in roots:
if root.document_type_id in normalized_ids:
result.append(root)
continue
children = root.children or []
if any(child.document_type_id in normalized_ids for child in children):
result.append(root)
return result
async def GetGroup(self, GroupId: int, WithRuleCount: bool, CurrentUserId: int) -> EvaluationPointGroupVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
row = await self._get_group_row(session, GroupId, current_user)
binding_map = await self._load_binding_map(session, [GroupId], current_user=current_user)
return self._to_group_vo(row, binding_map.get(GroupId, []), include_rule_count=WithRuleCount)
async def GetChildren(self, GroupId: int, IsEnabled: bool | None, Page: int, PageSize: int, CurrentUserId: int) -> EvaluationPointGroupListVO:
await self.GetGroup(GroupId, WithRuleCount=False, CurrentUserId=CurrentUserId)
return await self.ListGroups(
Name=None,
Code=None,
IsEnabled=IsEnabled,
Pid=GroupId,
Page=Page,
PageSize=PageSize,
CurrentUserId=CurrentUserId,
EntryModuleId=None,
)
async def CreateGroup(self, Body: EvaluationPointGroupCreateDTO, CurrentUserId: int) -> EvaluationPointGroupVO:
payload = self._normalize_create_payload(Body)
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
await self._ensure_code_unique(session, payload["code"], None)
parent = await self._ensure_parent_valid(session, payload["pid"], current_user)
payload["entry_module_id"] = await self._ensure_entry_module_valid(
session, payload["pid"], payload["entry_module_id"], parent, current_user
)
payload["document_type_id"] = await self._ensure_document_type_valid(
session, payload["pid"], payload["document_type_id"], payload["entry_module_id"], None, parent, current_user
)
if not current_user["is_global"] and payload["pid"] == 0 and payload["entry_module_id"] is None and payload["document_type_id"] is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前租户创建一级分组时必须绑定入口模块或文档类型")
row = (
await session.execute(
text(
"""
INSERT INTO leaudit_evaluation_point_groups (
pid, code, name, description, document_type_id, entry_module_id, sort_order, is_enabled, created_at, updated_at
) VALUES (
:pid, :code, :name, :description, :document_type_id, :entry_module_id, :sort_order, :is_enabled, NOW(), NOW()
)
RETURNING id
"""
),
payload,
)
).mappings().one()
await session.commit()
group_id = int(row["id"])
return await self.GetGroup(group_id, WithRuleCount=True, CurrentUserId=CurrentUserId)
async def UpdateGroup(self, GroupId: int, Body: EvaluationPointGroupUpdateDTO, CurrentUserId: int) -> EvaluationPointGroupVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
current = await self._get_group_row(session, GroupId, current_user)
provided_fields = set(getattr(Body, "model_fields_set", set()))
next_pid = self._normalize_pid(Body.pid) if Body.pid is not None else self._normalize_pid(current["pid"])
name = (Body.name.strip() if Body.name is not None else str(current.get("name") or "")).strip()
code = (Body.code.strip() if Body.code is not None else str(current.get("code") or "")).strip()
description = Body.description.strip() if Body.description is not None and Body.description else current.get("description")
document_type_id = Body.document_type_id if "document_type_id" in provided_fields else current.get("document_type_id")
entry_module_id = Body.entry_module_id if "entry_module_id" in provided_fields else current.get("entry_module_id")
sort_order = Body.sort_order if Body.sort_order is not None else int(current.get("sort_order") or 0)
is_enabled = Body.is_enabled if Body.is_enabled is not None else bool(current.get("is_enabled", True))
if not name:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组名称不能为空")
if not code:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码不能为空")
await self._ensure_code_unique(session, code, GroupId)
parent = await self._ensure_parent_valid(session, next_pid, current_user)
entry_module_id = await self._ensure_entry_module_valid(session, next_pid, entry_module_id, parent, current_user)
document_type_id = await self._ensure_document_type_valid(
session, next_pid, document_type_id, entry_module_id, GroupId, parent, current_user
)
if not current_user["is_global"] and next_pid == 0 and entry_module_id is None and document_type_id is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前租户的一级分组必须绑定入口模块或文档类型")
await session.execute(
text(
"""
UPDATE leaudit_evaluation_point_groups
SET pid = :pid,
code = :code,
name = :name,
description = :description,
document_type_id = :document_type_id,
entry_module_id = :entry_module_id,
sort_order = :sort_order,
is_enabled = :is_enabled,
updated_at = NOW()
WHERE id = :group_id
"""
),
{
"group_id": GroupId,
"pid": next_pid,
"code": code,
"name": name,
"description": description,
"document_type_id": document_type_id,
"entry_module_id": entry_module_id,
"sort_order": sort_order,
"is_enabled": is_enabled,
},
)
await sync_doc_type_bindings_from_group(session, GroupId)
await session.commit()
return await self.GetGroup(GroupId, WithRuleCount=True, CurrentUserId=CurrentUserId)
async def DeleteGroup(self, GroupId: int, CurrentUserId: int) -> EvaluationPointGroupDeleteVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
current = await self._get_group_row(session, GroupId, current_user)
is_root = self._normalize_pid(current["pid"]) == 0
if is_root:
child_count = int(
(
await session.execute(
text(
"SELECT COUNT(*) FROM leaudit_evaluation_point_groups WHERE pid = :group_id AND deleted_at IS NULL"
),
{"group_id": GroupId},
)
).scalar_one()
)
if child_count > 0:
return EvaluationPointGroupDeleteVO(
success=False,
message="当前一级分组下仍存在二级分组,请先迁移或删除二级分组",
deleted_groups=0,
deleted_points=0,
)
binding_count = int(
(
await session.execute(
text(
"SELECT COUNT(*) FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND deleted_at IS NULL"
),
{"group_id": GroupId},
)
).scalar_one()
)
await session.execute(
text(
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE group_id = :group_id AND deleted_at IS NULL"
),
{"group_id": GroupId},
)
await sync_doc_type_bindings_from_group(session, GroupId)
result = await session.execute(
text(
"UPDATE leaudit_evaluation_point_groups SET deleted_at = NOW(), updated_at = NOW() WHERE id = :group_id AND deleted_at IS NULL"
),
{"group_id": GroupId},
)
await session.commit()
deleted_groups = int(result.rowcount or 0)
return EvaluationPointGroupDeleteVO(
success=True,
message="规则分组删除成功",
deleted_count=deleted_groups,
deleted_groups=deleted_groups,
deleted_points=binding_count,
)
async def RebindGroup(self, GroupId: int, Body: EvaluationPointGroupRebindDTO, CurrentUserId: int) -> EvaluationPointGroupRebindVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
current = await self._get_group_row(session, GroupId, current_user)
target = await self._get_group_row(session, Body.new_parent_id, current_user)
if self._normalize_pid(current["pid"]) != 0 or self._normalize_pid(target["pid"]) != 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "仅支持一级分组之间迁移二级分组")
if GroupId == Body.new_parent_id:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "迁移目标不能与原分组相同")
result = await session.execute(
text(
"""
UPDATE leaudit_evaluation_point_groups
SET pid = :new_parent_id,
updated_at = NOW()
WHERE pid = :old_group_id AND deleted_at IS NULL
"""
),
{"old_group_id": GroupId, "new_parent_id": Body.new_parent_id},
)
await session.commit()
moved = int(result.rowcount or 0)
return EvaluationPointGroupRebindVO(success=True, message="二级分组迁移成功", rebind_count=moved, doc_types_updated=moved)
async def BatchUpdateStatus(self, Body: EvaluationPointGroupBatchStatusDTO, CurrentUserId: int) -> EvaluationPointGroupBatchStatusVO:
ids = sorted({int(item) for item in Body.ids if item})
if not ids:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请选择至少一个分组")
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
accessible_rows = await self._list_accessible_group_rows(session, ids, current_user)
if len(accessible_rows) != len(ids):
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "部分分组不存在或当前租户不可访问")
result = await session.execute(
text(
"""
UPDATE leaudit_evaluation_point_groups
SET is_enabled = :is_enabled,
updated_at = NOW()
WHERE id IN :ids AND deleted_at IS NULL
"""
).bindparams(bindparam("ids", expanding=True)),
{"ids": ids, "is_enabled": Body.is_enabled},
)
await session.commit()
updated_count = int(result.rowcount or 0)
return EvaluationPointGroupBatchStatusVO(success=True, updated_count=updated_count, message=f"成功更新 {updated_count} 个分组状态")
async def BatchDelete(self, Body: EvaluationPointGroupBatchDeleteDTO, CurrentUserId: int) -> EvaluationPointGroupBatchDeleteVO:
ids = sorted({int(item) for item in Body.ids if item})
if not ids:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请选择至少一个分组")
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
rows = await self._list_accessible_group_rows(session, ids, current_user)
if len(rows) != len(ids):
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "部分分组不存在或当前租户不可访问")
if any(self._normalize_pid(row["pid"]) == 0 for row in rows):
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "批量删除仅支持二级分组")
deleted_bindings = 0
for row in rows:
deleted_bindings += int(
(
await session.execute(
text(
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE group_id = :group_id AND deleted_at IS NULL"
),
{"group_id": int(row["id"])},
)
).rowcount
or 0
)
await sync_doc_type_bindings_from_group(session, int(row["id"]))
result = await session.execute(
text(
"UPDATE leaudit_evaluation_point_groups SET deleted_at = NOW(), updated_at = NOW() WHERE id IN :ids AND deleted_at IS NULL"
).bindparams(bindparam("ids", expanding=True)),
{"ids": ids},
)
await session.commit()
deleted_groups = int(result.rowcount or 0)
return EvaluationPointGroupBatchDeleteVO(success=True, deleted_groups=deleted_groups, deleted_points=deleted_bindings, message=f"成功删除 {deleted_groups} 个分组")
async def CreateBinding(self, GroupId: int, Body: EvaluationPointGroupBindingCreateDTO, CurrentUserId: int) -> RuleGroupBindingVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
group = await self._get_group_row(session, GroupId, current_user)
if self._normalize_pid(group["pid"]) == 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "一级分组不能直接绑定规则集,请先选择二级分组")
rule_set_meta = await self._ensure_rule_set_valid(session, Body.rule_set_id, current_user)
scope_payload = self._build_binding_scope_payload(current_user=current_user, rule_set_meta=rule_set_meta)
existing = (
await session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND rule_set_id = :rule_set_id
AND COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') = :tenant_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{
"group_id": GroupId,
"rule_set_id": Body.rule_set_id,
"tenant_code": scope_payload["tenant_code"],
},
)
).mappings().first()
if existing:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "该规则集已绑定到当前二级分组")
row = (
await session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id, rule_set_id, tenant_code, scope_type, tenant_name_snapshot, priority, is_active, note, created_at, updated_at
) VALUES (
:group_id, :rule_set_id, :tenant_code, :scope_type, :tenant_name_snapshot, :priority, :is_active, :note, NOW(), NOW()
)
RETURNING id
"""
),
{
"group_id": GroupId,
"rule_set_id": Body.rule_set_id,
"tenant_code": scope_payload["tenant_code"],
"scope_type": scope_payload["scope_type"],
"tenant_name_snapshot": scope_payload["tenant_name_snapshot"],
"priority": Body.priority,
"is_active": Body.is_active,
"note": Body.note.strip() if Body.note else None,
},
)
).mappings().one()
await sync_doc_type_bindings_from_group(session, GroupId)
await session.commit()
binding_id = int(row["id"])
binding_row = await self._get_binding_row(session, binding_id, current_user)
binding_vo = await self._build_binding_vo(binding_row, current_user=current_user)
return binding_vo
async def UpdateBinding(self, BindingId: int, Body: EvaluationPointGroupBindingUpdateDTO, CurrentUserId: int) -> RuleGroupBindingVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
current = await self._get_binding_row(session, BindingId, current_user)
self._assert_binding_mutable_by_user(current, current_user)
await session.execute(
text(
"""
UPDATE leaudit_rule_group_bindings
SET priority = :priority,
is_active = :is_active,
note = :note,
updated_at = NOW()
WHERE id = :binding_id
"""
),
{
"binding_id": BindingId,
"priority": Body.priority if Body.priority is not None else int(current.get("priority") or 0),
"is_active": Body.is_active if Body.is_active is not None else bool(current.get("is_active", True)),
"note": Body.note.strip() if Body.note else (current.get("note") if Body.note is None else None),
},
)
await sync_doc_type_bindings_from_group(session, int(current["group_id"]))
await session.commit()
binding_row = await self._get_binding_row(session, BindingId, current_user)
binding_vo = await self._build_binding_vo(binding_row, current_user=current_user)
return binding_vo
async def DeleteBinding(self, BindingId: int, CurrentUserId: int) -> None:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
current = await self._get_binding_row(session, BindingId, current_user)
self._assert_binding_mutable_by_user(current, current_user)
await session.execute(
text(
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE id = :binding_id"
),
{"binding_id": BindingId},
)
await sync_doc_type_bindings_from_group(session, int(current["group_id"]))
await session.commit()
async def GetRuleTemplate(self, GroupId: int, CurrentUserId: int) -> EvaluationPointGroupRuleTemplateVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
context = await self._load_rule_context(session, GroupId, current_user)
template = self._build_rule_yaml_template(context)
existing_binding_id = await self._get_group_binding_id_for_rule_type(session, GroupId, context["rule_type"])
return EvaluationPointGroupRuleTemplateVO(
context=EvaluationPointGroupRuleTemplateContextVO(
group_id=context["group_id"],
group_code=context["group_code"],
group_name=context["group_name"],
parent_group_id=context["parent_group_id"],
parent_group_code=context["parent_group_code"],
parent_group_name=context["parent_group_name"],
document_type_id=context["document_type_id"],
document_type_code=context["document_type_code"],
document_type_name=context["document_type_name"],
entry_module_id=context["entry_module_id"],
entry_module_name=context["entry_module_name"],
),
ruleType=context["rule_type"],
ruleName=context["rule_name"],
nextVersionNo=context["next_version_no"],
ossPreviewKey=context["oss_preview_key"],
yamlTemplate=template,
existingRuleSetId=context["existing_rule_set_id"],
existingBindingId=existing_binding_id,
)
async def CreateRuleDraft(self, GroupId: int, Body: EvaluationPointGroupRuleDraftCreateDTO, CurrentUserId: int) -> EvaluationPointGroupRuleDraftVO:
async with GetAsyncSession() as session:
await self._ensure_ready(session)
current_user = await self._get_current_user_context(session, CurrentUserId)
context = await self._load_rule_context(session, GroupId, current_user)
yaml_text = Body.yaml_text.strip()
if not yaml_text:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "YAML 内容不能为空")
created_version = await self.RuleService.CreateVersion(
RuleType=context["rule_type"],
YamlText=yaml_text,
ChangeNote=Body.change_note.strip() if Body.change_note else None,
EditorUserId=Body.editor_user_id,
CurrentUserId=CurrentUserId,
)
async with GetAsyncSession() as session:
await self._ensure_ready(session)
rule_set_id = int(created_version.ruleSetId)
if rule_set_id is None:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "规则版本已创建,但未找到对应规则集")
current_user = await self._get_current_user_context(session, CurrentUserId)
binding_id, auto_bound = await self._ensure_group_binding_for_rule_set(
session,
GroupId,
rule_set_id,
current_user=current_user,
)
await sync_doc_type_bindings_from_group(session, GroupId)
await session.commit()
binding_row = await self._get_binding_row(session, binding_id, current_user)
binding_vo = await self._build_binding_vo(binding_row, current_user=current_user)
return EvaluationPointGroupRuleDraftVO(
packId=GroupId,
groupId=GroupId,
ruleName=context["rule_name"],
ruleType=context["rule_type"],
ruleSetId=rule_set_id,
ossKey=OssPathUtils.BuildRuleYamlKey(context["rule_type"], created_version.versionNo),
version=created_version,
binding=binding_vo,
autoBound=auto_bound,
)
async def _ensure_ready(self, session) -> None:
self._rule_set_meta_cache = None
await ensure_rule_group_schema(session)
await bootstrap_rule_groups(session)
async def _get_group_row(self, session, group_id: int, current_user: dict[str, Any] | None = None):
params: dict[str, Any] = {"group_id": group_id}
access_filter = "1=1"
if current_user is not None:
access_filter = await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="group_detail_scope",
)
row = (
await session.execute(
text(
f"""
SELECT
g.id,
g.pid,
g.name,
g.code,
g.description,
g.document_type_id,
dt.code AS document_type_code,
dt.name AS document_type_name,
parent.id AS root_group_id,
parent.name AS root_group_name,
parent.code AS root_group_code,
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
em.name AS entry_module_name,
g.sort_order,
g.is_enabled,
g.created_at,
g.updated_at,
COALESCE(bg.binding_count, 0) AS rule_count
FROM leaudit_evaluation_point_groups g
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
LEFT JOIN (
SELECT group_id, COUNT(*)::int AS binding_count
FROM leaudit_rule_group_bindings
WHERE deleted_at IS NULL
GROUP BY group_id
) bg ON bg.group_id = g.id
WHERE g.id = :group_id AND g.deleted_at IS NULL AND {access_filter}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则分组不存在")
return row
async def _get_binding_row(self, session, binding_id: int, current_user: dict[str, Any] | None = None):
params: dict[str, Any] = {"binding_id": binding_id}
access_filter = "1=1"
if current_user is not None:
access_filter = await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="COALESCE(child.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="binding_scope",
)
row = (
await session.execute(
text(
f"""
SELECT
rgb.id,
rgb.group_id,
rgb.rule_set_id,
rgb.rule_type_binding_id,
COALESCE(NULLIF(BTRIM(rgb.tenant_code), ''), 'PROVINCIAL') AS tenant_code,
COALESCE(NULLIF(BTRIM(rgb.scope_type), ''), 'PROVINCIAL') AS scope_type,
rgb.tenant_name_snapshot,
rgb.priority,
rgb.is_active,
rgb.note,
rgb.created_at,
rgb.updated_at
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = child.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
WHERE rgb.id = :binding_id AND rgb.deleted_at IS NULL AND {access_filter}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则组绑定不存在")
return row
async def _load_binding_map(
self,
session,
group_ids: list[int],
current_user: dict[str, Any] | None = None,
) -> dict[int, list[RuleGroupBindingVO]]:
group_ids = [int(item) for item in group_ids if item]
if not group_ids:
return {}
params: dict[str, Any] = {"group_ids": group_ids}
tenant_filter = ""
if current_user is not None and not current_user.get("is_global"):
visible_tenant_codes = ["PUBLIC", "PROVINCIAL"]
tenant_code = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if tenant_code:
visible_tenant_codes.append(tenant_code)
tenant_filter = """
AND COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') IN :visible_tenant_codes
"""
params["visible_tenant_codes"] = sorted(set(visible_tenant_codes))
query = text(
f"""
SELECT
id,
group_id,
rule_set_id,
rule_type_binding_id,
COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') AS tenant_code,
COALESCE(NULLIF(BTRIM(scope_type), ''), 'PROVINCIAL') AS scope_type,
tenant_name_snapshot,
priority,
is_active,
note,
created_at,
updated_at
FROM leaudit_rule_group_bindings
WHERE group_id IN :group_ids
AND deleted_at IS NULL
{tenant_filter}
ORDER BY priority DESC, id ASC
"""
).bindparams(bindparam("group_ids", expanding=True))
if "visible_tenant_codes" in params:
query = query.bindparams(bindparam("visible_tenant_codes", expanding=True))
rows = (
await session.execute(query, params)
).mappings().all()
result: dict[int, list[RuleGroupBindingVO]] = {}
for row in rows:
result.setdefault(int(row["group_id"]), []).append(await self._build_binding_vo(row, current_user=current_user))
return result
async def _build_binding_vo(self, row, current_user: dict[str, Any] | None = None) -> RuleGroupBindingVO:
rule_set_meta = await self._get_rule_set_meta(int(row["rule_set_id"]), current_user=current_user)
scope_state = self._build_binding_scope_state(
binding_row=row,
current_user=current_user,
rule_set_meta=rule_set_meta,
)
return RuleGroupBindingVO(
id=int(row["id"]),
group_id=int(row["group_id"]),
rule_set_id=int(row["rule_set_id"]),
rule_type_binding_id=int(row["rule_type_binding_id"]) if row.get("rule_type_binding_id") else None,
priority=int(row.get("priority") or 0),
is_active=bool(row.get("is_active", True)),
note=row.get("note"),
tenant_code=scope_state["effective_tenant_code"],
scope_type=scope_state["effective_scope_type"],
tenant_name_snapshot=row.get("tenant_name_snapshot"),
rule_type=rule_set_meta.get("rule_type"),
rule_name=rule_set_meta.get("rule_name"),
current_version_id=rule_set_meta.get("current_version_id"),
fallback_version_id=rule_set_meta.get("fallback_version_id"),
has_usable_version=bool(rule_set_meta.get("has_usable_version", False)),
usable_rule_count=int(rule_set_meta.get("usable_rule_count") or 0),
effectiveTenantCode=scope_state["effective_tenant_code"],
effectiveScopeType=scope_state["effective_scope_type"],
isInherited=bool(scope_state["is_inherited"]),
sourceRuleSetId=self._to_int(rule_set_meta.get("source_rule_set_id")),
)
async def _get_rule_set_meta(self, rule_set_id: int, current_user: dict[str, Any] | None = None) -> dict[str, Any]:
if not getattr(self, "_rule_set_meta_cache", None):
rule_sets = await self.RuleService.ListSets(
CurrentUserId=int(current_user["id"]) if current_user and current_user.get("id") is not None else None
)
self._rule_set_meta_cache = {
int(item.id): {
"rule_type": item.ruleType,
"rule_name": item.ruleName,
"current_version_id": item.currentVersionId,
"fallback_version_id": item.fallbackVersionId,
"has_usable_version": item.hasUsableVersion,
"usable_rule_count": item.usableRuleCount,
"source_rule_set_id": getattr(item, "sourceRuleSetId", None),
"effective_tenant_code": getattr(item, "effectiveTenantCode", None),
"effective_scope_type": getattr(item, "effectiveScopeType", None),
}
for item in rule_sets
}
return self._rule_set_meta_cache.get(rule_set_id, {})
async def _ensure_parent_valid(self, session, pid: int, current_user: dict[str, Any]):
if pid == 0:
return None
parent = await self._get_group_row(session, pid, current_user)
if self._normalize_pid(parent["pid"]) != 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上级分组必须是一级分组")
return parent
async def _ensure_entry_module_valid(
self,
session,
pid: int,
entry_module_id: int | None,
parent: Any | None,
current_user: dict[str, Any],
) -> int | None:
if pid != 0:
if parent is not None and parent.get("entry_module_id") is not None:
return int(parent["entry_module_id"])
return None if entry_module_id in (None, 0, "0", "") else int(entry_module_id)
if entry_module_id in (None, 0, "0", ""):
return None
params: dict[str, Any] = {"entry_module_id": int(entry_module_id)}
access_filter = await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="em.id",
current_user=current_user,
params=params,
param_prefix="entry_module_scope",
)
exists = (
await session.execute(
text(
f"""
SELECT em.id
FROM leaudit_entry_modules em
WHERE em.id = :entry_module_id
AND em.deleted_at IS NULL
AND {access_filter}
LIMIT 1
"""
),
params,
)
).scalar_one_or_none()
if exists is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "关联入口模块不存在或当前租户不可访问")
return int(entry_module_id)
async def _ensure_document_type_valid(
self,
session,
pid: int,
document_type_id: int | None,
entry_module_id: int | None,
group_id: int | None,
parent: Any | None,
current_user: dict[str, Any],
) -> int | None:
if pid == 0:
if document_type_id is None and entry_module_id is None:
return None
else:
if parent is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组必须挂到一级分组下")
if parent.get("document_type_id") is not None:
parent_doc_type_id = int(parent["document_type_id"])
if document_type_id is None:
document_type_id = parent_doc_type_id
elif int(document_type_id) != parent_doc_type_id:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组的文档类型必须与所属一级分组一致")
elif document_type_id is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组必须明确绑定具体文档类型")
if document_type_id is None:
return None
row = (
await session.execute(
text(
"""
SELECT id, entry_module_id
FROM leaudit_document_types
WHERE id = :doc_type_id AND deleted_at IS NULL
LIMIT 1
"""
),
{"doc_type_id": int(document_type_id)},
)
).mappings().first()
if row is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "关联文档类型不存在")
doc_type_entry_module_id = int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None
effective_entry_module_id = int(entry_module_id) if entry_module_id is not None else doc_type_entry_module_id
if doc_type_entry_module_id is not None and effective_entry_module_id is not None and doc_type_entry_module_id != effective_entry_module_id:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "文档类型所属入口模块与分组入口模块不一致")
if doc_type_entry_module_id is not None:
await self._assert_entry_module_access(session, doc_type_entry_module_id, current_user)
if pid == 0:
duplicated_root = (
await session.execute(
text(
"""
SELECT id
FROM leaudit_evaluation_point_groups
WHERE deleted_at IS NULL
AND COALESCE(pid, 0) = 0
AND document_type_id = :doc_type_id
AND (:group_id IS NULL OR id <> :group_id)
LIMIT 1
"""
),
{"doc_type_id": int(document_type_id), "group_id": group_id},
)
).scalar_one_or_none()
if duplicated_root is not None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "该文档类型已存在一级分组")
return int(document_type_id)
async def _ensure_code_unique(self, session, code: str, group_id: int | None) -> None:
sql = "SELECT id FROM leaudit_evaluation_point_groups WHERE LOWER(code) = LOWER(:code) AND deleted_at IS NULL"
params: dict[str, Any] = {"code": code}
if group_id is not None:
sql += " AND id <> :group_id"
params["group_id"] = group_id
duplicated = (await session.execute(text(sql + " LIMIT 1"), params)).scalar_one_or_none()
if duplicated is not None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码已存在")
async def _ensure_rule_set_valid(self, session, rule_set_id: int, current_user: dict[str, Any]) -> dict[str, Any]:
row = (
await session.execute(
text(
"""
SELECT
id,
COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') AS tenant_code,
COALESCE(NULLIF(BTRIM(scope_type), ''), 'PROVINCIAL') AS scope_type,
source_rule_set_id
FROM leaudit_rule_sets
WHERE id = :rule_set_id AND deleted_at IS NULL
LIMIT 1
"""
),
{"rule_set_id": rule_set_id},
)
).mappings().first()
if row is None:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "规则集不存在")
if not current_user.get("is_global"):
rule_set_tenant = normalize_scoped_tenant_code(str(row.get("tenant_code") or ""))
user_tenant = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if not user_tenant:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户上下文缺失,不能绑定规则集")
if rule_set_tenant not in {user_tenant, "PROVINCIAL"}:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不能直接绑定公共或其他租户规则资产")
return dict(row)
async def _load_rule_context(self, session, group_id: int, current_user: dict[str, Any]) -> dict[str, Any]:
row = await self._get_group_row(session, group_id, current_user)
if self._normalize_pid(row["pid"]) == 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请先选择二级分组,再创建规则 YAML")
rule_type = self._resolve_rule_type(row)
if not rule_type:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前二级分组缺少可推导的规则类型编码,请先补充分组编码或文档类型编码")
rule_name = str(row.get("document_type_name") or row.get("name") or "").strip()
if not rule_name:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前二级分组缺少规则名称,请先补充分组名称")
next_version_no = await self._get_next_rule_version_no(session, rule_type)
return {
"group_id": int(row["id"]),
"group_code": str(row.get("code") or "").strip(),
"group_name": str(row.get("name") or ""),
"parent_group_id": int(row.get("root_group_id") or 0),
"parent_group_code": str(row.get("root_group_code") or "").strip(),
"parent_group_name": str(row.get("root_group_name") or row.get("entry_module_name") or ""),
"document_type_id": int(row["document_type_id"]) if row.get("document_type_id") is not None else None,
"document_type_code": str(row.get("document_type_code") or "").strip() or None,
"document_type_name": str(row.get("document_type_name") or "").strip() or None,
"entry_module_id": int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None,
"entry_module_name": str(row.get("entry_module_name") or "").strip() or None,
"rule_type": rule_type,
"rule_name": rule_name,
"next_version_no": next_version_no,
"oss_preview_key": OssPathUtils.BuildRuleYamlKey(rule_type, next_version_no),
"existing_rule_set_id": await self._get_rule_set_id_by_type(session, rule_type),
}
def _resolve_rule_type(self, row: Any) -> str:
doc_type_code = str(row.get("document_type_code") or "").strip()
if doc_type_code:
return self._normalize_rule_code(doc_type_code)
child_code = str(row.get("code") or "").strip()
root_code = str(row.get("root_group_code") or "").strip()
if child_code and root_code and child_code != root_code and not child_code.startswith(f"{root_code}."):
return self._normalize_rule_code(f"{root_code}.{child_code}")
if child_code:
return self._normalize_rule_code(child_code)
group_name = str(row.get("name") or "").strip()
if root_code and group_name:
return self._normalize_rule_code(f"{root_code}.{group_name}")
if group_name:
return self._normalize_rule_code(group_name)
return ""
def _normalize_rule_code(self, value: str) -> str:
normalized = ".".join(segment.strip() for segment in value.strip().split(".") if segment.strip())
if not normalized:
return ""
return normalized.replace("/", "_").replace("\\", "_").replace(" ", "_")
async def _get_next_rule_version_no(self, session, rule_type: str) -> str:
row = (
await session.execute(
text(
"""
SELECT COALESCE(MAX(rv.version_seq), 0) AS max_seq
FROM leaudit_rule_versions rv
JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id
WHERE rs.rule_type = :rule_type
AND rs.deleted_at IS NULL
"""
),
{"rule_type": rule_type},
)
).mappings().first()
next_seq = int((row or {}).get("max_seq") or 0) + 1
return f"v{next_seq}"
async def _get_current_user_context(self, session, current_user_id: int) -> dict[str, Any]:
sso_user_columns = await SsoUserCompat.get_columns(session)
tenant_code_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_code",
fallback_sql="''",
)
tenant_name_select = SsoUserCompat.optional_coalesce_as(
sso_user_columns,
alias="u",
column="tenant_name",
fallback_sql="''",
)
row = (
await session.execute(
text(
f"""
SELECT
u.id,
COALESCE(u.area, '') AS area,
{tenant_code_select},
{tenant_name_select},
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global
FROM sso_users u
LEFT JOIN user_role ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = :user_id
AND u.deleted_at IS NULL
AND u.status = 0
GROUP BY u.id, u.area
"""
),
{"user_id": current_user_id},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用")
tenant = await self.TenantResolver.ResolveUserContext(
Area=str(row["area"] or ""),
TenantCode=str(row.get("tenant_code") or "") or None,
TenantName=str(row.get("tenant_name") or "") or None,
Source="evaluation_group_user_context",
)
return {
"id": int(row["id"]),
"area": str(row["area"] or ""),
"tenant_code": tenant.tenant_code or (str(row.get("tenant_code") or "") or None),
"tenant_name": tenant.tenant_name or (str(row.get("tenant_name") or "") or None),
"tenant_scope_value": tenant.tenant_name or tenant.normalized_value or str(row["area"] or ""),
"is_global": bool(row["is_global"]),
}
async def _entry_module_tenant_table_exists(self, session) -> bool:
if self._entry_module_tenant_table_exists_cache is not None:
return self._entry_module_tenant_table_exists_cache
exists = bool(
(
await session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = 'leaudit_entry_module_tenants'
)
"""
)
)
).scalar_one()
)
self._entry_module_tenant_table_exists_cache = exists
return exists
async def _entry_module_tenant_scope_sql(
self,
*,
session,
entry_module_expr: str,
current_user: dict[str, Any],
params: dict[str, Any],
param_prefix: str,
) -> str:
if current_user.get("is_global"):
return "1=1"
if not await self._entry_module_tenant_table_exists(session):
return "1=0"
params[f"{param_prefix}_tenant_code"] = str(current_user.get("tenant_code") or "").strip()
return f"""
EXISTS (
SELECT 1
FROM leaudit_entry_module_tenants emt
WHERE emt.entry_module_id = {entry_module_expr}
AND emt.deleted_at IS NULL
AND emt.is_enabled = TRUE
AND (
emt.tenant_code = :{param_prefix}_tenant_code
OR emt.tenant_code = 'PUBLIC'
)
)
"""
async def _assert_entry_module_access(self, session, entry_module_id: int | None, current_user: dict[str, Any]) -> None:
if current_user.get("is_global"):
return
if entry_module_id is None:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前资源未绑定入口模块,当前租户不可访问")
params: dict[str, Any] = {"entry_module_id": int(entry_module_id)}
access_filter = await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="em.id",
current_user=current_user,
params=params,
param_prefix="assert_entry_module_scope",
)
exists = (
await session.execute(
text(
f"""
SELECT em.id
FROM leaudit_entry_modules em
WHERE em.id = :entry_module_id
AND em.deleted_at IS NULL
AND {access_filter}
LIMIT 1
"""
),
params,
)
).scalar_one_or_none()
if exists is None:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不可访问该入口模块")
async def _list_accessible_group_rows(self, session, group_ids: list[int], current_user: dict[str, Any]) -> list[Any]:
params: dict[str, Any] = {"ids": group_ids}
access_filter = await self._entry_module_tenant_scope_sql(
session=session,
entry_module_expr="COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="batch_group_scope",
)
rows = (
await session.execute(
text(
f"""
SELECT g.id, g.pid, g.document_type_id
FROM leaudit_evaluation_point_groups g
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
WHERE g.id IN :ids
AND g.deleted_at IS NULL
AND {access_filter}
"""
).bindparams(bindparam("ids", expanding=True)),
params,
)
).mappings().all()
return rows
def _build_rule_yaml_template(self, context: dict[str, Any]) -> str:
keywords = [
item
for item in dict.fromkeys(
[
context.get("parent_group_name"),
context.get("group_name"),
context.get("document_type_name"),
context.get("entry_module_name"),
]
)
if item
]
keyword_lines = "\n".join(f" - {item}" for item in keywords) or " - 待补充"
description = (
f"{context['rule_name']} 规则模板。"
f" 入口模块:{context.get('entry_module_name') or '未配置'}"
f" 一级分组:{context.get('parent_group_name') or '未配置'}"
f" 二级分组:{context.get('group_name') or '未配置'}"
)
parent_type = context.get("parent_group_name") or context.get("group_name") or context["rule_type"]
return (
"metadata:\n"
f" type_id: {context['rule_type']}\n"
f" name: {context['rule_name']}\n"
f" version: {context['next_version_no']}\n"
f" last_updated: '{datetime.now().date().isoformat()}'\n"
f" parent: {parent_type}\n"
" classification_keywords:\n"
f"{keyword_lines}\n"
" description: >\n"
f" {description}\n\n"
"sub_documents: []\n"
"rules: []\n"
)
async def _get_group_binding_id_for_rule_type(self, session, group_id: int, rule_type: str) -> int | None:
rule_set_id = await self._get_rule_set_id_by_type(session, rule_type)
if rule_set_id is None:
return None
row = (
await session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND rule_set_id = :rule_set_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"group_id": group_id, "rule_set_id": rule_set_id},
)
).mappings().first()
return int(row["id"]) if row else None
async def _get_rule_set_id_by_type(self, session, rule_type: str) -> int | None:
row = (
await session.execute(
text("SELECT id FROM leaudit_rule_sets WHERE rule_type = :rule_type AND deleted_at IS NULL LIMIT 1"),
{"rule_type": rule_type},
)
).mappings().first()
return int(row["id"]) if row else None
async def _ensure_group_binding_for_rule_set(
self,
session,
group_id: int,
rule_set_id: int,
*,
current_user: dict[str, Any],
) -> tuple[int, bool]:
rule_set_meta = await self._ensure_rule_set_valid(session, rule_set_id, current_user)
scope_payload = self._build_binding_scope_payload(current_user=current_user, rule_set_meta=rule_set_meta)
existing = (
await session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND rule_set_id = :rule_set_id
AND COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') = :tenant_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{
"group_id": group_id,
"rule_set_id": rule_set_id,
"tenant_code": scope_payload["tenant_code"],
},
)
).mappings().first()
if existing:
return int(existing["id"]), False
row = (
await session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id, rule_set_id, tenant_code, scope_type, tenant_name_snapshot, priority, is_active, note, created_at, updated_at
) VALUES (
:group_id, :rule_set_id, :tenant_code, :scope_type, :tenant_name_snapshot, 100, TRUE, :note, NOW(), NOW()
)
RETURNING id
"""
),
{
"group_id": group_id,
"rule_set_id": rule_set_id,
"tenant_code": scope_payload["tenant_code"],
"scope_type": scope_payload["scope_type"],
"tenant_name_snapshot": scope_payload["tenant_name_snapshot"],
"note": "由二级分组新建规则 YAML 时自动补绑",
},
)
).mappings().one()
return int(row["id"]), True
def _build_binding_scope_payload(
self,
*,
current_user: dict[str, Any],
rule_set_meta: dict[str, Any],
) -> dict[str, Any]:
if not current_user.get("is_global"):
tenant_code = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if not tenant_code:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户上下文缺失,不能修改规则绑定")
rule_set_tenant = normalize_scoped_tenant_code(
str(rule_set_meta.get("effective_tenant_code") or rule_set_meta.get("tenant_code") or "")
)
if rule_set_tenant == "PUBLIC":
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不能直接绑定公共规则资产")
return {
"tenant_code": tenant_code,
"scope_type": "TENANT",
"tenant_name_snapshot": str(current_user.get("tenant_name") or "").strip() or None,
}
effective_tenant_code = normalize_scoped_tenant_code(
str(rule_set_meta.get("effective_tenant_code") or rule_set_meta.get("tenant_code") or "")
)
effective_scope_type = str(rule_set_meta.get("effective_scope_type") or "").strip().upper()
if not effective_scope_type:
effective_scope_type = "PUBLIC" if effective_tenant_code == "PUBLIC" else "PROVINCIAL"
return {
"tenant_code": effective_tenant_code,
"scope_type": effective_scope_type,
"tenant_name_snapshot": None,
}
def _build_binding_scope_state(
self,
*,
binding_row: dict[str, Any] | Any,
current_user: dict[str, Any] | None,
rule_set_meta: dict[str, Any],
) -> dict[str, Any]:
effective_tenant_code = normalize_scoped_tenant_code(str(binding_row.get("tenant_code") or ""))
effective_scope_type = str(binding_row.get("scope_type") or "").strip().upper()
if not effective_scope_type:
effective_scope_type = "PUBLIC" if effective_tenant_code == "PUBLIC" else "PROVINCIAL" if effective_tenant_code == "PROVINCIAL" else "TENANT"
user_tenant = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="") if current_user else ""
is_tenant_user = bool(current_user) and not bool(current_user.get("is_global")) and bool(user_tenant)
is_inherited = is_tenant_user and effective_tenant_code != user_tenant
return {
"effective_tenant_code": effective_tenant_code,
"effective_scope_type": effective_scope_type,
"is_inherited": is_inherited,
"source_rule_set_id": self._to_int(rule_set_meta.get("source_rule_set_id")),
}
def _assert_binding_mutable_by_user(self, binding_row: dict[str, Any] | Any, current_user: dict[str, Any]) -> None:
if current_user.get("is_global"):
return
binding_tenant = normalize_scoped_tenant_code(str(binding_row.get("tenant_code") or ""))
user_tenant = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if not user_tenant or binding_tenant != user_tenant:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不能修改继承或其他租户的规则绑定")
def _normalize_create_payload(self, body: EvaluationPointGroupCreateDTO) -> dict[str, Any]:
name = body.name.strip()
code = body.code.strip()
if not name:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组名称不能为空")
if not code:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码不能为空")
return {
"name": name,
"code": code,
"pid": self._normalize_pid(body.pid),
"description": body.description.strip() if body.description else None,
"document_type_id": body.document_type_id,
"entry_module_id": body.entry_module_id,
"sort_order": int(body.sort_order or 0),
"is_enabled": body.is_enabled,
}
def _to_group_vo(self, row, bindings: list[RuleGroupBindingVO], include_rule_count: bool) -> EvaluationPointGroupVO:
return EvaluationPointGroupVO(
id=int(row["id"]),
pid=self._normalize_pid(row.get("pid")),
name=str(row.get("name") or ""),
code=str(row.get("code") or ""),
description=row.get("description"),
document_type_id=int(row["document_type_id"]) if row.get("document_type_id") is not None else None,
document_type_name=row.get("document_type_name"),
entry_module_id=int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None,
entry_module_name=row.get("entry_module_name"),
sort_order=int(row.get("sort_order") or 0),
is_enabled=bool(row.get("is_enabled", True)),
created_at=self._to_iso(row.get("created_at")),
updated_at=self._to_iso(row.get("updated_at")),
rule_count=len(bindings) if include_rule_count else None,
bindings=bindings,
children=None,
)
def _normalize_pid(self, value: Any) -> int:
if value in (None, "", "0", 0):
return 0
return int(value)
def _to_iso(self, value: Any) -> str | None:
if value is None:
return None
if isinstance(value, datetime):
return value.isoformat()
return str(value)
def _to_int(self, value: Any) -> int | None:
if value is None:
return None
return int(value)