299 lines
10 KiB
Python
299 lines
10 KiB
Python
#!/usr/bin/env python3
|
|
"""按已有 govdoc run 重生成 HTML 报告并覆盖 OSS 产物。"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import hashlib
|
|
import json
|
|
from typing import Any
|
|
|
|
from sqlalchemy import text
|
|
|
|
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import (
|
|
AuditResult,
|
|
AuditSummary,
|
|
CheckedRule,
|
|
OutlineNode,
|
|
StructureItem,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Finding, Location
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.reporter.html_renderer import render_html
|
|
from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl
|
|
|
|
|
|
def _parse_json(raw: Any) -> Any:
|
|
if raw is None or raw == "":
|
|
return None
|
|
if isinstance(raw, (dict, list)):
|
|
return raw
|
|
try:
|
|
return json.loads(raw)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def _build_checked_rules(rule_rows: list[dict[str, Any]]) -> list[CheckedRule]:
|
|
checked_rules: list[CheckedRule] = []
|
|
seen_rule_ids: set[str] = set()
|
|
for row in rule_rows:
|
|
rule_id = str(row["rule_id"])
|
|
if rule_id in seen_rule_ids:
|
|
continue
|
|
seen_rule_ids.add(rule_id)
|
|
status = str(row.get("result") or "pass")
|
|
checked_rules.append(
|
|
CheckedRule(
|
|
rule_id=rule_id,
|
|
name=row.get("rule_name") or rule_id,
|
|
severity=row.get("severity") or "info",
|
|
category=row.get("category") or "",
|
|
status=status if status in {"pass", "fail", "skipped"} else "pass",
|
|
skip_reason=row.get("skip_reason") or "",
|
|
)
|
|
)
|
|
return checked_rules
|
|
|
|
|
|
def _build_findings(rule_rows: list[dict[str, Any]]) -> list[Finding]:
|
|
findings: list[Finding] = []
|
|
for index, row in enumerate(rule_rows):
|
|
if row.get("result") != "fail":
|
|
continue
|
|
paragraph_index = int(row.get("paragraph_index") or 0)
|
|
findings.append(
|
|
Finding(
|
|
finding_id=f"{row['rule_id']}-{paragraph_index or index}",
|
|
rule_id=str(row["rule_id"]),
|
|
rule_name=row.get("rule_name") or str(row["rule_id"]),
|
|
severity=row.get("severity") or "info",
|
|
category=row.get("category") or "",
|
|
location=Location(
|
|
paragraph_index=paragraph_index,
|
|
role=row.get("location_path"),
|
|
char_start=0,
|
|
char_end=0,
|
|
context=row.get("paragraph_text") or "",
|
|
),
|
|
actual=_parse_json(row.get("actual")) or {},
|
|
expected=_parse_json(row.get("expected")) or {},
|
|
message=row.get("message") or "",
|
|
suggestion=row.get("suggestion") or "",
|
|
evidence=str(row.get("evidence") or ""),
|
|
confidence=1.0,
|
|
)
|
|
)
|
|
return findings
|
|
|
|
|
|
def _build_summary(run_row: dict[str, Any], findings: list[Finding]) -> AuditSummary:
|
|
severity_stats: dict[str, int] = {}
|
|
category_stats: dict[str, int] = {}
|
|
for finding in findings:
|
|
severity_stats[finding.severity] = severity_stats.get(finding.severity, 0) + 1
|
|
if finding.category:
|
|
category_stats[finding.category] = category_stats.get(finding.category, 0) + 1
|
|
return AuditSummary(
|
|
score=int(float(run_row.get("total_score") or 0)),
|
|
total_findings=len(findings),
|
|
by_severity=severity_stats,
|
|
by_category=category_stats,
|
|
passed_count=int(run_row.get("passed_count") or 0),
|
|
failed_count=int(run_row.get("failed_count") or 0),
|
|
skipped_count=int(run_row.get("skipped_count") or 0),
|
|
)
|
|
|
|
|
|
def _normalize_structure_item(item: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"role": item.get("role"),
|
|
"label": item.get("label") or "",
|
|
"count": item.get("count") or 0,
|
|
"expected": bool(item.get("expected", False)),
|
|
"paragraph_indices": item.get("paragraphIndices") or item.get("paragraph_indices") or [],
|
|
"samples": item.get("samples") or [],
|
|
"char_total": item.get("charTotal") or item.get("char_total") or 0,
|
|
"dominant_font": item.get("dominantFont") or item.get("dominant_font"),
|
|
"dominant_size_pt": item.get("dominantSizePt") or item.get("dominant_size_pt"),
|
|
"style_uniform": bool(item.get("styleUniform", item.get("style_uniform", True))),
|
|
}
|
|
|
|
|
|
def _normalize_outline_node(item: dict[str, Any]) -> dict[str, Any]:
|
|
return {
|
|
"paragraph_index": item.get("paragraphIndex") or item.get("paragraph_index") or 0,
|
|
"level": item.get("level") or 0,
|
|
"text": item.get("text") or "",
|
|
"children": [_normalize_outline_node(child) for child in (item.get("children") or [])],
|
|
}
|
|
|
|
|
|
async def regenerate_html_report(run_id: int) -> None:
|
|
oss_service = OssServiceImpl()
|
|
|
|
async with GetAsyncSession() as session:
|
|
run_row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
gr.id,
|
|
gr.document_id,
|
|
gr.total_score,
|
|
gr.passed_count,
|
|
gr.failed_count,
|
|
gr.skipped_count,
|
|
gr.result_summary_json,
|
|
d.region,
|
|
f.file_name
|
|
FROM govdoc_runs gr
|
|
JOIN leaudit_documents d
|
|
ON d.id = gr.document_id
|
|
AND d.deleted_at IS NULL
|
|
JOIN leaudit_document_files f
|
|
ON f.document_id = d.id
|
|
AND f.file_role = 'original'
|
|
AND f.is_active = true
|
|
AND f.deleted_at IS NULL
|
|
WHERE gr.id = :run_id
|
|
AND gr.deleted_at IS NULL
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"run_id": run_id},
|
|
)
|
|
).mappings().first()
|
|
if not run_row:
|
|
raise RuntimeError(f"run {run_id} 不存在")
|
|
|
|
rule_rows = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT
|
|
rule_id,
|
|
rule_name,
|
|
severity,
|
|
category,
|
|
result,
|
|
skip_reason,
|
|
message,
|
|
suggestion,
|
|
actual,
|
|
expected,
|
|
evidence,
|
|
paragraph_index,
|
|
paragraph_text,
|
|
location_path
|
|
FROM govdoc_rule_results
|
|
WHERE run_id = :run_id
|
|
AND deleted_at IS NULL
|
|
ORDER BY id ASC
|
|
"""
|
|
),
|
|
{"run_id": run_id},
|
|
)
|
|
).mappings().all()
|
|
|
|
artifact_row = (
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id, file_name, oss_url
|
|
FROM govdoc_report_artifacts
|
|
WHERE run_id = :run_id
|
|
AND artifact_type = 'html_report'
|
|
AND deleted_at IS NULL
|
|
ORDER BY id DESC
|
|
LIMIT 1
|
|
"""
|
|
),
|
|
{"run_id": run_id},
|
|
)
|
|
).mappings().first()
|
|
if not artifact_row:
|
|
raise RuntimeError(f"run {run_id} 没有 html_report 产物记录")
|
|
|
|
aux = _parse_json(run_row.get("result_summary_json")) or {}
|
|
findings = _build_findings(rule_rows)
|
|
result = AuditResult(
|
|
audit_id=str(run_id),
|
|
document={
|
|
"documentId": int(run_row["document_id"]),
|
|
"filename": run_row.get("file_name") or "",
|
|
},
|
|
summary=_build_summary(run_row, findings),
|
|
findings=findings,
|
|
checked_rules=_build_checked_rules(rule_rows),
|
|
structure=[
|
|
StructureItem.model_validate(_normalize_structure_item(item))
|
|
for item in aux.get("structure", [])
|
|
],
|
|
outline=[
|
|
OutlineNode.model_validate(_normalize_outline_node(item))
|
|
for item in aux.get("outline", [])
|
|
],
|
|
entities={
|
|
name: SemanticEntity.model_validate(value)
|
|
for name, value in (aux.get("entities") or {}).items()
|
|
if value is not None
|
|
},
|
|
)
|
|
|
|
html = render_html(result)
|
|
html_bytes = html.encode("utf-8")
|
|
sha256 = hashlib.sha256(html_bytes).hexdigest()
|
|
|
|
await oss_service.UploadText(
|
|
ObjectKey=str(artifact_row["oss_url"]),
|
|
Content=html,
|
|
ContentType="text/html; charset=utf-8",
|
|
)
|
|
|
|
await session.execute(
|
|
text(
|
|
"""
|
|
UPDATE govdoc_report_artifacts
|
|
SET file_size = :file_size,
|
|
sha256 = :sha256,
|
|
mime_type = 'text/html; charset=utf-8',
|
|
updated_at = now()
|
|
WHERE id = :artifact_id
|
|
"""
|
|
),
|
|
{
|
|
"artifact_id": int(artifact_row["id"]),
|
|
"file_size": len(html_bytes),
|
|
"sha256": sha256,
|
|
},
|
|
)
|
|
await session.commit()
|
|
|
|
print(
|
|
json.dumps(
|
|
{
|
|
"runId": run_id,
|
|
"documentId": int(run_row["document_id"]),
|
|
"fileName": run_row.get("file_name") or "",
|
|
"artifactOssKey": artifact_row["oss_url"],
|
|
"htmlBytes": len(html_bytes),
|
|
"sha256": sha256,
|
|
},
|
|
ensure_ascii=False,
|
|
)
|
|
)
|
|
|
|
|
|
def main() -> None:
|
|
parser = argparse.ArgumentParser(description="重生成 govdoc HTML 报告")
|
|
parser.add_argument("run_id", type=int, help="govdoc run id")
|
|
args = parser.parse_args()
|
|
asyncio.run(regenerate_html_report(args.run_id))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|