237 lines
7.5 KiB
Python
237 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Preview split document version chains that can be merged under the new rule.
|
|
|
|
The new rule groups versions by:
|
|
- region
|
|
- root business group (一级分组)
|
|
- normalized_name
|
|
|
|
This script is read-only. It prints candidate groups whose records currently
|
|
span more than one version_group_key, which means historical uploads were
|
|
split into multiple chains under the old same-type-only rule.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import asyncpg
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
APP_TOML = ROOT / "app.toml"
|
|
|
|
|
|
@dataclass
|
|
class CandidateRow:
|
|
document_id: int
|
|
region: str
|
|
normalized_name: str
|
|
root_group_id: int | None
|
|
root_group_name: str | None
|
|
type_id: int | None
|
|
type_name: str | None
|
|
group_id: int | None
|
|
group_name: str | None
|
|
version_group_key: str
|
|
version_no: int
|
|
is_latest_version: bool
|
|
file_name: str | None
|
|
updated_at: Any
|
|
created_at: Any
|
|
|
|
|
|
def load_target_dsn() -> str:
|
|
try:
|
|
import tomllib
|
|
except ImportError: # pragma: no cover
|
|
import tomli as tomllib
|
|
|
|
with APP_TOML.open("rb") as fh:
|
|
config = tomllib.load(fh)
|
|
db = config["DB"]
|
|
return (
|
|
f"postgresql://{db['USER']}:{db['PASSWORD']}"
|
|
f"@{db['HOST']}:{db['PORT']}/{db['NAME']}"
|
|
)
|
|
|
|
|
|
async def fetch_candidates(
|
|
conn: asyncpg.Connection,
|
|
*,
|
|
region: str | None,
|
|
normalized_name: str | None,
|
|
) -> list[CandidateRow]:
|
|
filters = ["d.deleted_at IS NULL"]
|
|
params: list[Any] = []
|
|
|
|
if region:
|
|
params.append(region)
|
|
filters.append(f"d.region = ${len(params)}")
|
|
if normalized_name:
|
|
params.append(normalized_name.strip().lower())
|
|
filters.append(f"d.normalized_name = ${len(params)}")
|
|
|
|
where_clause = " AND ".join(filters)
|
|
|
|
rows = await conn.fetch(
|
|
f"""
|
|
WITH doc_scope AS (
|
|
SELECT
|
|
d.id AS document_id,
|
|
d.region,
|
|
d.normalized_name,
|
|
d.version_group_key,
|
|
d.version_no,
|
|
d.is_latest_version,
|
|
d.type_id,
|
|
dt.name AS type_name,
|
|
d.group_id,
|
|
child.name AS group_name,
|
|
COALESCE(
|
|
CASE
|
|
WHEN child.id IS NULL THEN NULL
|
|
WHEN COALESCE(child.pid, 0) = 0 THEN child.id
|
|
ELSE child.pid
|
|
END,
|
|
inferred.root_group_id
|
|
) AS root_group_id,
|
|
COALESCE(
|
|
root.name,
|
|
inferred.root_group_name
|
|
) AS root_group_name,
|
|
d.created_at,
|
|
d.updated_at,
|
|
f.file_name
|
|
FROM leaudit_documents d
|
|
LEFT JOIN leaudit_document_types dt
|
|
ON dt.id = d.type_id
|
|
LEFT JOIN leaudit_evaluation_point_groups child
|
|
ON child.id = d.group_id
|
|
LEFT JOIN leaudit_evaluation_point_groups root
|
|
ON root.id = CASE
|
|
WHEN child.id IS NULL THEN NULL
|
|
WHEN COALESCE(child.pid, 0) = 0 THEN child.id
|
|
ELSE child.pid
|
|
END
|
|
LEFT JOIN (
|
|
SELECT
|
|
child2.document_type_id,
|
|
CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(child2.pid) END AS root_group_id,
|
|
CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(parent.name) END AS root_group_name
|
|
FROM leaudit_evaluation_point_groups child2
|
|
JOIN leaudit_evaluation_point_groups parent
|
|
ON parent.id = child2.pid
|
|
WHERE COALESCE(child2.pid, 0) <> 0
|
|
AND child2.deleted_at IS NULL
|
|
AND child2.is_enabled = true
|
|
AND child2.document_type_id IS NOT NULL
|
|
GROUP BY child2.document_type_id
|
|
) inferred
|
|
ON inferred.document_type_id = d.type_id
|
|
LEFT JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.is_active = true
|
|
AND f.file_role = 'primary'
|
|
WHERE {where_clause}
|
|
),
|
|
conflict_keys AS (
|
|
SELECT
|
|
region,
|
|
normalized_name,
|
|
root_group_id
|
|
FROM doc_scope
|
|
WHERE normalized_name IS NOT NULL
|
|
AND normalized_name <> ''
|
|
AND root_group_id IS NOT NULL
|
|
GROUP BY region, normalized_name, root_group_id
|
|
HAVING COUNT(DISTINCT version_group_key) > 1
|
|
)
|
|
SELECT ds.*
|
|
FROM doc_scope ds
|
|
JOIN conflict_keys ck
|
|
ON ck.region = ds.region
|
|
AND ck.normalized_name = ds.normalized_name
|
|
AND ck.root_group_id = ds.root_group_id
|
|
ORDER BY ds.region, ds.root_group_name, ds.normalized_name, ds.created_at ASC, ds.document_id ASC
|
|
""",
|
|
*params,
|
|
)
|
|
|
|
return [CandidateRow(**dict(row)) for row in rows]
|
|
|
|
|
|
def print_preview(rows: list[CandidateRow], limit: int) -> None:
|
|
grouped: dict[tuple[str, str, int | None], list[CandidateRow]] = defaultdict(list)
|
|
for row in rows:
|
|
grouped[(row.region, row.normalized_name, row.root_group_id)].append(row)
|
|
|
|
items = sorted(
|
|
grouped.items(),
|
|
key=lambda item: (
|
|
item[0][0],
|
|
item[1][0].root_group_name or "",
|
|
item[0][1],
|
|
),
|
|
)
|
|
|
|
print(f"候选合并组: {len(items)}")
|
|
if not items:
|
|
return
|
|
|
|
shown = 0
|
|
for (_, _, _), group_rows in items:
|
|
if shown >= limit:
|
|
break
|
|
shown += 1
|
|
|
|
sample = group_rows[0]
|
|
distinct_chain_keys = sorted({row.version_group_key for row in group_rows})
|
|
print()
|
|
print(
|
|
f"[{shown}] 地区={sample.region} | 一级分组={sample.root_group_name or sample.root_group_id} | "
|
|
f"归一化名称={sample.normalized_name} | 当前链数={len(distinct_chain_keys)} | 文档数={len(group_rows)}"
|
|
)
|
|
|
|
for idx, row in enumerate(group_rows, start=1):
|
|
latest_mark = "latest" if row.is_latest_version else "history"
|
|
display_name = row.file_name or row.normalized_name
|
|
print(
|
|
f" - #{idx} doc={row.document_id} chain={row.version_group_key} v{row.version_no} {latest_mark} "
|
|
f"type={row.type_name or row.type_id} child={row.group_name or row.group_id} "
|
|
f"time={row.updated_at} file={display_name}"
|
|
)
|
|
|
|
if len(items) > limit:
|
|
print()
|
|
print(f"... 还剩 {len(items) - limit} 组未显示,可通过 --limit 查看更多")
|
|
|
|
|
|
async def main() -> None:
|
|
parser = argparse.ArgumentParser(description="预览需要合并的历史文档版本链")
|
|
parser.add_argument("--region", help="仅查看指定地区,如 cz / mz / default")
|
|
parser.add_argument("--name", help="仅查看指定 normalized_name")
|
|
parser.add_argument("--limit", type=int, default=50, help="最多展示多少组候选,默认 50")
|
|
args = parser.parse_args()
|
|
|
|
conn = await asyncpg.connect(load_target_dsn())
|
|
try:
|
|
rows = await fetch_candidates(
|
|
conn,
|
|
region=args.region,
|
|
normalized_name=args.name,
|
|
)
|
|
finally:
|
|
await conn.close()
|
|
|
|
print_preview(rows, max(1, args.limit))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|