Files
wren e80e8febd8 feat: multi-region rule isolation — region column + config + queries
- DB: add region column to leaudit_rule_sets + leaudit_rule_type_bindings
- DB: change UNIQUE constraint from (rule_type) to (rule_type, region)
- Config: add APP_REGION to app.toml + AppSettings + __init__.pyi
- AuditServiceImpl: filter bindings by APP_REGION
- RuleServiceImpl: ListBindings/CreateBinding use APP_REGION
- Seed script: accept --region arg, tag rules by region
- OssPathUtils: BuildRuleYamlKey already accepts Region parameter

Each region can now have its own independent copy of the same rule_type,
stored in separate OSS paths and DB rows, keyed by region.
2026-04-28 13:15:26 +08:00

213 lines
8.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""M4 种子数据初始化 — 上传全部规则 YAML + 创建入口模块 + 文档类型 + 绑定。
用法: cd /home/wren-dev/Porject/leaudit-platform
PYTHONPATH=src:/home/wren-dev/Porject/docauditai \
python scripts/m4_seed_rules.py [--region default]
"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
sys.path.insert(0, "/home/wren-dev/Porject/docauditai")
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_common.fastapi_common_storage.oss_client import OssClient
from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils
from sqlalchemy import text
RULES_DIR = Path(__file__).resolve().parent.parent / "rules"
REGION = "default"
def _read_metadata(yaml_path: Path) -> dict:
import yaml
with open(yaml_path) as f:
data = yaml.safe_load(f)
return data.get("metadata", {})
def _type_id_to_rule_type(type_id: str) -> str:
"""type_id → rule_type (rule_sets 表唯一 key)"""
return type_id
async def _get_or_create_rule_set(
session, rule_type: str, rule_name: str, domain_type: str, description: str, region: str
) -> dict:
result = await session.execute(
text("SELECT * FROM leaudit_rule_sets WHERE rule_type = :rt AND region = :rgn AND deleted_at IS NULL LIMIT 1"),
{"rt": rule_type, "rgn": region},
)
row = result.mappings().first()
if row:
return dict(row)
result = await session.execute(
text(
"""INSERT INTO leaudit_rule_sets (rule_type, rule_name, domain_type, description, region, status, is_builtin)
VALUES (:rt, :rn, :dt, :desc, :rgn, 'draft', false)
RETURNING id, rule_type, rule_name, domain_type, current_version_id, status"""
),
{"rt": rule_type, "rn": rule_name, "dt": domain_type, "desc": description or "", "rgn": region},
)
return dict(result.mappings().first())
async def _version_exists(session, rule_set_id: int, version_no: str) -> bool:
result = await session.execute(
text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid AND version_no = :vn LIMIT 1"),
{"rsid": rule_set_id, "vn": version_no},
)
return result.mappings().first() is not None
async def upload_one(yaml_path: Path, oss: OssClient, region: str) -> dict:
meta = _read_metadata(yaml_path)
type_id = meta.get("type_id", "")
rule_type = _type_id_to_rule_type(type_id)
rule_name = meta.get("name", rule_type)
version_no = meta.get("version", "1.0")
description = meta.get("description", "")
domain_type = type_id.split(".", 1)[0] if "." in type_id else type_id
yaml_text = yaml_path.read_text(encoding="utf-8")
file_sha256 = hashlib.sha256(yaml_text.encode()).hexdigest()
file_size = len(yaml_text.encode())
async with GetAsyncSession() as session:
rs = await _get_or_create_rule_set(session, rule_type, rule_name, domain_type, description, region)
if await _version_exists(session, rs["id"], version_no):
return {"rule_type": rule_type, "status": "skipped", "reason": f"version {version_no} exists"}
# Upload to OSS
object_key = OssPathUtils.BuildRuleYamlKey(rule_type, version_no, Region=region)
oss_url = oss.UploadText(
ObjectKey=object_key,
Content=yaml_text,
ContentType="application/x-yaml; charset=utf-8",
)
# Get next version_seq
seq_result = await session.execute(
text("SELECT COALESCE(MAX(version_seq), 0) + 1 AS ns FROM leaudit_rule_versions WHERE rule_set_id = :rsid"),
{"rsid": rs["id"]},
)
next_seq = int(seq_result.mappings().first()["ns"])
# Insert version
await session.execute(
text(
"""INSERT INTO leaudit_rule_versions (
rule_set_id, version_no, version_seq, status, source_type, dsl_format,
oss_url, file_sha256, file_size, metadata_type_id, metadata_name, metadata_version
) VALUES (
:rsid, :vn, :vs, 'draft', 'oss_yaml', 'yaml',
:url, :sha, :fs, :mtid, :mn, :mv
)"""
),
{
"rsid": rs["id"], "vn": version_no, "vs": next_seq,
"url": oss_url, "sha": file_sha256, "fs": file_size,
"mtid": type_id, "mn": rule_name, "mv": version_no,
},
)
await session.commit()
return {"rule_type": rule_type, "status": "created", "version": version_no, "oss_url": oss_url}
async def publish_one(rule_type: str, region: str, version_no: str | None = None) -> dict:
async with GetAsyncSession() as session:
rs = await session.execute(
text("SELECT id FROM leaudit_rule_sets WHERE rule_type = :rt AND region = :rgn AND deleted_at IS NULL LIMIT 1"),
{"rt": rule_type, "rgn": region},
)
rs_row = rs.mappings().first()
if not rs_row:
return {"rule_type": rule_type, "status": "error", "reason": "rule_set not found"}
if version_no:
v = await session.execute(
text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid AND version_no = :vn LIMIT 1"),
{"rsid": rs_row["id"], "vn": version_no},
)
else:
v = await session.execute(
text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid ORDER BY version_seq DESC LIMIT 1"),
{"rsid": rs_row["id"]},
)
v_row = v.mappings().first()
if not v_row:
return {"rule_type": rule_type, "status": "error", "reason": "version not found"}
version_id = int(v_row["id"])
await session.execute(
text("UPDATE leaudit_rule_versions SET status='published', published_at=now(), updated_at=now() WHERE id=:vid"),
{"vid": version_id},
)
await session.execute(
text("UPDATE leaudit_rule_sets SET current_version_id=:vid, status='active', updated_at=now() WHERE id=:rsid"),
{"vid": version_id, "rsid": rs_row["id"]},
)
await session.commit()
return {"rule_type": rule_type, "status": "published", "version_id": version_id}
async def main():
parser = argparse.ArgumentParser(description="M4 种子数据初始化")
parser.add_argument("--region", default="default", help="地区标识(默认: default")
args = parser.parse_args()
region = args.region
oss = OssClient()
rules_dirs = sorted(d for d in RULES_DIR.iterdir() if d.is_dir() and (d / "rules.yaml").exists())
print(f"地区: {region}")
print(f"找到 {len(rules_dirs)} 套规则\n")
# Step 1: Upload all
print("=" * 60)
print("Step 1: 上传规则 YAML → OSS + 写 DB")
print("=" * 60)
for d in rules_dirs:
yaml_path = d / "rules.yaml"
try:
result = await upload_one(yaml_path, oss, region)
icon = "" if result["status"] != "skipped" else ""
print(f" {icon} {result['rule_type']:40s} {result['status']:8s} v{result.get('version', '?')}")
except Exception as e:
print(f"{d.name:40s} ERROR: {e}")
# Step 2: Publish all
print()
print("=" * 60)
print("Step 2: 发布规则版本")
print("=" * 60)
for d in rules_dirs:
meta = _read_metadata(d / "rules.yaml")
type_id = meta.get("type_id", "")
rule_type = _type_id_to_rule_type(type_id)
try:
result = await publish_one(rule_type, region)
icon = "" if result["status"] == "published" else ""
print(f" {icon} {rule_type:40s} {result['status']}")
except Exception as e:
print(f"{rule_type:40s} ERROR: {e}")
print()
print("完成!")
if __name__ == "__main__":
asyncio.run(main())