"""Rule-group schema, bootstrap, and sync helpers.""" from __future__ import annotations from typing import Any from sqlalchemy import bindparam, text async def ensure_rule_group_schema(session) -> None: """Create the new rule-group tables when they do not exist yet.""" statements = [ """ CREATE TABLE IF NOT EXISTS leaudit_evaluation_point_groups ( id BIGSERIAL PRIMARY KEY, pid BIGINT NOT NULL DEFAULT 0, code VARCHAR(120) NOT NULL, name VARCHAR(200) NOT NULL, description TEXT NULL, document_type_id BIGINT NULL REFERENCES leaudit_document_types(id), entry_module_id BIGINT NULL REFERENCES leaudit_entry_modules(id), sort_order INTEGER NOT NULL DEFAULT 0, is_enabled BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), deleted_at TIMESTAMPTZ NULL ) """, """ CREATE TABLE IF NOT EXISTS leaudit_rule_group_bindings ( id BIGSERIAL PRIMARY KEY, group_id BIGINT NOT NULL REFERENCES leaudit_evaluation_point_groups(id), rule_set_id BIGINT NOT NULL REFERENCES leaudit_rule_sets(id), rule_type_binding_id BIGINT NULL REFERENCES leaudit_rule_type_bindings(id), priority INTEGER NOT NULL DEFAULT 0, is_active BOOLEAN NOT NULL DEFAULT TRUE, note TEXT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), deleted_at TIMESTAMPTZ NULL ) """, "CREATE INDEX IF NOT EXISTS idx_leaudit_ep_groups_pid ON leaudit_evaluation_point_groups(pid)", "CREATE INDEX IF NOT EXISTS idx_leaudit_ep_groups_doc_type ON leaudit_evaluation_point_groups(document_type_id)", "CREATE INDEX IF NOT EXISTS idx_leaudit_ep_groups_entry_module ON leaudit_evaluation_point_groups(entry_module_id)", "CREATE INDEX IF NOT EXISTS idx_leaudit_rule_group_bindings_group_id ON leaudit_rule_group_bindings(group_id)", "CREATE INDEX IF NOT EXISTS idx_leaudit_rule_group_bindings_rule_set_id ON leaudit_rule_group_bindings(rule_set_id)", "CREATE UNIQUE INDEX IF NOT EXISTS uq_leaudit_ep_groups_code_active ON leaudit_evaluation_point_groups (LOWER(code)) WHERE deleted_at IS NULL", "DROP INDEX IF EXISTS uq_leaudit_ep_groups_doc_type_active", "CREATE UNIQUE INDEX IF NOT EXISTS uq_leaudit_rule_group_bindings_active ON leaudit_rule_group_bindings (group_id, rule_set_id) WHERE deleted_at IS NULL", ] for statement in statements: await session.execute(text(statement)) async def bootstrap_rule_groups(session) -> None: """Seed doc-type roots and default child groups from current doc-type bindings.""" await ensure_rule_group_schema(session) # Once the system has entered "business root" mode, stop recreating the # historical "doc type as top root" structure on every request. business_root_exists = bool( ( await session.execute( text( """ SELECT 1 FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) = 0 AND document_type_id IS NULL LIMIT 1 """ ) ) ).scalar_one_or_none() ) if business_root_exists: return rows = ( await session.execute( text( """ SELECT dt.id, dt.code, dt.name, dt.description, dt.entry_module_id, dt.sort_order, dt.is_enabled, em.name AS entry_module_name, COALESCE( json_agg( json_build_object( 'id', b.id, 'rule_set_id', b.rule_set_id, 'priority', b.priority, 'is_active', b.is_active, 'note', b.note ) ORDER BY b.priority DESC, b.id ASC ) FILTER (WHERE b.id IS NOT NULL AND b.deleted_at IS NULL), '[]'::json ) AS bindings FROM leaudit_document_types dt LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id LEFT JOIN leaudit_rule_type_bindings b ON b.doc_type_id = dt.id AND b.deleted_at IS NULL WHERE dt.deleted_at IS NULL GROUP BY dt.id, dt.code, dt.name, dt.description, dt.entry_module_id, dt.sort_order, dt.is_enabled, em.name ORDER BY dt.sort_order ASC, dt.id ASC """ ) ) ).mappings().all() for row in rows: top_group_id = await ensure_top_group(session, row) child_group_id = await ensure_default_child_group(session, row, top_group_id) existing_binding_count = int( ( await session.execute( text( "SELECT COUNT(*) FROM leaudit_rule_group_bindings WHERE group_id = :group_id AND deleted_at IS NULL" ), {"group_id": child_group_id}, ) ).scalar_one() ) bindings = list(row.get("bindings") or []) if existing_binding_count == 0 and bindings: await _replace_group_bindings(session, child_group_id, int(row["id"]), bindings) async def ensure_group_for_doc_type(session, doc_type_id: int) -> dict[str, int]: """Ensure the tree nodes for one doc type exist and return their ids.""" row = ( await session.execute( text( """ SELECT dt.id, dt.code, dt.name, dt.description, dt.entry_module_id, dt.sort_order, dt.is_enabled, em.name AS entry_module_name FROM leaudit_document_types dt LEFT JOIN leaudit_entry_modules em ON em.id = dt.entry_module_id WHERE dt.deleted_at IS NULL AND dt.id = :doc_type_id LIMIT 1 """ ), {"doc_type_id": doc_type_id}, ) ).mappings().first() if not row: raise ValueError("文档类型不存在") business_root_exists = bool( ( await session.execute( text( """ SELECT 1 FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) = 0 AND document_type_id IS NULL LIMIT 1 """ ) ) ).scalar_one_or_none() ) if business_root_exists: target_root_code = "root.contract" if int(row.get("entry_module_id") or 0) == 1 else "root.casefile" if int(row.get("entry_module_id") or 0) == 2 else None if target_root_code: root_row = ( await session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) = 0 AND code = :code LIMIT 1 """ ), {"code": target_root_code}, ) ).mappings().first() if root_row: child_row = ( await session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND pid = :pid AND document_type_id = :doc_type_id ORDER BY sort_order ASC, id ASC LIMIT 1 """ ), {"pid": int(root_row["id"]), "doc_type_id": doc_type_id}, ) ).mappings().first() if child_row: return {"top_group_id": int(root_row["id"]), "child_group_id": int(child_row["id"])} top_group_id = await ensure_top_group(session, row) child_group_id = await ensure_default_child_group(session, row, top_group_id) return {"top_group_id": top_group_id, "child_group_id": child_group_id} async def sync_group_bindings_from_doc_type(session, doc_type_id: int, rule_set_ids: list[int]) -> int: """Mirror doc-type bindings into the child rule-group bindings.""" await ensure_rule_group_schema(session) ids = [int(item) for item in rule_set_ids if item] ids = list(dict.fromkeys(ids)) group_ids = await ensure_group_for_doc_type(session, doc_type_id) child_group_id = group_ids["child_group_id"] legacy_bindings = ( await session.execute( text( """ SELECT id, rule_set_id, priority, is_active, note FROM leaudit_rule_type_bindings WHERE doc_type_id = :doc_type_id AND deleted_at IS NULL ORDER BY priority DESC, id ASC """ ), {"doc_type_id": doc_type_id}, ) ).mappings().all() binding_map = {int(row["rule_set_id"]): row for row in legacy_bindings} payload: list[dict[str, Any]] = [] for index, rule_set_id in enumerate(ids): current = binding_map.get(rule_set_id) payload.append( { "id": current.get("id") if current else None, "rule_set_id": rule_set_id, "priority": int(current.get("priority") if current else 100 - index), "is_active": bool(current.get("is_active") if current else True), "note": current.get("note") if current else None, } ) await _replace_group_bindings(session, child_group_id, doc_type_id, payload) return child_group_id async def sync_doc_type_bindings_from_group(session, group_id: int) -> int | None: """Mirror one doc type's active child-group bindings into the runtime binding table.""" await ensure_rule_group_schema(session) group_row = ( await session.execute( text( """ SELECT g.id, g.document_type_id, dt.code AS document_type_code FROM leaudit_evaluation_point_groups g LEFT JOIN leaudit_document_types dt ON dt.id = g.document_type_id WHERE g.id = :group_id AND g.deleted_at IS NULL LIMIT 1 """ ), {"group_id": group_id}, ) ).mappings().first() if not group_row or group_row.get("document_type_id") is None: return None doc_type_id = int(group_row["document_type_id"]) doc_type_code = str(group_row.get("document_type_code") or "") or None binding_rows = ( await session.execute( text( """ SELECT rgb.id, rgb.group_id, rgb.rule_set_id, rgb.priority, rgb.is_active, rgb.note, g.sort_order AS group_sort_order FROM leaudit_rule_group_bindings rgb JOIN leaudit_evaluation_point_groups g ON g.id = rgb.group_id WHERE g.document_type_id = :doc_type_id AND g.deleted_at IS NULL AND COALESCE(g.pid, 0) <> 0 AND rgb.deleted_at IS NULL AND rgb.is_active = TRUE ORDER BY COALESCE(g.sort_order, 0) ASC, COALESCE(rgb.priority, 0) DESC, rgb.id ASC """ ), {"doc_type_id": doc_type_id}, ) ).mappings().all() deduped_rows: list[dict[str, Any]] = [] seen_rule_set_ids: set[int] = set() for row in binding_rows: rule_set_id = int(row["rule_set_id"]) if rule_set_id in seen_rule_set_ids: continue seen_rule_set_ids.add(rule_set_id) deduped_rows.append(dict(row)) await session.execute( text( "UPDATE leaudit_rule_type_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE doc_type_id = :doc_type_id AND deleted_at IS NULL" ), {"doc_type_id": doc_type_id}, ) for index, row in enumerate(deduped_rows): await session.execute( text( """ INSERT INTO leaudit_rule_type_bindings ( doc_type_id, doc_type_code, rule_set_id, binding_mode, priority, is_active, note, created_at, updated_at, region ) VALUES ( :doc_type_id, :doc_type_code, :rule_set_id, 'explicit', :priority, TRUE, :note, NOW(), NOW(), 'default' ) RETURNING id """ ), { "doc_type_id": doc_type_id, "doc_type_code": doc_type_code, "rule_set_id": int(row["rule_set_id"]), "priority": max(0, 1000 - index), "note": row.get("note"), }, ) return doc_type_id async def ensure_top_group(session, doc_type_row) -> int: """Create or reuse the top-level root group for one doc type.""" doc_type_id = int(doc_type_row["id"]) top_code = str(doc_type_row["code"]) top_name = str(doc_type_row["name"]) top_sort = int(doc_type_row.get("sort_order") or 0) top = ( await session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND COALESCE(pid, 0) = 0 AND document_type_id = :doc_type_id LIMIT 1 """ ), {"doc_type_id": doc_type_id}, ) ).mappings().first() if top: await session.execute( text( """ UPDATE leaudit_evaluation_point_groups SET code = :code, name = :name, description = :description, sort_order = :sort_order, is_enabled = :is_enabled, updated_at = NOW() WHERE id = :group_id """ ), { "group_id": int(top["id"]), "code": top_code, "name": top_name, "description": doc_type_row.get("description"), "sort_order": top_sort, "is_enabled": bool(doc_type_row.get("is_enabled", True)), }, ) return int(top["id"]) legacy = ( await session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND document_type_id = :doc_type_id AND LOWER(code) = LOWER(:code) ORDER BY CASE WHEN COALESCE(pid, 0) = 0 THEN 0 ELSE 1 END, id ASC LIMIT 1 """ ), {"doc_type_id": doc_type_id, "code": top_code}, ) ).mappings().first() if legacy: await session.execute( text( """ UPDATE leaudit_evaluation_point_groups SET pid = 0, code = :code, name = :name, description = :description, document_type_id = :doc_type_id, sort_order = :sort_order, is_enabled = :is_enabled, updated_at = NOW() WHERE id = :group_id """ ), { "group_id": int(legacy["id"]), "doc_type_id": doc_type_id, "code": top_code, "name": top_name, "description": doc_type_row.get("description"), "sort_order": top_sort, "is_enabled": bool(doc_type_row.get("is_enabled", True)), }, ) return int(legacy["id"]) inserted = ( await session.execute( text( """ INSERT INTO leaudit_evaluation_point_groups ( pid, code, name, description, document_type_id, sort_order, is_enabled, created_at, updated_at ) VALUES ( 0, :code, :name, :description, :doc_type_id, :sort_order, :is_enabled, NOW(), NOW() ) RETURNING id """ ), { "doc_type_id": doc_type_id, "code": top_code, "name": top_name, "description": doc_type_row.get("description"), "sort_order": top_sort, "is_enabled": bool(doc_type_row.get("is_enabled", True)), }, ) ).mappings().one() return int(inserted["id"]) async def ensure_default_child_group(session, doc_type_row, top_group_id: int) -> int: """Ensure there is at least one default second-level group under a doc-type root.""" doc_type_id = int(doc_type_row["id"]) child = ( await session.execute( text( """ SELECT id, pid FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND pid = :pid AND document_type_id = :doc_type_id AND LOWER(code) = LOWER(:code) LIMIT 1 """ ), {"pid": top_group_id, "doc_type_id": doc_type_id, "code": f"{doc_type_row['code']}.default"}, ) ).mappings().first() any_children = int( ( await session.execute( text( """ SELECT COUNT(*) FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND pid = :pid """ ), {"pid": top_group_id}, ) ).scalar_one() ) if child: return int(child["id"]) if any_children > 0: adopted = ( await session.execute( text( """ SELECT id FROM leaudit_evaluation_point_groups WHERE deleted_at IS NULL AND pid = :pid AND document_type_id = :doc_type_id ORDER BY sort_order ASC, id ASC LIMIT 1 """ ), {"pid": top_group_id, "doc_type_id": doc_type_id}, ) ).mappings().first() if adopted: return int(adopted["id"]) payload = { "pid": top_group_id, "code": f"{doc_type_row['code']}.default", "name": "通用", "description": f"{doc_type_row['name']}默认子类型", "document_type_id": doc_type_id, "sort_order": int(doc_type_row.get("sort_order") or 0), "is_enabled": bool(doc_type_row.get("is_enabled", True)), } inserted = ( await session.execute( text( """ INSERT INTO leaudit_evaluation_point_groups ( pid, code, name, description, document_type_id, sort_order, is_enabled, created_at, updated_at ) VALUES ( :pid, :code, :name, :description, :document_type_id, :sort_order, :is_enabled, NOW(), NOW() ) RETURNING id """ ), payload, ) ).mappings().one() default_child_id = int(inserted["id"]) await _move_root_bindings_to_default_child(session, top_group_id, default_child_id) return int(inserted["id"]) async def _move_root_bindings_to_default_child(session, root_group_id: int, child_group_id: int) -> None: await session.execute( text( """ UPDATE leaudit_rule_group_bindings SET group_id = :child_group_id, updated_at = NOW() WHERE group_id = :root_group_id AND deleted_at IS NULL """ ), {"root_group_id": root_group_id, "child_group_id": child_group_id}, ) async def _replace_group_bindings(session, child_group_id: int, doc_type_id: int, bindings: list[dict[str, Any]]) -> None: await session.execute( text( "UPDATE leaudit_rule_group_bindings SET deleted_at = NOW(), updated_at = NOW() WHERE group_id = :group_id AND deleted_at IS NULL" ), {"group_id": child_group_id}, ) for item in bindings: await session.execute( text( """ INSERT INTO leaudit_rule_group_bindings ( group_id, rule_set_id, rule_type_binding_id, priority, is_active, note, created_at, updated_at ) VALUES ( :group_id, :rule_set_id, :rule_type_binding_id, :priority, :is_active, :note, NOW(), NOW() ) """ ), { "group_id": child_group_id, "rule_set_id": int(item["rule_set_id"]), "rule_type_binding_id": int(item["id"]) if item.get("id") else None, "priority": int(item.get("priority") or 0), "is_active": bool(item.get("is_active", True)), "note": item.get("note"), }, ) await sync_doc_type_bindings_from_group(session, child_group_id)