#!/usr/bin/env python3 """Migrate legacy contract templates from docauditai to leaudit_platform.""" from __future__ import annotations import argparse import asyncio from dataclasses import dataclass from io import BytesIO from pathlib import Path import asyncpg from minio import Minio from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils ROOT = Path(__file__).resolve().parents[1] APP_TOML = ROOT / "app.toml" OLD_BUCKET = "docauditai" @dataclass(frozen=True) class LegacyCategory: id: int name: str icon: str | None description: str | None sort_order: int @dataclass(frozen=True) class LegacyTemplate: id: int template_code: str title: str category_id: int description: str | None file_path: str | None file_format: str | None is_featured: bool | None created_at: object updated_at: object pdf_file_path: str | None category_name: str def load_target_config() -> dict[str, str]: try: import tomllib except ImportError: # pragma: no cover import tomli as tomllib with APP_TOML.open("rb") as fh: config = tomllib.load(fh) db = config["DB"] oss = config["OSS"] return { "target_dsn": ( f"postgresql://{db['USER']}:{db['PASSWORD']}" f"@{db['HOST']}:{db['PORT']}/{db['NAME']}" ), "oss_endpoint": oss["ENDPOINT"], "oss_base_url": oss.get("BASE_URL", ""), "oss_access_key": oss["ACCESS_KEY"], "oss_secret_key": oss["SECRET_KEY"], "oss_bucket": oss["BUCKET"], } def build_legacy_dsn(args: argparse.Namespace) -> str: return ( f"postgresql://{args.legacy_user}:{args.legacy_password}" f"@{args.legacy_host}:{args.legacy_port}/{args.legacy_db}" ) def build_minio_client(config: dict[str, str]) -> Minio: endpoint = config["oss_endpoint"] base_url = config.get("oss_base_url", "") if base_url.startswith("http://"): secure = False elif base_url.startswith("https://"): secure = True else: secure = endpoint.startswith("https://") host = endpoint.replace("http://", "").replace("https://", "") return Minio( host, access_key=config["oss_access_key"], secret_key=config["oss_secret_key"], secure=secure, ) async def fetch_legacy_categories(conn: asyncpg.Connection) -> list[LegacyCategory]: rows = await conn.fetch( """ SELECT id, name, icon, description, COALESCE(sort_order, 0) AS sort_order FROM public.contract_categories ORDER BY id """ ) return [LegacyCategory(**dict(row)) for row in rows] async def fetch_legacy_templates(conn: asyncpg.Connection) -> list[LegacyTemplate]: rows = await conn.fetch( """ SELECT t.id, t.template_code, t.title, t.category_id, t.description, t.file_path, t.file_format, t.is_featured, t.created_at, t.updated_at, t.pdf_file_path, c.name AS category_name FROM public.contract_templates t LEFT JOIN public.contract_categories c ON c.id = t.category_id ORDER BY t.id """ ) return [LegacyTemplate(**dict(row)) for row in rows] def resolve_docx_path(template: LegacyTemplate, object_keys: set[str]) -> str: file_path = (template.file_path or "").strip() if not file_path: raise ValueError(f"template {template.id} missing file_path") if file_path in object_keys: pdf_path = (template.pdf_file_path or "").strip() if pdf_path and pdf_path in object_keys: expected_docx = str(Path(pdf_path).with_suffix(".docx")) if expected_docx in object_keys: current_name = Path(file_path).name expected_name = Path(expected_docx).name if current_name != expected_name: return expected_docx return file_path pdf_path = (template.pdf_file_path or "").strip() if pdf_path: expected_docx = str(Path(pdf_path).with_suffix(".docx")) if expected_docx in object_keys: return expected_docx raise FileNotFoundError(f"template {template.id} docx not found: {file_path}") def resolve_pdf_path(template: LegacyTemplate, object_keys: set[str]) -> str: pdf_path = (template.pdf_file_path or "").strip() if not pdf_path: raise ValueError(f"template {template.id} missing pdf_file_path") if pdf_path in object_keys: return pdf_path raise FileNotFoundError(f"template {template.id} pdf not found: {pdf_path}") def build_new_object_keys(template: LegacyTemplate, docx_path: str, pdf_path: str) -> tuple[str, str]: docx_key = OssPathUtils.BuildContractTemplateKey( CategoryName=template.category_name, TemplateCode=template.template_code, FileRole="source", FileName=Path(docx_path).name, ) pdf_key = OssPathUtils.BuildContractTemplateKey( CategoryName=template.category_name, TemplateCode=template.template_code, FileRole="preview", FileName=Path(pdf_path).name, ) return docx_key, pdf_key def copy_object_bytes( client: Minio, *, source_bucket: str, source_key: str, target_bucket: str, target_key: str, ) -> None: response = client.get_object(source_bucket, source_key) try: payload = response.read() finally: response.close() response.release_conn() client.put_object( target_bucket, target_key, data=BytesIO(payload), length=len(payload), ) def ensure_bucket(client: Minio, bucket: str) -> None: if not client.bucket_exists(bucket): client.make_bucket(bucket) async def reset_target_tables(conn: asyncpg.Connection) -> None: await conn.execute("TRUNCATE TABLE public.contract_templates RESTART IDENTITY CASCADE") await conn.execute("TRUNCATE TABLE public.contract_categories RESTART IDENTITY CASCADE") async def insert_categories(conn: asyncpg.Connection, categories: list[LegacyCategory]) -> None: for category in categories: await conn.execute( """ INSERT INTO public.contract_categories (id, name, icon, description, sort_order) VALUES ($1, $2, $3, $4, $5) """, category.id, category.name, category.icon, category.description, category.sort_order, ) await conn.execute( """ SELECT setval( pg_get_serial_sequence('public.contract_categories', 'id'), COALESCE((SELECT MAX(id) FROM public.contract_categories), 1), TRUE ) """ ) async def insert_templates( conn: asyncpg.Connection, templates: list[LegacyTemplate], template_paths: dict[int, tuple[str, str]], ) -> None: for template in templates: file_path, pdf_file_path = template_paths[template.id] await conn.execute( """ INSERT INTO public.contract_templates ( id, template_code, title, category_id, description, file_path, file_format, is_featured, created_at, updated_at, pdf_file_path ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) """, template.id, template.template_code, template.title, template.category_id, template.description, file_path, (template.file_format or "docx").lower(), bool(template.is_featured), template.created_at, template.updated_at, pdf_file_path, ) await conn.execute( """ SELECT setval( pg_get_serial_sequence('public.contract_templates', 'id'), COALESCE((SELECT MAX(id) FROM public.contract_templates), 1), TRUE ) """ ) async def main() -> None: parser = argparse.ArgumentParser(description="Migrate legacy contract templates.") parser.add_argument("--legacy-host", default="nas.7bm.co") parser.add_argument("--legacy-port", type=int, default=54302) parser.add_argument("--legacy-db", default="docauditai") parser.add_argument("--legacy-user", default="root") parser.add_argument("--legacy-password", default="postgresql.2025.qwe") parser.add_argument("--apply", action="store_true", help="Apply migration to OSS and target DB.") args = parser.parse_args() config = load_target_config() legacy_dsn = build_legacy_dsn(args) target_dsn = config["target_dsn"] target_bucket = config["oss_bucket"] minio_client = build_minio_client(config) legacy_conn = await asyncpg.connect(legacy_dsn) target_conn = await asyncpg.connect(target_dsn) try: ensure_bucket(minio_client, target_bucket) categories = await fetch_legacy_categories(legacy_conn) templates = await fetch_legacy_templates(legacy_conn) object_keys = { obj.object_name for obj in minio_client.list_objects(OLD_BUCKET, prefix="contract-template/", recursive=True) } template_paths: dict[int, tuple[str, str]] = {} for template in templates: docx_path = resolve_docx_path(template, object_keys) pdf_path = resolve_pdf_path(template, object_keys) template_paths[template.id] = build_new_object_keys(template, docx_path, pdf_path) print(f"legacy categories: {len(categories)}") print(f"legacy templates: {len(templates)}") for template in templates: old_docx = resolve_docx_path(template, object_keys) old_pdf = resolve_pdf_path(template, object_keys) new_docx, new_pdf = template_paths[template.id] print( f"[{template.id}] {template.template_code} | " f"{old_docx} -> {new_docx} | {old_pdf} -> {new_pdf}" ) if not args.apply: print("dry-run complete; rerun with --apply to execute migration") return if args.apply: found_correction = False for template in templates: old_docx = resolve_docx_path(template, object_keys) old_pdf = resolve_pdf_path(template, object_keys) new_docx, new_pdf = template_paths[template.id] if old_docx != (template.file_path or "").strip(): print( f"corrected docx path for template {template.id}: " f"{template.file_path} -> {old_docx}" ) found_correction = True copy_object_bytes( minio_client, source_bucket=OLD_BUCKET, source_key=old_docx, target_bucket=target_bucket, target_key=new_docx, ) copy_object_bytes( minio_client, source_bucket=OLD_BUCKET, source_key=old_pdf, target_bucket=target_bucket, target_key=new_pdf, ) if not found_correction: print("no legacy path corrections required") async with target_conn.transaction(): await reset_target_tables(target_conn) await insert_categories(target_conn, categories) await insert_templates(target_conn, templates, template_paths) print("migration applied successfully") finally: await legacy_conn.close() await target_conn.close() if __name__ == "__main__": asyncio.run(main())