Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/services/impl/ruleTenantMaterializer.py
T

308 lines
14 KiB
Python

"""规则租户物化服务。"""
from __future__ import annotations
from typing import Any
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
class RuleTenantMaterializer:
"""将平台模板规则物化为租户私有规则资产。"""
_PLATFORM_TEMPLATE_TENANTS = {"PUBLIC", "PROVINCIAL"}
def _template_source_order(self) -> list[str]:
return ["PUBLIC", "PROVINCIAL"]
def _filter_materializable_tenants(self, tenants: list[dict[str, Any]]) -> list[str]:
result: list[str] = []
for tenant in tenants:
tenant_code = str(tenant.get("tenant_code") or "").strip().upper()
if not tenant_code or tenant_code in self._PLATFORM_TEMPLATE_TENANTS:
continue
if not bool(tenant.get("is_enabled", True)):
continue
result.append(tenant_code)
return result
def _legacy_region_for_tenant(self, tenant_code: str) -> str:
return str(tenant_code or "").strip().upper()
async def MaterializeAllEnabledTenants(self) -> dict[str, int]:
async with GetAsyncSession() as session:
await self._ensure_rule_tenant_schema(session)
await self._promote_legacy_provincial_templates_to_public(session)
tenants = await self._load_enabled_tenants(session)
tenant_codes = self._filter_materializable_tenants(tenants)
stats = await self._materialize_tenants(session, tenant_codes)
await session.commit()
return stats
async def MaterializeTenant(self, TenantCode: str) -> dict[str, int]:
tenant_code = str(TenantCode or "").strip().upper()
if not tenant_code or tenant_code in self._PLATFORM_TEMPLATE_TENANTS:
return {"tenants": 0, "rule_sets": 0, "versions": 0, "bindings": 0}
async with GetAsyncSession() as session:
await self._ensure_rule_tenant_schema(session)
await self._promote_legacy_provincial_templates_to_public(session)
stats = await self._materialize_tenants(session, [tenant_code])
await session.commit()
return stats
async def _ensure_rule_tenant_schema(self, session) -> None:
required = {
"leaudit_rule_sets": {"tenant_code", "scope_type", "source_rule_set_id", "tenant_name_snapshot"},
"leaudit_rule_versions": {"tenant_code_snapshot", "scope_type_snapshot", "source_version_id"},
"leaudit_rule_group_bindings": {"tenant_code", "scope_type", "tenant_name_snapshot"},
}
rows = (
await session.execute(
text(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = current_schema()
AND table_name = ANY(:table_names)
"""
),
{"table_names": list(required)},
)
).mappings().all()
existing: dict[str, set[str]] = {}
for row in rows:
existing.setdefault(str(row["table_name"]), set()).add(str(row["column_name"]))
missing = [
f"{table}.{column}"
for table, columns in required.items()
for column in sorted(columns - existing.get(table, set()))
]
if missing:
raise RuntimeError("规则域租户字段未就绪: " + ", ".join(missing))
async def _load_enabled_tenants(self, session) -> list[dict[str, Any]]:
rows = (
await session.execute(
text(
"""
SELECT tenant_code, is_enabled
FROM sys_tenants
WHERE deleted_at IS NULL
AND is_enabled = TRUE
ORDER BY display_order ASC, id ASC
"""
)
)
).mappings().all()
return [dict(row) for row in rows]
async def _promote_legacy_provincial_templates_to_public(self, session) -> None:
"""把历史 PROVINCIAL 模板归并成 PUBLIC 模板源。"""
await session.execute(
text(
"""
UPDATE leaudit_rule_sets
SET tenant_code = 'PUBLIC',
scope_type = 'PUBLIC',
region = 'PUBLIC',
updated_at = NOW()
WHERE deleted_at IS NULL
AND COALESCE(NULLIF(BTRIM(tenant_code), ''), 'PROVINCIAL') = 'PROVINCIAL'
AND NOT EXISTS (
SELECT 1
FROM leaudit_rule_sets existing
WHERE existing.rule_type = leaudit_rule_sets.rule_type
AND existing.tenant_code = 'PUBLIC'
AND existing.deleted_at IS NULL
)
"""
)
)
await session.execute(
text(
"""
UPDATE leaudit_rule_versions rv
SET tenant_code_snapshot = 'PUBLIC',
scope_type_snapshot = 'PUBLIC',
updated_at = NOW()
FROM leaudit_rule_sets rs
WHERE rv.rule_set_id = rs.id
AND rs.tenant_code = 'PUBLIC'
AND (rv.tenant_code_snapshot IS NULL OR rv.tenant_code_snapshot = 'PROVINCIAL')
"""
)
)
await session.execute(
text(
"""
UPDATE leaudit_rule_group_bindings rgb
SET tenant_code = 'PUBLIC',
scope_type = 'PUBLIC',
updated_at = NOW()
FROM leaudit_rule_sets rs
WHERE rgb.rule_set_id = rs.id
AND rs.tenant_code = 'PUBLIC'
AND rgb.deleted_at IS NULL
AND COALESCE(NULLIF(BTRIM(rgb.tenant_code), ''), 'PROVINCIAL') = 'PROVINCIAL'
"""
)
)
async def _materialize_tenants(self, session, tenant_codes: list[str]) -> dict[str, int]:
stats = {"tenants": len(tenant_codes), "rule_sets": 0, "versions": 0, "bindings": 0}
for tenant_code in tenant_codes:
stats["rule_sets"] += await self._materialize_rule_sets(session, tenant_code)
stats["versions"] += await self._materialize_versions(session, tenant_code)
stats["bindings"] += await self._materialize_bindings(session, tenant_code)
return stats
async def _materialize_rule_sets(self, session, tenant_code: str) -> int:
result = await session.execute(
text(
"""
INSERT INTO leaudit_rule_sets (
rule_type, rule_name, domain_type, description, entry_module,
current_version_id, status, is_builtin, owner_user_id,
tenant_code, scope_type, source_rule_set_id, tenant_name_snapshot,
created_at, updated_at, deleted_at, region
)
SELECT
src.rule_type, src.rule_name, src.domain_type, src.description, src.entry_module,
NULL, 'draft', FALSE, src.owner_user_id,
CAST(:tenant_code AS varchar), 'TENANT', src.id, t.tenant_name,
NOW(), NOW(), NULL, CAST(:tenant_code AS varchar)
FROM leaudit_rule_sets src
JOIN sys_tenants t ON t.tenant_code = CAST(:tenant_code AS varchar)
WHERE src.deleted_at IS NULL
AND src.tenant_code = 'PUBLIC'
AND NOT EXISTS (
SELECT 1
FROM leaudit_rule_sets existing
WHERE existing.rule_type = src.rule_type
AND existing.tenant_code = CAST(:tenant_code AS varchar)
AND existing.deleted_at IS NULL
)
"""
),
{"tenant_code": tenant_code},
)
return int(result.rowcount or 0)
async def _materialize_versions(self, session, tenant_code: str) -> int:
result = await session.execute(
text(
"""
INSERT INTO leaudit_rule_versions (
rule_set_id, version_no, version_seq, status, source_type, dsl_format,
oss_url, file_sha256, file_size, local_cache_path,
metadata_type_id, metadata_name, metadata_version, change_note,
editor_user_id, publisher_user_id, published_at,
created_at, updated_at, deleted_at,
tenant_code_snapshot, scope_type_snapshot, source_version_id
)
SELECT
tenant_rs.id, src_v.version_no, src_v.version_seq, src_v.status,
src_v.source_type, src_v.dsl_format, src_v.oss_url, src_v.file_sha256,
src_v.file_size, src_v.local_cache_path, src_v.metadata_type_id,
src_v.metadata_name, src_v.metadata_version, src_v.change_note,
src_v.editor_user_id, src_v.publisher_user_id, src_v.published_at,
NOW(), NOW(), NULL,
CAST(:tenant_code AS varchar), 'TENANT', src_v.id
FROM leaudit_rule_sets tenant_rs
JOIN leaudit_rule_sets src_rs ON src_rs.id = tenant_rs.source_rule_set_id
JOIN leaudit_rule_versions src_v ON src_v.rule_set_id = src_rs.id
WHERE tenant_rs.deleted_at IS NULL
AND tenant_rs.tenant_code = CAST(:tenant_code AS varchar)
AND src_rs.tenant_code = 'PUBLIC'
AND src_v.deleted_at IS NULL
AND NOT EXISTS (
SELECT 1
FROM leaudit_rule_versions tenant_existing
WHERE tenant_existing.rule_set_id = tenant_rs.id
AND tenant_existing.deleted_at IS NULL
)
AND NOT EXISTS (
SELECT 1
FROM leaudit_rule_versions existing
WHERE existing.rule_set_id = tenant_rs.id
AND existing.deleted_at IS NULL
AND (
existing.source_version_id = src_v.id
OR existing.version_no = src_v.version_no
OR existing.version_seq = src_v.version_seq
)
)
"""
),
{"tenant_code": tenant_code},
)
await session.execute(
text(
"""
UPDATE leaudit_rule_sets tenant_rs
SET current_version_id = tenant_v.id,
status = 'active',
updated_at = NOW()
FROM leaudit_rule_sets src_rs
JOIN leaudit_rule_versions src_current ON src_current.id = src_rs.current_version_id
JOIN leaudit_rule_versions tenant_v ON tenant_v.source_version_id = src_current.id
WHERE tenant_rs.source_rule_set_id = src_rs.id
AND tenant_rs.tenant_code = CAST(:tenant_code AS varchar)
AND tenant_v.rule_set_id = tenant_rs.id
AND tenant_rs.current_version_id IS NULL
"""
),
{"tenant_code": tenant_code},
)
return int(result.rowcount or 0)
async def _materialize_bindings(self, session, tenant_code: str) -> int:
result = await session.execute(
text(
"""
INSERT INTO leaudit_rule_group_bindings (
group_id, rule_set_id, rule_type_binding_id, priority, is_active, note,
tenant_code, scope_type, tenant_name_snapshot,
created_at, updated_at, deleted_at
)
SELECT
src_b.group_id, tenant_rs.id, src_b.rule_type_binding_id, src_b.priority,
src_b.is_active, '由公共规则模板自动物化',
CAST(:tenant_code AS varchar), 'TENANT', t.tenant_name,
NOW(), NOW(), NULL
FROM leaudit_rule_group_bindings src_b
JOIN leaudit_rule_sets src_rs ON src_rs.id = src_b.rule_set_id
JOIN leaudit_rule_sets tenant_rs
ON tenant_rs.source_rule_set_id = src_rs.id
AND tenant_rs.tenant_code = CAST(:tenant_code AS varchar)
AND tenant_rs.deleted_at IS NULL
JOIN sys_tenants t ON t.tenant_code = CAST(:tenant_code AS varchar)
WHERE src_b.deleted_at IS NULL
AND src_b.is_active = TRUE
AND src_rs.tenant_code = 'PUBLIC'
AND NOT EXISTS (
SELECT 1
FROM leaudit_rule_group_bindings existing
WHERE existing.group_id = src_b.group_id
AND existing.tenant_code = CAST(:tenant_code AS varchar)
AND existing.rule_set_id = tenant_rs.id
AND existing.deleted_at IS NULL
)
"""
),
{"tenant_code": tenant_code},
)
return int(result.rowcount or 0)
_RULE_TENANT_MATERIALIZER_SINGLETON: RuleTenantMaterializer | None = None
def GetRuleTenantMaterializerSingleton() -> RuleTenantMaterializer:
global _RULE_TENANT_MATERIALIZER_SINGLETON
if _RULE_TENANT_MATERIALIZER_SINGLETON is None:
_RULE_TENANT_MATERIALIZER_SINGLETON = RuleTenantMaterializer()
return _RULE_TENANT_MATERIALIZER_SINGLETON