249 lines
8.2 KiB
Python
249 lines
8.2 KiB
Python
"""Govdoc 引擎主编排入口。
|
|
|
|
将旧 govdoc-audit 的 audit_file() 函数适配为异步 Pipeline 接口,
|
|
供 govdoc_bridge.runner 调用。
|
|
|
|
迁移自: govdoc-audit/src/govdoc_audit/pipeline.py
|
|
移除依赖: RunRecorder, config.py (local file logging)
|
|
适配平台: 异步执行、直接返回 AuditResult
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import uuid
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.docx_parser import parse_docx
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.role_tagger import RoleTagger
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.extractor import FieldExtractor
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entity_builder import (
|
|
EntityBuilder,
|
|
BUILTIN_LLM_DESCRIPTION,
|
|
)
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.loader import load_rules
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.dsl.schema import EntitySpec, RuleSet
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.runner import RuleRunner
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.result import AuditResult, CheckedRule
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.engine.structure import build_outline, build_structure
|
|
from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
# ── 辅助函数 ────────────────────────────────────────────
|
|
|
|
def _outcomes_to_checked(outcomes) -> list[CheckedRule]:
|
|
"""将规则执行结果汇总为 CheckedRule 列表。"""
|
|
rows: list[CheckedRule] = []
|
|
for o in outcomes:
|
|
if o.skipped:
|
|
status = "skipped"
|
|
elif o.findings:
|
|
status = "fail"
|
|
else:
|
|
status = "pass"
|
|
rows.append(
|
|
CheckedRule(
|
|
rule_id=o.rule.rule_id,
|
|
name=o.rule.name,
|
|
severity=o.rule.severity,
|
|
category=o.rule.category,
|
|
status=status,
|
|
skip_reason=o.skip_reason,
|
|
)
|
|
)
|
|
return rows
|
|
|
|
|
|
def _build_result(
|
|
docx_path: Path, doc, findings, entities, outcomes,
|
|
) -> AuditResult:
|
|
"""从审查产物构建 AuditResult。"""
|
|
document_meta = {
|
|
"filename": docx_path.name,
|
|
"path": str(docx_path),
|
|
"page_count": doc.meta.get("page_count", 1),
|
|
"paragraph_count": len(doc.paragraphs),
|
|
}
|
|
result = AuditResult(
|
|
audit_id=f"A-{uuid.uuid4().hex[:8]}",
|
|
document=document_meta,
|
|
findings=findings,
|
|
entities=entities,
|
|
checked_rules=_outcomes_to_checked(outcomes),
|
|
structure=build_structure(doc),
|
|
outline=build_outline(doc),
|
|
)
|
|
result.compute_summary()
|
|
return result
|
|
|
|
|
|
def _compute_missing_spec(
|
|
entities: dict[str, SemanticEntity | None],
|
|
custom_entities: list[EntitySpec],
|
|
) -> dict[str, dict]:
|
|
"""计算哪些实体需要送 LLM 抽取。"""
|
|
spec: dict[str, dict] = {}
|
|
for name, desc in BUILTIN_LLM_DESCRIPTION.items():
|
|
if entities.get(name) is None:
|
|
spec[name] = {
|
|
"description": desc,
|
|
"type": "list" if name == "attachments" else "string",
|
|
}
|
|
for s in custom_entities:
|
|
spec[s.name] = {"description": s.description or s.name, "type": s.type}
|
|
return spec
|
|
|
|
|
|
def _merge_llm_into_entities(
|
|
entities: dict[str, SemanticEntity | None],
|
|
llm_values: dict[str, Any],
|
|
) -> None:
|
|
"""将 LLM 抽取结果合并进 entities。"""
|
|
for name, val in llm_values.items():
|
|
if val in (None, "", []):
|
|
continue
|
|
if isinstance(val, list):
|
|
text = "; ".join(
|
|
f"{it.get('序号', i + 1)}. {it.get('名称', '')}"
|
|
if isinstance(it, dict) else str(it)
|
|
for i, it in enumerate(val)
|
|
)
|
|
extra = {"items": val}
|
|
else:
|
|
text = str(val)
|
|
extra = {}
|
|
entities[name] = SemanticEntity(
|
|
name=name,
|
|
text=text,
|
|
paragraph_indices=[],
|
|
primary_role=None,
|
|
source="llm",
|
|
confidence=0.7,
|
|
extra=extra,
|
|
)
|
|
|
|
|
|
# ── 实体构建 (同步,供 sync 入口使用) ──────────────────
|
|
|
|
def _build_entities(
|
|
doc, ruleset: RuleSet, llm: LlmClient,
|
|
) -> dict[str, SemanticEntity | None]:
|
|
"""构建实体 + 差量 LLM 抽取(同步)。"""
|
|
entities = EntityBuilder().build(doc)
|
|
spec = _compute_missing_spec(entities, ruleset.extract.entities)
|
|
if spec:
|
|
llm_vals = FieldExtractor(llm).extract_missing(doc, spec)
|
|
_merge_llm_into_entities(entities, llm_vals)
|
|
return entities
|
|
|
|
|
|
# ── 实体构建 (异步,供 async 入口使用) ──────────────────
|
|
|
|
async def _build_entities_async(
|
|
doc, ruleset: RuleSet, llm: LlmClient,
|
|
) -> dict[str, SemanticEntity | None]:
|
|
"""构建实体 + 差量 LLM 抽取(异步)。"""
|
|
entities = EntityBuilder().build(doc)
|
|
spec = _compute_missing_spec(entities, ruleset.extract.entities)
|
|
if spec:
|
|
llm_vals = await FieldExtractor(llm).extract_missing_async(doc, spec)
|
|
_merge_llm_into_entities(entities, llm_vals)
|
|
return entities
|
|
|
|
|
|
# ── 同步入口 (保留兼容) ─────────────────────────────────
|
|
|
|
def audit_file(
|
|
docx_path: str | Path,
|
|
rules_path: str | Path,
|
|
llm_client: LlmClient | None = None,
|
|
) -> AuditResult:
|
|
"""同步审查单个公文文件。
|
|
|
|
Args:
|
|
docx_path: DOCX 文件路径。
|
|
rules_path: YAML 规则文件路径。
|
|
llm_client: 可选 LLM 客户端实例。
|
|
|
|
Returns:
|
|
AuditResult 包含 findings, entities, checked_rules, summary 等。
|
|
"""
|
|
docx_path = Path(docx_path)
|
|
rules_path = Path(rules_path)
|
|
llm = llm_client or LlmClient()
|
|
|
|
doc = parse_docx(docx_path)
|
|
RoleTagger(llm_client=llm).tag(doc)
|
|
|
|
ruleset = load_rules(rules_path)
|
|
entities = _build_entities(doc, ruleset, llm)
|
|
|
|
findings, outcomes = RuleRunner(llm_client=llm).evaluate(
|
|
ruleset.all_rules(), doc, entities
|
|
)
|
|
|
|
return _build_result(docx_path, doc, findings, entities, outcomes)
|
|
|
|
|
|
# ── 异步入口 (推荐,供 bridge 调用) ──────────────────────
|
|
|
|
async def run(
|
|
file_path: str | Path,
|
|
rules_path: str | Path,
|
|
llm_client: LlmClient | None = None,
|
|
) -> AuditResult:
|
|
"""异步审查单个公文文件。
|
|
|
|
这是 govdoc_bridge 的主要调用入口。
|
|
|
|
Args:
|
|
file_path: 文档文件路径 (DOCX 或 PDF)。
|
|
rules_path: YAML 规则文件路径。
|
|
llm_client: 可选 LLM 客户端实例。
|
|
|
|
Returns:
|
|
AuditResult 包含 findings, entities, checked_rules, summary 等。
|
|
"""
|
|
file_path = Path(file_path)
|
|
rules_path = Path(rules_path)
|
|
llm = llm_client or LlmClient()
|
|
|
|
_log.info("Govdoc pipeline start: %s", file_path.name)
|
|
|
|
# 1. 解析文档
|
|
doc = parse_docx(file_path)
|
|
_log.info(" parsed: %d paragraphs", len(doc.paragraphs))
|
|
|
|
# 2. 段落角色标注
|
|
RoleTagger(llm_client=llm).tag(doc)
|
|
|
|
# 3. 加载规则
|
|
ruleset = load_rules(rules_path)
|
|
_log.info(" rules: %d groups, %d rules", len(ruleset.groups), len(ruleset.all_rules()))
|
|
|
|
# 4. 实体抽取 (含差量 LLM)
|
|
entities = await _build_entities_async(doc, ruleset, llm)
|
|
_log.info(" entities: %d/%d resolved", sum(1 for v in entities.values() if v), len(entities))
|
|
|
|
# 5. 规则评估
|
|
findings, outcomes = RuleRunner(llm_client=llm).evaluate(
|
|
ruleset.all_rules(), doc, entities
|
|
)
|
|
_log.info(" evaluated: %d findings from %d rules", len(findings), len(outcomes))
|
|
|
|
# 6. 构建结果
|
|
result = _build_result(file_path, doc, findings, entities, outcomes)
|
|
_log.info(
|
|
"Govdoc pipeline complete: score=%d, pass=%d, fail=%d, skip=%d",
|
|
result.summary.score,
|
|
result.summary.passed_count,
|
|
result.summary.failed_count,
|
|
result.summary.skipped_count,
|
|
)
|
|
|
|
return result
|