Files
leaudit-platform-backend/scripts/regenerate_govdoc_html_report.py
2026-05-18 14:35:25 +08:00

299 lines
10 KiB
Python

#!/usr/bin/env python3
"""按已有 govdoc run 重生成 HTML 报告并覆盖 OSS 产物。"""
from __future__ import annotations
import argparse
import asyncio
import hashlib
import json
from typing import Any
from sqlalchemy import text
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import (
AuditResult,
AuditSummary,
CheckedRule,
OutlineNode,
StructureItem,
)
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Finding, Location
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity
from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_renderer import render_html
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
def _parse_json(raw: Any) -> Any:
if raw is None or raw == "":
return None
if isinstance(raw, (dict, list)):
return raw
try:
return json.loads(raw)
except Exception:
return None
def _build_checked_rules(rule_rows: list[dict[str, Any]]) -> list[CheckedRule]:
checked_rules: list[CheckedRule] = []
seen_rule_ids: set[str] = set()
for row in rule_rows:
rule_id = str(row["rule_id"])
if rule_id in seen_rule_ids:
continue
seen_rule_ids.add(rule_id)
status = str(row.get("result") or "pass")
checked_rules.append(
CheckedRule(
rule_id=rule_id,
name=row.get("rule_name") or rule_id,
severity=row.get("severity") or "info",
category=row.get("category") or "",
status=status if status in {"pass", "fail", "skipped"} else "pass",
skip_reason=row.get("skip_reason") or "",
)
)
return checked_rules
def _build_findings(rule_rows: list[dict[str, Any]]) -> list[Finding]:
findings: list[Finding] = []
for index, row in enumerate(rule_rows):
if row.get("result") != "fail":
continue
paragraph_index = int(row.get("paragraph_index") or 0)
findings.append(
Finding(
finding_id=f"{row['rule_id']}-{paragraph_index or index}",
rule_id=str(row["rule_id"]),
rule_name=row.get("rule_name") or str(row["rule_id"]),
severity=row.get("severity") or "info",
category=row.get("category") or "",
location=Location(
paragraph_index=paragraph_index,
role=row.get("location_path"),
char_start=0,
char_end=0,
context=row.get("paragraph_text") or "",
),
actual=_parse_json(row.get("actual")) or {},
expected=_parse_json(row.get("expected")) or {},
message=row.get("message") or "",
suggestion=row.get("suggestion") or "",
evidence=str(row.get("evidence") or ""),
confidence=1.0,
)
)
return findings
def _build_summary(run_row: dict[str, Any], findings: list[Finding]) -> AuditSummary:
severity_stats: dict[str, int] = {}
category_stats: dict[str, int] = {}
for finding in findings:
severity_stats[finding.severity] = severity_stats.get(finding.severity, 0) + 1
if finding.category:
category_stats[finding.category] = category_stats.get(finding.category, 0) + 1
return AuditSummary(
score=int(float(run_row.get("total_score") or 0)),
total_findings=len(findings),
by_severity=severity_stats,
by_category=category_stats,
passed_count=int(run_row.get("passed_count") or 0),
failed_count=int(run_row.get("failed_count") or 0),
skipped_count=int(run_row.get("skipped_count") or 0),
)
def _normalize_structure_item(item: dict[str, Any]) -> dict[str, Any]:
return {
"role": item.get("role"),
"label": item.get("label") or "",
"count": item.get("count") or 0,
"expected": bool(item.get("expected", False)),
"paragraph_indices": item.get("paragraphIndices") or item.get("paragraph_indices") or [],
"samples": item.get("samples") or [],
"char_total": item.get("charTotal") or item.get("char_total") or 0,
"dominant_font": item.get("dominantFont") or item.get("dominant_font"),
"dominant_size_pt": item.get("dominantSizePt") or item.get("dominant_size_pt"),
"style_uniform": bool(item.get("styleUniform", item.get("style_uniform", True))),
}
def _normalize_outline_node(item: dict[str, Any]) -> dict[str, Any]:
return {
"paragraph_index": item.get("paragraphIndex") or item.get("paragraph_index") or 0,
"level": item.get("level") or 0,
"text": item.get("text") or "",
"children": [_normalize_outline_node(child) for child in (item.get("children") or [])],
}
async def regenerate_html_report(run_id: int) -> None:
oss_service = OssServiceImpl()
async with GetAsyncSession() as session:
run_row = (
await session.execute(
text(
"""
SELECT
gr.id,
gr.document_id,
gr.total_score,
gr.passed_count,
gr.failed_count,
gr.skipped_count,
gr.result_summary_json,
d.region,
f.file_name
FROM govdoc_runs gr
JOIN leaudit_documents d
ON d.id = gr.document_id
AND d.deleted_at IS NULL
JOIN leaudit_document_files f
ON f.document_id = d.id
AND f.file_role = 'original'
AND f.is_active = true
AND f.deleted_at IS NULL
WHERE gr.id = :run_id
AND gr.deleted_at IS NULL
LIMIT 1
"""
),
{"run_id": run_id},
)
).mappings().first()
if not run_row:
raise RuntimeError(f"run {run_id} 不存在")
rule_rows = (
await session.execute(
text(
"""
SELECT
rule_id,
rule_name,
severity,
category,
result,
skip_reason,
message,
suggestion,
actual,
expected,
evidence,
paragraph_index,
paragraph_text,
location_path
FROM govdoc_rule_results
WHERE run_id = :run_id
AND deleted_at IS NULL
ORDER BY id ASC
"""
),
{"run_id": run_id},
)
).mappings().all()
artifact_row = (
await session.execute(
text(
"""
SELECT id, file_name, oss_url
FROM govdoc_report_artifacts
WHERE run_id = :run_id
AND artifact_type = 'html_report'
AND deleted_at IS NULL
ORDER BY id DESC
LIMIT 1
"""
),
{"run_id": run_id},
)
).mappings().first()
if not artifact_row:
raise RuntimeError(f"run {run_id} 没有 html_report 产物记录")
aux = _parse_json(run_row.get("result_summary_json")) or {}
findings = _build_findings(rule_rows)
result = AuditResult(
audit_id=str(run_id),
document={
"documentId": int(run_row["document_id"]),
"filename": run_row.get("file_name") or "",
},
summary=_build_summary(run_row, findings),
findings=findings,
checked_rules=_build_checked_rules(rule_rows),
structure=[
StructureItem.model_validate(_normalize_structure_item(item))
for item in aux.get("structure", [])
],
outline=[
OutlineNode.model_validate(_normalize_outline_node(item))
for item in aux.get("outline", [])
],
entities={
name: SemanticEntity.model_validate(value)
for name, value in (aux.get("entities") or {}).items()
if value is not None
},
)
html = render_html(result)
html_bytes = html.encode("utf-8")
sha256 = hashlib.sha256(html_bytes).hexdigest()
await oss_service.UploadText(
ObjectKey=str(artifact_row["oss_url"]),
Content=html,
ContentType="text/html; charset=utf-8",
)
await session.execute(
text(
"""
UPDATE govdoc_report_artifacts
SET file_size = :file_size,
sha256 = :sha256,
mime_type = 'text/html; charset=utf-8',
updated_at = now()
WHERE id = :artifact_id
"""
),
{
"artifact_id": int(artifact_row["id"]),
"file_size": len(html_bytes),
"sha256": sha256,
},
)
await session.commit()
print(
json.dumps(
{
"runId": run_id,
"documentId": int(run_row["document_id"]),
"fileName": run_row.get("file_name") or "",
"artifactOssKey": artifact_row["oss_url"],
"htmlBytes": len(html_bytes),
"sha256": sha256,
},
ensure_ascii=False,
)
)
def main() -> None:
parser = argparse.ArgumentParser(description="重生成 govdoc HTML 报告")
parser.add_argument("run_id", type=int, help="govdoc run id")
args = parser.parse_args()
asyncio.run(regenerate_html_report(args.run_id))
if __name__ == "__main__":
main()