Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/govdoc_engine/pipeline.py
T

249 lines
8.2 KiB
Python

"""Govdoc 引擎主编排入口。
将旧 govdoc-audit 的 audit_file() 函数适配为异步 Pipeline 接口,
供 govdoc_bridge.runner 调用。
迁移自: govdoc-audit/src/govdoc_audit/pipeline.py
移除依赖: RunRecorder, config.py (local file logging)
适配平台: 异步执行、直接返回 AuditResult
"""
from __future__ import annotations
import logging
import uuid
from pathlib import Path
from typing import Any
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.role_tagger import RoleTagger
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.extractor import FieldExtractor
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entity_builder import (
EntityBuilder,
BUILTIN_LLM_DESCRIPTION,
)
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.schema import EntitySpec, RuleSet
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.runner import RuleRunner
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import AuditResult, CheckedRule
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.structure import build_outline, build_structure
from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient
_log = logging.getLogger(__name__)
# ── 辅助函数 ────────────────────────────────────────────
def _outcomes_to_checked(outcomes) -> list[CheckedRule]:
"""将规则执行结果汇总为 CheckedRule 列表。"""
rows: list[CheckedRule] = []
for o in outcomes:
if o.skipped:
status = "skipped"
elif o.findings:
status = "fail"
else:
status = "pass"
rows.append(
CheckedRule(
rule_id=o.rule.rule_id,
name=o.rule.name,
severity=o.rule.severity,
category=o.rule.category,
status=status,
skip_reason=o.skip_reason,
)
)
return rows
def _build_result(
docx_path: Path, doc, findings, entities, outcomes,
) -> AuditResult:
"""从审查产物构建 AuditResult。"""
document_meta = {
"filename": docx_path.name,
"path": str(docx_path),
"page_count": doc.meta.get("page_count", 1),
"paragraph_count": len(doc.paragraphs),
}
result = AuditResult(
audit_id=f"A-{uuid.uuid4().hex[:8]}",
document=document_meta,
findings=findings,
entities=entities,
checked_rules=_outcomes_to_checked(outcomes),
structure=build_structure(doc),
outline=build_outline(doc),
)
result.compute_summary()
return result
def _compute_missing_spec(
entities: dict[str, SemanticEntity | None],
custom_entities: list[EntitySpec],
) -> dict[str, dict]:
"""计算哪些实体需要送 LLM 抽取。"""
spec: dict[str, dict] = {}
for name, desc in BUILTIN_LLM_DESCRIPTION.items():
if entities.get(name) is None:
spec[name] = {
"description": desc,
"type": "list" if name == "attachments" else "string",
}
for s in custom_entities:
spec[s.name] = {"description": s.description or s.name, "type": s.type}
return spec
def _merge_llm_into_entities(
entities: dict[str, SemanticEntity | None],
llm_values: dict[str, Any],
) -> None:
"""将 LLM 抽取结果合并进 entities。"""
for name, val in llm_values.items():
if val in (None, "", []):
continue
if isinstance(val, list):
text = "; ".join(
f"{it.get('序号', i + 1)}. {it.get('名称', '')}"
if isinstance(it, dict) else str(it)
for i, it in enumerate(val)
)
extra = {"items": val}
else:
text = str(val)
extra = {}
entities[name] = SemanticEntity(
name=name,
text=text,
paragraph_indices=[],
primary_role=None,
source="llm",
confidence=0.7,
extra=extra,
)
# ── 实体构建 (同步,供 sync 入口使用) ──────────────────
def _build_entities(
doc, ruleset: RuleSet, llm: LlmClient,
) -> dict[str, SemanticEntity | None]:
"""构建实体 + 差量 LLM 抽取(同步)。"""
entities = EntityBuilder().build(doc)
spec = _compute_missing_spec(entities, ruleset.extract.entities)
if spec:
llm_vals = FieldExtractor(llm).extract_missing(doc, spec)
_merge_llm_into_entities(entities, llm_vals)
return entities
# ── 实体构建 (异步,供 async 入口使用) ──────────────────
async def _build_entities_async(
doc, ruleset: RuleSet, llm: LlmClient,
) -> dict[str, SemanticEntity | None]:
"""构建实体 + 差量 LLM 抽取(异步)。"""
entities = EntityBuilder().build(doc)
spec = _compute_missing_spec(entities, ruleset.extract.entities)
if spec:
llm_vals = await FieldExtractor(llm).extract_missing_async(doc, spec)
_merge_llm_into_entities(entities, llm_vals)
return entities
# ── 同步入口 (保留兼容) ─────────────────────────────────
def audit_file(
docx_path: str | Path,
rules_path: str | Path,
llm_client: LlmClient | None = None,
) -> AuditResult:
"""同步审查单个公文文件。
Args:
docx_path: DOCX 文件路径。
rules_path: YAML 规则文件路径。
llm_client: 可选 LLM 客户端实例。
Returns:
AuditResult 包含 findings, entities, checked_rules, summary 等。
"""
docx_path = Path(docx_path)
rules_path = Path(rules_path)
llm = llm_client or LlmClient()
doc = parse_docx(docx_path)
RoleTagger(llm_client=llm).tag(doc)
ruleset = load_rules(rules_path)
entities = _build_entities(doc, ruleset, llm)
findings, outcomes = RuleRunner(llm_client=llm).evaluate(
ruleset.all_rules(), doc, entities
)
return _build_result(docx_path, doc, findings, entities, outcomes)
# ── 异步入口 (推荐,供 bridge 调用) ──────────────────────
async def run(
file_path: str | Path,
rules_path: str | Path,
llm_client: LlmClient | None = None,
) -> AuditResult:
"""异步审查单个公文文件。
这是 govdoc_bridge 的主要调用入口。
Args:
file_path: 文档文件路径 (DOCX 或 PDF)。
rules_path: YAML 规则文件路径。
llm_client: 可选 LLM 客户端实例。
Returns:
AuditResult 包含 findings, entities, checked_rules, summary 等。
"""
file_path = Path(file_path)
rules_path = Path(rules_path)
llm = llm_client or LlmClient()
_log.info("Govdoc pipeline start: %s", file_path.name)
# 1. 解析文档
doc = parse_docx(file_path)
_log.info(" parsed: %d paragraphs", len(doc.paragraphs))
# 2. 段落角色标注
RoleTagger(llm_client=llm).tag(doc)
# 3. 加载规则
ruleset = load_rules(rules_path)
_log.info(" rules: %d groups, %d rules", len(ruleset.groups), len(ruleset.all_rules()))
# 4. 实体抽取 (含差量 LLM)
entities = await _build_entities_async(doc, ruleset, llm)
_log.info(" entities: %d/%d resolved", sum(1 for v in entities.values() if v), len(entities))
# 5. 规则评估
findings, outcomes = RuleRunner(llm_client=llm).evaluate(
ruleset.all_rules(), doc, entities
)
_log.info(" evaluated: %d findings from %d rules", len(findings), len(outcomes))
# 6. 构建结果
result = _build_result(file_path, doc, findings, entities, outcomes)
_log.info(
"Govdoc pipeline complete: score=%d, pass=%d, fail=%d, skip=%d",
result.summary.score,
result.summary.passed_count,
result.summary.failed_count,
result.summary.skipped_count,
)
return result