From d310ba8bc08a6625f394d92be4566bb5b2c086c1 Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Tue, 28 Apr 2026 11:44:20 +0800 Subject: [PATCH] feat: implement binding CRUD in RuleServiceImpl --- .../services/impl/ruleServiceImpl.py | 709 ++++++++++++++++++ 1 file changed, 709 insertions(+) create mode 100644 fastapi_modules/fastapi_leaudit/services/impl/ruleServiceImpl.py diff --git a/fastapi_modules/fastapi_leaudit/services/impl/ruleServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/ruleServiceImpl.py new file mode 100644 index 0000000..93dcb61 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/services/impl/ruleServiceImpl.py @@ -0,0 +1,709 @@ +"""规则服务实现。""" + +from __future__ import annotations + +import hashlib +from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession +from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum +from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException +from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils +from sqlalchemy import text + +from fastapi_modules.fastapi_leaudit.domian.vo.ruleVo import ( + RuleBindingVO, + RuleContentVO, + RuleSetVO, + RuleValidationVO, + RuleVersionVO, +) +from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator +from fastapi_modules.fastapi_leaudit.services import IOssService, IRuleService +from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl + + +class RuleServiceImpl(IRuleService): + """规则服务实现。""" + + def __init__( + self, + OssService: IOssService | None = None, + Validator: RuleValidator | None = None, + ) -> None: + self.OssService = OssService or OssServiceImpl() + self.Validator = Validator or RuleValidator() + + async def ListSets(self) -> list[RuleSetVO]: + """列出所有规则集。""" + async with GetAsyncSession() as Session: + Result = await Session.execute( + text( + """ + SELECT + id, + rule_type, + rule_name, + domain_type, + current_version_id, + status + FROM leaudit_rule_sets + WHERE delete_time IS NULL + ORDER BY id DESC + """ + ) + ) + return [ + RuleSetVO( + id=int(Row["id"]), + ruleType=Row["rule_type"], + ruleName=Row["rule_name"], + domainType=Row["domain_type"], + currentVersionId=Row["current_version_id"], + status=Row["status"], + ) + for Row in Result.mappings().all() + ] + + async def GetVersions(self, RuleType: str) -> list[RuleVersionVO]: + """获取规则集的所有版本。""" + async with GetAsyncSession() as Session: + Result = await Session.execute( + text( + """ + SELECT + rv.id, + rv.rule_set_id, + rv.version_no, + rv.status, + rv.oss_url, + rv.change_note, + rv.published_at + FROM leaudit_rule_versions rv + JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id + WHERE rs.rule_type = :rule_type + AND rs.delete_time IS NULL + ORDER BY rv.version_seq DESC, rv.id DESC + """ + ), + {"rule_type": RuleType}, + ) + return [self._BuildRuleVersionVo(Row) for Row in Result.mappings().all()] + + async def GetContent(self, VersionId: int) -> RuleContentVO: + """获取指定版本的规则正文。""" + async with GetAsyncSession() as Session: + Result = await Session.execute( + text( + """ + SELECT + rv.id, + rv.rule_set_id, + rv.version_no, + rv.oss_url, + rs.rule_type + FROM leaudit_rule_versions rv + JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id + WHERE rv.id = :version_id + LIMIT 1 + """ + ), + {"version_id": VersionId}, + ) + Row = Result.mappings().first() + if not Row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在") + + YamlText = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8") + return RuleContentVO( + id=int(Row["id"]), + ruleSetId=int(Row["rule_set_id"]), + ruleType=Row["rule_type"], + versionNo=Row["version_no"], + yamlText=YamlText, + ossUrl=Row["oss_url"], + ) + + async def Validate(self, RuleType: str, YamlText: str) -> RuleValidationVO: + """校验规则 YAML。""" + Validation = self.Validator.ValidateYaml(YamlText) + if Validation.ruleType and Validation.ruleType != RuleType: + Validation = RuleValidationVO( + valid=False, + ruleType=Validation.ruleType, + ruleName=Validation.ruleName, + versionNo=Validation.versionNo, + ruleCount=Validation.ruleCount, + extractCount=Validation.extractCount, + errors=[f"路径规则类型 {RuleType} 与 YAML metadata.type_id {Validation.ruleType} 不一致"], + ) + return Validation + + return RuleValidationVO( + valid=Validation.valid, + ruleType=Validation.ruleType, + ruleName=Validation.ruleName, + versionNo=Validation.versionNo, + ruleCount=Validation.ruleCount, + extractCount=Validation.extractCount, + errors=Validation.errors or [], + ) + + async def CreateVersion( + self, + RuleType: str, + YamlText: str, + ChangeNote: str | None = None, + EditorUserId: int | None = None, + ) -> RuleVersionVO: + """创建规则版本。""" + Validation = await self.Validate(RuleType, YamlText) + if not Validation.valid: + raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "; ".join(Validation.errors)) + + RulesFile = self.Validator.ParseValidated(YamlText) + FileSha256 = hashlib.sha256(YamlText.encode("utf-8")).hexdigest() + FileSize = len(YamlText.encode("utf-8")) + DomainType = self._ResolveDomainType(RuleType) + + async with GetAsyncSession() as Session: + RuleSet = await self._GetRuleSetByType(Session, RuleType) + if not RuleSet: + CreateResult = await Session.execute( + text( + """ + INSERT INTO leaudit_rule_sets ( + rule_type, + rule_name, + domain_type, + description, + status, + is_builtin, + owner_user_id + ) VALUES ( + :rule_type, + :rule_name, + :domain_type, + :description, + 'draft', + false, + :owner_user_id + ) + RETURNING id, rule_type, rule_name, domain_type, current_version_id, status + """ + ), + { + "rule_type": RuleType, + "rule_name": RulesFile.metadata.name, + "domain_type": DomainType, + "description": RulesFile.metadata.description, + "owner_user_id": EditorUserId, + }, + ) + RuleSet = CreateResult.mappings().first() + else: + await Session.execute( + text( + """ + UPDATE leaudit_rule_sets + SET + rule_name = :rule_name, + domain_type = :domain_type, + description = :description, + update_time = now() + WHERE id = :rule_set_id + """ + ), + { + "rule_name": RulesFile.metadata.name, + "domain_type": DomainType, + "description": RulesFile.metadata.description, + "rule_set_id": RuleSet["id"], + }, + ) + + SeqResult = await Session.execute( + text( + """ + SELECT COALESCE(MAX(version_seq), 0) AS max_seq + FROM leaudit_rule_versions + WHERE rule_set_id = :rule_set_id + """ + ), + {"rule_set_id": RuleSet["id"]}, + ) + NextSeq = int(SeqResult.mappings().first()["max_seq"]) + 1 + VersionNo = RulesFile.metadata.version or f"v{NextSeq}" + ExistingVersion = await Session.execute( + text( + """ + SELECT id + FROM leaudit_rule_versions + WHERE rule_set_id = :rule_set_id + AND version_no = :version_no + LIMIT 1 + """ + ), + { + "rule_set_id": RuleSet["id"], + "version_no": VersionNo, + }, + ) + if ExistingVersion.mappings().first(): + raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"版本号 {VersionNo} 已存在") + + ObjectKey = OssPathUtils.BuildRuleYamlKey(RuleType, VersionNo) + OssUrl = await self.OssService.UploadText( + ObjectKey=ObjectKey, + Content=YamlText, + ContentType="application/x-yaml; charset=utf-8", + ) + + VersionResult = await Session.execute( + text( + """ + INSERT INTO leaudit_rule_versions ( + rule_set_id, + version_no, + version_seq, + status, + source_type, + dsl_format, + oss_url, + file_sha256, + file_size, + metadata_type_id, + metadata_name, + metadata_version, + change_note, + editor_user_id + ) VALUES ( + :rule_set_id, + :version_no, + :version_seq, + 'draft', + 'oss_yaml', + 'yaml', + :oss_url, + :file_sha256, + :file_size, + :metadata_type_id, + :metadata_name, + :metadata_version, + :change_note, + :editor_user_id + ) + RETURNING id, rule_set_id, version_no, status, oss_url, change_note, published_at + """ + ), + { + "rule_set_id": RuleSet["id"], + "version_no": VersionNo, + "version_seq": NextSeq, + "oss_url": OssUrl, + "file_sha256": FileSha256, + "file_size": FileSize, + "metadata_type_id": RulesFile.metadata.type_id, + "metadata_name": RulesFile.metadata.name, + "metadata_version": RulesFile.metadata.version, + "change_note": ChangeNote, + "editor_user_id": EditorUserId, + }, + ) + await Session.commit() + return self._BuildRuleVersionVo(VersionResult.mappings().first()) + + async def Publish( + self, + RuleType: str, + VersionId: int, + OperatorUserId: int | None = None, + ) -> RuleVersionVO: + """发布指定版本。""" + return await self._SwitchVersion( + RuleType=RuleType, + VersionId=VersionId, + TargetStatus="published", + OperatorUserId=OperatorUserId, + ) + + async def Rollback( + self, + RuleType: str, + VersionId: int, + OperatorUserId: int | None = None, + ) -> RuleVersionVO: + """回滚到指定历史版本。""" + return await self._SwitchVersion( + RuleType=RuleType, + VersionId=VersionId, + TargetStatus="published", + OperatorUserId=OperatorUserId, + Rollback=True, + ) + + async def ListBindings(self, RuleType: str | None = None) -> list[RuleBindingVO]: + """列出规则类型绑定,可按规则类型过滤。""" + async with GetAsyncSession() as Session: + if RuleType: + Result = await Session.execute( + text( + """ + SELECT + b.id, + b.doc_type_id, + b.doc_type_code, + b.rule_set_id, + b.binding_mode, + b.priority, + b.is_active, + b.note, + rs.rule_type, + rs.rule_name + FROM leaudit_rule_type_bindings b + JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id + WHERE rs.rule_type = :rule_type + AND rs.delete_time IS NULL + ORDER BY b.priority DESC, b.id DESC + """ + ), + {"rule_type": RuleType}, + ) + else: + Result = await Session.execute( + text( + """ + SELECT + b.id, + b.doc_type_id, + b.doc_type_code, + b.rule_set_id, + b.binding_mode, + b.priority, + b.is_active, + b.note, + rs.rule_type, + rs.rule_name + FROM leaudit_rule_type_bindings b + JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id + WHERE rs.delete_time IS NULL + ORDER BY rs.rule_type, b.priority DESC, b.id DESC + """ + ), + ) + return [ + RuleBindingVO( + id=int(Row["id"]), + docTypeId=int(Row["doc_type_id"]), + docTypeCode=Row["doc_type_code"], + ruleSetId=int(Row["rule_set_id"]), + ruleType=Row["rule_type"], + ruleName=Row["rule_name"], + bindingMode=Row["binding_mode"], + priority=int(Row["priority"]), + isActive=bool(Row["is_active"]), + note=Row["note"], + ) + for Row in Result.mappings().all() + ] + + async def CreateBinding( + self, + DocTypeId: int, + RuleSetId: int, + BindingMode: str = "explicit", + Priority: int = 0, + DocTypeCode: str | None = None, + Note: str | None = None, + ) -> RuleBindingVO: + """创建规则类型绑定。""" + async with GetAsyncSession() as Session: + RuleSet = await Session.execute( + text("SELECT id, rule_type, rule_name FROM leaudit_rule_sets WHERE id = :rid AND delete_time IS NULL LIMIT 1"), + {"rid": RuleSetId}, + ) + RsRow = RuleSet.mappings().first() + if not RsRow: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") + + Existing = await Session.execute( + text( + """ + SELECT id FROM leaudit_rule_type_bindings + WHERE doc_type_id = :dtid AND rule_set_id = :rsid + LIMIT 1 + """ + ), + {"dtid": DocTypeId, "rsid": RuleSetId}, + ) + if Existing.mappings().first(): + raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型已绑定此规则集") + + Result = await Session.execute( + text( + """ + INSERT INTO leaudit_rule_type_bindings ( + doc_type_id, + doc_type_code, + rule_set_id, + binding_mode, + priority, + is_active, + note + ) VALUES ( + :doc_type_id, + :doc_type_code, + :rule_set_id, + :binding_mode, + :priority, + true, + :note + ) + RETURNING id, doc_type_id, doc_type_code, rule_set_id, + binding_mode, priority, is_active, note + """ + ), + { + "doc_type_id": DocTypeId, + "doc_type_code": DocTypeCode, + "rule_set_id": RuleSetId, + "binding_mode": BindingMode, + "priority": Priority, + "note": Note, + }, + ) + await Session.commit() + Row = Result.mappings().first() + return RuleBindingVO( + id=int(Row["id"]), + docTypeId=int(Row["doc_type_id"]), + docTypeCode=Row["doc_type_code"], + ruleSetId=int(Row["rule_set_id"]), + ruleType=RsRow["rule_type"], + ruleName=RsRow["rule_name"], + bindingMode=Row["binding_mode"], + priority=int(Row["priority"]), + isActive=bool(Row["is_active"]), + note=Row["note"], + ) + + async def UpdateBinding( + self, + BindingId: int, + IsActive: bool | None = None, + Priority: int | None = None, + BindingMode: str | None = None, + Note: str | None = None, + ) -> RuleBindingVO: + """更新规则类型绑定。""" + async with GetAsyncSession() as Session: + Existing = await Session.execute( + text( + """ + SELECT + b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, + b.binding_mode, b.priority, b.is_active, b.note, + rs.rule_type, rs.rule_name + FROM leaudit_rule_type_bindings b + JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id + WHERE b.id = :bid + LIMIT 1 + """ + ), + {"bid": BindingId}, + ) + Row = Existing.mappings().first() + if not Row: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在") + + SetClauses: list[str] = [] + Params: dict[str, object] = {"bid": BindingId} + + if IsActive is not None: + SetClauses.append("is_active = :is_active") + Params["is_active"] = IsActive + if Priority is not None: + SetClauses.append("priority = :priority") + Params["priority"] = Priority + if BindingMode is not None: + SetClauses.append("binding_mode = :binding_mode") + Params["binding_mode"] = BindingMode + if Note is not None: + SetClauses.append("note = :note") + Params["note"] = Note + + if SetClauses: + SetClauses.append("update_time = now()") + await Session.execute( + text(f"UPDATE leaudit_rule_type_bindings SET {', '.join(SetClauses)} WHERE id = :bid"), + Params, + ) + await Session.commit() + + Result = await Session.execute( + text( + """ + SELECT + b.id, b.doc_type_id, b.doc_type_code, b.rule_set_id, + b.binding_mode, b.priority, b.is_active, b.note, + rs.rule_type, rs.rule_name + FROM leaudit_rule_type_bindings b + JOIN leaudit_rule_sets rs ON rs.id = b.rule_set_id + WHERE b.id = :bid + LIMIT 1 + """ + ), + {"bid": BindingId}, + ) + Row = Result.mappings().first() + return RuleBindingVO( + id=int(Row["id"]), + docTypeId=int(Row["doc_type_id"]), + docTypeCode=Row["doc_type_code"], + ruleSetId=int(Row["rule_set_id"]), + ruleType=Row["rule_type"], + ruleName=Row["rule_name"], + bindingMode=Row["binding_mode"], + priority=int(Row["priority"]), + isActive=bool(Row["is_active"]), + note=Row["note"], + ) + + async def DeleteBinding(self, BindingId: int) -> None: + """删除规则类型绑定。""" + async with GetAsyncSession() as Session: + Result = await Session.execute( + text("DELETE FROM leaudit_rule_type_bindings WHERE id = :bid"), + {"bid": BindingId}, + ) + await Session.commit() + if Result.rowcount == 0: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "绑定记录不存在") + + async def _SwitchVersion( + self, + *, + RuleType: str, + VersionId: int, + TargetStatus: str, + OperatorUserId: int | None = None, + Rollback: bool = False, + ) -> RuleVersionVO: + """切换当前生效版本。""" + async with GetAsyncSession() as Session: + RuleSet = await self._GetRuleSetByType(Session, RuleType) + if not RuleSet: + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在") + + VersionRow = await self._GetVersion(Session, VersionId) + if not VersionRow or int(VersionRow["rule_set_id"]) != int(RuleSet["id"]): + raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在或不属于当前规则集") + + await Session.execute( + text( + """ + UPDATE leaudit_rule_versions + SET + status = CASE + WHEN id = :version_id THEN :target_status + WHEN status = 'published' THEN :previous_status + ELSE status + END, + publisher_user_id = CASE + WHEN id = :version_id THEN :operator_user_id + ELSE publisher_user_id + END, + published_at = CASE + WHEN id = :version_id THEN now() + ELSE published_at + END, + update_time = now() + WHERE rule_set_id = :rule_set_id + """ + ), + { + "version_id": VersionId, + "target_status": TargetStatus, + "previous_status": "rollback" if Rollback else "deprecated", + "operator_user_id": OperatorUserId, + "rule_set_id": RuleSet["id"], + }, + ) + await Session.execute( + text( + """ + UPDATE leaudit_rule_sets + SET + current_version_id = :version_id, + status = 'active', + update_time = now() + WHERE id = :rule_set_id + """ + ), + { + "version_id": VersionId, + "rule_set_id": RuleSet["id"], + }, + ) + await Session.commit() + + VersionRow = await self._GetVersion(Session, VersionId) + return self._BuildRuleVersionVo(VersionRow) + + async def _GetRuleSetByType(self, Session, RuleType: str): + """按规则类型查询规则集。""" + Result = await Session.execute( + text( + """ + SELECT id, rule_type, rule_name, domain_type, current_version_id, status + FROM leaudit_rule_sets + WHERE rule_type = :rule_type + AND delete_time IS NULL + LIMIT 1 + """ + ), + {"rule_type": RuleType}, + ) + return Result.mappings().first() + + async def _GetVersion(self, Session, VersionId: int): + """按版本 ID 查询规则版本。""" + Result = await Session.execute( + text( + """ + SELECT + id, + rule_set_id, + version_no, + status, + oss_url, + change_note, + published_at + FROM leaudit_rule_versions + WHERE id = :version_id + LIMIT 1 + """ + ), + {"version_id": VersionId}, + ) + return Result.mappings().first() + + def _BuildRuleVersionVo(self, Row) -> RuleVersionVO: + """构造规则版本响应。""" + return RuleVersionVO( + id=int(Row["id"]), + ruleSetId=int(Row["rule_set_id"]), + versionNo=Row["version_no"], + status=Row["status"], + ossUrl=Row["oss_url"], + changeNote=Row["change_note"], + publishedAt=Row["published_at"].isoformat() if Row["published_at"] else None, + ) + + def _ResolveDomainType(self, RuleType: str) -> str: + """按规则类型推导领域类型。""" + Prefix = RuleType.split(".", 1)[0] + Mapping = { + "contract": "contract", + "admin_license": "admin_license", + "legal_doc": "legal_doc", + } + return Mapping.get(Prefix, Prefix or "unknown")