feat: migrate rule bindings to group-based flow

This commit is contained in:
wren
2026-05-07 17:43:20 +08:00
parent 75c2111209
commit f8eb2dc817
8 changed files with 871 additions and 361 deletions
@@ -44,6 +44,67 @@ def _normalize_speed(speed: str | None) -> str:
class AuditServiceImpl(IAuditService):
"""评查服务实现。"""
async def _resolve_rule_binding_from_group(self, session, group_id: int | None) -> dict | None:
"""按二级分组解析正式规则绑定。"""
if not group_id:
return None
result = await session.execute(
text(
"""
SELECT
rs.id AS rule_set_id,
COALESCE(rs.current_version_id, fallback_rv.id) AS rule_version_id,
COALESCE(current_rv.oss_url, fallback_rv.oss_url) AS rule_source_oss_url,
COALESCE(current_rv.file_sha256, fallback_rv.file_sha256) AS rule_source_sha256,
COALESCE(current_rv.metadata_type_id, fallback_rv.metadata_type_id) AS rule_type_id
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
LEFT JOIN leaudit_rule_versions current_rv ON current_rv.id = rs.current_version_id
LEFT JOIN LATERAL (
SELECT
rv.id,
rv.oss_url,
rv.file_sha256,
rv.metadata_type_id
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = rs.id
AND rv.status IN ('published', 'rollback')
ORDER BY rv.version_seq DESC, rv.id DESC
LIMIT 1
) fallback_rv ON TRUE
WHERE rgb.group_id = :group_id
AND rgb.is_active = TRUE
AND rgb.deleted_at IS NULL
ORDER BY rgb.priority DESC, rgb.id ASC
LIMIT 1
"""
),
{"group_id": int(group_id)},
)
return result.mappings().first()
async def _resolve_unique_group_binding_by_doc_type(self, session, doc_type_id: int | None) -> dict | None:
"""当文档尚未落 group_id 时,按文档类型唯一子组兜底解析正式绑定。"""
if not doc_type_id:
return None
group_row = (
await session.execute(
text(
"""
SELECT CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS group_id
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND deleted_at IS NULL
AND is_enabled = TRUE
AND COALESCE(pid, 0) <> 0
"""
),
{"doc_type_id": int(doc_type_id)},
)
).mappings().first()
resolved_group_id = int(group_row["group_id"]) if group_row and group_row.get("group_id") is not None else None
return await self._resolve_rule_binding_from_group(session, resolved_group_id)
async def Run(
self,
DocumentId: int,
@@ -125,44 +186,14 @@ class AuditServiceImpl(IAuditService):
)
latestRunNo = runNoResult.scalar_one_or_none() or 0
binding = None
if getattr(document, "groupId", None):
groupBindingResult = await session.execute(
text(
"""
SELECT
rs.id AS rule_set_id,
COALESCE(rs.current_version_id, fallback_rv.id) AS rule_version_id,
COALESCE(current_rv.oss_url, fallback_rv.oss_url) AS rule_source_oss_url,
COALESCE(current_rv.file_sha256, fallback_rv.file_sha256) AS rule_source_sha256,
COALESCE(current_rv.metadata_type_id, fallback_rv.metadata_type_id) AS rule_type_id
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
LEFT JOIN leaudit_rule_versions current_rv ON current_rv.id = rs.current_version_id
LEFT JOIN LATERAL (
SELECT
rv.id,
rv.oss_url,
rv.file_sha256,
rv.metadata_type_id
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = rs.id
AND rv.status IN ('published', 'rollback')
ORDER BY rv.version_seq DESC, rv.id DESC
LIMIT 1
) fallback_rv ON TRUE
WHERE rgb.group_id = :group_id
AND rgb.is_active = TRUE
AND rgb.deleted_at IS NULL
ORDER BY rgb.priority DESC, rgb.id ASC
LIMIT 1
"""
),
{"group_id": int(document.groupId)},
)
binding = groupBindingResult.mappings().first()
binding = await self._resolve_rule_binding_from_group(session, getattr(document, "groupId", None))
if binding is None:
binding = await self._resolve_unique_group_binding_by_doc_type(session, getattr(document, "typeId", None))
if binding and getattr(document, "groupId", None) is None:
logger.info("文档未显式记录 group_id,已按文档类型唯一子组解析正式规则绑定")
if not binding or not binding["rule_set_id"] or not binding["rule_version_id"]:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型未绑定可执行规则集,请先检查二级分组规则配置")
if getattr(document, "groupId", None):
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前子类型未绑定可执行规则集,请先检查二级分组规则配置")
if binding is None:
bindingResult = await session.execute(
@@ -1128,7 +1128,6 @@ class DocumentServiceImpl(IDocumentService):
)
).scalar_one()
await self._syncRuleBindings(Session, int(row), Body.ruleSetIds, "default")
await sync_group_bindings_from_doc_type(Session, int(row), Body.ruleSetIds)
await Session.commit()
return await self.GetDocumentType(int(row))
@@ -1171,7 +1170,6 @@ class DocumentServiceImpl(IDocumentService):
if "ruleSetIds" in providedFields and Body.ruleSetIds is not None:
await self._syncRuleBindings(Session, Id, Body.ruleSetIds, "default")
await sync_group_bindings_from_doc_type(Session, Id, Body.ruleSetIds)
await Session.commit()
return await self.GetDocumentType(Id)
@@ -1334,17 +1332,57 @@ class DocumentServiceImpl(IDocumentService):
await Session.execute(
text(
"""
SELECT doc_type_id, rule_set_id
FROM leaudit_rule_type_bindings
WHERE doc_type_id = ANY(:ids) AND deleted_at IS NULL AND is_active = true
ORDER BY priority DESC
SELECT
child.document_type_id AS doc_type_id,
rgb.rule_set_id,
child.sort_order AS child_sort_order,
rgb.priority,
rgb.id
FROM leaudit_evaluation_point_groups child
JOIN leaudit_rule_group_bindings rgb
ON rgb.group_id = child.id
AND rgb.deleted_at IS NULL
AND rgb.is_active = TRUE
WHERE child.document_type_id = ANY(:ids)
AND child.deleted_at IS NULL
AND COALESCE(child.pid, 0) <> 0
ORDER BY
child.document_type_id ASC,
COALESCE(child.sort_order, 0) ASC,
COALESCE(rgb.priority, 0) DESC,
rgb.id ASC
"""
),
{"ids": allIds},
)
).fetchall()
for b in bindingRows:
bindingsMap.setdefault(int(b[0]), []).append(int(b[1]))
for docTypeId, ruleSetId, *_ in bindingRows:
current = bindingsMap.setdefault(int(docTypeId), [])
normalizedRuleSetId = int(ruleSetId)
if normalizedRuleSetId not in current:
current.append(normalizedRuleSetId)
# 兼容历史数据:若某些文档类型尚未完成分组绑定迁移,再回退读取旧绑定表。
missingDocTypeIds = [docTypeId for docTypeId, ruleSetIds in bindingsMap.items() if len(ruleSetIds) == 0]
if missingDocTypeIds:
legacyBindingRows = (
await Session.execute(
text(
"""
SELECT doc_type_id, rule_set_id
FROM leaudit_rule_type_bindings
WHERE doc_type_id = ANY(:ids) AND deleted_at IS NULL AND is_active = true
ORDER BY priority DESC
"""
),
{"ids": missingDocTypeIds},
)
).fetchall()
for docTypeId, ruleSetId in legacyBindingRows:
current = bindingsMap.setdefault(int(docTypeId), [])
normalizedRuleSetId = int(ruleSetId)
if normalizedRuleSetId not in current:
current.append(normalizedRuleSetId)
return rows, bindingsMap
async def _queryDocumentTypeRoots(self, Session, Ids: list[int] | None = None, EntryModuleId: int | None = None):
@@ -2564,21 +2602,8 @@ class DocumentServiceImpl(IDocumentService):
return str(Row.get("rule_name") or Row.get("rule_id") or "")
async def _syncRuleBindings(self, Session, DocTypeId: int, RuleSetIds: list[int], Region: str = "default") -> None:
"""全量替换规则绑定。"""
await Session.execute(
text("UPDATE leaudit_rule_type_bindings SET deleted_at = NOW() WHERE doc_type_id = :id AND deleted_at IS NULL"),
{"id": DocTypeId},
)
for idx, ruleSetId in enumerate(RuleSetIds):
await Session.execute(
text(
"""
INSERT INTO leaudit_rule_type_bindings (doc_type_id, rule_set_id, binding_mode, priority, region, is_active, created_at, updated_at)
VALUES (:doc_type_id, :rule_set_id, 'explicit', :priority, :region, true, NOW(), NOW())
"""
),
{"doc_type_id": DocTypeId, "rule_set_id": ruleSetId, "priority": 100 - idx, "region": Region},
)
"""全量替换规则绑定,仅写入新分组绑定"""
await sync_group_bindings_from_doc_type(Session, DocTypeId, RuleSetIds)
async def _find_latest_version_candidate(
@@ -2,12 +2,22 @@
from __future__ import annotations
import asyncio
from collections import defaultdict
import re
from typing import Any
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from sqlalchemy import text
from fastapi_modules.fastapi_leaudit.domian.vo.ruleConfigVo import RuleConfigPackVO
from fastapi_modules.fastapi_leaudit.domian.vo.ruleConfigVo import (
RuleConfigPackListVO,
RuleConfigPackRuleSummaryVO,
RuleConfigPackVO,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator
from fastapi_modules.fastapi_leaudit.services import IOssService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl
@@ -20,41 +30,99 @@ class RuleConfigServiceImpl(IRuleConfigService):
def __init__(self, OssService: IOssService | None = None) -> None:
self.OssService = OssService or OssServiceImpl()
self.RuleService = RuleServiceImpl(self.OssService)
self.Validator = RuleValidator()
async def ListPacks(self) -> list[RuleConfigPackVO]:
"""列出规则配置页所需的全部 pack。"""
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT
child.id AS group_id,
child.name AS subtype,
COALESCE(child.document_type_id, root.document_type_id) AS document_type_id,
dt.name AS document_type_name,
root.id AS root_group_id,
COALESCE(root.name, child.name) AS main_type,
em.name AS entry_module_name
FROM leaudit_evaluation_point_groups child
LEFT JOIN leaudit_evaluation_point_groups root
ON root.id = child.pid
AND root.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt
ON dt.id = COALESCE(child.document_type_id, root.document_type_id)
LEFT JOIN leaudit_entry_modules em
ON em.id = COALESCE(child.entry_module_id, root.entry_module_id, dt.entry_module_id)
WHERE child.deleted_at IS NULL
AND COALESCE(child.pid, 0) <> 0
ORDER BY COALESCE(root.sort_order, 0) ASC, root.id ASC, child.sort_order ASC, child.id ASC
"""
)
)
).mappings().all()
rows = await self._load_pack_rows()
rule_set_map = await self._load_rule_set_meta_map()
return [await self._build_pack_vo(row, rule_set_map) for row in rows]
async def ListPackSummaries(self) -> list[RuleConfigPackListVO]:
"""列出规则列表页所需的轻量 pack。"""
rows = await self._load_pack_rows()
if not rows:
return []
group_ids = [int(row["group_id"]) for row in rows]
binding_map = await self._load_effective_binding_map(group_ids)
rule_set_ids = sorted({int(item["rule_set_id"]) for item in binding_map.values() if item.get("rule_set_id") is not None})
rule_set_map = await self._load_rule_set_meta_map_by_ids(rule_set_ids)
latest_version_map = await self._load_latest_version_map(rule_set_ids)
base_items: list[dict[str, Any]] = []
resolved_version_ids: set[int] = set()
for row in rows:
group_id = int(row["group_id"])
binding = binding_map.get(group_id)
document_type = str(row["document_type_name"] or "").strip()
main_type = str(row["main_type"] or "").strip()
subtype = str(row["subtype"] or "").strip() or "通用"
module_type = str(row["entry_module_name"] or "").strip() or (f"{document_type}评查" if document_type else "规则配置")
binding_id: int | None = None
rule_set_id: int | None = None
rule_type: str | None = None
rule_name: str | None = None
current_version_id: int | None = None
fallback_version_id: int | None = None
resolved_version_id: int | None = None
has_usable_version = False
usable_rule_count = 0
if binding:
binding_id = int(binding["id"])
rule_set_id = int(binding["rule_set_id"])
rule_set_meta = rule_set_map.get(rule_set_id, {})
rule_type = str(rule_set_meta.get("rule_type") or "") or None
rule_name = str(rule_set_meta.get("rule_name") or "") or None
current_version_id = self._to_int(rule_set_meta.get("current_version_id"))
fallback_version_id = self._to_int(rule_set_meta.get("fallback_version_id"))
has_usable_version = bool(rule_set_meta.get("has_usable_version"))
resolved_version_id = current_version_id or fallback_version_id or latest_version_map.get(rule_set_id)
if resolved_version_id is not None:
resolved_version_ids.add(resolved_version_id)
base_items.append({
"packId": group_id,
"groupId": group_id,
"rootGroupId": self._to_int(row.get("root_group_id")),
"bindingId": binding_id,
"ruleSetId": rule_set_id,
"ruleType": rule_type,
"ruleName": rule_name,
"currentVersionId": current_version_id,
"fallbackVersionId": fallback_version_id,
"resolvedVersionId": resolved_version_id,
"hasUsableVersion": has_usable_version,
"usableRuleCount": usable_rule_count,
"documentTypeId": self._to_int(row.get("document_type_id")),
"documentType": document_type,
"moduleType": module_type,
"mainType": main_type or document_type,
"subtype": subtype,
})
version_oss_map = await self._load_version_oss_map(sorted(resolved_version_ids))
yaml_summary_map = await self._load_yaml_summaries(version_oss_map)
packs: list[RuleConfigPackListVO] = []
for item in base_items:
summary = yaml_summary_map.get(item["resolvedVersionId"]) if item.get("resolvedVersionId") else None
rules = summary["rules"] if summary else []
source_status = "empty"
yaml_name = item.get("ruleName") or ""
if item.get("resolvedVersionId") is not None:
source_status = "ready" if summary and summary.get("loaded") else "missing"
yaml_name = str(summary.get("yaml_name") or yaml_name or "")
item["sourceStatus"] = source_status
item["yamlName"] = yaml_name
item["rules"] = [RuleConfigPackRuleSummaryVO(**rule) for rule in rules]
item["usableRuleCount"] = len(rules)
packs.append(RuleConfigPackListVO(**item))
return packs
async def GetPack(self, PackId: int) -> RuleConfigPackVO:
"""获取单个规则配置 pack。"""
async with GetAsyncSession() as session:
@@ -128,8 +196,6 @@ class RuleConfigServiceImpl(IRuleConfigService):
has_usable_version = bool(rule_set_meta.get("has_usable_version"))
usable_rule_count = int(rule_set_meta.get("usable_rule_count") or 0)
if resolved_version_id is None:
# 仅在当前没有生效版本时,才回退到最新版本(通常是草稿)。
# 否则规则详情页必须与当前生效版本保持一致,发布/回滚后才能看到真实内容切换。
resolved_version_id = await self._load_latest_version_id(rule_set_id)
if resolved_version_id is not None:
yaml_text = await self._load_yaml_text_by_version_id(resolved_version_id)
@@ -157,6 +223,36 @@ class RuleConfigServiceImpl(IRuleConfigService):
sourceStatus=source_status,
)
async def _load_pack_rows(self):
async with GetAsyncSession() as session:
return (
await session.execute(
text(
"""
SELECT
child.id AS group_id,
child.name AS subtype,
COALESCE(child.document_type_id, root.document_type_id) AS document_type_id,
dt.name AS document_type_name,
root.id AS root_group_id,
COALESCE(root.name, child.name) AS main_type,
em.name AS entry_module_name
FROM leaudit_evaluation_point_groups child
LEFT JOIN leaudit_evaluation_point_groups root
ON root.id = child.pid
AND root.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt
ON dt.id = COALESCE(child.document_type_id, root.document_type_id)
LEFT JOIN leaudit_entry_modules em
ON em.id = COALESCE(child.entry_module_id, root.entry_module_id, dt.entry_module_id)
WHERE child.deleted_at IS NULL
AND COALESCE(child.pid, 0) <> 0
ORDER BY COALESCE(root.sort_order, 0) ASC, root.id ASC, child.sort_order ASC, child.id ASC
"""
)
)
).mappings().all()
async def _load_effective_binding(self, group_id: int):
"""读取当前二级分组实际生效的规则集绑定。"""
async with GetAsyncSession() as session:
@@ -178,8 +274,35 @@ class RuleConfigServiceImpl(IRuleConfigService):
).mappings().first()
return row
async def _load_effective_binding_map(self, group_ids: list[int]) -> dict[int, dict[str, Any]]:
if not group_ids:
return {}
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT id, group_id, rule_set_id
FROM (
SELECT
id,
group_id,
rule_set_id,
ROW_NUMBER() OVER (PARTITION BY group_id ORDER BY priority DESC, id ASC) AS rn
FROM leaudit_rule_group_bindings
WHERE deleted_at IS NULL
AND is_active = TRUE
AND group_id = ANY(:group_ids)
) t
WHERE rn = 1
"""
),
{"group_ids": group_ids},
)
).mappings().all()
return {int(row["group_id"]): dict(row) for row in rows}
async def _load_rule_set_meta_map(self) -> dict[int, dict[str, object]]:
"""批量读取规则集元数据。"""
items = await self.RuleService.ListSets()
return {
item.id: {
@@ -193,8 +316,94 @@ class RuleConfigServiceImpl(IRuleConfigService):
for item in items
}
async def _load_rule_set_meta_map_by_ids(self, rule_set_ids: list[int]) -> dict[int, dict[str, object]]:
if not rule_set_ids:
return {}
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT
rs.id,
rs.rule_type,
rs.rule_name,
rs.current_version_id,
current_rv.id AS usable_current_version_id,
fallback_rv.id AS fallback_version_id,
CASE
WHEN current_rv.id IS NOT NULL OR fallback_rv.id IS NOT NULL THEN TRUE
ELSE FALSE
END AS has_usable_version
FROM leaudit_rule_sets rs
LEFT JOIN leaudit_rule_versions current_rv
ON current_rv.id = rs.current_version_id
AND current_rv.status = 'published'
LEFT JOIN LATERAL (
SELECT rv.id
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = rs.id
AND rv.status = 'published'
AND (rs.current_version_id IS NULL OR rv.id <> rs.current_version_id)
ORDER BY rv.version_seq DESC, rv.id DESC
LIMIT 1
) fallback_rv ON TRUE
WHERE rs.deleted_at IS NULL
AND rs.id = ANY(:rule_set_ids)
"""
),
{"rule_set_ids": rule_set_ids},
)
).mappings().all()
return {
int(row["id"]): {
"rule_type": row["rule_type"],
"rule_name": row["rule_name"],
"current_version_id": row["current_version_id"],
"fallback_version_id": row["fallback_version_id"],
"has_usable_version": row["has_usable_version"],
}
for row in rows
}
async def _load_version_oss_map(self, version_ids: list[int]) -> dict[int, str]:
if not version_ids:
return {}
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT id, oss_url
FROM leaudit_rule_versions
WHERE id = ANY(:version_ids)
"""
),
{"version_ids": version_ids},
)
).mappings().all()
return {int(row["id"]): str(row["oss_url"] or "") for row in rows if row.get("oss_url")}
async def _load_latest_version_map(self, rule_set_ids: list[int]) -> dict[int, int]:
if not rule_set_ids:
return {}
async with GetAsyncSession() as session:
rows = (
await session.execute(
text(
"""
SELECT DISTINCT ON (rule_set_id) rule_set_id, id
FROM leaudit_rule_versions
WHERE rule_set_id = ANY(:rule_set_ids)
ORDER BY rule_set_id, version_seq DESC, id DESC
"""
),
{"rule_set_ids": rule_set_ids},
)
).mappings().all()
return {int(row["rule_set_id"]): int(row["id"]) for row in rows}
async def _load_yaml_text_by_version_id(self, version_id: int) -> str:
"""按版本ID读取 YAML 正文。"""
async with GetAsyncSession() as session:
row = (
await session.execute(
@@ -218,8 +427,131 @@ class RuleConfigServiceImpl(IRuleConfigService):
except Exception:
return ""
async def _load_yaml_summaries(self, version_oss_map: dict[int, str]) -> dict[int, dict[str, Any]]:
def _extract_rule_dependencies(rule: Any) -> list[str]:
dependencies: list[str] = []
for item in list(getattr(rule, "dependencies", []) or []):
value = str(item or "").strip()
if value:
dependencies.append(value)
for stage in list(getattr(rule, "stages", []) or []):
field = str(getattr(stage, "field", "") or "").strip()
if field:
dependencies.append(field)
for attr_name in ("seal_id", "signature_id", "element"):
attr_value = str(getattr(stage, attr_name, "") or "").strip()
if attr_value:
dependencies.append(attr_value)
for item in list(getattr(stage, "fields", []) or []):
value = str(item or "").strip()
if value:
dependencies.append(value)
prompt = str(getattr(stage, "prompt", "") or "")
if prompt:
dependencies.extend(
match.strip()
for match in re.findall(r"\{\{\s*([^}]+?)\s*\}\}", prompt)
if match.strip()
)
deduped: list[str] = []
seen: set[str] = set()
for item in dependencies:
if item in seen:
continue
seen.add(item)
deduped.append(item)
return deduped
async def _load_one(version_id: int, oss_url: str):
if not oss_url:
return version_id, {"loaded": False, "yaml_name": "", "rules": []}
try:
yaml_text = (await self.OssService.DownloadBytes(oss_url)).decode("utf-8")
if not yaml_text.strip():
return version_id, {"loaded": False, "yaml_name": "", "rules": []}
rules_file = self.Validator.ParseValidated(yaml_text)
groups: dict[str, str] = {}
try:
for group in getattr(rules_file, "rules", []) or []:
group_name = getattr(group, "group", "") or ""
for rule in getattr(group, "rules", []) or []:
groups[getattr(rule, "rule_id", "") or ""] = group_name
except Exception:
groups = {}
summaries = []
for rule in getattr(rules_file, "flat_rules", []) or []:
stage_items = []
check_types = []
for idx, stage in enumerate(getattr(rule, "stages", []) or [], start=1):
check = str(getattr(stage, "check", "") or getattr(stage, "type", "") or "")
check_types.append(check)
content = check
if check == "ai":
content = str(getattr(stage, "prompt", "") or "")
elif hasattr(stage, "fields") and getattr(stage, "fields"):
content = "".join(str(item) for item in getattr(stage, "fields")[:3])
stage_items.append({"id": str(idx), "check": check, "content": content})
summaries.append({
"id": getattr(rule, "rule_id", "") or getattr(rule, "name", "") or "-",
"ruleId": getattr(rule, "rule_id", "") or "-",
"name": getattr(rule, "name", "") or getattr(rule, "rule_id", "") or "未命名规则",
"group": groups.get(getattr(rule, "rule_id", "") or "", getattr(rule, "group", "") or "未分组"),
"risk": str(getattr(rule, "risk", "medium") or "medium"),
"score": str(getattr(rule, "score", "0") or "0"),
"type": str(getattr(rule, "type", "deterministic") or "deterministic"),
"checkTypes": [item for item in check_types if item],
"logic": str(getattr(rule, "logic", "") or ""),
"subRules": stage_items,
"subRuleIds": list(getattr(rule, "rules", []) or []),
"scope": list(getattr(rule, "scope", []) or []),
"dependencies": _extract_rule_dependencies(rule),
"stageCount": len(stage_items),
"appliesIn": list(getattr(rule, "applies_in", []) or []),
"prompt": stage_items[0]["content"] if len(stage_items) == 1 and stage_items[0]["check"] == "ai" else "",
"description": str(getattr(rule, "desc", "") or ""),
})
summary_map = {
str(item.get("ruleId") or item.get("id") or ""): item
for item in summaries
if str(item.get("ruleId") or item.get("id") or "")
}
for item in summaries:
if item.get("dependencies"):
continue
sub_rule_ids = [str(sub_rule_id or "").strip() for sub_rule_id in list(item.get("subRuleIds") or []) if str(sub_rule_id or "").strip()]
if not sub_rule_ids:
continue
merged_dependencies: list[str] = []
seen_dependencies: set[str] = set()
for sub_rule_id in sub_rule_ids:
child_summary = summary_map.get(sub_rule_id)
if not child_summary:
continue
for dependency in list(child_summary.get("dependencies") or []):
normalized_dependency = str(dependency or "").strip()
if not normalized_dependency or normalized_dependency in seen_dependencies:
continue
seen_dependencies.add(normalized_dependency)
merged_dependencies.append(normalized_dependency)
item["dependencies"] = merged_dependencies
return version_id, {
"loaded": True,
"yaml_name": str(getattr(getattr(rules_file, "metadata", None), "name", "") or ""),
"rules": summaries,
}
except Exception:
return version_id, {"loaded": False, "yaml_name": "", "rules": []}
results = await asyncio.gather(*[_load_one(version_id, oss_url) for version_id, oss_url in version_oss_map.items()])
return dict(results)
async def _load_latest_version_id(self, rule_set_id: int) -> int | None:
"""在没有可用发布版本时,退回读取最新草稿版本。"""
async with GetAsyncSession() as session:
row = (
await session.execute(
@@ -250,7 +250,7 @@ async def sync_group_bindings_from_doc_type(session, doc_type_id: int, rule_set_
async def sync_doc_type_bindings_from_group(session, group_id: int) -> int | None:
"""Mirror one doc type's active child-group bindings into the runtime binding table."""
"""兼容空实现:新链路已直接读取分组绑定,不再反向写旧文档类型绑定表。"""
await ensure_rule_group_schema(session)
group_row = (
await session.execute(
@@ -268,94 +268,7 @@ async def sync_doc_type_bindings_from_group(session, group_id: int) -> int | Non
).mappings().first()
if not group_row or group_row.get("document_type_id") is None:
return None
doc_type_id = int(group_row["document_type_id"])
doc_type_code = str(group_row.get("document_type_code") or "") or None
binding_rows = (
await session.execute(
text(
"""
SELECT
rgb.id,
rgb.group_id,
rgb.rule_set_id,
rgb.priority,
rgb.is_active,
rgb.note,
g.sort_order AS group_sort_order
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups g ON g.id = rgb.group_id
WHERE g.document_type_id = :doc_type_id
AND g.deleted_at IS NULL
AND COALESCE(g.pid, 0) <> 0
AND rgb.deleted_at IS NULL
AND rgb.is_active = TRUE
ORDER BY
COALESCE(g.sort_order, 0) ASC,
COALESCE(rgb.priority, 0) DESC,
rgb.id ASC
"""
),
{"doc_type_id": doc_type_id},
)
).mappings().all()
deduped_rows: list[dict[str, Any]] = []
seen_rule_set_ids: set[int] = set()
for row in binding_rows:
rule_set_id = int(row["rule_set_id"])
if rule_set_id in seen_rule_set_ids:
continue
seen_rule_set_ids.add(rule_set_id)
deduped_rows.append(dict(row))
await session.execute(
text(
"UPDATE leaudit_rule_type_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE doc_type_id = :doc_type_id AND deleted_at IS NULL"
),
{"doc_type_id": doc_type_id},
)
for index, row in enumerate(deduped_rows):
await session.execute(
text(
"""
INSERT INTO leaudit_rule_type_bindings (
doc_type_id,
doc_type_code,
rule_set_id,
binding_mode,
priority,
is_active,
note,
created_at,
updated_at,
region
) VALUES (
:doc_type_id,
:doc_type_code,
:rule_set_id,
'explicit',
:priority,
TRUE,
:note,
NOW(),
NOW(),
'default'
)
RETURNING id
"""
),
{
"doc_type_id": doc_type_id,
"doc_type_code": doc_type_code,
"rule_set_id": int(row["rule_set_id"]),
"priority": max(0, 1000 - index),
"note": row.get("note"),
},
)
return doc_type_id
return int(group_row["document_type_id"])
async def ensure_top_group(session, doc_type_row) -> int:
@@ -19,6 +19,7 @@ from fastapi_modules.fastapi_leaudit.domian.vo.ruleVo import (
from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator
from fastapi_modules.fastapi_leaudit.services import IOssService, IRuleService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_doc_type_bindings_from_group
class RuleServiceImpl(IRuleService):
@@ -32,6 +33,42 @@ class RuleServiceImpl(IRuleService):
self.OssService = OssService or OssServiceImpl()
self.Validator = Validator or RuleValidator()
async def _resolve_unique_child_group_id(self, Session, DocTypeId: int) -> int | None:
"""仅当文档类型唯一对应一个二级分组时,返回该分组ID。"""
row = (
await Session.execute(
text(
"""
SELECT CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS group_id
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND deleted_at IS NULL
AND COALESCE(pid, 0) <> 0
"""
),
{"doc_type_id": DocTypeId},
)
).mappings().first()
return int(row["group_id"]) if row and row.get("group_id") is not None else None
async def _count_child_groups(self, Session, DocTypeId: int) -> int:
"""统计文档类型下已启用的二级分组数量,用于明确报错。"""
row = (
await Session.execute(
text(
"""
SELECT COUNT(*) AS total
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND deleted_at IS NULL
AND COALESCE(pid, 0) <> 0
"""
),
{"doc_type_id": DocTypeId},
)
).mappings().first()
return int(row["total"] or 0) if row else 0
async def ListSets(self) -> list[RuleSetVO]:
"""列出所有规则集。"""
async with GetAsyncSession() as Session:
@@ -398,119 +435,116 @@ class RuleServiceImpl(IRuleService):
)
async def ListBindings(self, RuleType: str | None = None, Region: str | None = None) -> list[RuleBindingVO]:
"""列出规则类型绑定,可按规则类型/地区过滤"""
"""列出规则类型绑定,优先读取新分组绑定,旧表仅作为兼容兜底"""
region = Region or ""
async with GetAsyncSession() as Session:
if RuleType and region:
Result = await Session.execute(
text(
"""
SELECT
b.id,
b.doc_type_id,
b.doc_type_code,
b.rule_set_id,
b.binding_mode,
b.priority,
b.is_active,
b.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
WHERE rs.rule_type = :rule_type
AND rs.deleted_at IS NULL
AND b.region = :region
ORDER BY b.priority DESC, b.id DESC
"""
),
{"rule_type": RuleType, "region": region},
params: dict[str, object] = {}
filters = ["rs.deleted_at IS NULL", "dt.deleted_at IS NULL", "child.deleted_at IS NULL", "rgb.deleted_at IS NULL"]
if RuleType:
filters.append("rs.rule_type = :rule_type")
params["rule_type"] = RuleType
where_clause = " AND ".join(filters)
result = await Session.execute(
text(
f"""
SELECT
rgb.id,
dt.id AS doc_type_id,
dt.code AS doc_type_code,
rgb.rule_set_id,
'explicit' AS binding_mode,
rgb.priority,
rgb.is_active,
rgb.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
JOIN leaudit_document_types dt ON dt.id = child.document_type_id
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
WHERE {where_clause}
AND COALESCE(child.pid, 0) <> 0
ORDER BY dt.id ASC, COALESCE(child.sort_order, 0) ASC, rgb.priority DESC, rgb.id DESC
"""
),
params,
)
rows = result.mappings().all()
bindings: list[RuleBindingVO] = []
seen_doc_type_rule_pairs: set[tuple[int, int]] = set()
covered_doc_type_ids: set[int] = set()
for row in rows:
pair = (int(row["doc_type_id"]), int(row["rule_set_id"]))
if pair in seen_doc_type_rule_pairs:
continue
seen_doc_type_rule_pairs.add(pair)
covered_doc_type_ids.add(int(row["doc_type_id"]))
bindings.append(
RuleBindingVO(
id=int(row["id"]),
docTypeId=int(row["doc_type_id"]),
docTypeCode=row["doc_type_code"],
ruleSetId=int(row["rule_set_id"]),
ruleType=row["rule_type"],
ruleName=row["rule_name"],
bindingMode=row["binding_mode"],
priority=int(row["priority"]),
isActive=bool(row["is_active"]),
note=row["note"],
)
)
elif region:
Result = await Session.execute(
text(
"""
SELECT
b.id,
b.doc_type_id,
b.doc_type_code,
b.rule_set_id,
b.binding_mode,
b.priority,
b.is_active,
b.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
WHERE rs.deleted_at IS NULL
AND b.region = :region
ORDER BY rs.rule_type, b.priority DESC, b.id DESC
"""
),
{"region": region},
legacy_filters = ["rs.deleted_at IS NULL", "b.deleted_at IS NULL"]
legacy_params: dict[str, object] = {}
if RuleType:
legacy_filters.append("rs.rule_type = :rule_type")
legacy_params["rule_type"] = RuleType
if region:
legacy_filters.append("b.region = :region")
legacy_params["region"] = region
if covered_doc_type_ids:
legacy_filters.append("b.doc_type_id <> ALL(:covered_doc_type_ids)")
legacy_params["covered_doc_type_ids"] = list(covered_doc_type_ids)
legacy_where_clause = " AND ".join(legacy_filters)
legacy_result = await Session.execute(
text(
f"""
SELECT
b.id,
b.doc_type_id,
b.doc_type_code,
b.rule_set_id,
b.binding_mode,
b.priority,
b.is_active,
b.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
WHERE {legacy_where_clause}
ORDER BY rs.rule_type, b.priority DESC, b.id DESC
"""
),
legacy_params,
)
for row in legacy_result.mappings().all():
bindings.append(
RuleBindingVO(
id=int(row["id"]),
docTypeId=int(row["doc_type_id"]),
docTypeCode=row["doc_type_code"],
ruleSetId=int(row["rule_set_id"]),
ruleType=row["rule_type"],
ruleName=row["rule_name"],
bindingMode=row["binding_mode"],
priority=int(row["priority"]),
isActive=bool(row["is_active"]),
note=row["note"],
)
)
elif RuleType:
Result = await Session.execute(
text(
"""
SELECT
b.id,
b.doc_type_id,
b.doc_type_code,
b.rule_set_id,
b.binding_mode,
b.priority,
b.is_active,
b.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
WHERE rs.rule_type = :rule_type
AND rs.deleted_at IS NULL
ORDER BY b.priority DESC, b.id DESC
"""
),
{"rule_type": RuleType},
)
else:
Result = await Session.execute(
text(
"""
SELECT
b.id,
b.doc_type_id,
b.doc_type_code,
b.rule_set_id,
b.binding_mode,
b.priority,
b.is_active,
b.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_type_bindings b
JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id
WHERE rs.deleted_at IS NULL
ORDER BY rs.rule_type, b.priority DESC, b.id DESC
"""
),
)
return [
RuleBindingVO(
id=int(Row["id"]),
docTypeId=int(Row["doc_type_id"]),
docTypeCode=Row["doc_type_code"],
ruleSetId=int(Row["rule_set_id"]),
ruleType=Row["rule_type"],
ruleName=Row["rule_name"],
bindingMode=Row["binding_mode"],
priority=int(Row["priority"]),
isActive=bool(Row["is_active"]),
note=Row["note"],
)
for Row in Result.mappings().all()
]
return bindings
async def CreateBinding(
self,
@@ -522,7 +556,7 @@ class RuleServiceImpl(IRuleService):
DocTypeCode: str | None = None,
Note: str | None = None,
) -> RuleBindingVO:
"""创建规则类型绑定。"""
"""创建规则类型绑定;若文档类型唯一对应一个二级分组,则优先写入新分组绑定"""
async with GetAsyncSession() as Session:
RuleSet = await Session.execute(
text("SELECT id, rule_type, rule_name FROM leaudit_rule_sets WHERE id = :rid AND deleted_at IS NULL LIMIT 1"),
@@ -532,68 +566,79 @@ class RuleServiceImpl(IRuleService):
if not RsRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在")
Existing = await Session.execute(
text(
"""
SELECT id FROM leaudit_rule_type_bindings
WHERE doc_type_id = :dtid AND rule_set_id = :rsid
LIMIT 1
"""
),
{"dtid": DocTypeId, "rsid": RuleSetId},
)
if Existing.mappings().first():
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型已绑定此规则集")
GroupId = await self._resolve_unique_child_group_id(Session, DocTypeId)
if GroupId is not None:
ExistingGroupBinding = await Session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND rule_set_id = :rule_set_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"group_id": GroupId, "rule_set_id": RuleSetId},
)
if ExistingGroupBinding.mappings().first():
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型对应子组已绑定此规则集")
Result = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_type_bindings (
doc_type_id,
doc_type_code,
rule_set_id,
binding_mode,
priority,
is_active,
region,
note
) VALUES (
:doc_type_id,
:doc_type_code,
:rule_set_id,
:binding_mode,
:priority,
true,
:region,
:note
)
RETURNING id, doc_type_id, doc_type_code, rule_set_id,
binding_mode, priority, is_active, region, note
"""
),
{
"doc_type_id": DocTypeId,
"doc_type_code": DocTypeCode,
"rule_set_id": RuleSetId,
"binding_mode": BindingMode,
"priority": Priority,
"region": Region,
"note": Note,
},
)
await Session.commit()
Row = Result.mappings().first()
return RuleBindingVO(
id=int(Row["id"]),
docTypeId=int(Row["doc_type_id"]),
docTypeCode=Row["doc_type_code"],
ruleSetId=int(Row["rule_set_id"]),
ruleType=RsRow["rule_type"],
ruleName=RsRow["rule_name"],
bindingMode=Row["binding_mode"],
priority=int(Row["priority"]),
isActive=bool(Row["is_active"]),
note=Row["note"],
Result = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id,
rule_set_id,
priority,
is_active,
note,
created_at,
updated_at
) VALUES (
:group_id,
:rule_set_id,
:priority,
true,
:note,
NOW(),
NOW()
)
RETURNING id, rule_set_id, priority, is_active, note
"""
),
{
"group_id": GroupId,
"rule_set_id": RuleSetId,
"priority": Priority,
"note": Note,
},
)
await sync_doc_type_bindings_from_group(Session, GroupId)
await Session.commit()
Row = Result.mappings().first()
return RuleBindingVO(
id=int(Row["id"]),
docTypeId=DocTypeId,
docTypeCode=DocTypeCode,
ruleSetId=int(Row["rule_set_id"]),
ruleType=RsRow["rule_type"],
ruleName=RsRow["rule_name"],
bindingMode="explicit",
priority=int(Row["priority"]),
isActive=bool(Row["is_active"]),
note=Row["note"],
)
child_group_count = await self._count_child_groups(Session, DocTypeId)
if child_group_count == 0:
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"当前文档类型尚未创建运行子类型,无法绑定规则集;请先在评查点分组管理中补齐二级分组",
)
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"当前文档类型存在多个运行子类型,不能再直接按文档类型绑定规则集;请改为在对应二级分组下绑定规则集",
)
async def UpdateBinding(
@@ -604,8 +649,94 @@ class RuleServiceImpl(IRuleService):
BindingMode: str | None = None,
Note: str | None = None,
) -> RuleBindingVO:
"""更新规则类型绑定。"""
"""更新规则类型绑定;若绑定ID来自新分组绑定,则优先更新新表"""
async with GetAsyncSession() as Session:
GroupBinding = await Session.execute(
text(
"""
SELECT
rgb.id,
rgb.group_id,
child.document_type_id AS doc_type_id,
dt.code AS doc_type_code,
rgb.rule_set_id,
rgb.priority,
rgb.is_active,
rgb.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
WHERE rgb.id = :bid
AND rgb.deleted_at IS NULL
LIMIT 1
"""
),
{"bid": BindingId},
)
GroupRow = GroupBinding.mappings().first()
if GroupRow:
await Session.execute(
text(
"""
UPDATE leaudit_rule_group_bindings
SET priority = :priority,
is_active = :is_active,
note = :note,
updated_at = NOW()
WHERE id = :binding_id
"""
),
{
"binding_id": BindingId,
"priority": Priority if Priority is not None else int(GroupRow.get("priority") or 0),
"is_active": IsActive if IsActive is not None else bool(GroupRow.get("is_active", True)),
"note": Note if Note is not None else GroupRow.get("note"),
},
)
await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"]))
await Session.commit()
refreshed = (
await Session.execute(
text(
"""
SELECT
rgb.id,
rgb.group_id,
child.document_type_id AS doc_type_id,
dt.code AS doc_type_code,
rgb.rule_set_id,
rgb.priority,
rgb.is_active,
rgb.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
WHERE rgb.id = :bid
LIMIT 1
"""
),
{"bid": BindingId},
)
).mappings().first()
return RuleBindingVO(
id=int(refreshed["id"]),
docTypeId=int(refreshed["doc_type_id"]),
docTypeCode=refreshed["doc_type_code"],
ruleSetId=int(refreshed["rule_set_id"]),
ruleType=refreshed["rule_type"],
ruleName=refreshed["rule_name"],
bindingMode=BindingMode or "explicit",
priority=int(refreshed["priority"]),
isActive=bool(refreshed["is_active"]),
note=refreshed["note"],
)
Existing = await Session.execute(
text(
"""
@@ -679,8 +810,30 @@ class RuleServiceImpl(IRuleService):
)
async def DeleteBinding(self, BindingId: int) -> None:
"""删除规则类型绑定。"""
"""删除规则类型绑定;若绑定ID来自新分组绑定,则优先删除新表"""
async with GetAsyncSession() as Session:
GroupBinding = await Session.execute(
text(
"""
SELECT id, group_id
FROM leaudit_rule_group_bindings
WHERE id = :bid
AND deleted_at IS NULL
LIMIT 1
"""
),
{"bid": BindingId},
)
GroupRow = GroupBinding.mappings().first()
if GroupRow:
await Session.execute(
text("UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE id = :bid"),
{"bid": BindingId},
)
await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"]))
await Session.commit()
return
Result = await Session.execute(
text("DELETE FROM leaudit_rule_type_bindings WHERE id = :bid"),
{"bid": BindingId},