feat: add rag backend and review access fixes
This commit is contained in:
@@ -0,0 +1,277 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Merge historical split document version chains under the new root-group rule.
|
||||
|
||||
Default mode is dry-run. Use --apply to write changes.
|
||||
|
||||
Merge key:
|
||||
- region
|
||||
- root business group (一级分组)
|
||||
- normalized_name
|
||||
|
||||
Within one merge group:
|
||||
- sort by created_at ASC, then document_id ASC
|
||||
- keep the earliest document as root version
|
||||
- reuse the earliest document's version_group_key as the canonical chain key
|
||||
- renumber version_no from 1..N
|
||||
- link previous_version_id sequentially
|
||||
- mark only the newest document as is_latest_version = true
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import asyncio
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import asyncpg
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
APP_TOML = ROOT / "app.toml"
|
||||
|
||||
|
||||
@dataclass
|
||||
class CandidateRow:
|
||||
document_id: int
|
||||
region: str
|
||||
normalized_name: str
|
||||
root_group_id: int | None
|
||||
root_group_name: str | None
|
||||
type_id: int | None
|
||||
type_name: str | None
|
||||
group_id: int | None
|
||||
group_name: str | None
|
||||
version_group_key: str
|
||||
version_no: int
|
||||
is_latest_version: bool
|
||||
file_name: str | None
|
||||
updated_at: Any
|
||||
created_at: Any
|
||||
|
||||
|
||||
def load_target_dsn() -> str:
|
||||
try:
|
||||
import tomllib
|
||||
except ImportError: # pragma: no cover
|
||||
import tomli as tomllib
|
||||
|
||||
with APP_TOML.open("rb") as fh:
|
||||
config = tomllib.load(fh)
|
||||
db = config["DB"]
|
||||
return (
|
||||
f"postgresql://{db['USER']}:{db['PASSWORD']}"
|
||||
f"@{db['HOST']}:{db['PORT']}/{db['NAME']}"
|
||||
)
|
||||
|
||||
|
||||
async def fetch_candidates(
|
||||
conn: asyncpg.Connection,
|
||||
*,
|
||||
region: str | None,
|
||||
normalized_name: str | None,
|
||||
) -> list[CandidateRow]:
|
||||
filters = ["d.deleted_at IS NULL"]
|
||||
params: list[Any] = []
|
||||
|
||||
if region:
|
||||
params.append(region)
|
||||
filters.append(f"d.region = ${len(params)}")
|
||||
if normalized_name:
|
||||
params.append(normalized_name.strip().lower())
|
||||
filters.append(f"d.normalized_name = ${len(params)}")
|
||||
|
||||
where_clause = " AND ".join(filters)
|
||||
|
||||
rows = await conn.fetch(
|
||||
f"""
|
||||
WITH doc_scope AS (
|
||||
SELECT
|
||||
d.id AS document_id,
|
||||
d.region,
|
||||
d.normalized_name,
|
||||
d.version_group_key,
|
||||
d.version_no,
|
||||
d.is_latest_version,
|
||||
d.type_id,
|
||||
dt.name AS type_name,
|
||||
d.group_id,
|
||||
child.name AS group_name,
|
||||
COALESCE(
|
||||
CASE
|
||||
WHEN child.id IS NULL THEN NULL
|
||||
WHEN COALESCE(child.pid, 0) = 0 THEN child.id
|
||||
ELSE child.pid
|
||||
END,
|
||||
inferred.root_group_id
|
||||
) AS root_group_id,
|
||||
COALESCE(root.name, inferred.root_group_name) AS root_group_name,
|
||||
d.created_at,
|
||||
d.updated_at,
|
||||
f.file_name
|
||||
FROM leaudit_documents d
|
||||
LEFT JOIN leaudit_document_types dt
|
||||
ON dt.id = d.type_id
|
||||
LEFT JOIN leaudit_evaluation_point_groups child
|
||||
ON child.id = d.group_id
|
||||
LEFT JOIN leaudit_evaluation_point_groups root
|
||||
ON root.id = CASE
|
||||
WHEN child.id IS NULL THEN NULL
|
||||
WHEN COALESCE(child.pid, 0) = 0 THEN child.id
|
||||
ELSE child.pid
|
||||
END
|
||||
LEFT JOIN (
|
||||
SELECT
|
||||
child2.document_type_id,
|
||||
CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(child2.pid) END AS root_group_id,
|
||||
CASE WHEN COUNT(DISTINCT child2.pid) = 1 THEN MIN(parent.name) END AS root_group_name
|
||||
FROM leaudit_evaluation_point_groups child2
|
||||
JOIN leaudit_evaluation_point_groups parent
|
||||
ON parent.id = child2.pid
|
||||
WHERE COALESCE(child2.pid, 0) <> 0
|
||||
AND child2.deleted_at IS NULL
|
||||
AND child2.is_enabled = true
|
||||
AND child2.document_type_id IS NOT NULL
|
||||
GROUP BY child2.document_type_id
|
||||
) inferred
|
||||
ON inferred.document_type_id = d.type_id
|
||||
LEFT JOIN leaudit_document_files f
|
||||
ON f.document_id = d.id
|
||||
AND f.is_active = true
|
||||
AND f.file_role = 'primary'
|
||||
WHERE {where_clause}
|
||||
),
|
||||
conflict_keys AS (
|
||||
SELECT
|
||||
region,
|
||||
normalized_name,
|
||||
root_group_id
|
||||
FROM doc_scope
|
||||
WHERE normalized_name IS NOT NULL
|
||||
AND normalized_name <> ''
|
||||
AND root_group_id IS NOT NULL
|
||||
GROUP BY region, normalized_name, root_group_id
|
||||
HAVING COUNT(DISTINCT version_group_key) > 1
|
||||
)
|
||||
SELECT ds.*
|
||||
FROM doc_scope ds
|
||||
JOIN conflict_keys ck
|
||||
ON ck.region = ds.region
|
||||
AND ck.normalized_name = ds.normalized_name
|
||||
AND ck.root_group_id = ds.root_group_id
|
||||
ORDER BY ds.region, ds.root_group_name, ds.normalized_name, ds.created_at ASC, ds.document_id ASC
|
||||
""",
|
||||
*params,
|
||||
)
|
||||
|
||||
return [CandidateRow(**dict(row)) for row in rows]
|
||||
|
||||
|
||||
def group_candidates(rows: list[CandidateRow]) -> list[list[CandidateRow]]:
|
||||
grouped: dict[tuple[str, str, int], list[CandidateRow]] = defaultdict(list)
|
||||
for row in rows:
|
||||
if row.root_group_id is None:
|
||||
continue
|
||||
grouped[(row.region, row.normalized_name, row.root_group_id)].append(row)
|
||||
|
||||
result: list[list[CandidateRow]] = []
|
||||
for group_rows in grouped.values():
|
||||
group_rows.sort(key=lambda item: (item.created_at, item.document_id))
|
||||
result.append(group_rows)
|
||||
result.sort(key=lambda items: (items[0].region, items[0].root_group_name or "", items[0].normalized_name))
|
||||
return result
|
||||
|
||||
|
||||
def print_plan(groups: list[list[CandidateRow]], limit: int) -> None:
|
||||
print(f"计划处理候选组: {len(groups)}")
|
||||
if not groups:
|
||||
return
|
||||
|
||||
for index, rows in enumerate(groups[:limit], start=1):
|
||||
first = rows[0]
|
||||
canonical_key = first.version_group_key
|
||||
root_id = first.document_id
|
||||
latest_id = rows[-1].document_id
|
||||
print()
|
||||
print(
|
||||
f"[{index}] 地区={first.region} | 一级分组={first.root_group_name or first.root_group_id} | "
|
||||
f"名称={first.normalized_name} | 目标链={canonical_key} | 根文档={root_id} | 最新文档={latest_id}"
|
||||
)
|
||||
for version_no, row in enumerate(rows, start=1):
|
||||
previous_id = rows[version_no - 2].document_id if version_no > 1 else None
|
||||
latest_mark = "latest" if row.document_id == latest_id else "history"
|
||||
print(
|
||||
f" - doc={row.document_id} old_chain={row.version_group_key} old_v={row.version_no} "
|
||||
f"-> new_v={version_no} prev={previous_id} {latest_mark} type={row.type_name or row.type_id}"
|
||||
)
|
||||
|
||||
if len(groups) > limit:
|
||||
print()
|
||||
print(f"... 还剩 {len(groups) - limit} 组未显示,可通过 --limit 查看更多")
|
||||
|
||||
|
||||
async def apply_groups(conn: asyncpg.Connection, groups: list[list[CandidateRow]]) -> None:
|
||||
for rows in groups:
|
||||
root_row = rows[0]
|
||||
canonical_key = root_row.version_group_key
|
||||
root_document_id = root_row.document_id
|
||||
latest_document_id = rows[-1].document_id
|
||||
|
||||
for version_no, row in enumerate(rows, start=1):
|
||||
previous_id = rows[version_no - 2].document_id if version_no > 1 else None
|
||||
await conn.execute(
|
||||
"""
|
||||
UPDATE leaudit_documents
|
||||
SET version_group_key = $1,
|
||||
version_no = $2,
|
||||
previous_version_id = $3,
|
||||
root_version_id = $4,
|
||||
is_latest_version = $5,
|
||||
updated_at = NOW()
|
||||
WHERE id = $6
|
||||
""",
|
||||
canonical_key,
|
||||
version_no,
|
||||
previous_id,
|
||||
root_document_id,
|
||||
row.document_id == latest_document_id,
|
||||
row.document_id,
|
||||
)
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
parser = argparse.ArgumentParser(description="合并历史上被拆开的文档版本链")
|
||||
parser.add_argument("--region", help="仅处理指定地区")
|
||||
parser.add_argument("--name", help="仅处理指定 normalized_name")
|
||||
parser.add_argument("--limit", type=int, default=50, help="最多展示多少组计划")
|
||||
parser.add_argument("--apply", action="store_true", help="真正写库;默认仅预览")
|
||||
args = parser.parse_args()
|
||||
|
||||
conn = await asyncpg.connect(load_target_dsn())
|
||||
try:
|
||||
rows = await fetch_candidates(
|
||||
conn,
|
||||
region=args.region,
|
||||
normalized_name=args.name,
|
||||
)
|
||||
groups = group_candidates(rows)
|
||||
print_plan(groups, max(1, args.limit))
|
||||
|
||||
if not args.apply:
|
||||
print()
|
||||
print("dry-run complete; rerun with --apply to write data")
|
||||
return
|
||||
|
||||
async with conn.transaction():
|
||||
await apply_groups(conn, groups)
|
||||
|
||||
print()
|
||||
print(f"apply complete; merged groups: {len(groups)}")
|
||||
finally:
|
||||
await conn.close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user