"""Govdoc 引擎主编排入口。 将旧 govdoc-audit 的 audit_file() 函数适配为异步 Pipeline 接口, 供 govdoc_bridge.runner 调用。 迁移自: govdoc-audit/src/govdoc_audit/pipeline.py 移除依赖: RunRecorder, config.py (local file logging) 适配平台: 异步执行、直接返回 AuditResult """ from __future__ import annotations import logging import uuid from pathlib import Path from typing import Any from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.role_tagger import RoleTagger from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.extractor import FieldExtractor from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entity_builder import ( EntityBuilder, BUILTIN_LLM_DESCRIPTION, ) from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.schema import EntitySpec, RuleSet from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.runner import RuleRunner from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import AuditResult, CheckedRule from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.structure import build_outline, build_structure from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient _log = logging.getLogger(__name__) # ── 辅助函数 ──────────────────────────────────────────── def _outcomes_to_checked(outcomes) -> list[CheckedRule]: """将规则执行结果汇总为 CheckedRule 列表。""" rows: list[CheckedRule] = [] for o in outcomes: if o.skipped: status = "skipped" elif o.findings: status = "fail" else: status = "pass" rows.append( CheckedRule( rule_id=o.rule.rule_id, name=o.rule.name, severity=o.rule.severity, category=o.rule.category, status=status, skip_reason=o.skip_reason, ) ) return rows def _build_result( docx_path: Path, doc, findings, entities, outcomes, ) -> AuditResult: """从审查产物构建 AuditResult。""" document_meta = { "filename": docx_path.name, "path": str(docx_path), "page_count": doc.meta.get("page_count", 1), "paragraph_count": len(doc.paragraphs), } result = AuditResult( audit_id=f"A-{uuid.uuid4().hex[:8]}", document=document_meta, findings=findings, entities=entities, checked_rules=_outcomes_to_checked(outcomes), structure=build_structure(doc), outline=build_outline(doc), ) result.compute_summary() return result def _compute_missing_spec( entities: dict[str, SemanticEntity | None], custom_entities: list[EntitySpec], ) -> dict[str, dict]: """计算哪些实体需要送 LLM 抽取。""" spec: dict[str, dict] = {} for name, desc in BUILTIN_LLM_DESCRIPTION.items(): if entities.get(name) is None: spec[name] = { "description": desc, "type": "list" if name == "attachments" else "string", } for s in custom_entities: spec[s.name] = {"description": s.description or s.name, "type": s.type} return spec def _merge_llm_into_entities( entities: dict[str, SemanticEntity | None], llm_values: dict[str, Any], ) -> None: """将 LLM 抽取结果合并进 entities。""" for name, val in llm_values.items(): if val in (None, "", []): continue if isinstance(val, list): text = "; ".join( f"{it.get('序号', i + 1)}. {it.get('名称', '')}" if isinstance(it, dict) else str(it) for i, it in enumerate(val) ) extra = {"items": val} else: text = str(val) extra = {} entities[name] = SemanticEntity( name=name, text=text, paragraph_indices=[], primary_role=None, source="llm", confidence=0.7, extra=extra, ) # ── 实体构建 (同步,供 sync 入口使用) ────────────────── def _build_entities( doc, ruleset: RuleSet, llm: LlmClient, ) -> dict[str, SemanticEntity | None]: """构建实体 + 差量 LLM 抽取(同步)。""" entities = EntityBuilder().build(doc) spec = _compute_missing_spec(entities, ruleset.extract.entities) if spec: llm_vals = FieldExtractor(llm).extract_missing(doc, spec) _merge_llm_into_entities(entities, llm_vals) return entities # ── 实体构建 (异步,供 async 入口使用) ────────────────── async def _build_entities_async( doc, ruleset: RuleSet, llm: LlmClient, ) -> dict[str, SemanticEntity | None]: """构建实体 + 差量 LLM 抽取(异步)。""" entities = EntityBuilder().build(doc) spec = _compute_missing_spec(entities, ruleset.extract.entities) if spec: llm_vals = await FieldExtractor(llm).extract_missing_async(doc, spec) _merge_llm_into_entities(entities, llm_vals) return entities # ── 同步入口 (保留兼容) ───────────────────────────────── def audit_file( docx_path: str | Path, rules_path: str | Path, llm_client: LlmClient | None = None, ) -> AuditResult: """同步审查单个公文文件。 Args: docx_path: DOCX 文件路径。 rules_path: YAML 规则文件路径。 llm_client: 可选 LLM 客户端实例。 Returns: AuditResult 包含 findings, entities, checked_rules, summary 等。 """ docx_path = Path(docx_path) rules_path = Path(rules_path) llm = llm_client or LlmClient() doc = parse_docx(docx_path) RoleTagger(llm_client=llm).tag(doc) ruleset = load_rules(rules_path) entities = _build_entities(doc, ruleset, llm) findings, outcomes = RuleRunner(llm_client=llm).evaluate( ruleset.all_rules(), doc, entities ) return _build_result(docx_path, doc, findings, entities, outcomes) # ── 异步入口 (推荐,供 bridge 调用) ────────────────────── async def run( file_path: str | Path, rules_path: str | Path, llm_client: LlmClient | None = None, ) -> AuditResult: """异步审查单个公文文件。 这是 govdoc_bridge 的主要调用入口。 Args: file_path: 文档文件路径 (DOCX 或 PDF)。 rules_path: YAML 规则文件路径。 llm_client: 可选 LLM 客户端实例。 Returns: AuditResult 包含 findings, entities, checked_rules, summary 等。 """ file_path = Path(file_path) rules_path = Path(rules_path) llm = llm_client or LlmClient() _log.info("Govdoc pipeline start: %s", file_path.name) # 1. 解析文档 doc = parse_docx(file_path) _log.info(" parsed: %d paragraphs", len(doc.paragraphs)) # 2. 段落角色标注 RoleTagger(llm_client=llm).tag(doc) # 3. 加载规则 ruleset = load_rules(rules_path) _log.info(" rules: %d groups, %d rules", len(ruleset.groups), len(ruleset.all_rules())) # 4. 实体抽取 (含差量 LLM) entities = await _build_entities_async(doc, ruleset, llm) _log.info(" entities: %d/%d resolved", sum(1 for v in entities.values() if v), len(entities)) # 5. 规则评估 findings, outcomes = RuleRunner(llm_client=llm).evaluate( ruleset.all_rules(), doc, entities ) _log.info(" evaluated: %d findings from %d rules", len(findings), len(outcomes)) # 6. 构建结果 result = _build_result(file_path, doc, findings, entities, outcomes) _log.info( "Govdoc pipeline complete: score=%d, pass=%d, fail=%d, skip=%d", result.summary.score, result.summary.passed_count, result.summary.failed_count, result.summary.skipped_count, ) return result