Files
2026-05-25 09:50:01 +08:00

1714 lines
73 KiB
Python

"""规则服务实现。"""
from __future__ import annotations
import hashlib
import logging
import time
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum
from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from sqlalchemy import text
from fastapi_modules.fastapi_leaudit.domian.vo.ruleVo import (
RuleBindingVO,
RuleContentVO,
RuleSetVO,
RuleValidationVO,
RuleVersionVO,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleValidator import RuleValidator
from fastapi_modules.fastapi_leaudit.services import IOssService, IRuleService
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
from fastapi_modules.fastapi_leaudit.services.impl.ruleGroupSupport import sync_doc_type_bindings_from_group
from fastapi_modules.fastapi_leaudit.services.impl.ruleTenantScope import normalize_scoped_tenant_code, pick_effective_scoped_row
from fastapi_modules.fastapi_leaudit.services.impl.ssoUserCompat import SsoUserCompat
from fastapi_modules.fastapi_leaudit.services.impl.tenantResolver import TenantResolver
class RuleServiceImpl(IRuleService):
"""规则服务实现。"""
_GLOBAL_LIST_SETS_CACHE: tuple[float, list[RuleSetVO]] | None = None
_GLOBAL_RULE_COUNT_CACHE: dict[int, tuple[float, int]] = {}
def __init__(
self,
OssService: IOssService | None = None,
Validator: RuleValidator | None = None,
) -> None:
self.OssService = OssService or OssServiceImpl()
self.Validator = Validator or RuleValidator()
self.TenantResolver = TenantResolver()
self._list_sets_cache = self.__class__._GLOBAL_LIST_SETS_CACHE
self._rule_count_cache = self.__class__._GLOBAL_RULE_COUNT_CACHE
self._entry_module_tenant_table_exists_cache: bool | None = None
self._column_exists_cache: dict[str, bool] = {}
@classmethod
def _set_list_sets_cache(cls, value: tuple[float, list[RuleSetVO]] | None) -> None:
cls._GLOBAL_LIST_SETS_CACHE = value
def InvalidateCaches(self, version_ids: list[int] | None = None) -> None:
"""清理规则集及规则数量缓存。"""
self.__class__._set_list_sets_cache(None)
if version_ids is None:
self._rule_count_cache.clear()
return
for version_id in {int(item) for item in version_ids if item is not None}:
self._rule_count_cache.pop(version_id, None)
async def _resolve_unique_child_group_id(self, Session, DocTypeId: int) -> int | None:
"""仅当文档类型唯一对应一个二级分组时,返回该分组ID。"""
row = (
await Session.execute(
text(
"""
SELECT CASE WHEN COUNT(*) = 1 THEN MIN(id) END AS group_id
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND deleted_at IS NULL
AND COALESCE(pid, 0) <> 0
"""
),
{"doc_type_id": DocTypeId},
)
).mappings().first()
return int(row["group_id"]) if row and row.get("group_id") is not None else None
async def _count_child_groups(self, Session, DocTypeId: int) -> int:
"""统计文档类型下已启用的二级分组数量,用于明确报错。"""
row = (
await Session.execute(
text(
"""
SELECT COUNT(*) AS total
FROM leaudit_evaluation_point_groups
WHERE document_type_id = :doc_type_id
AND deleted_at IS NULL
AND COALESCE(pid, 0) <> 0
"""
),
{"doc_type_id": DocTypeId},
)
).mappings().first()
return int(row["total"] or 0) if row else 0
async def ListSets(self, CurrentUserId: int | None = None) -> list[RuleSetVO]:
"""列出所有规则集。"""
now = time.monotonic()
cached = self.__class__._GLOBAL_LIST_SETS_CACHE
if CurrentUserId is None and cached and now - cached[0] <= 30:
return cached[1]
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
if await self._column_exists(Session, "leaudit_rule_sets", "tenant_code"):
rows = await self._load_scoped_rule_set_rows(Session, current_user=current_user)
else:
Result = await Session.execute(
text(
"""
SELECT
rs.id,
rs.rule_type,
rs.rule_name,
rs.domain_type,
rs.current_version_id,
current_rv.id AS usable_current_version_id,
fallback_rv.id AS fallback_version_id,
CASE
WHEN current_rv.id IS NOT NULL OR fallback_rv.id IS NOT NULL THEN TRUE
ELSE FALSE
END AS has_usable_version,
rs.status
FROM leaudit_rule_sets rs
LEFT JOIN leaudit_rule_versions current_rv
ON current_rv.id = rs.current_version_id
AND current_rv.status = 'published'
LEFT JOIN LATERAL (
SELECT rv.id
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = rs.id
AND rv.status = 'published'
AND (rs.current_version_id IS NULL OR rv.id <> rs.current_version_id)
ORDER BY rv.version_seq DESC, rv.id DESC
LIMIT 1
) fallback_rv ON TRUE
WHERE rs.deleted_at IS NULL
ORDER BY rs.id DESC
"""
)
)
rows = Result.mappings().all()
usable_counts: dict[int, int] = {}
for row in rows:
usable_version_id = row["usable_current_version_id"] or row["fallback_version_id"]
if usable_version_id is not None and int(usable_version_id) not in usable_counts:
usable_counts[int(usable_version_id)] = await self._GetRuleCountByVersionId(int(usable_version_id))
items = [
RuleSetVO(
id=int(Row["id"]),
ruleType=Row["rule_type"],
ruleName=Row["rule_name"],
effectiveTenantCode=str(Row.get("tenant_code") or "").strip() or None,
effectiveScopeType="PUBLIC"
if str(Row.get("tenant_code") or "").strip().upper() == "PUBLIC"
else "PROVINCIAL"
if str(Row.get("tenant_code") or "").strip().upper() in {"", "PROVINCIAL"}
else "TENANT",
isInherited=bool(current_user)
and not bool(current_user.get("is_global"))
and bool(str(current_user.get("tenant_code") or "").strip())
and str(Row.get("tenant_code") or "").strip().upper()
not in {"", str(current_user.get("tenant_code") or "").strip().upper()},
sourceRuleSetId=int(Row["source_rule_set_id"]) if Row.get("source_rule_set_id") is not None else None,
domainType=Row["domain_type"],
currentVersionId=Row["current_version_id"],
fallbackVersionId=Row["fallback_version_id"],
hasUsableVersion=bool(Row["has_usable_version"]),
usableRuleCount=usable_counts.get(int(Row["usable_current_version_id"] or Row["fallback_version_id"]), 0)
if (Row["usable_current_version_id"] or Row["fallback_version_id"]) is not None
else 0,
status=Row["status"],
)
for Row in rows
]
cache_value = (time.monotonic(), items)
if CurrentUserId is None:
self.__class__._set_list_sets_cache(cache_value)
self._list_sets_cache = cache_value
return items
async def _GetRuleCountByVersionId(self, VersionId: int) -> int:
"""读取指定可用规则版本的规则数。"""
cached = self._rule_count_cache.get(VersionId)
now = time.monotonic()
if cached and now - cached[0] <= 120:
return cached[1]
async with GetAsyncSession() as Session:
Result = await Session.execute(
text(
"""
SELECT oss_url
FROM leaudit_rule_versions
WHERE id = :version_id
LIMIT 1
"""
),
{"version_id": VersionId},
)
Row = Result.mappings().first()
if not Row or not Row["oss_url"]:
return 0
try:
yaml_text = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8")
validation = self.Validator.ValidateYaml(yaml_text)
count = int(validation.ruleCount or 0)
self._rule_count_cache[VersionId] = (time.monotonic(), count)
return count
except Exception:
return 0
async def GetVersions(self, RuleType: str, CurrentUserId: int | None = None) -> list[RuleVersionVO]:
"""获取规则集的所有版本。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
if await self._column_exists(Session, "leaudit_rule_sets", "tenant_code"):
rule_set_row = await self._get_effective_rule_set_row_by_type(Session, RuleType, current_user=current_user)
if not rule_set_row:
return []
Result = await Session.execute(
text(
"""
SELECT
rv.id,
rv.rule_set_id,
rv.version_no,
rv.status,
rv.oss_url,
rv.change_note,
rv.published_at
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = :rule_set_id
AND rv.deleted_at IS NULL
ORDER BY rv.version_seq DESC, rv.id DESC
"""
),
{"rule_set_id": int(rule_set_row["id"])},
)
else:
Result = await Session.execute(
text(
"""
SELECT
rv.id,
rv.rule_set_id,
rv.version_no,
rv.status,
rv.oss_url,
rv.change_note,
rv.published_at
FROM leaudit_rule_versions rv
JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id
WHERE rs.rule_type = :rule_type
AND rs.deleted_at IS NULL
AND rv.deleted_at IS NULL
ORDER BY rv.version_seq DESC, rv.id DESC
"""
),
{"rule_type": RuleType},
)
return [self._BuildRuleVersionVo(Row) for Row in Result.mappings().all()]
async def GetContent(self, VersionId: int, CurrentUserId: int | None = None) -> RuleContentVO:
"""获取指定版本的规则正文。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
Result = await Session.execute(
text(
"""
SELECT
rv.id,
rv.rule_set_id,
rv.version_no,
rv.oss_url,
rs.rule_type
FROM leaudit_rule_versions rv
JOIN leaudit_rule_sets rs ON rs.id = rv.rule_set_id
WHERE rv.id = :version_id
AND rv.deleted_at IS NULL
LIMIT 1
"""
),
{"version_id": VersionId},
)
Row = Result.mappings().first()
if not Row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在")
if await self._column_exists(Session, "leaudit_rule_sets", "tenant_code"):
effective_rule_set = await self._get_effective_rule_set_row_by_type(
Session,
str(Row["rule_type"] or ""),
current_user=current_user,
)
if effective_rule_set is None or int(effective_rule_set["id"]) != int(Row["rule_set_id"]):
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不可访问该规则版本")
YamlText = (await self.OssService.DownloadBytes(Row["oss_url"])).decode("utf-8")
return RuleContentVO(
id=int(Row["id"]),
ruleSetId=int(Row["rule_set_id"]),
ruleType=Row["rule_type"],
versionNo=Row["version_no"],
yamlText=YamlText,
ossUrl=Row["oss_url"],
)
async def Validate(self, RuleType: str, YamlText: str) -> RuleValidationVO:
"""校验规则 YAML。"""
Validation = self.Validator.ValidateYaml(YamlText)
if Validation.ruleType and Validation.ruleType != RuleType:
Validation = RuleValidationVO(
valid=False,
ruleType=Validation.ruleType,
ruleName=Validation.ruleName,
versionNo=Validation.versionNo,
ruleCount=Validation.ruleCount,
extractCount=Validation.extractCount,
errors=[f"路径规则类型 {RuleType} 与 YAML metadata.type_id {Validation.ruleType} 不一致"],
)
return Validation
return RuleValidationVO(
valid=Validation.valid,
ruleType=Validation.ruleType,
ruleName=Validation.ruleName,
versionNo=Validation.versionNo,
ruleCount=Validation.ruleCount,
extractCount=Validation.extractCount,
errors=Validation.errors or [],
)
def _pick_writable_rule_set_row(
self,
rows: list[dict[str, object]],
*,
current_user: dict[str, object] | None,
) -> dict[str, object] | None:
if not rows:
return None
if current_user is None:
return dict(rows[0])
current_user_tenant = str(current_user.get("tenant_code") or "").strip().upper()
if current_user.get("is_global") and not current_user_tenant:
effective = pick_effective_scoped_row(rows, "PROVINCIAL")
return dict(effective) if effective is not None else dict(rows[0])
effective = pick_effective_scoped_row(rows, current_user_tenant or None)
if effective is None:
return None
normalized = str(effective.get("tenant_code") or "").strip().upper() or "PROVINCIAL"
if normalized == "PUBLIC":
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不能直接修改公共规则资产")
return dict(effective)
def _assert_version_belongs_to_writable_rule_set(
self,
version_row: dict[str, object] | None,
writable_rule_set: dict[str, object] | None,
) -> None:
if version_row is None or writable_rule_set is None:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在或当前租户无可写规则集")
if int(version_row["rule_set_id"]) != int(writable_rule_set["id"]):
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不能发布或回滚其他租户的规则版本")
def _assert_rollback_target(
self,
version_row: dict[str, object] | None,
rule_set_row: dict[str, object] | None,
) -> None:
if version_row is None or rule_set_row is None:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则版本不存在或当前租户无可写规则集")
current_version_id = rule_set_row.get("current_version_id")
if current_version_id is not None and int(version_row["id"]) == int(current_version_id):
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前版本已经生效,无需回滚")
status = str(version_row.get("status") or "").strip().lower()
if status in {"draft", "validated"}:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "草稿版本不能作为回滚目标,请先发布或选择历史版本")
async def CreateVersion(
self,
RuleType: str,
YamlText: str,
ChangeNote: str | None = None,
EditorUserId: int | None = None,
CurrentUserId: int | None = None,
) -> RuleVersionVO:
"""创建规则版本。"""
Validation = await self.Validate(RuleType, YamlText)
if not Validation.valid:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "; ".join(Validation.errors))
RulesFile = self.Validator.ParseValidated(YamlText)
FileSha256 = hashlib.sha256(YamlText.encode("utf-8")).hexdigest()
FileSize = len(YamlText.encode("utf-8"))
DomainType = self._ResolveDomainType(RuleType)
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
use_tenant_scope = await self._column_exists(Session, "leaudit_rule_sets", "tenant_code")
self._assert_rule_tenant_schema_ready_for_write(use_tenant_scope=use_tenant_scope, current_user=current_user)
writable_tenant_code: str | None = None
writable_scope_type = "PROVINCIAL"
source_rule_set_id: int | None = None
source_rule_set_binding_ids: list[int] = []
if use_tenant_scope:
candidate_rows = await self._load_rule_set_rows_by_type(Session, RuleType)
writable_rule_set = self._pick_writable_rule_set_row(candidate_rows, current_user=current_user)
RuleSet = writable_rule_set
if current_user and not current_user.get("is_global"):
writable_tenant_code = str(current_user.get("tenant_code") or "").strip().upper() or None
writable_scope_type = "TENANT"
elif RuleSet is not None:
writable_tenant_code = str(RuleSet.get("tenant_code") or "").strip().upper() or "PROVINCIAL"
writable_scope_type = "PROVINCIAL" if writable_tenant_code != "PUBLIC" else "PUBLIC"
if RuleSet is not None:
normalized_rule_set_tenant = str(RuleSet.get("tenant_code") or "").strip().upper() or "PROVINCIAL"
if (
writable_scope_type == "TENANT"
and writable_tenant_code
and normalized_rule_set_tenant != writable_tenant_code
):
source_rule_set_id = int(RuleSet["id"])
source_rule_set_binding_ids = await self._load_source_group_binding_ids(Session, source_rule_set_id)
RuleSet = None
else:
RuleSet = await self._GetRuleSetByType(Session, RuleType)
if not RuleSet:
if use_tenant_scope:
CreateResult = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_sets (
rule_type,
rule_name,
domain_type,
description,
region,
status,
is_builtin,
owner_user_id,
tenant_code,
scope_type,
source_rule_set_id,
tenant_name_snapshot
) VALUES (
:rule_type,
:rule_name,
:domain_type,
:description,
:region,
'draft',
false,
:owner_user_id,
:tenant_code,
:scope_type,
:source_rule_set_id,
:tenant_name_snapshot
)
RETURNING id, rule_type, rule_name, domain_type, current_version_id, status, tenant_code, source_rule_set_id
"""
),
{
"rule_type": RuleType,
"rule_name": RulesFile.metadata.name,
"domain_type": DomainType,
"description": RulesFile.metadata.description,
"region": self._legacy_region_for_scope(writable_tenant_code, writable_scope_type),
"owner_user_id": EditorUserId,
"tenant_code": writable_tenant_code or "PROVINCIAL",
"scope_type": writable_scope_type,
"source_rule_set_id": source_rule_set_id,
"tenant_name_snapshot": str(current_user.get("tenant_name") or "") if current_user else None,
},
)
else:
CreateResult = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_sets (
rule_type,
rule_name,
domain_type,
description,
status,
is_builtin,
owner_user_id
) VALUES (
:rule_type,
:rule_name,
:domain_type,
:description,
'draft',
false,
:owner_user_id
)
RETURNING id, rule_type, rule_name, domain_type, current_version_id, status
"""
),
{
"rule_type": RuleType,
"rule_name": RulesFile.metadata.name,
"domain_type": DomainType,
"description": RulesFile.metadata.description,
"owner_user_id": EditorUserId,
},
)
RuleSet = CreateResult.mappings().first()
if use_tenant_scope and writable_scope_type == "TENANT" and source_rule_set_binding_ids:
await self._clone_source_group_bindings_for_tenant(
Session,
source_binding_ids=source_rule_set_binding_ids,
tenant_rule_set_id=int(RuleSet["id"]),
current_user=current_user,
)
else:
await Session.execute(
text(
"""
UPDATE leaudit_rule_sets
SET
rule_name = :rule_name,
domain_type = :domain_type,
description = :description,
updated_at = now()
WHERE id = :rule_set_id
"""
),
{
"rule_name": RulesFile.metadata.name,
"domain_type": DomainType,
"description": RulesFile.metadata.description,
"rule_set_id": RuleSet["id"],
},
)
SeqResult = await Session.execute(
text(
"""
SELECT COALESCE(MAX(version_seq), 0) AS max_seq
FROM leaudit_rule_versions
WHERE rule_set_id = :rule_set_id
"""
),
{"rule_set_id": RuleSet["id"]},
)
NextSeq = int(SeqResult.mappings().first()["max_seq"]) + 1
VersionNo = RulesFile.metadata.version or f"v{NextSeq}"
source_version_id: int | None = None
if source_rule_set_id is not None:
source_version_id = await self._load_latest_version_id_by_rule_set(Session, source_rule_set_id)
ExistingVersion = await Session.execute(
text(
"""
SELECT id
FROM leaudit_rule_versions
WHERE rule_set_id = :rule_set_id
AND version_no = :version_no
LIMIT 1
"""
),
{
"rule_set_id": RuleSet["id"],
"version_no": VersionNo,
},
)
if ExistingVersion.mappings().first():
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, f"版本号 {VersionNo} 已存在")
ObjectKey = OssPathUtils.BuildRuleYamlKey(RuleType, VersionNo)
OssUrl = await self.OssService.UploadText(
ObjectKey=ObjectKey,
Content=YamlText,
ContentType="application/x-yaml; charset=utf-8",
)
VersionResult = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_versions (
rule_set_id,
version_no,
version_seq,
status,
source_type,
dsl_format,
oss_url,
file_sha256,
file_size,
metadata_type_id,
metadata_name,
metadata_version,
change_note,
editor_user_id
) VALUES (
:rule_set_id,
:version_no,
:version_seq,
'draft',
'oss_yaml',
'yaml',
:oss_url,
:file_sha256,
:file_size,
:metadata_type_id,
:metadata_name,
:metadata_version,
:change_note,
:editor_user_id
)
RETURNING id, rule_set_id, version_no, status, oss_url, change_note, published_at
"""
),
{
"rule_set_id": RuleSet["id"],
"version_no": VersionNo,
"version_seq": NextSeq,
"oss_url": OssUrl,
"file_sha256": FileSha256,
"file_size": FileSize,
"metadata_type_id": RulesFile.metadata.type_id,
"metadata_name": RulesFile.metadata.name,
"metadata_version": RulesFile.metadata.version,
"change_note": ChangeNote,
"editor_user_id": EditorUserId,
},
)
version_row = VersionResult.mappings().first()
if use_tenant_scope and await self._column_exists(Session, "leaudit_rule_versions", "tenant_code_snapshot"):
snapshot = self._build_version_scope_snapshot(
writable_tenant_code=writable_tenant_code,
writable_scope_type=writable_scope_type,
rule_set_row=dict(RuleSet),
source_version_id=source_version_id,
)
await Session.execute(
text(
"""
UPDATE leaudit_rule_versions
SET
tenant_code_snapshot = :tenant_code_snapshot,
scope_type_snapshot = :scope_type_snapshot,
source_version_id = :source_version_id
WHERE id = :version_id
"""
),
{
"tenant_code_snapshot": snapshot["tenant_code_snapshot"],
"scope_type_snapshot": snapshot["scope_type_snapshot"],
"source_version_id": snapshot["source_version_id"],
"version_id": int(version_row["id"]),
},
)
await Session.commit()
return self._BuildRuleVersionVo(version_row)
async def Publish(
self,
RuleType: str,
VersionId: int,
OperatorUserId: int | None = None,
CurrentUserId: int | None = None,
) -> RuleVersionVO:
"""发布指定版本。"""
return await self._SwitchVersion(
RuleType=RuleType,
VersionId=VersionId,
TargetStatus="published",
OperatorUserId=OperatorUserId,
CurrentUserId=CurrentUserId,
)
async def Rollback(
self,
RuleType: str,
VersionId: int,
OperatorUserId: int | None = None,
CurrentUserId: int | None = None,
) -> RuleVersionVO:
"""回滚到指定历史版本。"""
return await self._SwitchVersion(
RuleType=RuleType,
VersionId=VersionId,
TargetStatus="published",
OperatorUserId=OperatorUserId,
CurrentUserId=CurrentUserId,
Rollback=True,
)
async def ListBindings(
self,
RuleType: str | None = None,
Region: str | None = None,
CurrentUserId: int | None = None,
) -> list[RuleBindingVO]:
"""列出规则类型绑定,只读取新分组绑定。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
params: dict[str, object] = {}
filters = ["rs.deleted_at IS NULL", "dt.deleted_at IS NULL", "child.deleted_at IS NULL", "rgb.deleted_at IS NULL"]
filters.append(
await self._entry_module_tenant_scope_sql(
session=Session,
entry_module_expr="COALESCE(child.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=current_user,
params=params,
param_prefix="rule_binding_scope",
)
)
if RuleType:
filters.append("rs.rule_type = :rule_type")
params["rule_type"] = RuleType
where_clause = " AND ".join(filters)
result = await Session.execute(
text(
f"""
SELECT
rgb.id,
dt.id AS doc_type_id,
dt.code AS doc_type_code,
rgb.rule_set_id,
'explicit' AS binding_mode,
rgb.priority,
rgb.is_active,
rgb.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = child.pid AND parent.deleted_at IS NULL
JOIN leaudit_document_types dt ON dt.id = child.document_type_id
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
WHERE {where_clause}
AND COALESCE(child.pid, 0) <> 0
ORDER BY dt.id ASC, COALESCE(child.sort_order, 0) ASC, rgb.priority DESC, rgb.id DESC
"""
),
params,
)
rows = result.mappings().all()
bindings: list[RuleBindingVO] = []
seen_doc_type_rule_pairs: set[tuple[int, int]] = set()
for row in rows:
pair = (int(row["doc_type_id"]), int(row["rule_set_id"]))
if pair in seen_doc_type_rule_pairs:
continue
seen_doc_type_rule_pairs.add(pair)
bindings.append(
RuleBindingVO(
id=int(row["id"]),
docTypeId=int(row["doc_type_id"]),
docTypeCode=row["doc_type_code"],
ruleSetId=int(row["rule_set_id"]),
ruleType=row["rule_type"],
ruleName=row["rule_name"],
bindingMode=row["binding_mode"],
priority=int(row["priority"]),
isActive=bool(row["is_active"]),
note=row["note"],
)
)
return bindings
async def CreateBinding(
self,
DocTypeId: int,
RuleSetId: int,
Region: str = "default",
BindingMode: str = "explicit",
Priority: int = 0,
DocTypeCode: str | None = None,
Note: str | None = None,
CurrentUserId: int | None = None,
) -> RuleBindingVO:
"""创建规则类型绑定;若文档类型唯一对应一个二级分组,则优先写入新分组绑定。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
RuleSet = await Session.execute(
text("SELECT id, rule_type, rule_name FROM leaudit_rule_sets WHERE id = :rid AND deleted_at IS NULL LIMIT 1"),
{"rid": RuleSetId},
)
RsRow = RuleSet.mappings().first()
if not RsRow:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在")
await self._assert_document_type_access(Session, DocTypeId, current_user)
GroupId = await self._resolve_unique_accessible_child_group_id(Session, DocTypeId, current_user)
if GroupId is not None:
binding_scope = self._build_group_binding_scope_payload(current_user)
ExistingGroupBinding = await Session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND rule_set_id = :rule_set_id
AND COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') = :tenant_code
AND deleted_at IS NULL
LIMIT 1
"""
),
{
"group_id": GroupId,
"rule_set_id": RuleSetId,
"tenant_code": binding_scope["tenant_code"],
},
)
if ExistingGroupBinding.mappings().first():
raise LeauditException(StatusCodeEnum.HTTP_409_CONFLICT, "该文档类型对应子组已绑定此规则集")
Result = await Session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id,
rule_set_id,
tenant_code,
scope_type,
tenant_name_snapshot,
priority,
is_active,
note,
created_at,
updated_at
) VALUES (
:group_id,
:rule_set_id,
:tenant_code,
:scope_type,
:tenant_name_snapshot,
:priority,
true,
:note,
NOW(),
NOW()
)
RETURNING id, rule_set_id, priority, is_active, note
"""
),
{
"group_id": GroupId,
"rule_set_id": RuleSetId,
"tenant_code": binding_scope["tenant_code"],
"scope_type": binding_scope["scope_type"],
"tenant_name_snapshot": binding_scope["tenant_name_snapshot"],
"priority": Priority,
"note": Note,
},
)
await sync_doc_type_bindings_from_group(Session, GroupId)
await Session.commit()
Row = Result.mappings().first()
return RuleBindingVO(
id=int(Row["id"]),
docTypeId=DocTypeId,
docTypeCode=DocTypeCode,
ruleSetId=int(Row["rule_set_id"]),
ruleType=RsRow["rule_type"],
ruleName=RsRow["rule_name"],
bindingMode="explicit",
priority=int(Row["priority"]),
isActive=bool(Row["is_active"]),
note=Row["note"],
)
child_group_count = await self._count_accessible_child_groups(Session, DocTypeId, current_user)
if child_group_count == 0:
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"当前文档类型尚未创建运行子类型,无法绑定规则集;请先在评查点分组管理中补齐二级分组",
)
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"当前文档类型存在多个运行子类型,不能再直接按文档类型绑定规则集;请改为在对应二级分组下绑定规则集",
)
async def UpdateBinding(
self,
BindingId: int,
IsActive: bool | None = None,
Priority: int | None = None,
BindingMode: str | None = None,
Note: str | None = None,
CurrentUserId: int | None = None,
) -> RuleBindingVO:
"""更新规则类型绑定;若绑定ID来自新分组绑定,则优先更新新表。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
GroupRow = await self._get_binding_row(Session, BindingId, current_user)
if GroupRow:
await Session.execute(
text(
"""
UPDATE leaudit_rule_group_bindings
SET priority = :priority,
is_active = :is_active,
note = :note,
updated_at = NOW()
WHERE id = :binding_id
"""
),
{
"binding_id": BindingId,
"priority": Priority if Priority is not None else int(GroupRow.get("priority") or 0),
"is_active": IsActive if IsActive is not None else bool(GroupRow.get("is_active", True)),
"note": Note if Note is not None else GroupRow.get("note"),
},
)
await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"]))
await Session.commit()
refreshed = await self._get_binding_row(Session, BindingId, current_user)
return RuleBindingVO(
id=int(refreshed["id"]),
docTypeId=int(refreshed["doc_type_id"]),
docTypeCode=refreshed["doc_type_code"],
ruleSetId=int(refreshed["rule_set_id"]),
ruleType=refreshed["rule_type"],
ruleName=refreshed["rule_name"],
bindingMode=BindingMode or "explicit",
priority=int(refreshed["priority"]),
isActive=bool(refreshed["is_active"]),
note=refreshed["note"],
)
raise LeauditException(
StatusCodeEnum.HTTP_404_NOT_FOUND,
"绑定记录不存在或已迁移到新分组绑定,请刷新列表后重试",
)
async def DeleteBinding(self, BindingId: int, CurrentUserId: int | None = None) -> None:
"""删除规则类型绑定;若绑定ID来自新分组绑定,则优先删除新表。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
GroupRow = await self._get_binding_row(Session, BindingId, current_user)
if GroupRow:
await Session.execute(
text("UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE id = :bid"),
{"bid": BindingId},
)
await sync_doc_type_bindings_from_group(Session, int(GroupRow["group_id"]))
await Session.commit()
return
raise LeauditException(
StatusCodeEnum.HTTP_404_NOT_FOUND,
"绑定记录不存在或已迁移到新分组绑定,请刷新列表后重试",
)
async def _SwitchVersion(
self,
*,
RuleType: str,
VersionId: int,
TargetStatus: str,
OperatorUserId: int | None = None,
CurrentUserId: int | None = None,
Rollback: bool = False,
) -> RuleVersionVO:
"""切换当前生效版本。"""
async with GetAsyncSession() as Session:
current_user = await self._get_current_user_context(Session, CurrentUserId)
use_tenant_scope = await self._column_exists(Session, "leaudit_rule_sets", "tenant_code")
self._assert_rule_tenant_schema_ready_for_write(use_tenant_scope=use_tenant_scope, current_user=current_user)
if use_tenant_scope:
candidate_rows = await self._load_rule_set_rows_by_type(Session, RuleType)
RuleSet = self._pick_writable_rule_set_row(candidate_rows, current_user=current_user)
else:
RuleSet = await self._GetRuleSetByType(Session, RuleType)
if not RuleSet:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "规则集不存在")
VersionRow = await self._GetVersion(Session, VersionId)
self._assert_version_belongs_to_writable_rule_set(
dict(VersionRow) if VersionRow else None,
dict(RuleSet) if RuleSet else None,
)
if Rollback:
self._assert_rollback_target(
dict(VersionRow) if VersionRow else None,
dict(RuleSet) if RuleSet else None,
)
await Session.execute(
text(
"""
UPDATE leaudit_rule_versions
SET
status = CASE
WHEN id = :version_id THEN :target_status
WHEN status = 'published' THEN :previous_status
ELSE status
END,
publisher_user_id = CASE
WHEN id = :version_id THEN :operator_user_id
ELSE publisher_user_id
END,
published_at = CASE
WHEN id = :version_id THEN now()
ELSE published_at
END,
updated_at = now()
WHERE rule_set_id = :rule_set_id
"""
),
{
"version_id": VersionId,
"target_status": TargetStatus,
"previous_status": "rollback" if Rollback else "deprecated",
"operator_user_id": OperatorUserId,
"rule_set_id": RuleSet["id"],
},
)
await Session.execute(
text(
"""
UPDATE leaudit_rule_sets
SET
current_version_id = :version_id,
status = 'active',
updated_at = now()
WHERE id = :rule_set_id
"""
),
{
"version_id": VersionId,
"rule_set_id": RuleSet["id"],
},
)
await Session.commit()
self.InvalidateCaches()
try:
from fastapi_modules.fastapi_leaudit.services.impl.ruleConfigServiceImpl import GetRuleConfigServiceSingleton
RuleConfigService = GetRuleConfigServiceSingleton()
RuleConfigService.InvalidateSummaryCaches()
await RuleConfigService.WarmPackSummaries(force=False)
except Exception as exc:
logging.getLogger("RULE").warning("刷新规则配置摘要缓存失败: %s", exc)
VersionRow = await self._GetVersion(Session, VersionId)
return self._BuildRuleVersionVo(VersionRow)
async def _GetRuleSetByType(self, Session, RuleType: str):
"""按规则类型查询规则集。"""
Result = await Session.execute(
text(
"""
SELECT id, rule_type, rule_name, domain_type, current_version_id, status
FROM leaudit_rule_sets
WHERE rule_type = :rule_type
AND deleted_at IS NULL
LIMIT 1
"""
),
{"rule_type": RuleType},
)
return Result.mappings().first()
async def _load_rule_set_rows_by_type(self, Session, RuleType: str) -> list[dict[str, object]]:
if not await self._column_exists(Session, "leaudit_rule_sets", "tenant_code"):
row = await self._GetRuleSetByType(Session, RuleType)
return [dict(row)] if row else []
Result = await Session.execute(
text(
"""
SELECT
id,
rule_type,
rule_name,
domain_type,
current_version_id,
status,
COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') AS tenant_code,
source_rule_set_id
FROM leaudit_rule_sets
WHERE rule_type = :rule_type
AND deleted_at IS NULL
ORDER BY id DESC
"""
),
{"rule_type": RuleType},
)
return [dict(row) for row in Result.mappings().all()]
async def _GetVersion(self, Session, VersionId: int):
"""按版本 ID 查询规则版本。"""
Result = await Session.execute(
text(
"""
SELECT
id,
rule_set_id,
version_no,
status,
oss_url,
change_note,
published_at
FROM leaudit_rule_versions
WHERE id = :version_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"version_id": VersionId},
)
return Result.mappings().first()
async def _load_latest_version_id_by_rule_set(self, Session, RuleSetId: int) -> int | None:
row = (
await Session.execute(
text(
"""
SELECT id
FROM leaudit_rule_versions
WHERE rule_set_id = :rule_set_id
AND deleted_at IS NULL
ORDER BY version_seq DESC, id DESC
LIMIT 1
"""
),
{"rule_set_id": RuleSetId},
)
).mappings().first()
return int(row["id"]) if row and row.get("id") is not None else None
def _build_version_scope_snapshot(
self,
*,
writable_tenant_code: str | None,
writable_scope_type: str,
rule_set_row: dict[str, object],
source_version_id: int | None,
) -> dict[str, object | None]:
return {
"tenant_code_snapshot": writable_tenant_code or str(rule_set_row.get("tenant_code") or "") or "PROVINCIAL",
"scope_type_snapshot": writable_scope_type,
"source_version_id": source_version_id,
}
def _legacy_region_for_scope(self, tenant_code: str | None, scope_type: str | None) -> str:
"""兼容旧唯一约束 rule_type + region;租户私有规则集不能再共用 default。"""
normalized_tenant = normalize_scoped_tenant_code(str(tenant_code or ""), default="")
normalized_scope = str(scope_type or "").strip().upper()
if normalized_scope == "TENANT" and normalized_tenant:
return normalized_tenant
if normalized_tenant == "PUBLIC" or normalized_scope == "PUBLIC":
return "PUBLIC"
return "default"
def _assert_rule_tenant_schema_ready_for_write(
self,
*,
use_tenant_scope: bool,
current_user: dict[str, object] | None,
) -> None:
"""租户用户不能在缺少规则域租户列时写共享规则集。"""
if use_tenant_scope:
return
if current_user is None or current_user.get("is_global"):
return
if str(current_user.get("tenant_code") or "").strip():
raise LeauditException(
StatusCodeEnum.HTTP_409_CONFLICT,
"规则域租户隔离表结构未就绪,已拒绝写入共享规则集;请先执行 schema_rule_domain_tenant_phase1.sql",
)
def _build_tenant_binding_clone_payload(
self,
*,
current_user: dict[str, object],
source_binding: dict[str, object],
tenant_rule_set_id: int,
) -> dict[str, object | None]:
tenant_code = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if not tenant_code:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户上下文缺失,不能派生规则绑定")
return {
"group_id": int(source_binding["group_id"]),
"rule_set_id": int(tenant_rule_set_id),
"tenant_code": tenant_code,
"scope_type": "TENANT",
"tenant_name_snapshot": str(current_user.get("tenant_name") or "").strip() or None,
"priority": int(source_binding.get("priority") or 0),
"note": "由租户规则集派生自动补绑",
}
def _build_group_binding_scope_payload(self, current_user: dict[str, object] | None) -> dict[str, object | None]:
if current_user and not current_user.get("is_global"):
tenant_code = normalize_scoped_tenant_code(str(current_user.get("tenant_code") or ""), default="")
if not tenant_code:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户上下文缺失,不能绑定规则集")
return {
"tenant_code": tenant_code,
"scope_type": "TENANT",
"tenant_name_snapshot": str(current_user.get("tenant_name") or "").strip() or None,
}
return {
"tenant_code": "PROVINCIAL",
"scope_type": "PROVINCIAL",
"tenant_name_snapshot": None,
}
async def _load_source_group_binding_ids(self, Session, source_rule_set_id: int) -> list[int]:
if not await self._column_exists(Session, "leaudit_rule_group_bindings", "tenant_code"):
return []
rows = (
await Session.execute(
text(
"""
SELECT id
FROM leaudit_rule_group_bindings
WHERE rule_set_id = :rule_set_id
AND deleted_at IS NULL
AND is_active = TRUE
ORDER BY priority DESC, id ASC
"""
),
{"rule_set_id": source_rule_set_id},
)
).mappings().all()
return [int(row["id"]) for row in rows]
async def _clone_source_group_bindings_for_tenant(
self,
Session,
*,
source_binding_ids: list[int],
tenant_rule_set_id: int,
current_user: dict[str, object] | None,
) -> None:
if not source_binding_ids or not current_user or current_user.get("is_global"):
return
rows = (
await Session.execute(
text(
"""
SELECT id, group_id, priority, note
FROM leaudit_rule_group_bindings
WHERE id = ANY(:binding_ids)
AND deleted_at IS NULL
AND is_active = TRUE
ORDER BY priority DESC, id ASC
"""
),
{"binding_ids": source_binding_ids},
)
).mappings().all()
for row in rows:
payload = self._build_tenant_binding_clone_payload(
current_user=current_user,
source_binding=dict(row),
tenant_rule_set_id=tenant_rule_set_id,
)
await Session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id,
rule_set_id,
tenant_code,
scope_type,
tenant_name_snapshot,
priority,
is_active,
note,
created_at,
updated_at
)
SELECT
:group_id,
:rule_set_id,
CAST(:tenant_code AS varchar),
:scope_type,
:tenant_name_snapshot,
:priority,
TRUE,
:note,
NOW(),
NOW()
WHERE NOT EXISTS (
SELECT 1
FROM leaudit_rule_group_bindings
WHERE group_id = :group_id
AND tenant_code = CAST(:tenant_code AS varchar)
AND deleted_at IS NULL
)
"""
),
payload,
)
async def _column_exists(self, Session, table_name: str, column_name: str) -> bool:
cache_key = f"{table_name}.{column_name}"
cached = self._column_exists_cache.get(cache_key)
if cached is not None:
return cached
exists = bool(
(
await Session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = :table_name
AND column_name = :column_name
)
"""
),
{"table_name": table_name, "column_name": column_name},
)
).scalar_one()
)
if exists:
self._column_exists_cache[cache_key] = exists
return exists
async def _load_scoped_rule_set_rows(
self,
Session,
*,
current_user: dict[str, object] | None,
) -> list[dict[str, object]]:
Result = await Session.execute(
text(
"""
SELECT
rs.id,
rs.rule_type,
rs.rule_name,
rs.domain_type,
rs.current_version_id,
current_rv.id AS usable_current_version_id,
fallback_rv.id AS fallback_version_id,
CASE
WHEN current_rv.id IS NOT NULL OR fallback_rv.id IS NOT NULL THEN TRUE
ELSE FALSE
END AS has_usable_version,
rs.status,
COALESCE(NULLIF(BTRIM(rs.tenant_code), ''), 'PROVINCIAL') AS tenant_code,
rs.source_rule_set_id
FROM leaudit_rule_sets rs
LEFT JOIN leaudit_rule_versions current_rv
ON current_rv.id = rs.current_version_id
AND current_rv.status = 'published'
LEFT JOIN LATERAL (
SELECT rv.id
FROM leaudit_rule_versions rv
WHERE rv.rule_set_id = rs.id
AND rv.status = 'published'
AND (rs.current_version_id IS NULL OR rv.id <> rs.current_version_id)
ORDER BY rv.version_seq DESC, rv.id DESC
LIMIT 1
) fallback_rv ON TRUE
WHERE rs.deleted_at IS NULL
ORDER BY rs.rule_type ASC, rs.id DESC
"""
)
)
rows = [dict(row) for row in Result.mappings().all()]
current_user_tenant = str((current_user or {}).get("tenant_code") or "").strip().upper()
if current_user is None or (current_user.get("is_global") and not current_user_tenant):
return rows
grouped: dict[str, list[dict[str, object]]] = {}
for row in rows:
grouped.setdefault(str(row.get("rule_type") or ""), []).append(row)
effective_rows: list[dict[str, object]] = []
for group_rows in grouped.values():
effective = pick_effective_scoped_row(group_rows, current_user_tenant or None)
if effective is not None:
effective_rows.append(dict(effective))
effective_rows.sort(key=lambda item: int(item["id"]), reverse=True)
return effective_rows
async def _get_effective_rule_set_row_by_type(
self,
Session,
RuleType: str,
*,
current_user: dict[str, object] | None,
) -> dict[str, object] | None:
if not await self._column_exists(Session, "leaudit_rule_sets", "tenant_code"):
return await self._GetRuleSetByType(Session, RuleType)
Result = await Session.execute(
text(
"""
SELECT
rs.id,
rs.rule_type,
rs.rule_name,
rs.domain_type,
rs.current_version_id,
rs.status,
COALESCE(NULLIF(BTRIM(rs.tenant_code), ''), 'PROVINCIAL') AS tenant_code,
rs.source_rule_set_id
FROM leaudit_rule_sets rs
WHERE rs.rule_type = :rule_type
AND rs.deleted_at IS NULL
ORDER BY rs.id DESC
"""
),
{"rule_type": RuleType},
)
rows = [dict(row) for row in Result.mappings().all()]
if not rows:
return None
current_user_tenant = str((current_user or {}).get("tenant_code") or "").strip().upper()
if current_user is None or (current_user.get("is_global") and not current_user_tenant):
return rows[0]
effective = pick_effective_scoped_row(rows, current_user_tenant or None)
return dict(effective) if effective is not None else None
async def _get_current_user_context(self, Session, CurrentUserId: int | None) -> dict[str, object] | None:
if CurrentUserId is None:
return None
sso_user_columns = await SsoUserCompat.get_columns(Session)
tenant_code_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_code",
)
tenant_name_expr = SsoUserCompat.raw_optional_column(
sso_user_columns,
alias="u",
column="tenant_name",
)
row = (
await Session.execute(
text(
f"""
SELECT
u.id,
COALESCE(u.area, '') AS area,
COALESCE(MAX({tenant_code_expr}), '') AS tenant_code,
COALESCE(MAX({tenant_name_expr}), '') AS tenant_name,
COALESCE(bool_or(r.role_key IN ('super_admin', 'provincial_admin')), FALSE) AS is_global
FROM sso_users u
LEFT JOIN user_role ur ON ur.user_id = u.id
LEFT JOIN roles r ON r.id = ur.role_id
WHERE u.id = :user_id
AND u.deleted_at IS NULL
AND u.status = 0
GROUP BY u.id, u.area
"""
),
{"user_id": CurrentUserId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "当前用户不存在或已停用")
tenant = await self.TenantResolver.ResolveUserContext(
Area=str(row["area"] or ""),
TenantCode=str(row.get("tenant_code") or "") or None,
TenantName=str(row.get("tenant_name") or "") or None,
Source="rule_binding_user_context",
)
return {
"id": int(row["id"]),
"tenant_code": tenant.tenant_code or (str(row.get("tenant_code") or "") or None),
"tenant_name": tenant.tenant_name or (str(row.get("tenant_name") or "") or None),
"is_global": bool(row["is_global"]),
}
async def _entry_module_tenant_table_exists(self, session) -> bool:
if self._entry_module_tenant_table_exists_cache is not None:
return self._entry_module_tenant_table_exists_cache
exists = bool(
(
await session.execute(
text(
"""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = current_schema()
AND table_name = 'leaudit_entry_module_tenants'
)
"""
)
)
).scalar_one()
)
self._entry_module_tenant_table_exists_cache = exists
return exists
async def _entry_module_tenant_scope_sql(
self,
*,
session,
entry_module_expr: str,
current_user: dict[str, object] | None,
params: dict[str, object],
param_prefix: str,
) -> str:
if current_user is None or current_user.get("is_global"):
return "1=1"
if not await self._entry_module_tenant_table_exists(session):
return "1=0"
params[f"{param_prefix}_tenant_code"] = str(current_user.get("tenant_code") or "").strip()
return f"""
EXISTS (
SELECT 1
FROM leaudit_entry_module_tenants emt
WHERE emt.entry_module_id = {entry_module_expr}
AND emt.deleted_at IS NULL
AND emt.is_enabled = TRUE
AND (
emt.tenant_code = :{param_prefix}_tenant_code
OR emt.tenant_code = 'PUBLIC'
)
)
"""
async def _assert_entry_module_access(self, Session, EntryModuleId: int | None, CurrentUser: dict[str, object] | None) -> None:
if CurrentUser is None or CurrentUser.get("is_global"):
return
if EntryModuleId is None:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前资源未绑定入口模块,当前租户不可访问")
params: dict[str, object] = {"entry_module_id": int(EntryModuleId)}
access_filter = await self._entry_module_tenant_scope_sql(
session=Session,
entry_module_expr="em.id",
current_user=CurrentUser,
params=params,
param_prefix="assert_rule_binding_scope",
)
exists = (
await Session.execute(
text(
f"""
SELECT em.id
FROM leaudit_entry_modules em
WHERE em.id = :entry_module_id
AND em.deleted_at IS NULL
AND {access_filter}
LIMIT 1
"""
),
params,
)
).scalar_one_or_none()
if exists is None:
raise LeauditException(StatusCodeEnum.HTTP_403_FORBIDDEN, "当前租户不可访问该入口模块")
async def _assert_document_type_access(self, Session, DocTypeId: int, CurrentUser: dict[str, object] | None) -> None:
row = (
await Session.execute(
text(
"""
SELECT id, entry_module_id
FROM leaudit_document_types
WHERE id = :doc_type_id
AND deleted_at IS NULL
LIMIT 1
"""
),
{"doc_type_id": DocTypeId},
)
).mappings().first()
if not row:
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "文档类型不存在")
await self._assert_entry_module_access(
Session,
int(row["entry_module_id"]) if row.get("entry_module_id") is not None else None,
CurrentUser,
)
async def _resolve_unique_accessible_child_group_id(self, Session, DocTypeId: int, CurrentUser: dict[str, object] | None) -> int | None:
params: dict[str, object] = {"doc_type_id": DocTypeId}
access_filter = await self._entry_module_tenant_scope_sql(
session=Session,
entry_module_expr="COALESCE(child.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=CurrentUser,
params=params,
param_prefix="unique_child_scope",
)
row = (
await Session.execute(
text(
f"""
SELECT CASE WHEN COUNT(*) = 1 THEN MIN(child.id) END AS group_id
FROM leaudit_evaluation_point_groups child
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = child.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
WHERE child.document_type_id = :doc_type_id
AND child.deleted_at IS NULL
AND COALESCE(child.pid, 0) <> 0
AND {access_filter}
"""
),
params,
)
).mappings().first()
return int(row["group_id"]) if row and row.get("group_id") is not None else None
async def _count_accessible_child_groups(self, Session, DocTypeId: int, CurrentUser: dict[str, object] | None) -> int:
params: dict[str, object] = {"doc_type_id": DocTypeId}
access_filter = await self._entry_module_tenant_scope_sql(
session=Session,
entry_module_expr="COALESCE(child.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=CurrentUser,
params=params,
param_prefix="count_child_scope",
)
row = (
await Session.execute(
text(
f"""
SELECT COUNT(*) AS total
FROM leaudit_evaluation_point_groups child
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = child.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
WHERE child.document_type_id = :doc_type_id
AND child.deleted_at IS NULL
AND COALESCE(child.pid, 0) <> 0
AND {access_filter}
"""
),
params,
)
).mappings().first()
return int(row["total"] or 0) if row else 0
async def _get_binding_row(self, Session, BindingId: int, CurrentUser: dict[str, object] | None):
params: dict[str, object] = {"bid": BindingId}
access_filter = await self._entry_module_tenant_scope_sql(
session=Session,
entry_module_expr="COALESCE(child.entry_module_id, parent.entry_module_id, dt.entry_module_id)",
current_user=CurrentUser,
params=params,
param_prefix="binding_detail_scope",
)
row = (
await Session.execute(
text(
f"""
SELECT
rgb.id,
rgb.group_id,
child.document_type_id AS doc_type_id,
dt.code AS doc_type_code,
rgb.rule_set_id,
rgb.priority,
rgb.is_active,
rgb.note,
rs.rule_type,
rs.rule_name
FROM leaudit_rule_group_bindings rgb
JOIN leaudit_evaluation_point_groups child ON child.id = rgb.group_id
LEFT JOIN leaudit_evaluation_point_groups parent ON parent.id = child.pid AND parent.deleted_at IS NULL
LEFT JOIN leaudit_document_types dt ON dt.id = child.document_type_id
JOIN leaudit_rule_sets rs ON rs.id = rgb.rule_set_id
WHERE rgb.id = :bid
AND rgb.deleted_at IS NULL
AND {access_filter}
LIMIT 1
"""
),
params,
)
).mappings().first()
if not row:
raise LeauditException(
StatusCodeEnum.HTTP_404_NOT_FOUND,
"绑定记录不存在或当前租户不可访问,请刷新列表后重试",
)
return row
def _BuildRuleVersionVo(self, Row) -> RuleVersionVO:
"""构造规则版本响应。"""
return RuleVersionVO(
id=int(Row["id"]),
ruleSetId=int(Row["rule_set_id"]),
versionNo=Row["version_no"],
status=Row["status"],
ossUrl=Row["oss_url"],
changeNote=Row["change_note"],
publishedAt=Row["published_at"].isoformat() if Row["published_at"] else None,
)
def _ResolveDomainType(self, RuleType: str) -> str:
"""按规则类型推导领域类型。"""
Prefix = RuleType.split(".", 1)[0]
Mapping = {
"contract": "contract",
"admin_license": "admin_license",
"legal_doc": "legal_doc",
}
return Mapping.get(Prefix, Prefix or "unknown")
_RULE_SERVICE_SINGLETON: RuleServiceImpl | None = None
def GetRuleServiceSingleton() -> RuleServiceImpl:
"""返回共享规则服务实例,供控制器与聚合服务复用缓存。"""
global _RULE_SERVICE_SINGLETON
if _RULE_SERVICE_SINGLETON is None:
_RULE_SERVICE_SINGLETON = RuleServiceImpl()
return _RULE_SERVICE_SINGLETON