feat(govdoc): 新增内部公文模块全链路(后端58+前端11文件)

This commit is contained in:
wren
2026-05-13 14:37:12 +08:00
parent 99699e20e1
commit 5d777599bf
63 changed files with 7608 additions and 0 deletions
@@ -0,0 +1,152 @@
"""解析 .docx → Document 对象。
文档顺序遍历 body:顶级段落 + 表格内段落都纳入 paragraphs
后续 role tagging 与规则评估都能扫到表格内的内容。
"""
from __future__ import annotations
from pathlib import Path
from docx import Document as DocxDocument
from docx.oxml.ns import qn
from docx.text.paragraph import Paragraph as DocxParagraph
from lxml import etree
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document, Paragraph, ParagraphStyle, Run, Table
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.style_resolver import StyleResolver
_ALIGN_MAP = {0: "left", 1: "center", 2: "right", 3: "justify"}
def _read_run_style(run, p_elem, resolver: StyleResolver) -> ParagraphStyle:
rs = resolver.resolve_run(p_elem, run._element)
return ParagraphStyle(
font_eastasia=rs.font_eastasia,
font_ascii=rs.font_ascii,
font_size_pt=rs.size_pt,
bold=bool(rs.bold) if rs.bold is not None else False,
italic=bool(rs.italic) if rs.italic is not None else False,
)
def _read_paragraph_style(p, resolver: StyleResolver) -> ParagraphStyle:
pf = p.paragraph_format
alignment = (
_ALIGN_MAP.get(pf.alignment, "left") if pf.alignment is not None else "left"
)
spacing_pt = float(pf.line_spacing) if pf.line_spacing is not None else None
indent = pf.first_line_indent
indent_pt = float(indent.pt) if indent is not None else 0.0
if p.runs:
base = _read_run_style(p.runs[0], p._element, resolver)
else:
rs = resolver.resolve_paragraph(p._element)
base = ParagraphStyle(
font_eastasia=rs.font_eastasia,
font_ascii=rs.font_ascii,
font_size_pt=rs.size_pt,
bold=bool(rs.bold) if rs.bold is not None else False,
italic=bool(rs.italic) if rs.italic is not None else False,
)
base.alignment = alignment
base.line_spacing = spacing_pt
base.first_line_indent_pt = indent_pt
return base
def _is_in_table(p_elem) -> bool:
parent = p_elem.getparent()
while parent is not None:
if etree.QName(parent).localname == "tbl":
return True
parent = parent.getparent()
return False
def _iter_body_paragraphs(docx):
"""文档顺序遍历 body 下所有 w:p(含表格内)。"""
for p_elem in docx.element.body.iter(qn("w:p")):
yield p_elem
def _iter_header_footer_paragraphs(docx):
"""yield (DocxParagraph, p_elem, in_header, in_footer),跨 section 去重。"""
seen: set[int] = set()
for section in docx.sections:
targets = [
("header", section.header),
("first_header", section.first_page_header),
("even_header", section.even_page_header),
("footer", section.footer),
("first_footer", section.first_page_footer),
("even_footer", section.even_page_footer),
]
for kind, hf in targets:
if hf is None:
continue
try:
if hf.is_linked_to_previous:
continue
except Exception:
pass
in_header = "header" in kind
for p in hf.paragraphs:
pid = id(p._element)
if pid in seen:
continue
seen.add(pid)
yield p, p._element, in_header, not in_header
def parse_docx(path: str | Path) -> Document:
path = Path(path)
docx = DocxDocument(path)
resolver = StyleResolver(docx)
paragraphs: list[Paragraph] = []
idx = 0
# 1) body:含表格内段落
for p_elem in _iter_body_paragraphs(docx):
p = DocxParagraph(p_elem, docx.part)
runs = [
Run(text=r.text, style=_read_run_style(r, p_elem, resolver))
for r in p.runs
]
style = _read_paragraph_style(p, resolver)
paragraphs.append(Paragraph(
index=idx,
text=p.text,
runs=runs,
style=style,
in_table=_is_in_table(p_elem),
))
idx += 1
# 2) headers / footers:附在末尾,role tagger 也能扫到
for p, p_elem, in_header, in_footer in _iter_header_footer_paragraphs(docx):
runs = [
Run(text=r.text, style=_read_run_style(r, p_elem, resolver))
for r in p.runs
]
style = _read_paragraph_style(p, resolver)
paragraphs.append(Paragraph(
index=idx,
text=p.text,
runs=runs,
style=style,
in_table=_is_in_table(p_elem),
in_header=in_header,
in_footer=in_footer,
))
idx += 1
tables = []
for tidx, t in enumerate(docx.tables):
rows = [[cell.text for cell in row.cells] for row in t.rows]
tables.append(Table(index=tidx, rows=rows))
return Document(
meta={"path": str(path), "page_count": len(docx.sections)},
paragraphs=paragraphs,
tables=tables,
)
@@ -0,0 +1,27 @@
"""语义实体:把段落 + 字段值 + 样式合在一起。"""
from __future__ import annotations
from typing import Any, Literal
from pydantic import BaseModel, Field
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import ParagraphStyle
EntitySource = Literal["structural", "llm", "derived"]
class SemanticEntity(BaseModel):
"""公文中的一个语义单元(标题 / 发文字号 / 主送机关 / ...)。
- structuralname 与某个 role 一一对应,paragraph_indices 非空,style 可用。
- derived:从其他实体推导(如 wenzhong 从 title 末尾),paragraph_indices 借用源段落。
- llm:仅当结构 / 派生路径都失败时启用,paragraph_indices 可能为空。
"""
name: str
text: str = ""
paragraph_indices: list[int] = Field(default_factory=list)
primary_role: str | None = None
style: ParagraphStyle | None = None
extra: dict[str, Any] = Field(default_factory=dict)
source: EntitySource = "structural"
confidence: float = 1.0
@@ -0,0 +1,195 @@
"""从已 tag 的 Document 抽取语义实体(结构化优先)。"""
from __future__ import annotations
import re
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.entities import SemanticEntity
# 8 个内置实体名(也用于 schema 校验冲突)
BUILTIN_ENTITY_NAMES: frozenset[str] = frozenset({
"title", "doc_number", "recipient", "date",
"signature", "attachments", "wenzhong", "issuer",
})
# 内置实体的 LLM 兜底 prompt 描述(Phase B 使用)
BUILTIN_LLM_DESCRIPTION: dict[str, str] = {
"title": "公文主标题(不含发文字号)",
"doc_number": "X发〔YYYYN号 形式的发文字号",
"recipient": "公文抬头的接收机关名称",
"date": "末尾的成文日期原文",
"signature": "末尾的发文机关署名",
"attachments": "附件清单(数组,每项含 序号 与 名称)",
"wenzhong": "公文文种(决议/决定/通知/通报/请示/批复 等 15 种之一)",
"issuer": "发文机关全称",
}
# role → entity name 的 1:1 映射
_ROLE_ENTITY_MAP = {
"title": "title",
"doc_number": "doc_number",
"recipient": "recipient",
"date": "date",
"signature": "signature",
}
_ATTACHMENT_HEAD_RE = re.compile(r"^附件\d*[:]\s*")
_ATTACHMENT_ITEM_RE = re.compile(r"^\s*(\d+)[\..、)]\s*(.+)$")
# 15 种法定文种(参照《党政机关公文处理工作条例》)
_WENZHONG_LIST = (
"决议", "决定", "命令", "公报", "公告", "通告",
"意见", "通知", "通报", "报告", "请示", "批复",
"议案", "", "纪要",
)
_WENZHONG_RE = re.compile("(" + "|".join(_WENZHONG_LIST) + ")$")
# 「XX关于...的YY」 → issuer = XX
_ISSUER_PREFIX_RE = re.compile(r"^(.+?)关于")
class EntityBuilder:
"""从已 tag 的 Document 抽取 8 个内置语义实体。"""
def build(self, doc: Document) -> dict[str, SemanticEntity | None]:
entities: dict[str, SemanticEntity | None] = {
name: None for name in BUILTIN_ENTITY_NAMES
}
# ① 一对一 role → entity
for role, name in _ROLE_ENTITY_MAP.items():
paras = [p for p in doc.paragraphs if p.role == role]
if not paras:
continue
target = paras[-1] if name == "signature" else paras[0]
entities[name] = SemanticEntity(
name=name,
text=target.text.strip(),
paragraph_indices=[target.index],
primary_role=role,
style=target.style,
source="structural",
confidence=target.role_confidence,
)
# ② attachmentsattachment_marker + 跟随行
entities["attachments"] = self._build_attachments(doc)
# ③ 派生:wenzhong / issuer
title_e = entities.get("title")
if title_e:
entities["wenzhong"] = self._derive_wenzhong(title_e)
entities["issuer"] = self._derive_issuer(
title_e, entities.get("signature")
)
elif entities.get("signature"):
entities["issuer"] = self._derive_issuer(
None, entities["signature"]
)
return entities
# ---------- attachments ----------
def _build_attachments(self, doc: Document) -> SemanticEntity | None:
markers = [
i for i, p in enumerate(doc.paragraphs)
if p.role == "attachment_marker"
]
if not markers:
return None
m = markers[0]
items: list[dict] = []
para_idxs: list[int] = [m]
first = doc.paragraphs[m].text.strip()
head = _ATTACHMENT_HEAD_RE.sub("", first)
if head:
mt = _ATTACHMENT_ITEM_RE.match(head)
if mt:
items.append(
{"序号": int(mt.group(1)), "名称": mt.group(2).strip()}
)
else:
items.append({"序号": 1, "名称": head})
# 后续顺序行:直到遇到非 body / unknown 的段
for j in range(m + 1, len(doc.paragraphs)):
p = doc.paragraphs[j]
if p.role and p.role not in ("body", "unknown", "attachment_marker"):
break
t = p.text.strip()
if not t:
continue
mt = _ATTACHMENT_ITEM_RE.match(t)
if not mt:
break
items.append(
{"序号": int(mt.group(1)), "名称": mt.group(2).strip()}
)
para_idxs.append(p.index)
if not items:
return None
text = "; ".join(f"{it['序号']}. {it['名称']}" for it in items)
return SemanticEntity(
name="attachments",
text=text,
paragraph_indices=para_idxs,
primary_role="attachment_marker",
style=doc.paragraphs[m].style,
extra={"items": items},
source="structural",
confidence=0.9,
)
# ---------- 派生 ----------
def _derive_wenzhong(
self, title: SemanticEntity
) -> SemanticEntity | None:
m = _WENZHONG_RE.search(title.text)
if not m:
return None
return SemanticEntity(
name="wenzhong",
text=m.group(1),
paragraph_indices=list(title.paragraph_indices),
primary_role="title",
extra={"derived_from": "title.suffix"},
source="derived",
confidence=0.95,
)
def _derive_issuer(
self,
title: SemanticEntity | None,
signature: SemanticEntity | None,
) -> SemanticEntity | None:
if title:
m = _ISSUER_PREFIX_RE.match(title.text)
if m:
return SemanticEntity(
name="issuer",
text=m.group(1),
paragraph_indices=list(title.paragraph_indices),
primary_role="title",
extra={"derived_from": "title.prefix"},
source="derived",
confidence=0.9,
)
if signature:
return SemanticEntity(
name="issuer",
text=signature.text,
paragraph_indices=list(signature.paragraph_indices),
primary_role="signature",
style=signature.style,
extra={"derived_from": "signature"},
source="derived",
confidence=0.8,
)
return None
@@ -0,0 +1,104 @@
"""LLM 字段抽取:差量模式(仅对未知字段构造 prompt)。"""
from __future__ import annotations
import logging
from typing import Any
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document
from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient, _format_exc
_log = logging.getLogger(__name__)
_PROMPT_HEAD = """从下面的公文中抽取以下指定字段,仅以 JSON 输出。
【公文内容(顺序段落)】
{text}
【需要抽取的字段】
{spec_block}
【输出格式】
仅 JSON{{{example}}}
未识别的字段填 ""list 类型填 [])。
"""
def _build_doc_text(doc: Document) -> str:
return "\n".join(f"[{p.index}] {p.text}" for p in doc.paragraphs)
def _example_for(spec: dict[str, dict]) -> str:
parts = []
for name, meta in spec.items():
t = meta.get("type", "string")
if t == "list":
parts.append(f'"{name}": []')
else:
parts.append(f'"{name}": ""')
return ", ".join(parts)
class FieldExtractor:
"""LLM 差量字段抽取。
extract_missing(doc, spec): spec 指定需要抽哪些字段;空 spec 不调 LLM。
"""
def __init__(self, llm_client: LlmClient):
self.client = llm_client
def _build_messages_for_spec(
self, doc: Document, spec: dict[str, dict]
) -> list[dict[str, str]]:
spec_lines = [
f"- {name}: {meta.get('description', name)}"
f"{meta.get('type', 'string')}"
for name, meta in spec.items()
]
prompt = _PROMPT_HEAD.format(
text=_build_doc_text(doc),
spec_block="\n".join(spec_lines) or "(无)",
example=_example_for(spec),
)
return [{"role": "user", "content": prompt}]
def _shape_missing(
self, spec: dict[str, dict], resp: dict
) -> dict[str, Any]:
out: dict[str, Any] = {}
for name, meta in spec.items():
if meta.get("type") == "list":
out[name] = resp.get(name) or []
else:
out[name] = resp.get(name) or ""
return out
def extract_missing(
self, doc: Document | None, spec: dict[str, dict]
) -> dict[str, Any]:
if not spec or doc is None:
return {}
label = "extract_missing__" + ",".join(spec.keys())
try:
resp = self.client.chat_json(
self._build_messages_for_spec(doc, spec), label=label,
)
except Exception as e:
_log.warning("Differential extraction failed: %s", _format_exc(e))
resp = {}
return self._shape_missing(spec, resp)
async def extract_missing_async(
self, doc: Document | None, spec: dict[str, dict]
) -> dict[str, Any]:
if not spec or doc is None:
return {}
label = "extract_missing__" + ",".join(spec.keys())
try:
resp = await self.client.chat_json_async(
self._build_messages_for_spec(doc, spec), label=label,
)
except Exception as e:
_log.warning("Differential extraction failed: %s", _format_exc(e))
resp = {}
return self._shape_missing(spec, resp)
@@ -0,0 +1,83 @@
"""doc / wps → docx 转换。"""
from __future__ import annotations
import shutil
import subprocess
from pathlib import Path
from fastapi_modules.fastapi_leaudit.govdoc_engine.config import get_settings
class UnsupportedFormat(Exception):
pass
class ConversionError(Exception):
pass
_SUPPORTED_DIRECT = {".docx"}
_SUPPORTED_CONVERT = {".doc", ".wps"}
_SOFFICE_FALLBACK_PATHS = (
"/opt/homebrew/bin/soffice",
"/usr/local/bin/soffice",
"/Applications/LibreOffice.app/Contents/MacOS/soffice",
"/usr/bin/soffice",
)
def load_to_docx(src: Path) -> Path:
"""统一返回 .docx 路径。.doc/.wps 调 soffice 转换。"""
ext = src.suffix.lower()
if ext in _SUPPORTED_DIRECT:
return src
if ext in _SUPPORTED_CONVERT:
return _convert_via_soffice(src)
raise UnsupportedFormat(f"unsupported file type: {ext}")
def _convert_via_soffice(src: Path) -> Path:
soffice = _resolve_soffice_path(get_settings().soffice_path)
out_dir = src.parent
cmd = [
soffice, "--headless", "--convert-to", "docx",
"--outdir", str(out_dir), str(src),
]
try:
result = subprocess.run(
cmd, capture_output=True, timeout=60,
)
except subprocess.TimeoutExpired as e:
raise ConversionError("soffice timeout") from e
if result.returncode != 0:
raise ConversionError(
f"soffice exit {result.returncode}: {result.stderr.decode(errors='ignore')}"
)
out = out_dir / (src.stem + ".docx")
if not out.exists():
raise ConversionError(f"expected output not found: {out}")
return out
def _resolve_soffice_path(configured: str) -> str:
candidates = [configured, *_SOFFICE_FALLBACK_PATHS]
checked: list[str] = []
for candidate in candidates:
if candidate in checked:
continue
checked.append(candidate)
resolved = shutil.which(candidate)
if resolved:
return resolved
if Path(candidate).exists():
return candidate
raise ConversionError(
f"soffice not found; checked: {', '.join(checked)}. "
"Install LibreOffice or set SOFFICE_PATH."
)
@@ -0,0 +1,50 @@
"""组合规则 tagger + LLM tagger 的总入口。"""
from __future__ import annotations
import asyncio
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.role_tagger_rule import RuleBasedTagger
from fastapi_modules.fastapi_leaudit.govdoc_engine.parser.role_tagger_llm import LlmTagger
from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient
class RoleTagger:
"""两段式:先规则打标,置信度 < threshold 的段落送 LLM 兜底。"""
def __init__(
self,
llm_client: LlmClient | None = None,
threshold: float = 0.8,
):
self.rule = RuleBasedTagger()
self.llm = LlmTagger(llm_client) if llm_client else None
self.threshold = threshold
def _low_conf_indices(self, doc: Document) -> list[int]:
return [
i for i, p in enumerate(doc.paragraphs)
if p.role_confidence < self.threshold
]
def tag(self, doc: Document) -> None:
self.rule.tag(doc)
if self.llm is None:
return
for i in self._low_conf_indices(doc):
role, conf = self.llm.disambiguate(doc, i)
doc.paragraphs[i].role = role
doc.paragraphs[i].role_confidence = conf
async def tag_async(self, doc: Document) -> None:
self.rule.tag(doc)
if self.llm is None:
return
targets = self._low_conf_indices(doc)
if not targets:
return
results = await asyncio.gather(
*(self.llm.disambiguate_async(doc, i) for i in targets)
)
for i, (role, conf) in zip(targets, results):
doc.paragraphs[i].role = role
doc.paragraphs[i].role_confidence = conf
@@ -0,0 +1,90 @@
"""LLM 兜底打 role:对低置信段落做二次确认。"""
from __future__ import annotations
import logging
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document, Role
from fastapi_modules.fastapi_leaudit.govdoc_engine.llm.client import LlmClient, _format_exc
_log = logging.getLogger(__name__)
VALID_ROLES = [
"title", "doc_number", "recipient",
"heading_1", "heading_2", "heading_3", "heading_4",
"body", "attachment_marker", "signature", "date",
"no_text_marker", "unknown",
]
_PROMPT = """你是公文格式专家。下面是一份公文的段落列表,请为指定的"待定段落"判断其角色。
【全文段落(带索引和当前规则推测)】
{context}
【待定段落 idx={idx}
文本: {text}
当前推测角色: {current_role}(置信度 {conf:.2f}
【角色取值范围】
{roles}
请综合公文结构判断该段落最可能的角色。
仅以 JSON 输出:
{{"role": "<角色>", "confidence": <0-1 浮点数>, "reason": "<简短理由>"}}
"""
class LlmTagger:
def __init__(self, client: LlmClient):
self.client = client
def _build_prompt(self, doc: Document, target_idx: int) -> tuple[str, "object"]:
ctx_lines = []
for p in doc.paragraphs:
tag = "← 待定" if p.index == target_idx else ""
ctx_lines.append(f"[{p.index}] role={p.role} text={p.text[:60]} {tag}")
ctx = "\n".join(ctx_lines)
target = doc.paragraphs[target_idx]
prompt = _PROMPT.format(
context=ctx,
idx=target_idx,
text=target.text,
current_role=target.role or "unknown",
conf=target.role_confidence,
roles=", ".join(VALID_ROLES),
)
return prompt, target
def _interpret(self, resp: dict, target) -> tuple[Role, float]:
role = resp.get("role", "unknown")
if role not in VALID_ROLES:
role = "unknown"
conf = float(resp.get("confidence", 0.5))
return role, conf # type: ignore[return-value]
def disambiguate(self, doc: Document, target_idx: int) -> tuple[Role, float]:
prompt, target = self._build_prompt(doc, target_idx)
label = f"role_tag_p{target_idx}"
try:
resp = self.client.chat_json(
[{"role": "user", "content": prompt}], label=label,
)
except Exception as e:
_log.warning("Role disambiguation skipped (LLM error): %s", _format_exc(e))
return target.role or "unknown", target.role_confidence # type: ignore[return-value]
return self._interpret(resp, target)
async def disambiguate_async(
self, doc: Document, target_idx: int
) -> tuple[Role, float]:
prompt, target = self._build_prompt(doc, target_idx)
label = f"role_tag_p{target_idx}"
try:
resp = await self.client.chat_json_async(
[{"role": "user", "content": prompt}], label=label,
)
except Exception as e:
_log.warning("Role disambiguation skipped (LLM error): %s", _format_exc(e))
return target.role or "unknown", target.role_confidence # type: ignore[return-value]
return self._interpret(resp, target)
@@ -0,0 +1,132 @@
"""基于位置 + 文字模式 + 字体样式的段落角色识别。"""
from __future__ import annotations
import re
from fastapi_modules.fastapi_leaudit.govdoc_engine.models import Document, Paragraph, Role
HEADING_1_RE = re.compile(r"^[一二三四五六七八九十百]+、")
HEADING_2_RE = re.compile(r"^[一二三四五六七八九十]+")
HEADING_3_RE = re.compile(r"^\d+[\.]")
HEADING_4_RE = re.compile(r"^\d+")
DOC_NUMBER_RE = re.compile(r"[一-龥]+[\[]\d{4}[\]]第?\d+号")
DATE_RE = re.compile(
r"^\d{4}\d{1,2}月\d{1,2}日$"
r"|^[一二三四五六七八九十○〇零]+年[一二三四五六七八九十○〇零]+月[一二三四五六七八九十○〇零]+日$"
)
ATTACHMENT_RE = re.compile(r"^附件[:1-9]")
NO_TEXT_RE = re.compile(r"^[\(]\s*此页无正文\s*[\)]")
RECIPIENT_TAIL_RE = re.compile(r"[:]\s*$")
RECIPIENT_HINTS = (
"", "", "", "", "", "公司", "", "处室",
"委员会", "", "", "", "", "",
)
RECIPIENT_BLOCKLIST = (
"现将", "", "经研究", "为做好", "为深入", "为进一步",
"根据", "如下", "汇报", "通知如下", "请示如下",
)
class RuleBasedTagger:
def tag(self, doc: Document) -> None:
n = len(doc.paragraphs)
for i, p in enumerate(doc.paragraphs):
role, conf = self._classify(p, i, n, doc)
p.role = role
p.role_confidence = conf
def _classify(
self, p: Paragraph, idx: int, total: int, doc: Document
) -> tuple[Role, float]:
text = p.text.strip()
if not text:
return ("unknown", 0.5)
if NO_TEXT_RE.match(text):
return ("no_text_marker", 1.0)
if ATTACHMENT_RE.match(text):
return ("attachment_marker", 0.95)
if DATE_RE.match(text):
return ("date", 0.9)
if DOC_NUMBER_RE.search(text) and idx <= 5:
return ("doc_number", 0.95)
if idx == 0 or (
idx <= 2
and p.style.alignment == "center"
and (p.style.font_size_pt or 0) >= 18
):
return ("title", 0.95)
font = (p.style.font_eastasia or "").strip()
size = p.style.font_size_pt or 0
if self._is_attachment_title(p, idx, doc):
return ("attachment_title", 0.9)
if HEADING_1_RE.match(text):
conf = 0.95 if "黑体" in font else 0.7
return ("heading_1", conf)
if HEADING_2_RE.match(text):
conf = 0.95 if "楷体" in font else 0.7
return ("heading_2", conf)
if HEADING_3_RE.match(text):
conf = 0.9 if "仿宋" in font else 0.65
return ("heading_3", conf)
if HEADING_4_RE.match(text):
return ("heading_4", 0.85)
if (
idx <= 6
and 3 <= len(text) <= 50
and RECIPIENT_TAIL_RE.search(text)
and any(kw in text for kw in RECIPIENT_HINTS)
and not any(kw in text for kw in RECIPIENT_BLOCKLIST)
):
return ("recipient", 0.9)
if total - idx <= 3 and 5 <= len(text) <= 30 and any(
kw in text
for kw in ["", "公司", "委员会", "人民政府", "办公厅", "办公室"]
):
return ("signature", 0.7)
if size >= 14 or font:
return ("body", 0.85)
return ("unknown", 0.4)
@staticmethod
def _is_attachment_title(p: Paragraph, idx: int, doc: Document) -> bool:
"""识别附件正文首页标题,避免按普通正文套用 GW-F-004。"""
if idx <= 0:
return False
text = p.text.strip()
font = (p.style.font_eastasia or "").strip()
if (
p.style.alignment != "center"
or (p.style.font_size_pt or 0) < 18
or "小标宋" not in font
):
return False
marker_index = None
marker_text = ""
for prev in reversed(doc.paragraphs[:idx]):
if prev.role == "attachment_marker" or ATTACHMENT_RE.match(prev.text.strip()):
marker_index = prev.index
marker_text = prev.text.strip()
break
if marker_index is None or idx - marker_index > 12:
return False
attachment_name = re.sub(r"^附件\d*[:]\s*", "", marker_text).strip()
attachment_name = re.sub(r"^\d+[\..、)]\s*", "", attachment_name).strip()
return not attachment_name or text == attachment_name or text in attachment_name or attachment_name in text
@@ -0,0 +1,241 @@
"""OOXML 字体解析:处理样式继承链 + 主题字体。
Word 把字体属性分散在四个层级:
1. 直接 run rPr`<w:r><w:rPr><w:rFonts/></w:rPr>...`
2. 段落 rPr(段落标记字体):`<w:p><w:pPr><w:rPr><w:rFonts/></w:rPr></w:pPr>`
3. 段落引用样式:`<w:p><w:pPr><w:pStyle w:val="Heading1"/></w:pPr>`
样式定义在 styles.xml,可经 `<w:basedOn>` 链向上继承
4. 全局默认:styles.xml 的 `<w:docDefaults>`
此外 `<w:rFonts>` 的 `*Theme` 属性指向 theme1.xml 中的字体方案
majorEastAsia / minorEastAsia 等),需要做二次解析。
"""
from __future__ import annotations
from dataclasses import dataclass
from docx.oxml.ns import qn
from lxml import etree
# theme1.xml 命名空间
_DML_NS = "http://schemas.openxmlformats.org/drawingml/2006/main"
@dataclass
class ResolvedRunStyle:
font_eastasia: str | None = None
font_ascii: str | None = None
size_pt: float | None = None
bold: bool | None = None
italic: bool | None = None
def _empty_to_none(s: str | None) -> str | None:
if s is None:
return None
s = s.strip()
return s or None
class StyleResolver:
"""构造时一次性解析样式表 + 主题;之后 resolve_run() 是 O(链长)。"""
def __init__(self, docx):
self._theme = self._load_theme(docx)
self._styles, self._doc_defaults = self._load_styles(docx)
# ---- 主题 ---------------------------------------------------------
def _load_theme(self, docx) -> dict[tuple[str, str], str | None]:
"""返回 {(axis, scheme_attr): font_name}。
scheme_attr 形如 'majorEastAsia' / 'minorAscii'axis 是 rFonts 的轴。
"""
out: dict[tuple[str, str], str | None] = {}
try:
theme_part = next(
p for p in docx.part.package.parts
if p.partname.endswith("/theme/theme1.xml")
)
except StopIteration:
return out
try:
root = etree.fromstring(theme_part.blob)
except etree.XMLSyntaxError:
return out
ns = {"a": _DML_NS}
for kind, font_tag in (("major", "majorFont"), ("minor", "minorFont")):
font_elem = root.find(f".//a:fontScheme/a:{font_tag}", ns)
if font_elem is None:
continue
latin = font_elem.find("a:latin", ns)
ea = font_elem.find("a:ea", ns)
cs = font_elem.find("a:cs", ns)
# ea 为空时用简中 Hans 兜底
ea_val = _empty_to_none(ea.get("typeface")) if ea is not None else None
if ea_val is None:
hans = font_elem.find('a:font[@script="Hans"]', ns)
if hans is not None:
ea_val = _empty_to_none(hans.get("typeface"))
latin_val = _empty_to_none(latin.get("typeface")) if latin is not None else None
cs_val = _empty_to_none(cs.get("typeface")) if cs is not None else None
out[("ascii", f"{kind}Ascii")] = latin_val
out[("ascii", f"{kind}HAnsi")] = latin_val # asciiTheme=majorHAnsi 也可能出现
out[("hAnsi", f"{kind}HAnsi")] = latin_val
out[("hAnsi", f"{kind}Ascii")] = latin_val
out[("eastAsia", f"{kind}EastAsia")] = ea_val
out[("cs", f"{kind}Bidi")] = cs_val
return out
# ---- 样式表 -------------------------------------------------------
def _load_styles(
self, docx
) -> tuple[dict[str, dict], ResolvedRunStyle | None]:
out: dict[str, dict] = {}
defaults: ResolvedRunStyle | None = None
try:
styles_root = docx.part._styles_part.element
except (AttributeError, KeyError):
return out, defaults
if styles_root is None:
return out, defaults
# docDefaults
ddef = styles_root.find(qn("w:docDefaults"))
if ddef is not None:
rdef = ddef.find(qn("w:rPrDefault"))
if rdef is not None:
defaults = self._read_rpr(rdef.find(qn("w:rPr")))
# 各 style
for style in styles_root.findall(qn("w:style")):
sid = style.get(qn("w:styleId"))
if not sid:
continue
rpr = style.find(qn("w:rPr"))
ppr = style.find(qn("w:pPr"))
ppr_rpr = ppr.find(qn("w:rPr")) if ppr is not None else None
based_on = None
bo = style.find(qn("w:basedOn"))
if bo is not None:
based_on = bo.get(qn("w:val"))
link = style.find(qn("w:link"))
link_id = link.get(qn("w:val")) if link is not None else None
out[sid] = {
"rpr": rpr,
"ppr_rpr": ppr_rpr,
"based_on": based_on,
"link": link_id,
}
return out, defaults
# ---- 读 rPr -------------------------------------------------------
def _read_rpr(self, rpr) -> ResolvedRunStyle | None:
if rpr is None:
return None
rs = ResolvedRunStyle()
rfonts = rpr.find(qn("w:rFonts"))
if rfonts is not None:
rs.font_eastasia = self._resolve_font_axis(rfonts, "eastAsia")
rs.font_ascii = self._resolve_font_axis(rfonts, "ascii")
sz = rpr.find(qn("w:sz"))
if sz is not None and sz.get(qn("w:val")):
try:
rs.size_pt = float(sz.get(qn("w:val"))) / 2.0
except ValueError:
pass
if rpr.find(qn("w:b")) is not None:
rs.bold = True
if rpr.find(qn("w:i")) is not None:
rs.italic = True
return rs
def _resolve_font_axis(self, rfonts, axis: str) -> str | None:
"""同一根 rFonts 上 explicit > theme。"""
explicit = _empty_to_none(rfonts.get(qn(f"w:{axis}")))
if explicit:
return explicit
theme_attr = "cstheme" if axis == "cs" else f"{axis}Theme"
theme = _empty_to_none(rfonts.get(qn(f"w:{theme_attr}")))
if theme:
return self._theme.get((axis, theme))
return None
# ---- 合并 ---------------------------------------------------------
@staticmethod
def _fill(target: ResolvedRunStyle, source: ResolvedRunStyle | None) -> None:
"""target 已有的字段保留;缺的从 source 取。"""
if source is None:
return
if target.font_eastasia is None:
target.font_eastasia = source.font_eastasia
if target.font_ascii is None:
target.font_ascii = source.font_ascii
if target.size_pt is None:
target.size_pt = source.size_pt
if target.bold is None:
target.bold = source.bold
if target.italic is None:
target.italic = source.italic
def _resolve_style_chain(
self, sid: str | None, _seen: set[str] | None = None
) -> ResolvedRunStyle | None:
"""段落样式 → 链向 basedOn → 沿途累积 rPr 与 pPr 的 rPr。"""
if sid is None:
return None
seen = _seen or set()
if sid in seen:
return None
seen = seen | {sid}
info = self._styles.get(sid)
if info is None:
return None
# 当前 style 的两个 rPr
rs = ResolvedRunStyle()
self._fill(rs, self._read_rpr(info.get("rpr")))
self._fill(rs, self._read_rpr(info.get("ppr_rpr")))
# 链接的 character style(如果有)
if info.get("link"):
self._fill(rs, self._resolve_style_chain(info["link"], seen))
# 父样式
if info.get("based_on"):
self._fill(rs, self._resolve_style_chain(info["based_on"], seen))
return rs
# ---- 主入口 -------------------------------------------------------
def resolve_run(self, p_elem, run_elem) -> ResolvedRunStyle:
"""解析单个 run 的最终样式。p_elem 可为 None。"""
rs = ResolvedRunStyle()
# 1. 直接 run rPr
if run_elem is not None:
self._fill(rs, self._read_rpr(run_elem.find(qn("w:rPr"))))
# 2. 段落 rPr(段落标记字体)+ pStyle 链
if p_elem is not None:
ppr = p_elem.find(qn("w:pPr"))
if ppr is not None:
self._fill(rs, self._read_rpr(ppr.find(qn("w:rPr"))))
pstyle = ppr.find(qn("w:pStyle"))
if pstyle is not None and pstyle.get(qn("w:val")):
self._fill(rs, self._resolve_style_chain(pstyle.get(qn("w:val"))))
# 3. 默认 style "Normal"(中文文档常见)
if "Normal" in self._styles:
self._fill(rs, self._resolve_style_chain("Normal"))
# 4. docDefaults
self._fill(rs, self._doc_defaults)
return rs
def resolve_paragraph(self, p_elem) -> ResolvedRunStyle:
"""段落整体样式(不读 run,仅 pPr/style/默认)。"""
rs = ResolvedRunStyle()
if p_elem is not None:
ppr = p_elem.find(qn("w:pPr"))
if ppr is not None:
self._fill(rs, self._read_rpr(ppr.find(qn("w:rPr"))))
pstyle = ppr.find(qn("w:pStyle"))
if pstyle is not None and pstyle.get(qn("w:val")):
self._fill(rs, self._resolve_style_chain(pstyle.get(qn("w:val"))))
if "Normal" in self._styles:
self._fill(rs, self._resolve_style_chain("Normal"))
self._fill(rs, self._doc_defaults)
return rs