"""规则配置页聚合服务实现。""" from __future__ import annotations from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from sqlalchemy import text from fastapi_modules.fastapi_leaudit.domian.vo.ruleConfigVo import RuleConfigPackVO from fastapi_modules.fastapi_leaudit.services import IOssService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ruleServiceImpl import RuleServiceImpl from fastapi_modules.fastapi_leaudit.services.ruleConfigService import IRuleConfigService class RuleConfigServiceImpl(IRuleConfigService): """规则配置页聚合服务实现。""" def __init__(self, OssService: IOssService | None = None) -> None: self.OssService = OssService or OssServiceImpl() self.RuleService = RuleServiceImpl(self.OssService) async def ListPacks(self) -> list[RuleConfigPackVO]: """列出规则配置页所需的全部 pack。""" async with GetAsyncSession() as session: rows = ( await session.execute( text( """ SELECT child.id AS group_id, child.name AS subtype, COALESCE(child.document_type_id, root.document_type_id) AS document_type_id, dt.name AS document_type_name, root.id AS root_group_id, COALESCE(root.name, child.name) AS main_type, em.name AS entry_module_name FROM leaudit_evaluation_point_groups child LEFT JOIN leaudit_evaluation_point_groups root ON root.id = child.pid AND root.deleted_at IS NULL LEFT JOIN leaudit_document_types dt ON dt.id = COALESCE(child.document_type_id, root.document_type_id) LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(child.entry_module_id, root.entry_module_id, dt.entry_module_id) WHERE child.deleted_at IS NULL AND COALESCE(child.pid, 0) <> 0 ORDER BY COALESCE(root.sort_order, 0) ASC, root.id ASC, child.sort_order ASC, child.id ASC """ ) ) ).mappings().all() rule_set_map = await self._load_rule_set_meta_map() return [await self._build_pack_vo(row, rule_set_map) for row in rows] async def GetPack(self, PackId: int) -> RuleConfigPackVO: """获取单个规则配置 pack。""" async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT child.id AS group_id, child.name AS subtype, COALESCE(child.document_type_id, root.document_type_id) AS document_type_id, dt.name AS document_type_name, root.id AS root_group_id, COALESCE(root.name, child.name) AS main_type, em.name AS entry_module_name FROM leaudit_evaluation_point_groups child LEFT JOIN leaudit_evaluation_point_groups root ON root.id = child.pid AND root.deleted_at IS NULL LEFT JOIN leaudit_document_types dt ON dt.id = COALESCE(child.document_type_id, root.document_type_id) LEFT JOIN leaudit_entry_modules em ON em.id = COALESCE(child.entry_module_id, root.entry_module_id, dt.entry_module_id) WHERE child.id = :group_id AND child.deleted_at IS NULL AND COALESCE(child.pid, 0) <> 0 LIMIT 1 """ ), {"group_id": PackId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则配置 pack 不存在") rule_set_map = await self._load_rule_set_meta_map() return await self._build_pack_vo(row, rule_set_map) async def _build_pack_vo(self, row, rule_set_map: dict[int, dict[str, object]]) -> RuleConfigPackVO: """构建单个 pack 聚合对象。""" group_id = int(row["group_id"]) binding = await self._load_effective_binding(group_id) document_type = str(row["document_type_name"] or "").strip() main_type = str(row["main_type"] or "").strip() subtype = str(row["subtype"] or "").strip() or "通用" module_type = str(row["entry_module_name"] or "").strip() or (f"{document_type}评查" if document_type else "规则配置") source_status = "empty" yaml_text = "" rule_set_id: int | None = None rule_type: str | None = None rule_name: str | None = None current_version_id: int | None = None fallback_version_id: int | None = None resolved_version_id: int | None = None has_usable_version = False usable_rule_count = 0 binding_id: int | None = None if binding: binding_id = int(binding["id"]) rule_set_id = int(binding["rule_set_id"]) rule_set_meta = rule_set_map.get(rule_set_id, {}) rule_type = str(rule_set_meta.get("rule_type") or "") or None rule_name = str(rule_set_meta.get("rule_name") or "") or None current_version_id = self._to_int(rule_set_meta.get("current_version_id")) fallback_version_id = self._to_int(rule_set_meta.get("fallback_version_id")) resolved_version_id = current_version_id or fallback_version_id has_usable_version = bool(rule_set_meta.get("has_usable_version")) usable_rule_count = int(rule_set_meta.get("usable_rule_count") or 0) latest_version_id = await self._load_latest_version_id(rule_set_id) latest_version_seq = await self._load_version_seq_by_id(latest_version_id) resolved_version_seq = await self._load_version_seq_by_id(resolved_version_id) if latest_version_id is not None and (resolved_version_id is None or latest_version_seq > resolved_version_seq): # 规则详情页优先读取最新草稿;上传运行仍以 current/fallback 的可用版本为准。 resolved_version_id = latest_version_id if resolved_version_id is not None: yaml_text = await self._load_yaml_text_by_version_id(resolved_version_id) source_status = "ready" if yaml_text.strip() else "missing" return RuleConfigPackVO( packId=group_id, groupId=group_id, rootGroupId=self._to_int(row.get("root_group_id")), bindingId=binding_id, ruleSetId=rule_set_id, ruleType=rule_type, ruleName=rule_name, currentVersionId=current_version_id, fallbackVersionId=fallback_version_id, resolvedVersionId=resolved_version_id, hasUsableVersion=has_usable_version, usableRuleCount=usable_rule_count, documentTypeId=self._to_int(row.get("document_type_id")), documentType=document_type, moduleType=module_type, mainType=main_type or document_type, subtype=subtype, yamlText=yaml_text, sourceStatus=source_status, ) async def _load_effective_binding(self, group_id: int): """读取当前二级分组实际生效的规则集绑定。""" async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT id, rule_set_id FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND deleted_at IS NULL AND is_active = TRUE ORDER BY priority DESC, id ASC LIMIT 1 """ ), {"group_id": group_id}, ) ).mappings().first() return row async def _load_rule_set_meta_map(self) -> dict[int, dict[str, object]]: """批量读取规则集元数据。""" items = await self.RuleService.ListSets() return { item.id: { "rule_type": item.ruleType, "rule_name": item.ruleName, "current_version_id": item.currentVersionId, "fallback_version_id": item.fallbackVersionId, "has_usable_version": item.hasUsableVersion, "usable_rule_count": item.usableRuleCount, } for item in items } async def _load_yaml_text_by_version_id(self, version_id: int) -> str: """按版本ID读取 YAML 正文。""" async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT oss_url FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": version_id}, ) ).mappings().first() if not row or not row["oss_url"]: return "" try: return (await self.OssService.DownloadBytes(row["oss_url"])).decode("utf-8") except Exception: return "" async def _load_latest_version_id(self, rule_set_id: int) -> int | None: """在没有可用发布版本时,退回读取最新草稿版本。""" async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rule_set_id ORDER BY version_seq DESC, id DESC LIMIT 1 """ ), {"rule_set_id": rule_set_id}, ) ).mappings().first() return self._to_int(row.get("id")) if row else None async def _load_version_seq_by_id(self, version_id: int | None) -> int: if version_id is None: return -1 async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT version_seq FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": version_id}, ) ).mappings().first() return int(row.get("version_seq") or -1) if row else -1 def _to_int(self, value) -> int | None: if value is None: return None return int(value)