815 lines
38 KiB
Python
815 lines
38 KiB
Python
"""评查点分组服务实现(新链路:文档类型 -> 子类型 -> 规则集)。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import bindparam, text
|
|
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
|
|
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
|
|
from fastapi_modules.fastapi_leaudit.domian.Dto.evaluationPointGroupDto import (
|
|
EvaluationPointGroupBatchDeleteDTO,
|
|
EvaluationPointGroupBatchStatusDTO,
|
|
EvaluationPointGroupBindingCreateDTO,
|
|
EvaluationPointGroupBindingUpdateDTO,
|
|
EvaluationPointGroupCreateDTO,
|
|
EvaluationPointGroupRebindDTO,
|
|
EvaluationPointGroupUpdateDTO,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.domian.vo.evaluationPointGroupVo import (
|
|
EvaluationPointGroupBatchDeleteVO,
|
|
EvaluationPointGroupBatchStatusVO,
|
|
EvaluationPointGroupDeleteVO,
|
|
EvaluationPointGroupListVO,
|
|
EvaluationPointGroupRebindVO,
|
|
EvaluationPointGroupVO,
|
|
RuleGroupBindingVO,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.services.evaluationPointGroupService import IEvaluationPointGroupService
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import (
|
|
bootstrap_rule_groups,
|
|
ensure_rule_group_schema,
|
|
sync_doc_type_bindings_from_group,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl
|
|
|
|
|
|
class EvaluationPointGroupServiceImpl(IEvaluationPointGroupService):
|
|
"""评查点分组服务实现。"""
|
|
|
|
def __init__(self) -> None:
|
|
self.RuleService = RuleServiceImpl()
|
|
|
|
async def ListGroups(
|
|
self,
|
|
Name: str | None,
|
|
Code: str | None,
|
|
IsEnabled: bool | None,
|
|
Pid: int | None,
|
|
Page: int,
|
|
PageSize: int,
|
|
) -> EvaluationPointGroupListVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
offset = max(Page - 1, 0) * PageSize
|
|
filters = ["g.deleted_at IS NULL", "(COALESCE(g.pid, 0) <> 0 OR g.document_type_id IS NOT NULL OR g.entry_module_id IS NOT NULL)"]
|
|
params: dict[str, Any] = {"limit": PageSize, "offset": offset}
|
|
|
|
if Name:
|
|
filters.append("g.name ILIKE :name")
|
|
params["name"] = f"%{Name.strip()}%"
|
|
if Code:
|
|
filters.append("g.code ILIKE :code")
|
|
params["code"] = f"%{Code.strip()}%"
|
|
if IsEnabled is not None:
|
|
filters.append("g.is_enabled = :is_enabled")
|
|
params["is_enabled"] = IsEnabled
|
|
if Pid is not None:
|
|
filters.append("COALESCE(g.pid, 0) = :pid")
|
|
params["pid"] = self._normalize_pid(Pid)
|
|
|
|
where_clause = " AND ".join(filters)
|
|
total = int(
|
|
(
|
|
await session.execute(
|
|
text(f"SELECT COUNT(*) FROM leaudit_evaluation_point_groups g WHERE {where_clause}"),
|
|
params,
|
|
)
|
|
).scalar_one()
|
|
)
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
g.id,
|
|
g.pid,
|
|
g.name,
|
|
g.code,
|
|
g.description,
|
|
g.document_type_id,
|
|
dt.name AS document_type_name,
|
|
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
|
|
em.name AS entry_module_name,
|
|
g.sort_order,
|
|
g.is_enabled,
|
|
g.created_at,
|
|
g.updated_at,
|
|
COALESCE(bg.binding_count, 0) AS rule_count
|
|
FROM leaudit_evaluation_point_groups g
|
|
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
|
|
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
|
|
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
|
|
LEFT JOIN (
|
|
SELECT group_id, COUNT(*)::int AS binding_count
|
|
FROM leaudit_rule_group_bindings
|
|
WHERE deleted_at IS NULL
|
|
GROUP BY group_id
|
|
) bg ON bg.group_id = g.id
|
|
WHERE {where_clause}
|
|
ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC
|
|
LIMIT :limit OFFSET :offset
|
|
"""
|
|
),
|
|
params,
|
|
)
|
|
).mappings().all()
|
|
binding_map = await self._load_binding_map(session, [int(row["id"]) for row in rows])
|
|
return EvaluationPointGroupListVO(
|
|
data=[self._to_group_vo(row, binding_map.get(int(row["id"]), []), include_rule_count=True) for row in rows],
|
|
total=total,
|
|
page=Page,
|
|
page_size=PageSize,
|
|
)
|
|
|
|
async def ListAllGroups(self, IncludeDisabled: bool, WithRuleCount: bool) -> list[EvaluationPointGroupVO]:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
filters = ["g.deleted_at IS NULL", "(COALESCE(g.pid, 0) <> 0 OR g.document_type_id IS NOT NULL OR g.entry_module_id IS NOT NULL)"]
|
|
if not IncludeDisabled:
|
|
filters.append("g.is_enabled = TRUE")
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
f"""
|
|
SELECT
|
|
g.id,
|
|
g.pid,
|
|
g.name,
|
|
g.code,
|
|
g.description,
|
|
g.document_type_id,
|
|
dt.name AS document_type_name,
|
|
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
|
|
em.name AS entry_module_name,
|
|
g.sort_order,
|
|
g.is_enabled,
|
|
g.created_at,
|
|
g.updated_at,
|
|
COALESCE(bg.binding_count, 0) AS rule_count
|
|
FROM leaudit_evaluation_point_groups g
|
|
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
|
|
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
|
|
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
|
|
LEFT JOIN (
|
|
SELECT group_id, COUNT(*)::int AS binding_count
|
|
FROM leaudit_rule_group_bindings
|
|
WHERE deleted_at IS NULL
|
|
GROUP BY group_id
|
|
) bg ON bg.group_id = g.id
|
|
WHERE {' AND '.join(filters)}
|
|
ORDER BY COALESCE(g.sort_order, 0) ASC, g.id ASC
|
|
"""
|
|
)
|
|
)
|
|
).mappings().all()
|
|
binding_map = await self._load_binding_map(session, [int(row["id"]) for row in rows])
|
|
|
|
groups = [self._to_group_vo(row, binding_map.get(int(row["id"]), []), include_rule_count=WithRuleCount) for row in rows]
|
|
by_parent: dict[int, list[EvaluationPointGroupVO]] = {}
|
|
roots: list[EvaluationPointGroupVO] = []
|
|
for group in groups:
|
|
parent_id = self._normalize_pid(group.pid)
|
|
if parent_id == 0:
|
|
roots.append(group)
|
|
else:
|
|
by_parent.setdefault(parent_id, []).append(group)
|
|
for group in groups:
|
|
children = by_parent.get(group.id)
|
|
if children:
|
|
group.children = children
|
|
return roots
|
|
|
|
async def ListGroupsByDocumentTypes(
|
|
self,
|
|
DocumentTypeIds: list[int],
|
|
IncludeDisabled: bool,
|
|
WithRuleCount: bool,
|
|
) -> list[EvaluationPointGroupVO]:
|
|
normalized_ids = sorted({int(item) for item in DocumentTypeIds if item})
|
|
if not normalized_ids:
|
|
return []
|
|
roots = await self.ListAllGroups(IncludeDisabled=IncludeDisabled, WithRuleCount=WithRuleCount)
|
|
result: list[EvaluationPointGroupVO] = []
|
|
for root in roots:
|
|
if root.document_type_id in normalized_ids:
|
|
result.append(root)
|
|
continue
|
|
children = root.children or []
|
|
if any(child.document_type_id in normalized_ids for child in children):
|
|
result.append(root)
|
|
return result
|
|
|
|
async def GetGroup(self, GroupId: int, WithRuleCount: bool) -> EvaluationPointGroupVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
row = await self._get_group_row(session, GroupId)
|
|
binding_map = await self._load_binding_map(session, [GroupId])
|
|
return self._to_group_vo(row, binding_map.get(GroupId, []), include_rule_count=WithRuleCount)
|
|
|
|
async def GetChildren(self, GroupId: int, IsEnabled: bool | None, Page: int, PageSize: int) -> EvaluationPointGroupListVO:
|
|
await self.GetGroup(GroupId, WithRuleCount=False)
|
|
return await self.ListGroups(Name=None, Code=None, IsEnabled=IsEnabled, Pid=GroupId, Page=Page, PageSize=PageSize)
|
|
|
|
async def CreateGroup(self, Body: EvaluationPointGroupCreateDTO) -> EvaluationPointGroupVO:
|
|
payload = self._normalize_create_payload(Body)
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
await self._ensure_code_unique(session, payload["code"], None)
|
|
parent = await self._ensure_parent_valid(session, payload["pid"])
|
|
payload["entry_module_id"] = await self._ensure_entry_module_valid(
|
|
session, payload["pid"], payload["entry_module_id"], parent
|
|
)
|
|
payload["document_type_id"] = await self._ensure_document_type_valid(
|
|
session, payload["pid"], payload["document_type_id"], payload["entry_module_id"], None, parent
|
|
)
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO leaudit_evaluation_point_groups (
|
|
pid, code, name, description, document_type_id, entry_module_id, sort_order, is_enabled, created_at, updated_at
|
|
) VALUES (
|
|
:pid, :code, :name, :description, :document_type_id, :entry_module_id, :sort_order, :is_enabled, NOW(), NOW()
|
|
)
|
|
RETURNING id
|
|
"""
|
|
),
|
|
payload,
|
|
)
|
|
).mappings().one()
|
|
await session.commit()
|
|
group_id = int(row["id"])
|
|
return await self.GetGroup(group_id, WithRuleCount=True)
|
|
|
|
async def UpdateGroup(self, GroupId: int, Body: EvaluationPointGroupUpdateDTO) -> EvaluationPointGroupVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
current = await self._get_group_row(session, GroupId)
|
|
provided_fields = set(getattr(Body, "model_fields_set", set()))
|
|
next_pid = self._normalize_pid(Body.pid) if Body.pid is not None else self._normalize_pid(current["pid"])
|
|
name = (Body.name.strip() if Body.name is not None else str(current.get("name") or "")).strip()
|
|
code = (Body.code.strip() if Body.code is not None else str(current.get("code") or "")).strip()
|
|
description = Body.description.strip() if Body.description is not None and Body.description else current.get("description")
|
|
document_type_id = Body.document_type_id if "document_type_id" in provided_fields else current.get("document_type_id")
|
|
entry_module_id = Body.entry_module_id if "entry_module_id" in provided_fields else current.get("entry_module_id")
|
|
sort_order = Body.sort_order if Body.sort_order is not None else int(current.get("sort_order") or 0)
|
|
is_enabled = Body.is_enabled if Body.is_enabled is not None else bool(current.get("is_enabled", True))
|
|
|
|
if not name:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组名称不能为空")
|
|
if not code:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码不能为空")
|
|
|
|
await self._ensure_code_unique(session, code, GroupId)
|
|
parent = await self._ensure_parent_valid(session, next_pid)
|
|
entry_module_id = await self._ensure_entry_module_valid(session, next_pid, entry_module_id, parent)
|
|
document_type_id = await self._ensure_document_type_valid(
|
|
session, next_pid, document_type_id, entry_module_id, GroupId, parent
|
|
)
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE leaudit_evaluation_point_groups
|
|
SET pid = :pid,
|
|
code = :code,
|
|
name = :name,
|
|
description = :description,
|
|
document_type_id = :document_type_id,
|
|
entry_module_id = :entry_module_id,
|
|
sort_order = :sort_order,
|
|
is_enabled = :is_enabled,
|
|
updated_at = NOW()
|
|
WHERE id = :group_id
|
|
"""
|
|
),
|
|
{
|
|
"group_id": GroupId,
|
|
"pid": next_pid,
|
|
"code": code,
|
|
"name": name,
|
|
"description": description,
|
|
"document_type_id": document_type_id,
|
|
"entry_module_id": entry_module_id,
|
|
"sort_order": sort_order,
|
|
"is_enabled": is_enabled,
|
|
},
|
|
)
|
|
await sync_doc_type_bindings_from_group(session, GroupId)
|
|
await session.commit()
|
|
return await self.GetGroup(GroupId, WithRuleCount=True)
|
|
|
|
async def DeleteGroup(self, GroupId: int) -> EvaluationPointGroupDeleteVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
current = await self._get_group_row(session, GroupId)
|
|
is_root = self._normalize_pid(current["pid"]) == 0
|
|
if is_root:
|
|
child_count = int(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
"SELECT COUNT(*) FROM leaudit_evaluation_point_groups WHERE pid = :group_id AND deleted_at IS NULL"
|
|
),
|
|
{"group_id": GroupId},
|
|
)
|
|
).scalar_one()
|
|
)
|
|
if child_count > 0:
|
|
return EvaluationPointGroupDeleteVO(
|
|
success=False,
|
|
message="当前一级分组下仍存在二级分组,请先迁移或删除二级分组",
|
|
deleted_groups=0,
|
|
deleted_points=0,
|
|
)
|
|
|
|
binding_count = int(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
"SELECT COUNT(*) FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND deleted_at IS NULL"
|
|
),
|
|
{"group_id": GroupId},
|
|
)
|
|
).scalar_one()
|
|
)
|
|
await session.execute(
|
|
text(
|
|
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE group_id = :group_id AND deleted_at IS NULL"
|
|
),
|
|
{"group_id": GroupId},
|
|
)
|
|
await sync_doc_type_bindings_from_group(session, GroupId)
|
|
result = await session.execute(
|
|
text(
|
|
"UPDATE leaudit_evaluation_point_groups SET deleted_at = NOW(), updated_at = NOW() WHERE id = :group_id AND deleted_at IS NULL"
|
|
),
|
|
{"group_id": GroupId},
|
|
)
|
|
await session.commit()
|
|
deleted_groups = int(result.rowcount or 0)
|
|
return EvaluationPointGroupDeleteVO(
|
|
success=True,
|
|
message="规则分组删除成功",
|
|
deleted_count=deleted_groups,
|
|
deleted_groups=deleted_groups,
|
|
deleted_points=binding_count,
|
|
)
|
|
|
|
async def RebindGroup(self, GroupId: int, Body: EvaluationPointGroupRebindDTO) -> EvaluationPointGroupRebindVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
current = await self._get_group_row(session, GroupId)
|
|
target = await self._get_group_row(session, Body.new_parent_id)
|
|
if self._normalize_pid(current["pid"]) != 0 or self._normalize_pid(target["pid"]) != 0:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "仅支持一级分组之间迁移二级分组")
|
|
if GroupId == Body.new_parent_id:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "迁移目标不能与原分组相同")
|
|
result = await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE leaudit_evaluation_point_groups
|
|
SET pid = :new_parent_id,
|
|
updated_at = NOW()
|
|
WHERE pid = :old_group_id AND deleted_at IS NULL
|
|
"""
|
|
),
|
|
{"old_group_id": GroupId, "new_parent_id": Body.new_parent_id},
|
|
)
|
|
await session.commit()
|
|
moved = int(result.rowcount or 0)
|
|
return EvaluationPointGroupRebindVO(success=True, message="二级分组迁移成功", rebind_count=moved, doc_types_updated=moved)
|
|
|
|
async def BatchUpdateStatus(self, Body: EvaluationPointGroupBatchStatusDTO) -> EvaluationPointGroupBatchStatusVO:
|
|
ids = sorted({int(item) for item in Body.ids if item})
|
|
if not ids:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请选择至少一个分组")
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
result = await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE leaudit_evaluation_point_groups
|
|
SET is_enabled = :is_enabled,
|
|
updated_at = NOW()
|
|
WHERE id IN :ids AND deleted_at IS NULL
|
|
"""
|
|
).bindparams(bindparam("ids", expanding=True)),
|
|
{"ids": ids, "is_enabled": Body.is_enabled},
|
|
)
|
|
await session.commit()
|
|
updated_count = int(result.rowcount or 0)
|
|
return EvaluationPointGroupBatchStatusVO(success=True, updated_count=updated_count, message=f"成功更新 {updated_count} 个分组状态")
|
|
|
|
async def BatchDelete(self, Body: EvaluationPointGroupBatchDeleteDTO) -> EvaluationPointGroupBatchDeleteVO:
|
|
ids = sorted({int(item) for item in Body.ids if item})
|
|
if not ids:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "请选择至少一个分组")
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
"SELECT id, pid, document_type_id FROM leaudit_evaluation_point_groups WHERE id IN :ids AND deleted_at IS NULL"
|
|
).bindparams(bindparam("ids", expanding=True)),
|
|
{"ids": ids},
|
|
)
|
|
).mappings().all()
|
|
if len(rows) != len(ids):
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "部分分组不存在")
|
|
if any(self._normalize_pid(row["pid"]) == 0 for row in rows):
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "批量删除仅支持二级分组")
|
|
|
|
deleted_bindings = 0
|
|
for row in rows:
|
|
deleted_bindings += int(
|
|
(
|
|
await session.execute(
|
|
text(
|
|
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE group_id = :group_id AND deleted_at IS NULL"
|
|
),
|
|
{"group_id": int(row["id"])},
|
|
)
|
|
).rowcount
|
|
or 0
|
|
)
|
|
await sync_doc_type_bindings_from_group(session, int(row["id"]))
|
|
result = await session.execute(
|
|
text(
|
|
"UPDATE leaudit_evaluation_point_groups SET deleted_at = NOW(), updated_at = NOW() WHERE id IN :ids AND deleted_at IS NULL"
|
|
).bindparams(bindparam("ids", expanding=True)),
|
|
{"ids": ids},
|
|
)
|
|
await session.commit()
|
|
deleted_groups = int(result.rowcount or 0)
|
|
return EvaluationPointGroupBatchDeleteVO(success=True, deleted_groups=deleted_groups, deleted_points=deleted_bindings, message=f"成功删除 {deleted_groups} 个分组")
|
|
|
|
async def CreateBinding(self, GroupId: int, Body: EvaluationPointGroupBindingCreateDTO) -> RuleGroupBindingVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
group = await self._get_group_row(session, GroupId)
|
|
if self._normalize_pid(group["pid"]) == 0:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "一级分组不能直接绑定规则集,请先选择二级分组")
|
|
await self._ensure_rule_set_valid(session, Body.rule_set_id)
|
|
existing = (
|
|
await session.execute(
|
|
text(
|
|
"SELECT id FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND rule_set_id = :rule_set_id AND deleted_at IS NULL LIMIT 1"
|
|
),
|
|
{"group_id": GroupId, "rule_set_id": Body.rule_set_id},
|
|
)
|
|
).mappings().first()
|
|
if existing:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "该规则集已绑定到当前二级分组")
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
INSERT INTO leaudit_rule_group_bindings (
|
|
group_id, rule_set_id, priority, is_active, note, created_at, updated_at
|
|
) VALUES (
|
|
:group_id, :rule_set_id, :priority, :is_active, :note, NOW(), NOW()
|
|
)
|
|
RETURNING id
|
|
"""
|
|
),
|
|
{
|
|
"group_id": GroupId,
|
|
"rule_set_id": Body.rule_set_id,
|
|
"priority": Body.priority,
|
|
"is_active": Body.is_active,
|
|
"note": Body.note.strip() if Body.note else None,
|
|
},
|
|
)
|
|
).mappings().one()
|
|
await sync_doc_type_bindings_from_group(session, GroupId)
|
|
await session.commit()
|
|
binding_id = int(row["id"])
|
|
binding_row = await self._get_binding_row(session, binding_id)
|
|
binding_vo = await self._build_binding_vo(binding_row)
|
|
return binding_vo
|
|
|
|
async def UpdateBinding(self, BindingId: int, Body: EvaluationPointGroupBindingUpdateDTO) -> RuleGroupBindingVO:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
current = await self._get_binding_row(session, BindingId)
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE leaudit_rule_group_bindings
|
|
SET priority = :priority,
|
|
is_active = :is_active,
|
|
note = :note,
|
|
updated_at = NOW()
|
|
WHERE id = :binding_id
|
|
"""
|
|
),
|
|
{
|
|
"binding_id": BindingId,
|
|
"priority": Body.priority if Body.priority is not None else int(current.get("priority") or 0),
|
|
"is_active": Body.is_active if Body.is_active is not None else bool(current.get("is_active", True)),
|
|
"note": Body.note.strip() if Body.note else (current.get("note") if Body.note is None else None),
|
|
},
|
|
)
|
|
await sync_doc_type_bindings_from_group(session, int(current["group_id"]))
|
|
await session.commit()
|
|
binding_row = await self._get_binding_row(session, BindingId)
|
|
binding_vo = await self._build_binding_vo(binding_row)
|
|
return binding_vo
|
|
|
|
async def DeleteBinding(self, BindingId: int) -> None:
|
|
async with GetAsyncSession() as session:
|
|
await self._ensure_ready(session)
|
|
current = await self._get_binding_row(session, BindingId)
|
|
await session.execute(
|
|
text(
|
|
"UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE id = :binding_id"
|
|
),
|
|
{"binding_id": BindingId},
|
|
)
|
|
await sync_doc_type_bindings_from_group(session, int(current["group_id"]))
|
|
await session.commit()
|
|
|
|
async def _ensure_ready(self, session) -> None:
|
|
self._rule_set_meta_cache = None
|
|
await ensure_rule_group_schema(session)
|
|
await bootstrap_rule_groups(session)
|
|
|
|
async def _get_group_row(self, session, group_id: int):
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
g.id,
|
|
g.pid,
|
|
g.name,
|
|
g.code,
|
|
g.description,
|
|
g.document_type_id,
|
|
dt.name AS document_type_name,
|
|
COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id) AS entry_module_id,
|
|
em.name AS entry_module_name,
|
|
g.sort_order,
|
|
g.is_enabled,
|
|
g.created_at,
|
|
g.updated_at,
|
|
COALESCE(bg.binding_count, 0) AS rule_count
|
|
FROM leaudit_evaluation_point_groups g
|
|
LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id
|
|
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = g.pid AND parent.deleted_at IS NULL
|
|
LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(g.entry_module_id, parent.entry_module_id, dt.entry_module_id)
|
|
LEFT JOIN (
|
|
SELECT group_id, COUNT(*)::int AS binding_count
|
|
FROM leaudit_rule_group_bindings
|
|
WHERE deleted_at IS NULL
|
|
GROUP BY group_id
|
|
) bg ON bg.group_id = g.id
|
|
WHERE g.id = :group_id AND g.deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"group_id": group_id},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则分组不存在")
|
|
return row
|
|
|
|
async def _get_binding_row(self, session, binding_id: int):
|
|
row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id, group_id, rule_set_id, rule_type_binding_id, priority, is_active, note, created_at, updated_at
|
|
FROM leaudit_rule_group_bindings
|
|
WHERE id = :binding_id AND deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"binding_id": binding_id},
|
|
)
|
|
).mappings().first()
|
|
if not row:
|
|
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则组绑定不存在")
|
|
return row
|
|
|
|
async def _load_binding_map(self, session, group_ids: list[int]) -> dict[int, list[RuleGroupBindingVO]]:
|
|
group_ids = [int(item) for item in group_ids if item]
|
|
if not group_ids:
|
|
return {}
|
|
rows = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id, group_id, rule_set_id, rule_type_binding_id, priority, is_active, note, created_at, updated_at
|
|
FROM leaudit_rule_group_bindings
|
|
WHERE group_id IN :group_ids AND deleted_at IS NULL
|
|
ORDER BY priority DESC, id ASC
|
|
"""
|
|
).bindparams(bindparam("group_ids", expanding=True)),
|
|
{"group_ids": group_ids},
|
|
)
|
|
).mappings().all()
|
|
result: dict[int, list[RuleGroupBindingVO]] = {}
|
|
for row in rows:
|
|
result.setdefault(int(row["group_id"]), []).append(await self._build_binding_vo(row))
|
|
return result
|
|
|
|
async def _build_binding_vo(self, row) -> RuleGroupBindingVO:
|
|
rule_set_meta = await self._get_rule_set_meta(int(row["rule_set_id"]))
|
|
return RuleGroupBindingVO(
|
|
id=int(row["id"]),
|
|
group_id=int(row["group_id"]),
|
|
rule_set_id=int(row["rule_set_id"]),
|
|
rule_type_binding_id=int(row["rule_type_binding_id"]) if row.get("rule_type_binding_id") else None,
|
|
priority=int(row.get("priority") or 0),
|
|
is_active=bool(row.get("is_active", True)),
|
|
note=row.get("note"),
|
|
rule_type=rule_set_meta.get("rule_type"),
|
|
rule_name=rule_set_meta.get("rule_name"),
|
|
current_version_id=rule_set_meta.get("current_version_id"),
|
|
fallback_version_id=rule_set_meta.get("fallback_version_id"),
|
|
has_usable_version=bool(rule_set_meta.get("has_usable_version", False)),
|
|
usable_rule_count=int(rule_set_meta.get("usable_rule_count") or 0),
|
|
)
|
|
|
|
async def _get_rule_set_meta(self, rule_set_id: int) -> dict[str, Any]:
|
|
if not getattr(self, "_rule_set_meta_cache", None):
|
|
rule_sets = await self.RuleService.ListSets()
|
|
self._rule_set_meta_cache = {
|
|
int(item.id): {
|
|
"rule_type": item.ruleType,
|
|
"rule_name": item.ruleName,
|
|
"current_version_id": item.currentVersionId,
|
|
"fallback_version_id": item.fallbackVersionId,
|
|
"has_usable_version": item.hasUsableVersion,
|
|
"usable_rule_count": item.usableRuleCount,
|
|
}
|
|
for item in rule_sets
|
|
}
|
|
return self._rule_set_meta_cache.get(rule_set_id, {})
|
|
|
|
async def _ensure_parent_valid(self, session, pid: int):
|
|
if pid == 0:
|
|
return None
|
|
parent = await self._get_group_row(session, pid)
|
|
if self._normalize_pid(parent["pid"]) != 0:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "上级分组必须是一级分组")
|
|
return parent
|
|
|
|
async def _ensure_entry_module_valid(
|
|
self,
|
|
session,
|
|
pid: int,
|
|
entry_module_id: int | None,
|
|
parent: Any | None,
|
|
) -> int | None:
|
|
if pid != 0:
|
|
if parent is not None and parent.get("entry_module_id") is not None:
|
|
return int(parent["entry_module_id"])
|
|
return None if entry_module_id in (None, 0, "0", "") else int(entry_module_id)
|
|
|
|
if entry_module_id in (None, 0, "0", ""):
|
|
return None
|
|
|
|
exists = (
|
|
await session.execute(
|
|
text("SELECT id FROM leaudit_entry_modules WHERE id = :entry_module_id AND deleted_at IS NULL LIMIT 1"),
|
|
{"entry_module_id": int(entry_module_id)},
|
|
)
|
|
).scalar_one_or_none()
|
|
if exists is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "关联入口模块不存在")
|
|
return int(entry_module_id)
|
|
|
|
async def _ensure_document_type_valid(
|
|
self,
|
|
session,
|
|
pid: int,
|
|
document_type_id: int | None,
|
|
entry_module_id: int | None,
|
|
group_id: int | None,
|
|
parent: Any | None,
|
|
) -> int | None:
|
|
if pid == 0:
|
|
if document_type_id is None and entry_module_id is None:
|
|
return None
|
|
else:
|
|
if parent is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组必须挂到一级分组下")
|
|
if parent.get("document_type_id") is not None:
|
|
parent_doc_type_id = int(parent["document_type_id"])
|
|
if document_type_id is None:
|
|
document_type_id = parent_doc_type_id
|
|
elif int(document_type_id) != parent_doc_type_id:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组的文档类型必须与所属一级分组一致")
|
|
elif document_type_id is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "二级分组必须明确绑定具体文档类型")
|
|
|
|
if document_type_id is None:
|
|
return None
|
|
exists = (
|
|
await session.execute(
|
|
text(
|
|
"SELECT id FROM leaudit_document_types WHERE id = :doc_type_id AND deleted_at IS NULL LIMIT 1"
|
|
),
|
|
{"doc_type_id": int(document_type_id)},
|
|
)
|
|
).scalar_one_or_none()
|
|
if exists is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "关联文档类型不存在")
|
|
if pid == 0:
|
|
duplicated_root = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id
|
|
FROM leaudit_evaluation_point_groups
|
|
WHERE deleted_at IS NULL
|
|
AND COALESCE(pid, 0) = 0
|
|
AND document_type_id = :doc_type_id
|
|
AND (:group_id IS NULL OR id <> :group_id)
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"doc_type_id": int(document_type_id), "group_id": group_id},
|
|
)
|
|
).scalar_one_or_none()
|
|
if duplicated_root is not None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "该文档类型已存在一级分组")
|
|
return int(document_type_id)
|
|
|
|
async def _ensure_code_unique(self, session, code: str, group_id: int | None) -> None:
|
|
sql = "SELECT id FROM leaudit_evaluation_point_groups WHERE LOWER(code) = LOWER(:code) AND deleted_at IS NULL"
|
|
params: dict[str, Any] = {"code": code}
|
|
if group_id is not None:
|
|
sql += " AND id <> :group_id"
|
|
params["group_id"] = group_id
|
|
duplicated = (await session.execute(text(sql + " LIMIT 1"), params)).scalar_one_or_none()
|
|
if duplicated is not None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码已存在")
|
|
|
|
async def _ensure_rule_set_valid(self, session, rule_set_id: int) -> None:
|
|
exists = (
|
|
await session.execute(
|
|
text("SELECT id FROM leaudit_rule_sets WHERE id = :rule_set_id AND deleted_at IS NULL LIMIT 1"),
|
|
{"rule_set_id": rule_set_id},
|
|
)
|
|
).scalar_one_or_none()
|
|
if exists is None:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "规则集不存在")
|
|
|
|
def _normalize_create_payload(self, body: EvaluationPointGroupCreateDTO) -> dict[str, Any]:
|
|
name = body.name.strip()
|
|
code = body.code.strip()
|
|
if not name:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组名称不能为空")
|
|
if not code:
|
|
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "分组编码不能为空")
|
|
return {
|
|
"name": name,
|
|
"code": code,
|
|
"pid": self._normalize_pid(body.pid),
|
|
"description": body.description.strip() if body.description else None,
|
|
"document_type_id": body.document_type_id,
|
|
"entry_module_id": body.entry_module_id,
|
|
"sort_order": int(body.sort_order or 0),
|
|
"is_enabled": body.is_enabled,
|
|
}
|
|
|
|
def _to_group_vo(self, row, bindings: list[RuleGroupBindingVO], include_rule_count: bool) -> EvaluationPointGroupVO:
|
|
return EvaluationPointGroupVO(
|
|
id=int(row["id"]),
|
|
pid=self._normalize_pid(row.get("pid")),
|
|
name=str(row.get("name") or ""),
|
|
code=str(row.get("code") or ""),
|
|
description=row.get("description"),
|
|
document_type_id=int(row["document_type_id"]) if row.get("document_type_id") is not None else None,
|
|
document_type_name=row.get("document_type_name"),
|
|
entry_module_id=int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None,
|
|
entry_module_name=row.get("entry_module_name"),
|
|
sort_order=int(row.get("sort_order") or 0),
|
|
is_enabled=bool(row.get("is_enabled", True)),
|
|
created_at=self._to_iso(row.get("created_at")),
|
|
updated_at=self._to_iso(row.get("updated_at")),
|
|
rule_count=len(bindings) if include_rule_count else None,
|
|
bindings=bindings,
|
|
children=None,
|
|
)
|
|
|
|
def _normalize_pid(self, value: Any) -> int:
|
|
if value in (None, "", "0", 0):
|
|
return 0
|
|
return int(value)
|
|
|
|
def _to_iso(self, value: Any) -> str | None:
|
|
if value is None:
|
|
return None
|
|
if isinstance(value, datetime):
|
|
return value.isoformat()
|
|
return str(value)
|