#!/usr/bin/env python3 """Preview split document version chains that can be merged under the new rule. The new rule groups versions by: - region - root business group (一级分组) - normalized_name This script is read-only. It prints candidate groups whose records currently span more than one version_group_key, which means historical uploads were split into multiple chains under the old same-type-only rule. """ from __future__ import annotations import argparse import asyncio from collections import defaultdict from dataclasses import dataclass from pathlib import Path from typing import Any import asyncpg ROOT = Path(__file__).resolve().parents[1] APP_TOML = ROOT / "app.toml" @dataclass class CandidateRow: document_id: int region: str normalized_name: str root_group_id: int | None root_group_name: str | None type_id: int | None type_name: str | None group_id: int | None group_name: str | None version_group_key: str version_no: int is_latest_version: bool file_name: str | None updated_at: Any created_at: Any def load_target_dsn() -> str: try: import tomllib except ImportError: # pragma: no cover import tomli as tomllib with APP_TOML.open("rb") as fh: config = tomllib.load(fh) db = config["DB"] return ( f"postgresql://{db['USER']}:{db['PASSWORD']}" f"@{db['HOST']}:{db['PORT']}/{db['NAME']}" ) async def fetch_candidates( conn: asyncpg.Connection, *, region: str | None, normalized_name: str | None, ) -> list[CandidateRow]: filters = ["d.deleted_at IS NULL"] params: list[Any] = [] if region: params.append(region) filters.append(f"d.region = ${len(params)}") if normalized_name: params.append(normalized_name.strip().lower()) filters.append(f"d.normalized_name = ${len(params)}") where_clause = " AND ".join(filters) rows = await conn.fetch( f""" WITH doc_scope AS ( SELECT d.id AS document_id, d.region, d.normalized_name, d.version_group_key, d.version_no, d.is_latest_version, d.type_id, dt.name AS type_name, d.group_id, child.name AS group_name, COALESCE( CASE WHEN child.id IS NULL THEN NULL WHEN COALESCE(child.pid, 0) = 0 THEN child.id ELSE child.pid END, inferred.root_group_id ) AS root_group_id, COALESCE( root.name, inferred.root_group_name ) AS root_group_name, d.created_at, d.updated_at, f.file_name FROM leaudit_documents d LEFT JOIN leaudit_document_types dt ON dt.id = d.type_id LEFT JOIN leaudit_evaluation_point_groups child ON child.id = d.group_id LEFT JOIN leaudit_evaluation_point_groups root ON root.id = CASE WHEN child.id IS NULL THEN NULL WHEN COALESCE(child.pid, 0) = 0 THEN child.id ELSE child.pid END LEFT JOIN ( SELECT child2.document_type_id, CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(child2.pid) END AS root_group_id, CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(parent.name) END AS root_group_name FROM leaudit_evaluation_point_groups child2 JOIN leaudit_evaluation_point_groups parent ON parent.id = child2.pid WHERE COALESCE(child2.pid, 0) <> 0 AND child2.deleted_at IS NULL AND child2.is_enabled = true AND child2.document_type_id IS NOT NULL GROUP BY child2.document_type_id ) inferred ON inferred.document_type_id = d.type_id LEFT JOIN leaudit_document_files f ON f.document_id = d.id AND f.is_active = true AND f.file_role = 'primary' WHERE {where_clause} ), conflict_keys AS ( SELECT region, normalized_name, root_group_id FROM doc_scope WHERE normalized_name IS NOT NULL AND normalized_name <> '' AND root_group_id IS NOT NULL GROUP BY region, normalized_name, root_group_id HAVING COUNT(DISTINCT version_group_key) > 1 ) SELECT ds.* FROM doc_scope ds JOIN conflict_keys ck ON ck.region = ds.region AND ck.normalized_name = ds.normalized_name AND ck.root_group_id = ds.root_group_id ORDER BY ds.region, ds.root_group_name, ds.normalized_name, ds.created_at ASC, ds.document_id ASC """, *params, ) return [CandidateRow(**dict(row)) for row in rows] def print_preview(rows: list[CandidateRow], limit: int) -> None: grouped: dict[tuple[str, str, int | None], list[CandidateRow]] = defaultdict(list) for row in rows: grouped[(row.region, row.normalized_name, row.root_group_id)].append(row) items = sorted( grouped.items(), key=lambda item: ( item[0][0], item[1][0].root_group_name or "", item[0][1], ), ) print(f"候选合并组: {len(items)}") if not items: return shown = 0 for (_, _, _), group_rows in items: if shown >= limit: break shown += 1 sample = group_rows[0] distinct_chain_keys = sorted({row.version_group_key for row in group_rows}) print() print( f"[{shown}] 地区={sample.region} | 一级分组={sample.root_group_name or sample.root_group_id} | " f"归一化名称={sample.normalized_name} | 当前链数={len(distinct_chain_keys)} | 文档数={len(group_rows)}" ) for idx, row in enumerate(group_rows, start=1): latest_mark = "latest" if row.is_latest_version else "history" display_name = row.file_name or row.normalized_name print( f" - #{idx} doc={row.document_id} chain={row.version_group_key} v{row.version_no} {latest_mark} " f"type={row.type_name or row.type_id} child={row.group_name or row.group_id} " f"time={row.updated_at} file={display_name}" ) if len(items) > limit: print() print(f"... 还剩 {len(items) - limit} 组未显示,可通过 --limit 查看更多") async def main() -> None: parser = argparse.ArgumentParser(description="预览需要合并的历史文档版本链") parser.add_argument("--region", help="仅查看指定地区,如 cz / mz / default") parser.add_argument("--name", help="仅查看指定 normalized_name") parser.add_argument("--limit", type=int, default=50, help="最多展示多少组候选,默认 50") args = parser.parse_args() conn = await asyncpg.connect(load_target_dsn()) try: rows = await fetch_candidates( conn, region=args.region, normalized_name=args.name, ) finally: await conn.close() print_preview(rows, max(1, args.limit)) if __name__ == "__main__": asyncio.run(main())