"""M4 种子数据初始化 — 上传全部规则 YAML + 创建入口模块 + 文档类型 + 绑定。 用法: cd /home/wren-dev/Porject/leaudit-platform PYTHONPATH=src:/home/wren-dev/Porject/docauditai \ python scripts/m4_seed_rules.py """ from __future__ import annotations import asyncio import hashlib import os import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) sys.path.insert(0, "/home/wren-dev/Porject/docauditai") from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_storage.oss_client import OssClient from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils from sqlalchemy import text RULES_DIR = Path(__file__).resolve().parent.parent / "rules" def _read_metadata(yaml_path: Path) -> dict: import yaml with open(yaml_path) as f: data = yaml.safe_load(f) return data.get("metadata", {}) def _type_id_to_rule_type(type_id: str) -> str: """type_id → rule_type (rule_sets 表唯一 key)""" return type_id async def _get_or_create_rule_set( session, rule_type: str, rule_name: str, domain_type: str, description: str ) -> dict: result = await session.execute( text("SELECT * FROM leaudit_rule_sets WHERE rule_type = :rt AND deleted_at IS NULL LIMIT 1"), {"rt": rule_type}, ) row = result.mappings().first() if row: return dict(row) result = await session.execute( text( """INSERT INTO leaudit_rule_sets (rule_type, rule_name, domain_type, description, status, is_builtin) VALUES (:rt, :rn, :dt, :desc, 'draft', false) RETURNING id, rule_type, rule_name, domain_type, current_version_id, status""" ), {"rt": rule_type, "rn": rule_name, "dt": domain_type, "desc": description or ""}, ) return dict(result.mappings().first()) async def _version_exists(session, rule_set_id: int, version_no: str) -> bool: result = await session.execute( text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid AND version_no = :vn LIMIT 1"), {"rsid": rule_set_id, "vn": version_no}, ) return result.mappings().first() is not None async def upload_one(yaml_path: Path, oss: OssClient) -> dict: meta = _read_metadata(yaml_path) type_id = meta.get("type_id", "") rule_type = _type_id_to_rule_type(type_id) rule_name = meta.get("name", rule_type) version_no = meta.get("version", "1.0") description = meta.get("description", "") domain_type = type_id.split(".", 1)[0] if "." in type_id else type_id yaml_text = yaml_path.read_text(encoding="utf-8") file_sha256 = hashlib.sha256(yaml_text.encode()).hexdigest() file_size = len(yaml_text.encode()) async with GetAsyncSession() as session: rs = await _get_or_create_rule_set(session, rule_type, rule_name, domain_type, description) if await _version_exists(session, rs["id"], version_no): return {"rule_type": rule_type, "status": "skipped", "reason": f"version {version_no} exists"} # Upload to OSS object_key = OssPathUtils.BuildRuleYamlKey(rule_type, version_no) oss_url = oss.UploadText( ObjectKey=object_key, Content=yaml_text, ContentType="application/x-yaml; charset=utf-8", ) # Get next version_seq seq_result = await session.execute( text("SELECT COALESCE(MAX(version_seq), 0) + 1 AS ns FROM leaudit_rule_versions WHERE rule_set_id = :rsid"), {"rsid": rs["id"]}, ) next_seq = int(seq_result.mappings().first()["ns"]) # Insert version await session.execute( text( """INSERT INTO leaudit_rule_versions ( rule_set_id, version_no, version_seq, status, source_type, dsl_format, oss_url, file_sha256, file_size, metadata_type_id, metadata_name, metadata_version ) VALUES ( :rsid, :vn, :vs, 'draft', 'oss_yaml', 'yaml', :url, :sha, :fs, :mtid, :mn, :mv )""" ), { "rsid": rs["id"], "vn": version_no, "vs": next_seq, "url": oss_url, "sha": file_sha256, "fs": file_size, "mtid": type_id, "mn": rule_name, "mv": version_no, }, ) await session.commit() return {"rule_type": rule_type, "status": "created", "version": version_no, "oss_url": oss_url} async def publish_one(rule_type: str, version_no: str | None = None) -> dict: async with GetAsyncSession() as session: rs = await session.execute( text("SELECT id FROM leaudit_rule_sets WHERE rule_type = :rt AND deleted_at IS NULL LIMIT 1"), {"rt": rule_type}, ) rs_row = rs.mappings().first() if not rs_row: return {"rule_type": rule_type, "status": "error", "reason": "rule_set not found"} if version_no: v = await session.execute( text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid AND version_no = :vn LIMIT 1"), {"rsid": rs_row["id"], "vn": version_no}, ) else: v = await session.execute( text("SELECT id FROM leaudit_rule_versions WHERE rule_set_id = :rsid ORDER BY version_seq DESC LIMIT 1"), {"rsid": rs_row["id"]}, ) v_row = v.mappings().first() if not v_row: return {"rule_type": rule_type, "status": "error", "reason": "version not found"} version_id = int(v_row["id"]) await session.execute( text("UPDATE leaudit_rule_versions SET status='published', published_at=now(), updated_at=now() WHERE id=:vid"), {"vid": version_id}, ) await session.execute( text("UPDATE leaudit_rule_sets SET current_version_id=:vid, status='active', updated_at=now() WHERE id=:rsid"), {"vid": version_id, "rsid": rs_row["id"]}, ) await session.commit() return {"rule_type": rule_type, "status": "published", "version_id": version_id} async def main(): oss = OssClient() rules_dirs = sorted(d for d in RULES_DIR.iterdir() if d.is_dir() and (d / "rules.yaml").exists()) print(f"找到 {len(rules_dirs)} 套规则\n") # Step 1: Upload all print("=" * 60) print("Step 1: 上传规则 YAML → OSS + 写 DB") print("=" * 60) for d in rules_dirs: yaml_path = d / "rules.yaml" try: result = await upload_one(yaml_path, oss) icon = "✓" if result["status"] != "skipped" else "⊙" print(f" {icon} {result['rule_type']:40s} {result['status']:8s} v{result.get('version', '?')}") except Exception as e: print(f" ✗ {d.name:40s} ERROR: {e}") # Step 2: Publish all print() print("=" * 60) print("Step 2: 发布规则版本") print("=" * 60) for d in rules_dirs: meta = _read_metadata(d / "rules.yaml") type_id = meta.get("type_id", "") rule_type = _type_id_to_rule_type(type_id) try: result = await publish_one(rule_type) icon = "✓" if result["status"] == "published" else "✗" print(f" {icon} {rule_type:40s} {result['status']}") except Exception as e: print(f" ✗ {rule_type:40s} ERROR: {e}") print() print("完成!") if __name__ == "__main__": asyncio.run(main())