"""规则服务实现。""" from __future__ import annotations import hashlib from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from sqlalchemy import text from fastapi_modules.fastapi_leaudit.domian.vo.ruleVo import ( RuleBindingVO, RuleContentVO, RuleSetVO, RuleValidationVO, RuleVersionVO, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator from fastapi_modules.fastapi_leaudit.services import IOssService, IRuleService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_doc_type_bindings_from_group class RuleServiceImpl(IRuleService): """规则服务实现。""" def __init__( self, OssService: IOssService | None = None, Validator: RuleValidator | None = None, ) -> None: self.OssService = OssService or OssServiceImpl() self.Validator = Validator or RuleValidator() async def _resolve_unique_child_group_id(self, Session, DocTypeId: int) -> int | None: """仅当文档类型唯一对应一个二级分组时,返回该分组ID。""" row = ( await Session.execute( text( """ SELECT CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS group_id FROM leaudit_evaluation_point_groups WHERE document_type_id = :doc_type_id AND deleted_at IS NULL AND COALESCE(pid, 0) <> 0 """ ), {"doc_type_id": DocTypeId}, ) ).mappings().first() return int(row["group_id"]) if row and row.get("group_id") is not None else None async def _count_child_groups(self, Session, DocTypeId: int) -> int: """统计文档类型下已启用的二级分组数量,用于明确报错。""" row = ( await Session.execute( text( """ SELECT COUNT(*) AS total FROM leaudit_evaluation_point_groups WHERE document_type_id = :doc_type_id AND deleted_at IS NULL AND COALESCE(pid, 0) <> 0 """ ), {"doc_type_id": DocTypeId}, ) ).mappings().first() return int(row["total"] or 0) if row else 0 async def ListSets(self) -> list[RuleSetVO]: """列出所有规则集。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rs.id, rs.rule_type, rs.rule_name, rs.domain_type, rs.current_version_id, current_rv.id AS usable_current_version_id, fallback_rv.id AS fallback_version_id, CASE WHEN current_rv.id IS NOT NULL OR fallback_rv.id IS NOT NULL THEN TRUE ELSE FALSE END AS has_usable_version, rs.status FROM leaudit_rule_sets rs LEFT JOIN leaudit_rule_versions current_rv ON current_rv.id = rs.current_version_id AND current_rv.status = 'published' LEFT JOIN LATERAL ( SELECT rv.id FROM leaudit_rule_versions rv WHERE rv.rule_set_id = rs.id AND rv.status = 'published' AND (rs.current_version_id IS NULL OR rv.id <> rs.current_version_id) ORDER BY rv.version_seq DESC, rv.id DESC LIMIT 1 ) fallback_rv ON TRUE WHERE rs.deleted_at IS NULL ORDER BY rs.id DESC """ ) ) rows = Result.mappings().all() usable_counts: dict[int, int] = {} for row in rows: usable_version_id = row["usable_current_version_id"] or row["fallback_version_id"] if usable_version_id is not None and int(usable_version_id) not in usable_counts: usable_counts[int(usable_version_id)] = await self._GetRuleCountByVersionId(int(usable_version_id)) return [ RuleSetVO( id=int(Row["id"]), ruleType=Row["rule_type"], ruleName=Row["rule_name"], domainType=Row["domain_type"], currentVersionId=Row["current_version_id"], fallbackVersionId=Row["fallback_version_id"], hasUsableVersion=bool(Row["has_usable_version"]), usableRuleCount=usable_counts.get(int(Row["usable_current_version_id"] or Row["fallback_version_id"]), 0) if (Row["usable_current_version_id"] or Row["fallback_version_id"]) is not None else 0, status=Row["status"], ) for Row in rows ] async def _GetRuleCountByVersionId(self, VersionId: int) -> int: """读取指定可用规则版本的规则数。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT oss_url FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) Row = Result.mappings().first() if not Row or not Row["oss_url"]: return 0 try: yaml_text = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8") validation = self.Validator.ValidateYaml(yaml_text) return int(validation.ruleCount or 0) except Exception: return 0 async def GetVersions(self, RuleType: str) -> list[RuleVersionVO]: """获取规则集的所有版本。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rv.id, rv.rule_set_id, rv.version_no, rv.status, rv.oss_url, rv.change_note, rv.published_at FROM leaudit_rule_versions rv JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id WHERE rs.rule_type = :rule_type AND rs.deleted_at IS NULL ORDER BY rv.version_seq DESC, rv.id DESC """ ), {"rule_type": RuleType}, ) return [self._BuildRuleVersionVo(Row) for Row in Result.mappings().all()] async def GetContent(self, VersionId: int) -> RuleContentVO: """获取指定版本的规则正文。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rv.id, rv.rule_set_id, rv.version_no, rv.oss_url, rs.rule_type FROM leaudit_rule_versions rv JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id WHERE rv.id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) Row = Result.mappings().first() if not Row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在") YamlText = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8") return RuleContentVO( id=int(Row["id"]), ruleSetId=int(Row["rule_set_id"]), ruleType=Row["rule_type"], versionNo=Row["version_no"], yamlText=YamlText, ossUrl=Row["oss_url"], ) async def Validate(self, RuleType: str, YamlText: str) -> RuleValidationVO: """校验规则 YAML。""" Validation = self.Validator.ValidateYaml(YamlText) if Validation.ruleType and Validation.ruleType != RuleType: Validation = RuleValidationVO( valid=False, ruleType=Validation.ruleType, ruleName=Validation.ruleName, versionNo=Validation.versionNo, ruleCount=Validation.ruleCount, extractCount=Validation.extractCount, errors=[f"路径规则类型 {RuleType} 与 YAML metadata.type_id {Validation.ruleType} 不一致"], ) return Validation return RuleValidationVO( valid=Validation.valid, ruleType=Validation.ruleType, ruleName=Validation.ruleName, versionNo=Validation.versionNo, ruleCount=Validation.ruleCount, extractCount=Validation.extractCount, errors=Validation.errors or [], ) async def CreateVersion( self, RuleType: str, YamlText: str, ChangeNote: str | None = None, EditorUserId: int | None = None, ) -> RuleVersionVO: """创建规则版本。""" Validation = await self.Validate(RuleType, YamlText) if not Validation.valid: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "; ".join(Validation.errors)) RulesFile = self.Validator.ParseValidated(YamlText) FileSha256 = hashlib.sha256(YamlText.encode("utf-8")).hexdigest() FileSize = len(YamlText.encode("utf-8")) DomainType = self._ResolveDomainType(RuleType) async with GetAsyncSession() as Session: RuleSet = await self._GetRuleSetByType(Session, RuleType) if not RuleSet: CreateResult = await Session.execute( text( """ INSERT INTO leaudit_rule_sets ( rule_type, rule_name, domain_type, description, status, is_builtin, owner_user_id ) VALUES ( :rule_type, :rule_name, :domain_type, :description, 'draft', false, :owner_user_id ) RETURNING id, rule_type, rule_name, domain_type, current_version_id, status """ ), { "rule_type": RuleType, "rule_name": RulesFile.metadata.name, "domain_type": DomainType, "description": RulesFile.metadata.description, "owner_user_id": EditorUserId, }, ) RuleSet = CreateResult.mappings().first() else: await Session.execute( text( """ UPDATE leaudit_rule_sets SET rule_name = :rule_name, domain_type = :domain_type, description = :description, updated_at = now() WHERE id = :rule_set_id """ ), { "rule_name": RulesFile.metadata.name, "domain_type": DomainType, "description": RulesFile.metadata.description, "rule_set_id": RuleSet["id"], }, ) SeqResult = await Session.execute( text( """ SELECT COALESCE(MAX(version_seq), 0) AS max_seq FROM leaudit_rule_versions WHERE rule_set_id = :rule_set_id """ ), {"rule_set_id": RuleSet["id"]}, ) NextSeq = int(SeqResult.mappings().first()["max_seq"]) + 1 VersionNo = RulesFile.metadata.version or f"v{NextSeq}" ExistingVersion = await Session.execute( text( """ SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rule_set_id AND version_no = :version_no LIMIT 1 """ ), { "rule_set_id": RuleSet["id"], "version_no": VersionNo, }, ) if ExistingVersion.mappings().first(): raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"版本号 {VersionNo} 已存在") ObjectKey = OssPathUtils.BuildRuleYamlKey(RuleType, VersionNo) OssUrl = await self.OssService.UploadText( ObjectKey=ObjectKey, Content=YamlText, ContentType="application/x-yaml; charset=utf-8", ) VersionResult = await Session.execute( text( """ INSERT INTO leaudit_rule_versions ( rule_set_id, version_no, version_seq, status, source_type, dsl_format, oss_url, file_sha256, file_size, metadata_type_id, metadata_name, metadata_version, change_note, editor_user_id ) VALUES ( :rule_set_id, :version_no, :version_seq, 'draft', 'oss_yaml', 'yaml', :oss_url, :file_sha256, :file_size, :metadata_type_id, :metadata_name, :metadata_version, :change_note, :editor_user_id ) RETURNING id, rule_set_id, version_no, status, oss_url, change_note, published_at """ ), { "rule_set_id": RuleSet["id"], "version_no": VersionNo, "version_seq": NextSeq, "oss_url": OssUrl, "file_sha256": FileSha256, "file_size": FileSize, "metadata_type_id": RulesFile.metadata.type_id, "metadata_name": RulesFile.metadata.name, "metadata_version": RulesFile.metadata.version, "change_note": ChangeNote, "editor_user_id": EditorUserId, }, ) await Session.commit() return self._BuildRuleVersionVo(VersionResult.mappings().first()) async def Publish( self, RuleType: str, VersionId: int, OperatorUserId: int | None = None, ) -> RuleVersionVO: """发布指定版本。""" return await self._SwitchVersion( RuleType=RuleType, VersionId=VersionId, TargetStatus="published", OperatorUserId=OperatorUserId, ) async def Rollback( self, RuleType: str, VersionId: int, OperatorUserId: int | None = None, ) -> RuleVersionVO: """回滚到指定历史版本。""" return await self._SwitchVersion( RuleType=RuleType, VersionId=VersionId, TargetStatus="published", OperatorUserId=OperatorUserId, Rollback=True, ) async def ListBindings(self, RuleType: str | None = None, Region: str | None = None) -> list[RuleBindingVO]: """列出规则类型绑定,只读取新分组绑定。""" async with GetAsyncSession() as Session: params: dict[str, object] = {} filters = ["rs.deleted_at IS NULL", "dt.deleted_at IS NULL", "child.deleted_at IS NULL", "rgb.deleted_at IS NULL"] if RuleType: filters.append("rs.rule_type = :rule_type") params["rule_type"] = RuleType where_clause = " AND ".join(filters) result = await Session.execute( text( f""" SELECT rgb.id, dt.id AS doc_type_id, dt.code AS doc_type_code, rgb.rule_set_id, 'explicit' AS binding_mode, rgb.priority, rgb.is_active, rgb.note, rs.rule_type, rs.rule_name FROM leaudit_rule_group_bindings rgb JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id JOIN leaudit_document_types dt ON dt.id = child.document_type_id JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id WHERE {where_clause} AND COALESCE(child.pid, 0) <> 0 ORDER BY dt.id ASC, COALESCE(child.sort_order, 0) ASC, rgb.priority DESC, rgb.id DESC """ ), params, ) rows = result.mappings().all() bindings: list[RuleBindingVO] = [] seen_doc_type_rule_pairs: set[tuple[int, int]] = set() for row in rows: pair = (int(row["doc_type_id"]), int(row["rule_set_id"])) if pair in seen_doc_type_rule_pairs: continue seen_doc_type_rule_pairs.add(pair) bindings.append( RuleBindingVO( id=int(row["id"]), docTypeId=int(row["doc_type_id"]), docTypeCode=row["doc_type_code"], ruleSetId=int(row["rule_set_id"]), ruleType=row["rule_type"], ruleName=row["rule_name"], bindingMode=row["binding_mode"], priority=int(row["priority"]), isActive=bool(row["is_active"]), note=row["note"], ) ) return bindings async def CreateBinding( self, DocTypeId: int, RuleSetId: int, Region: str = "default", BindingMode: str = "explicit", Priority: int = 0, DocTypeCode: str | None = None, Note: str | None = None, ) -> RuleBindingVO: """创建规则类型绑定;若文档类型唯一对应一个二级分组,则优先写入新分组绑定。""" async with GetAsyncSession() as Session: RuleSet = await Session.execute( text("SELECT id, rule_type, rule_name FROM leaudit_rule_sets WHERE id = :rid AND deleted_at IS NULL LIMIT 1"), {"rid": RuleSetId}, ) RsRow = RuleSet.mappings().first() if not RsRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") GroupId = await self._resolve_unique_child_group_id(Session, DocTypeId) if GroupId is not None: ExistingGroupBinding = await Session.execute( text( """ SELECT id FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND rule_set_id = :rule_set_id AND deleted_at IS NULL LIMIT 1 """ ), {"group_id": GroupId, "rule_set_id": RuleSetId}, ) if ExistingGroupBinding.mappings().first(): raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型对应子组已绑定此规则集") Result = await Session.execute( text( """ INSERT INTO leaudit_rule_group_bindings ( group_id, rule_set_id, priority, is_active, note, created_at, updated_at ) VALUES ( :group_id, :rule_set_id, :priority, true, :note, NOW(), NOW() ) RETURNING id, rule_set_id, priority, is_active, note """ ), { "group_id": GroupId, "rule_set_id": RuleSetId, "priority": Priority, "note": Note, }, ) await sync_doc_type_bindings_from_group(Session, GroupId) await Session.commit() Row = Result.mappings().first() return RuleBindingVO( id=int(Row["id"]), docTypeId=DocTypeId, docTypeCode=DocTypeCode, ruleSetId=int(Row["rule_set_id"]), ruleType=RsRow["rule_type"], ruleName=RsRow["rule_name"], bindingMode="explicit", priority=int(Row["priority"]), isActive=bool(Row["is_active"]), note=Row["note"], ) child_group_count = await self._count_child_groups(Session, DocTypeId) if child_group_count == 0: raise LeauditException( StatusCodeEnum.HTTP_409_CONFLICT, "当前文档类型尚未创建运行子类型,无法绑定规则集;请先在评查点分组管理中补齐二级分组", ) raise LeauditException( StatusCodeEnum.HTTP_409_CONFLICT, "当前文档类型存在多个运行子类型,不能再直接按文档类型绑定规则集;请改为在对应二级分组下绑定规则集", ) async def UpdateBinding( self, BindingId: int, IsActive: bool | None = None, Priority: int | None = None, BindingMode: str | None = None, Note: str | None = None, ) -> RuleBindingVO: """更新规则类型绑定;若绑定ID来自新分组绑定,则优先更新新表。""" async with GetAsyncSession() as Session: GroupBinding = await Session.execute( text( """ SELECT rgb.id, rgb.group_id, child.document_type_id AS doc_type_id, dt.code AS doc_type_code, rgb.rule_set_id, rgb.priority, rgb.is_active, rgb.note, rs.rule_type, rs.rule_name FROM leaudit_rule_group_bindings rgb JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id WHERE rgb.id = :bid AND rgb.deleted_at IS NULL LIMIT 1 """ ), {"bid": BindingId}, ) GroupRow = GroupBinding.mappings().first() if GroupRow: await Session.execute( text( """ UPDATE leaudit_rule_group_bindings SET priority = :priority, is_active = :is_active, note = :note, updated_at = NOW() WHERE id = :binding_id """ ), { "binding_id": BindingId, "priority": Priority if Priority is not None else int(GroupRow.get("priority") or 0), "is_active": IsActive if IsActive is not None else bool(GroupRow.get("is_active", True)), "note": Note if Note is not None else GroupRow.get("note"), }, ) await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"])) await Session.commit() refreshed = ( await Session.execute( text( """ SELECT rgb.id, rgb.group_id, child.document_type_id AS doc_type_id, dt.code AS doc_type_code, rgb.rule_set_id, rgb.priority, rgb.is_active, rgb.note, rs.rule_type, rs.rule_name FROM leaudit_rule_group_bindings rgb JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id WHERE rgb.id = :bid LIMIT 1 """ ), {"bid": BindingId}, ) ).mappings().first() return RuleBindingVO( id=int(refreshed["id"]), docTypeId=int(refreshed["doc_type_id"]), docTypeCode=refreshed["doc_type_code"], ruleSetId=int(refreshed["rule_set_id"]), ruleType=refreshed["rule_type"], ruleName=refreshed["rule_name"], bindingMode=BindingMode or "explicit", priority=int(refreshed["priority"]), isActive=bool(refreshed["is_active"]), note=refreshed["note"], ) raise LeauditException( StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在或已迁移到新分组绑定,请刷新列表后重试", ) async def DeleteBinding(self, BindingId: int) -> None: """删除规则类型绑定;若绑定ID来自新分组绑定,则优先删除新表。""" async with GetAsyncSession() as Session: GroupBinding = await Session.execute( text( """ SELECT id, group_id FROM leaudit_rule_group_bindings WHERE id = :bid AND deleted_at IS NULL LIMIT 1 """ ), {"bid": BindingId}, ) GroupRow = GroupBinding.mappings().first() if GroupRow: await Session.execute( text("UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE id = :bid"), {"bid": BindingId}, ) await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"])) await Session.commit() return raise LeauditException( StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在或已迁移到新分组绑定,请刷新列表后重试", ) async def _SwitchVersion( self, *, RuleType: str, VersionId: int, TargetStatus: str, OperatorUserId: int | None = None, Rollback: bool = False, ) -> RuleVersionVO: """切换当前生效版本。""" async with GetAsyncSession() as Session: RuleSet = await self._GetRuleSetByType(Session, RuleType) if not RuleSet: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") VersionRow = await self._GetVersion(Session, VersionId) if not VersionRow or int(VersionRow["rule_set_id"]) != int(RuleSet["id"]): raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在或不属于当前规则集") await Session.execute( text( """ UPDATE leaudit_rule_versions SET status = CASE WHEN id = :version_id THEN :target_status WHEN status = 'published' THEN :previous_status ELSE status END, publisher_user_id = CASE WHEN id = :version_id THEN :operator_user_id ELSE publisher_user_id END, published_at = CASE WHEN id = :version_id THEN now() ELSE published_at END, updated_at = now() WHERE rule_set_id = :rule_set_id """ ), { "version_id": VersionId, "target_status": TargetStatus, "previous_status": "rollback" if Rollback else "deprecated", "operator_user_id": OperatorUserId, "rule_set_id": RuleSet["id"], }, ) await Session.execute( text( """ UPDATE leaudit_rule_sets SET current_version_id = :version_id, status = 'active', updated_at = now() WHERE id = :rule_set_id """ ), { "version_id": VersionId, "rule_set_id": RuleSet["id"], }, ) await Session.commit() VersionRow = await self._GetVersion(Session, VersionId) return self._BuildRuleVersionVo(VersionRow) async def _GetRuleSetByType(self, Session, RuleType: str): """按规则类型查询规则集。""" Result = await Session.execute( text( """ SELECT id, rule_type, rule_name, domain_type, current_version_id, status FROM leaudit_rule_sets WHERE rule_type = :rule_type AND deleted_at IS NULL LIMIT 1 """ ), {"rule_type": RuleType}, ) return Result.mappings().first() async def _GetVersion(self, Session, VersionId: int): """按版本 ID 查询规则版本。""" Result = await Session.execute( text( """ SELECT id, rule_set_id, version_no, status, oss_url, change_note, published_at FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) return Result.mappings().first() def _BuildRuleVersionVo(self, Row) -> RuleVersionVO: """构造规则版本响应。""" return RuleVersionVO( id=int(Row["id"]), ruleSetId=int(Row["rule_set_id"]), versionNo=Row["version_no"], status=Row["status"], ossUrl=Row["oss_url"], changeNote=Row["change_note"], publishedAt=Row["published_at"].isoformat() if Row["published_at"] else None, ) def _ResolveDomainType(self, RuleType: str) -> str: """按规则类型推导领域类型。""" Prefix = RuleType.split(".", 1)[0] Mapping = { "contract": "contract", "admin_license": "admin_license", "legal_doc": "legal_doc", } return Mapping.get(Prefix, Prefix or "unknown")