#!/usr/bin/env python3 """按已有 govdoc run 重生成 HTML 报告并覆盖 OSS 产物。""" from __future__ import annotations import argparse import asyncio import hashlib import json from typing import Any from sqlalchemy import text from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import ( AuditResult, AuditSummary, CheckedRule, OutlineNode, StructureItem, ) from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Finding, Location from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_renderer import render_html from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl def _parse_json(raw: Any) -> Any: if raw is None or raw == "": return None if isinstance(raw, (dict, list)): return raw try: return json.loads(raw) except Exception: return None def _build_checked_rules(rule_rows: list[dict[str, Any]]) -> list[CheckedRule]: checked_rules: list[CheckedRule] = [] seen_rule_ids: set[str] = set() for row in rule_rows: rule_id = str(row["rule_id"]) if rule_id in seen_rule_ids: continue seen_rule_ids.add(rule_id) status = str(row.get("result") or "pass") checked_rules.append( CheckedRule( rule_id=rule_id, name=row.get("rule_name") or rule_id, severity=row.get("severity") or "info", category=row.get("category") or "", status=status if status in {"pass", "fail", "skipped"} else "pass", skip_reason=row.get("skip_reason") or "", ) ) return checked_rules def _build_findings(rule_rows: list[dict[str, Any]]) -> list[Finding]: findings: list[Finding] = [] for index, row in enumerate(rule_rows): if row.get("result") != "fail": continue paragraph_index = int(row.get("paragraph_index") or 0) findings.append( Finding( finding_id=f"{row['rule_id']}-{paragraph_index or index}", rule_id=str(row["rule_id"]), rule_name=row.get("rule_name") or str(row["rule_id"]), severity=row.get("severity") or "info", category=row.get("category") or "", location=Location( paragraph_index=paragraph_index, role=row.get("location_path"), char_start=0, char_end=0, context=row.get("paragraph_text") or "", ), actual=_parse_json(row.get("actual")) or {}, expected=_parse_json(row.get("expected")) or {}, message=row.get("message") or "", suggestion=row.get("suggestion") or "", evidence=str(row.get("evidence") or ""), confidence=1.0, ) ) return findings def _build_summary(run_row: dict[str, Any], findings: list[Finding]) -> AuditSummary: severity_stats: dict[str, int] = {} category_stats: dict[str, int] = {} for finding in findings: severity_stats[finding.severity] = severity_stats.get(finding.severity, 0) + 1 if finding.category: category_stats[finding.category] = category_stats.get(finding.category, 0) + 1 return AuditSummary( score=int(float(run_row.get("total_score") or 0)), total_findings=len(findings), by_severity=severity_stats, by_category=category_stats, passed_count=int(run_row.get("passed_count") or 0), failed_count=int(run_row.get("failed_count") or 0), skipped_count=int(run_row.get("skipped_count") or 0), ) def _normalize_structure_item(item: dict[str, Any]) -> dict[str, Any]: return { "role": item.get("role"), "label": item.get("label") or "", "count": item.get("count") or 0, "expected": bool(item.get("expected", False)), "paragraph_indices": item.get("paragraphIndices") or item.get("paragraph_indices") or [], "samples": item.get("samples") or [], "char_total": item.get("charTotal") or item.get("char_total") or 0, "dominant_font": item.get("dominantFont") or item.get("dominant_font"), "dominant_size_pt": item.get("dominantSizePt") or item.get("dominant_size_pt"), "style_uniform": bool(item.get("styleUniform", item.get("style_uniform", True))), } def _normalize_outline_node(item: dict[str, Any]) -> dict[str, Any]: return { "paragraph_index": item.get("paragraphIndex") or item.get("paragraph_index") or 0, "level": item.get("level") or 0, "text": item.get("text") or "", "children": [_normalize_outline_node(child) for child in (item.get("children") or [])], } async def regenerate_html_report(run_id: int) -> None: oss_service = OssServiceImpl() async with GetAsyncSession() as session: run_row = ( await session.execute( text( """ SELECT gr.id, gr.document_id, gr.total_score, gr.passed_count, gr.failed_count, gr.skipped_count, gr.result_summary_json, d.region, f.file_name FROM govdoc_runs gr JOIN leaudit_documents d ON d.id = gr.document_id AND d.deleted_at IS NULL JOIN leaudit_document_files f ON f.document_id = d.id AND f.file_role = 'original' AND f.is_active = true AND f.deleted_at IS NULL WHERE gr.id = :run_id AND gr.deleted_at IS NULL LIMIT 1 """ ), {"run_id": run_id}, ) ).mappings().first() if not run_row: raise RuntimeError(f"run {run_id} 不存在") rule_rows = ( await session.execute( text( """ SELECT rule_id, rule_name, severity, category, result, skip_reason, message, suggestion, actual, expected, evidence, paragraph_index, paragraph_text, location_path FROM govdoc_rule_results WHERE run_id = :run_id AND deleted_at IS NULL ORDER BY id ASC """ ), {"run_id": run_id}, ) ).mappings().all() artifact_row = ( await session.execute( text( """ SELECT id, file_name, oss_url FROM govdoc_report_artifacts WHERE run_id = :run_id AND artifact_type = 'html_report' AND deleted_at IS NULL ORDER BY id DESC LIMIT 1 """ ), {"run_id": run_id}, ) ).mappings().first() if not artifact_row: raise RuntimeError(f"run {run_id} 没有 html_report 产物记录") aux = _parse_json(run_row.get("result_summary_json")) or {} findings = _build_findings(rule_rows) result = AuditResult( audit_id=str(run_id), document={ "documentId": int(run_row["document_id"]), "filename": run_row.get("file_name") or "", }, summary=_build_summary(run_row, findings), findings=findings, checked_rules=_build_checked_rules(rule_rows), structure=[ StructureItem.model_validate(_normalize_structure_item(item)) for item in aux.get("structure", []) ], outline=[ OutlineNode.model_validate(_normalize_outline_node(item)) for item in aux.get("outline", []) ], entities={ name: SemanticEntity.model_validate(value) for name, value in (aux.get("entities") or {}).items() if value is not None }, ) html = render_html(result) html_bytes = html.encode("utf-8") sha256 = hashlib.sha256(html_bytes).hexdigest() await oss_service.UploadText( ObjectKey=str(artifact_row["oss_url"]), Content=html, ContentType="text/html; charset=utf-8", ) await session.execute( text( """ UPDATE govdoc_report_artifacts SET file_size = :file_size, sha256 = :sha256, mime_type = 'text/html; charset=utf-8', updated_at = now() WHERE id = :artifact_id """ ), { "artifact_id": int(artifact_row["id"]), "file_size": len(html_bytes), "sha256": sha256, }, ) await session.commit() print( json.dumps( { "runId": run_id, "documentId": int(run_row["document_id"]), "fileName": run_row.get("file_name") or "", "artifactOssKey": artifact_row["oss_url"], "htmlBytes": len(html_bytes), "sha256": sha256, }, ensure_ascii=False, ) ) def main() -> None: parser = argparse.ArgumentParser(description="重生成 govdoc HTML 报告") parser.add_argument("run_id", type=int, help="govdoc run id") args = parser.parse_args() asyncio.run(regenerate_html_report(args.run_id)) if __name__ == "__main__": main()