"""规则服务实现。""" from __future__ import annotations import hashlib from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from sqlalchemy import text from fastapi_modules.fastapi_leaudit.domian.vo.ruleVo import ( RuleBindingVO, RuleContentVO, RuleSetVO, RuleValidationVO, RuleVersionVO, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator from fastapi_modules.fastapi_leaudit.services import IOssService, IRuleService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl class RuleServiceImpl(IRuleService): """规则服务实现。""" def __init__( self, OssService: IOssService | None = None, Validator: RuleValidator | None = None, ) -> None: self.OssService = OssService or OssServiceImpl() self.Validator = Validator or RuleValidator() async def ListSets(self) -> list[RuleSetVO]: """列出所有规则集。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rs.id, rs.rule_type, rs.rule_name, rs.domain_type, rs.current_version_id, current_rv.id AS usable_current_version_id, fallback_rv.id AS fallback_version_id, CASE WHEN current_rv.id IS NOT NULL OR fallback_rv.id IS NOT NULL THEN TRUE ELSE FALSE END AS has_usable_version, rs.status FROM leaudit_rule_sets rs LEFT JOIN leaudit_rule_versions current_rv ON current_rv.id = rs.current_version_id AND current_rv.status = 'published' LEFT JOIN LATERAL ( SELECT rv.id FROM leaudit_rule_versions rv WHERE rv.rule_set_id = rs.id AND rv.status = 'published' AND (rs.current_version_id IS NULL OR rv.id <> rs.current_version_id) ORDER BY rv.version_seq DESC, rv.id DESC LIMIT 1 ) fallback_rv ON TRUE WHERE rs.deleted_at IS NULL ORDER BY rs.id DESC """ ) ) rows = Result.mappings().all() usable_counts: dict[int, int] = {} for row in rows: usable_version_id = row["usable_current_version_id"] or row["fallback_version_id"] if usable_version_id is not None and int(usable_version_id) not in usable_counts: usable_counts[int(usable_version_id)] = await self._GetRuleCountByVersionId(int(usable_version_id)) return [ RuleSetVO( id=int(Row["id"]), ruleType=Row["rule_type"], ruleName=Row["rule_name"], domainType=Row["domain_type"], currentVersionId=Row["current_version_id"], fallbackVersionId=Row["fallback_version_id"], hasUsableVersion=bool(Row["has_usable_version"]), usableRuleCount=usable_counts.get(int(Row["usable_current_version_id"] or Row["fallback_version_id"]), 0) if (Row["usable_current_version_id"] or Row["fallback_version_id"]) is not None else 0, status=Row["status"], ) for Row in rows ] async def _GetRuleCountByVersionId(self, VersionId: int) -> int: """读取指定可用规则版本的规则数。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT oss_url FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) Row = Result.mappings().first() if not Row or not Row["oss_url"]: return 0 try: yaml_text = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8") validation = self.Validator.ValidateYaml(yaml_text) return int(validation.ruleCount or 0) except Exception: return 0 async def GetVersions(self, RuleType: str) -> list[RuleVersionVO]: """获取规则集的所有版本。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rv.id, rv.rule_set_id, rv.version_no, rv.status, rv.oss_url, rv.change_note, rv.published_at FROM leaudit_rule_versions rv JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id WHERE rs.rule_type = :rule_type AND rs.deleted_at IS NULL ORDER BY rv.version_seq DESC, rv.id DESC """ ), {"rule_type": RuleType}, ) return [self._BuildRuleVersionVo(Row) for Row in Result.mappings().all()] async def GetContent(self, VersionId: int) -> RuleContentVO: """获取指定版本的规则正文。""" async with GetAsyncSession() as Session: Result = await Session.execute( text( """ SELECT rv.id, rv.rule_set_id, rv.version_no, rv.oss_url, rs.rule_type FROM leaudit_rule_versions rv JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id WHERE rv.id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) Row = Result.mappings().first() if not Row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在") YamlText = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8") return RuleContentVO( id=int(Row["id"]), ruleSetId=int(Row["rule_set_id"]), ruleType=Row["rule_type"], versionNo=Row["version_no"], yamlText=YamlText, ossUrl=Row["oss_url"], ) async def Validate(self, RuleType: str, YamlText: str) -> RuleValidationVO: """校验规则 YAML。""" Validation = self.Validator.ValidateYaml(YamlText) if Validation.ruleType and Validation.ruleType != RuleType: Validation = RuleValidationVO( valid=False, ruleType=Validation.ruleType, ruleName=Validation.ruleName, versionNo=Validation.versionNo, ruleCount=Validation.ruleCount, extractCount=Validation.extractCount, errors=[f"路径规则类型 {RuleType} 与 YAML metadata.type_id {Validation.ruleType} 不一致"], ) return Validation return RuleValidationVO( valid=Validation.valid, ruleType=Validation.ruleType, ruleName=Validation.ruleName, versionNo=Validation.versionNo, ruleCount=Validation.ruleCount, extractCount=Validation.extractCount, errors=Validation.errors or [], ) async def CreateVersion( self, RuleType: str, YamlText: str, ChangeNote: str | None = None, EditorUserId: int | None = None, ) -> RuleVersionVO: """创建规则版本。""" Validation = await self.Validate(RuleType, YamlText) if not Validation.valid: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "; ".join(Validation.errors)) RulesFile = self.Validator.ParseValidated(YamlText) FileSha256 = hashlib.sha256(YamlText.encode("utf-8")).hexdigest() FileSize = len(YamlText.encode("utf-8")) DomainType = self._ResolveDomainType(RuleType) async with GetAsyncSession() as Session: RuleSet = await self._GetRuleSetByType(Session, RuleType) if not RuleSet: CreateResult = await Session.execute( text( """ INSERT INTO leaudit_rule_sets ( rule_type, rule_name, domain_type, description, status, is_builtin, owner_user_id ) VALUES ( :rule_type, :rule_name, :domain_type, :description, 'draft', false, :owner_user_id ) RETURNING id, rule_type, rule_name, domain_type, current_version_id, status """ ), { "rule_type": RuleType, "rule_name": RulesFile.metadata.name, "domain_type": DomainType, "description": RulesFile.metadata.description, "owner_user_id": EditorUserId, }, ) RuleSet = CreateResult.mappings().first() else: await Session.execute( text( """ UPDATE leaudit_rule_sets SET rule_name = :rule_name, domain_type = :domain_type, description = :description, updated_at = now() WHERE id = :rule_set_id """ ), { "rule_name": RulesFile.metadata.name, "domain_type": DomainType, "description": RulesFile.metadata.description, "rule_set_id": RuleSet["id"], }, ) SeqResult = await Session.execute( text( """ SELECT COALESCE(MAX(version_seq), 0) AS max_seq FROM leaudit_rule_versions WHERE rule_set_id = :rule_set_id """ ), {"rule_set_id": RuleSet["id"]}, ) NextSeq = int(SeqResult.mappings().first()["max_seq"]) + 1 VersionNo = RulesFile.metadata.version or f"v{NextSeq}" ExistingVersion = await Session.execute( text( """ SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rule_set_id AND version_no = :version_no LIMIT 1 """ ), { "rule_set_id": RuleSet["id"], "version_no": VersionNo, }, ) if ExistingVersion.mappings().first(): raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"版本号 {VersionNo} 已存在") ObjectKey = OssPathUtils.BuildRuleYamlKey(RuleType, VersionNo) OssUrl = await self.OssService.UploadText( ObjectKey=ObjectKey, Content=YamlText, ContentType="application/x-yaml; charset=utf-8", ) VersionResult = await Session.execute( text( """ INSERT INTO leaudit_rule_versions ( rule_set_id, version_no, version_seq, status, source_type, dsl_format, oss_url, file_sha256, file_size, metadata_type_id, metadata_name, metadata_version, change_note, editor_user_id ) VALUES ( :rule_set_id, :version_no, :version_seq, 'draft', 'oss_yaml', 'yaml', :oss_url, :file_sha256, :file_size, :metadata_type_id, :metadata_name, :metadata_version, :change_note, :editor_user_id ) RETURNING id, rule_set_id, version_no, status, oss_url, change_note, published_at """ ), { "rule_set_id": RuleSet["id"], "version_no": VersionNo, "version_seq": NextSeq, "oss_url": OssUrl, "file_sha256": FileSha256, "file_size": FileSize, "metadata_type_id": RulesFile.metadata.type_id, "metadata_name": RulesFile.metadata.name, "metadata_version": RulesFile.metadata.version, "change_note": ChangeNote, "editor_user_id": EditorUserId, }, ) await Session.commit() return self._BuildRuleVersionVo(VersionResult.mappings().first()) async def Publish( self, RuleType: str, VersionId: int, OperatorUserId: int | None = None, ) -> RuleVersionVO: """发布指定版本。""" return await self._SwitchVersion( RuleType=RuleType, VersionId=VersionId, TargetStatus="published", OperatorUserId=OperatorUserId, ) async def Rollback( self, RuleType: str, VersionId: int, OperatorUserId: int | None = None, ) -> RuleVersionVO: """回滚到指定历史版本。""" return await self._SwitchVersion( RuleType=RuleType, VersionId=VersionId, TargetStatus="published", OperatorUserId=OperatorUserId, Rollback=True, ) async def ListBindings(self, RuleType: str | None = None, Region: str | None = None) -> list[RuleBindingVO]: """列出规则类型绑定,可按规则类型/地区过滤。""" region = Region or "" async with GetAsyncSession() as Session: if RuleType and region: Result = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE rs.rule_type = :rule_type AND rs.deleted_at IS NULL AND b.region = :region ORDER BY b.priority DESC, b.id DESC """ ), {"rule_type": RuleType, "region": region}, ) elif region: Result = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE rs.deleted_at IS NULL AND b.region = :region ORDER BY rs.rule_type, b.priority DESC, b.id DESC """ ), {"region": region}, ) elif RuleType: Result = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE rs.rule_type = :rule_type AND rs.deleted_at IS NULL ORDER BY b.priority DESC, b.id DESC """ ), {"rule_type": RuleType}, ) else: Result = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE rs.deleted_at IS NULL ORDER BY rs.rule_type, b.priority DESC, b.id DESC """ ), ) return [ RuleBindingVO( id=int(Row["id"]), docTypeId=int(Row["doc_type_id"]), docTypeCode=Row["doc_type_code"], ruleSetId=int(Row["rule_set_id"]), ruleType=Row["rule_type"], ruleName=Row["rule_name"], bindingMode=Row["binding_mode"], priority=int(Row["priority"]), isActive=bool(Row["is_active"]), note=Row["note"], ) for Row in Result.mappings().all() ] async def CreateBinding( self, DocTypeId: int, RuleSetId: int, Region: str = "default", BindingMode: str = "explicit", Priority: int = 0, DocTypeCode: str | None = None, Note: str | None = None, ) -> RuleBindingVO: """创建规则类型绑定。""" async with GetAsyncSession() as Session: RuleSet = await Session.execute( text("SELECT id, rule_type, rule_name FROM leaudit_rule_sets WHERE id = :rid AND deleted_at IS NULL LIMIT 1"), {"rid": RuleSetId}, ) RsRow = RuleSet.mappings().first() if not RsRow: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") Existing = await Session.execute( text( """ SELECT id FROM leaudit_rule_type_bindings WHERE doc_type_id = :dtid AND rule_set_id = :rsid LIMIT 1 """ ), {"dtid": DocTypeId, "rsid": RuleSetId}, ) if Existing.mappings().first(): raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型已绑定此规则集") Result = await Session.execute( text( """ INSERT INTO leaudit_rule_type_bindings ( doc_type_id, doc_type_code, rule_set_id, binding_mode, priority, is_active, region, note ) VALUES ( :doc_type_id, :doc_type_code, :rule_set_id, :binding_mode, :priority, true, :region, :note ) RETURNING id, doc_type_id, doc_type_code, rule_set_id, binding_mode, priority, is_active, region, note """ ), { "doc_type_id": DocTypeId, "doc_type_code": DocTypeCode, "rule_set_id": RuleSetId, "binding_mode": BindingMode, "priority": Priority, "region": Region, "note": Note, }, ) await Session.commit() Row = Result.mappings().first() return RuleBindingVO( id=int(Row["id"]), docTypeId=int(Row["doc_type_id"]), docTypeCode=Row["doc_type_code"], ruleSetId=int(Row["rule_set_id"]), ruleType=RsRow["rule_type"], ruleName=RsRow["rule_name"], bindingMode=Row["binding_mode"], priority=int(Row["priority"]), isActive=bool(Row["is_active"]), note=Row["note"], ) async def UpdateBinding( self, BindingId: int, IsActive: bool | None = None, Priority: int | None = None, BindingMode: str | None = None, Note: str | None = None, ) -> RuleBindingVO: """更新规则类型绑定。""" async with GetAsyncSession() as Session: Existing = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE b.id = :bid LIMIT 1 """ ), {"bid": BindingId}, ) Row = Existing.mappings().first() if not Row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在") SetClauses: list[str] = [] Params: dict[str, object] = {"bid": BindingId} if IsActive is not None: SetClauses.append("is_active = :is_active") Params["is_active"] = IsActive if Priority is not None: SetClauses.append("priority = :priority") Params["priority"] = Priority if BindingMode is not None: SetClauses.append("binding_mode = :binding_mode") Params["binding_mode"] = BindingMode if Note is not None: SetClauses.append("note = :note") Params["note"] = Note if SetClauses: SetClauses.append("updated_at = now()") await Session.execute( text(f"UPDATE leaudit_rule_type_bindings SET {', '.join(SetClauses)} WHERE id = :bid"), Params, ) await Session.commit() Result = await Session.execute( text( """ SELECT b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, b.binding_mode, b.priority, b.is_active, b.note, rs.rule_type, rs.rule_name FROM leaudit_rule_type_bindings b JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id WHERE b.id = :bid LIMIT 1 """ ), {"bid": BindingId}, ) Row = Result.mappings().first() return RuleBindingVO( id=int(Row["id"]), docTypeId=int(Row["doc_type_id"]), docTypeCode=Row["doc_type_code"], ruleSetId=int(Row["rule_set_id"]), ruleType=Row["rule_type"], ruleName=Row["rule_name"], bindingMode=Row["binding_mode"], priority=int(Row["priority"]), isActive=bool(Row["is_active"]), note=Row["note"], ) async def DeleteBinding(self, BindingId: int) -> None: """删除规则类型绑定。""" async with GetAsyncSession() as Session: Result = await Session.execute( text("DELETE FROM leaudit_rule_type_bindings WHERE id = :bid"), {"bid": BindingId}, ) await Session.commit() if Result.rowcount == 0: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在") async def _SwitchVersion( self, *, RuleType: str, VersionId: int, TargetStatus: str, OperatorUserId: int | None = None, Rollback: bool = False, ) -> RuleVersionVO: """切换当前生效版本。""" async with GetAsyncSession() as Session: RuleSet = await self._GetRuleSetByType(Session, RuleType) if not RuleSet: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") VersionRow = await self._GetVersion(Session, VersionId) if not VersionRow or int(VersionRow["rule_set_id"]) != int(RuleSet["id"]): raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在或不属于当前规则集") await Session.execute( text( """ UPDATE leaudit_rule_versions SET status = CASE WHEN id = :version_id THEN :target_status WHEN status = 'published' THEN :previous_status ELSE status END, publisher_user_id = CASE WHEN id = :version_id THEN :operator_user_id ELSE publisher_user_id END, published_at = CASE WHEN id = :version_id THEN now() ELSE published_at END, updated_at = now() WHERE rule_set_id = :rule_set_id """ ), { "version_id": VersionId, "target_status": TargetStatus, "previous_status": "rollback" if Rollback else "deprecated", "operator_user_id": OperatorUserId, "rule_set_id": RuleSet["id"], }, ) await Session.execute( text( """ UPDATE leaudit_rule_sets SET current_version_id = :version_id, status = 'active', updated_at = now() WHERE id = :rule_set_id """ ), { "version_id": VersionId, "rule_set_id": RuleSet["id"], }, ) await Session.commit() VersionRow = await self._GetVersion(Session, VersionId) return self._BuildRuleVersionVo(VersionRow) async def _GetRuleSetByType(self, Session, RuleType: str): """按规则类型查询规则集。""" Result = await Session.execute( text( """ SELECT id, rule_type, rule_name, domain_type, current_version_id, status FROM leaudit_rule_sets WHERE rule_type = :rule_type AND deleted_at IS NULL LIMIT 1 """ ), {"rule_type": RuleType}, ) return Result.mappings().first() async def _GetVersion(self, Session, VersionId: int): """按版本 ID 查询规则版本。""" Result = await Session.execute( text( """ SELECT id, rule_set_id, version_no, status, oss_url, change_note, published_at FROM leaudit_rule_versions WHERE id = :version_id LIMIT 1 """ ), {"version_id": VersionId}, ) return Result.mappings().first() def _BuildRuleVersionVo(self, Row) -> RuleVersionVO: """构造规则版本响应。""" return RuleVersionVO( id=int(Row["id"]), ruleSetId=int(Row["rule_set_id"]), versionNo=Row["version_no"], status=Row["status"], ossUrl=Row["oss_url"], changeNote=Row["change_note"], publishedAt=Row["published_at"].isoformat() if Row["published_at"] else None, ) def _ResolveDomainType(self, RuleType: str) -> str: """按规则类型推导领域类型。""" Prefix = RuleType.split(".", 1)[0] Mapping = { "contract": "contract", "admin_license": "admin_license", "legal_doc": "legal_doc", } return Mapping.get(Prefix, Prefix or "unknown")