Files
leaudit-platform-backend/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py
T

841 lines
31 KiB
Python

"""LeAudit 任务入口。"""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import tempfile
import time
from typing import Any, Dict, Optional
import fitz
from fastapi_common.fastapi_common_logger import logger
from leaudit.converters import doc2pdf
from leaudit.ocr.chandra_client import ChandraOCRError
from sqlalchemy import select
from fastapi_admin.celery_app import celery_app
from fastapi_admin.config import (
LEAUDIT_RULES_DIR,
LEAUDIT_STUCK_TIMEOUT_MINUTES,
LEAUDIT_WORKER_QUEUE_NORMAL,
LEAUDIT_WORKER_QUEUE_URGENT,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.nativeRunner import (
NativeRunRequest,
NativeRunner,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditCtxBuilder import (
NativeAuditMetadata,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.fileSourceResolver import (
FileSourceResolver,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleVersionResolver import (
RuleVersionResolver,
)
from fastapi_modules.fastapi_leaudit.leaudit_bridge.rules_loader import RulesLoader
from fastapi_modules.fastapi_leaudit.leaudit_bridge.storage_adapter import StorageAdapter
from fastapi_modules.fastapi_leaudit.models import (
LeauditAuditRun,
LeauditDocument,
LeauditDocumentFile,
)
log = logger
def _classify_run_failure(exc: Exception) -> tuple[str, str, dict[str, Any]]:
"""将底层异常归一化为可落库、可展示的失败信息。"""
raw_message = str(exc).strip() or exc.__class__.__name__
detail: dict[str, Any] = {
"rawMessage": raw_message,
"errorType": type(exc).__name__,
}
if isinstance(exc, ChandraOCRError):
lower_message = raw_message.lower()
detail["service"] = "ocr"
if "all connection attempts failed" in lower_message or "connect" in lower_message:
return (
"OCR_SERVICE_UNAVAILABLE",
"OCR服务暂时不可用,文档未完成识别,请稍后重试。",
{**detail, "reason": "connection_failed"},
)
if "timeout" in lower_message:
return (
"OCR_SERVICE_TIMEOUT",
"OCR服务处理超时,文档未完成识别,请稍后重试。",
{**detail, "reason": "timeout"},
)
if "ocr api returned" in lower_message:
return (
"OCR_SERVICE_BAD_RESPONSE",
"OCR服务响应异常,文档未完成识别,请稍后重试。",
{**detail, "reason": "bad_status"},
)
return (
"OCR_PROCESSING_FAILED",
"OCR处理失败,文档未完成识别,请稍后重试。",
{**detail, "reason": "ocr_failed"},
)
return (
"AUDIT_RUN_FAILED",
raw_message,
detail,
)
def leaudit_process_document(
document_id: int,
file_content: bytes,
filename: str,
upload_info: Optional[Dict[str, Any]] = None,
source_port: Optional[int] = None,
rules_path: Optional[str] = None,
):
"""处理单个文档的 LeAudit 任务。"""
task_id = os.urandom(8).hex()
log.info(f"[任务ID: {task_id}] leaudit管线开始处理: {filename}")
# 新平台:region 通过参数传递,不再依赖 os.environ 切换
if source_port:
log.info(f"[任务ID: {task_id}] 来源端口: {source_port}")
if upload_info is None:
upload_info = {}
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
temp_paths: list[str] = []
storage = StorageAdapter()
try:
run_id = _resolve_run_id(document_id, upload_info, loop)
rules_resolution = _resolve_rules_runtime(document_id, run_id, rules_path, loop)
loop.run_until_complete(_update_run_status_safe(run_id, "running"))
rules_path_resolved = rules_resolution["rules_path"]
attachment_inputs = list((upload_info or {}).get("attachments") or [])
rules_file = None
if rules_path_resolved:
loader = RulesLoader()
rules_file = loader.load(rules_path_resolved)
temp_rule_path = rules_resolution.get("temp_rule_path")
if isinstance(temp_rule_path, str):
temp_paths.append(temp_rule_path)
log.info(
f"[任务ID: {task_id}] RulesFile pre-loaded: {rules_path_resolved} "
f"({len(rules_file.flat_rules)} rules, {len(rules_file.flat_extract)} fields)"
)
else:
log.info(
f"[任务ID: {task_id}] No fixed rules_path — "
"will classify from document content after OCR"
)
attachment_metadata: list[dict[str, Any]] = []
for attachment in attachment_inputs:
attachment_metadata.append(
{
"fileId": attachment.get("file_id"),
"fileName": attachment.get("file_name") or attachment.get("filename"),
"sourceType": attachment.get("source_type"),
"sourcePath": attachment.get("source_path"),
}
)
temp_path, merged_attachment_metadata = _prepare_execution_input(
filename=filename,
file_content=file_content,
attachments=attachment_inputs,
temp_paths=temp_paths,
)
if merged_attachment_metadata:
for item in attachment_metadata:
for merged in merged_attachment_metadata:
if item["fileId"] == merged["fileId"]:
item["localPath"] = merged["localPath"]
item["mergedPdfPath"] = merged["mergedPdfPath"]
break
log.info("[任务ID: %s] 已装配并合并附件 %s 个到单输入文件", task_id, len(merged_attachment_metadata))
runner = NativeRunner()
t0 = time.time()
native_result = loop.run_until_complete(
runner.run(
NativeRunRequest(
metadata=NativeAuditMetadata(
run_id=run_id,
document_id=document_id,
rule_version_id=_optional_int(upload_info, "rule_version_id", "ruleVersionId"),
extras={
"taskId": task_id,
"attachments": attachment_metadata,
"mergedInput": bool(merged_attachment_metadata),
},
),
local_file_path=temp_path,
rules_file=rules_file,
rule_source_path=rules_path_resolved,
)
)
)
loop.run_until_complete(runner.persist_result(native_result))
elapsed = round(time.time() - t0, 2)
ctx = native_result.ctx
loop.run_until_complete(_update_run_phase_safe(run_id, ctx.phase))
loop.run_until_complete(_update_run_status_safe(run_id, "completed"))
loop.run_until_complete(_update_status_safe(document_id, "completed"))
log.info(
f"[任务ID: {task_id}] leaudit管线完成: phase={ctx.phase}, "
f"timing={dict(ctx.timing)}, 总耗时={elapsed:.1f}s"
)
return {
"status": "success",
"document_id": document_id,
"run_id": run_id,
"phase": ctx.phase,
"timing": dict(ctx.timing),
"errors": list(ctx.extraction.all_errors) if ctx.extraction is not None else list(ctx.extraction_errors),
}
except Exception as e:
log.error(f"[任务ID: {task_id}] leaudit管线失败: {e}", exc_info=True)
error_code, user_message, error_detail = _classify_run_failure(e)
try:
loop.run_until_complete(_update_status_safe(document_id, "failed"))
if 'run_id' in locals():
failed_phase = "persist"
if "native_result" in locals():
failed_phase = native_result.ctx.phase or failed_phase
loop.run_until_complete(
storage.fail_run(
document_id,
run_id=run_id,
phase=failed_phase,
message=user_message,
error_code=error_code,
detail_json={
"taskId": task_id,
"filename": filename,
**error_detail,
},
)
)
except Exception:
pass
raise
finally:
for temp_path in temp_paths:
try:
if Path(temp_path).exists():
os.remove(temp_path)
except OSError:
pass
loop.close()
def leaudit_process_document_by_run(
run_id: int,
*,
task_id: str | None = None,
rules_path: str | None = None,
queue_name: str | None = None,
) -> dict[str, Any]:
"""按 runId 加载执行上下文并执行原生 leaudit。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
claimed = loop.run_until_complete(_claim_run_safe(run_id, task_id))
if not claimed:
log.warning("run_id=%s 未抢占成功,跳过重复消费", run_id)
return {"status": "skipped", "run_id": run_id, "reason": "already_claimed"}
context = loop.run_until_complete(_load_run_context(run_id))
log.info(
"run_id=%s worker开始执行: queue=%s, speed=%s, filename=%s",
run_id,
queue_name or resolve_worker_queue(context.get("trigger_source")),
_queue_label(queue_name or resolve_worker_queue(context.get("trigger_source"))),
context["filename"],
)
return leaudit_process_document(
document_id=context["document_id"],
file_content=context["file_content"],
filename=context["filename"],
upload_info={
"run_id": run_id,
"rule_version_id": context["rule_version_id"],
"rule_source_oss_url": context["rule_source_oss_url"],
"source_type": context["source_type"],
"source_path": context["source_path"],
"trigger_source": context["trigger_source"],
"attachments": context.get("attachments") or [],
},
rules_path=rules_path,
)
finally:
loop.close()
@celery_app.task(
bind=True,
name="leaudit.process_document",
acks_late=True,
)
def leaudit_process_document_task(self, run_id: int, rules_path: str | None = None) -> dict[str, Any]:
"""Celery worker 入口 —— 按 runId 执行评查。"""
delivery_info = getattr(self.request, "delivery_info", {}) or {}
queue_name = delivery_info.get("routing_key") or delivery_info.get("queue")
return leaudit_process_document_by_run(
run_id=run_id,
task_id=self.request.id,
rules_path=rules_path,
queue_name=queue_name,
)
@celery_app.task(
bind=True,
name="leaudit.scan_stuck_documents",
)
def leaudit_scan_stuck_documents_task(self) -> dict[str, Any]:
"""周期扫描长时间无进展的评查文档,并自动标记失败。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_scan_and_fail_stuck_documents(timeout_minutes=max(1, int(LEAUDIT_STUCK_TIMEOUT_MINUTES)))
)
finally:
loop.close()
# type_id → rules directory mapping (only fixed-mapping types)
# 行政许可 (type_id=2) has 9 sub-types, NOT mapped here —
# must come from document metadata (rules_file_path) or content classification.
_TYPE_ID_RULES_MAP: dict[int, str] = {
3: "行政处罚",
}
def _resolve_rules_path(document_id: int, loop: asyncio.AbstractEventLoop) -> str | None:
"""Resolve rules_path: config override → document metadata → type_id mapping."""
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
# 1. Config override (when explicitly set in app.toml)
if LEAUDIT_RULES_DIR:
return LEAUDIT_RULES_DIR
try:
async def _fetch():
async with GetAsyncSession() as session:
result = await session.execute(
sa_text("SELECT type_id FROM leaudit_documents WHERE id = :did"),
{"did": document_id},
)
row = result.fetchone()
if row and row[0] and row[0] in _TYPE_ID_RULES_MAP:
return f"{_TYPE_ID_RULES_MAP[row[0]]}/rules.yaml"
return None
return loop.run_until_complete(_fetch())
except Exception as e:
log.warning(f"Failed to resolve rules_path from document: {e}")
return None
def _resolve_rules_runtime(
document_id: int,
run_id: int,
explicit_rules_path: str | None,
loop: asyncio.AbstractEventLoop,
) -> dict[str, str | None]:
"""解析本次执行使用的规则来源。"""
if explicit_rules_path:
return {
"rules_path": explicit_rules_path,
"temp_rule_path": None,
"source_type": "explicit",
"source_path": explicit_rules_path,
}
resolver = RuleVersionResolver()
try:
payload = loop.run_until_complete(resolver.ResolveForRun(run_id))
if payload:
log.info(
f"run_id={run_id} 规则来源已解析: sourceType={payload.sourceType}, "
f"sourcePath={payload.sourcePath}, localPath={payload.localPath}"
)
return {
"rules_path": payload.localPath,
"temp_rule_path": payload.localPath if payload.sourceType == "oss" else None,
"source_type": payload.sourceType,
"source_path": payload.sourcePath,
}
except Exception as e:
log.warning(f"Failed to resolve rule version from run: run_id={run_id}, error={e}")
fallback_rules_path = _resolve_rules_path(document_id, loop)
return {
"rules_path": fallback_rules_path,
"temp_rule_path": None,
"source_type": "legacy_fallback",
"source_path": fallback_rules_path,
}
def _resolve_run_id(
document_id: int,
upload_info: Dict[str, Any] | None,
loop: asyncio.AbstractEventLoop,
) -> int:
"""解析本次任务对应的运行 ID。"""
if upload_info:
for key in ("run_id", "runId"):
value = upload_info.get(key)
if isinstance(value, int):
return value
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
async def _fetch() -> int:
async with GetAsyncSession() as session:
result = await session.execute(
sa_text("SELECT id FROM leaudit_audit_runs WHERE document_id = :did ORDER BY id DESC LIMIT 1"),
{"did": document_id},
)
row = result.fetchone()
if not row:
raise ValueError(f"未找到 document_id={document_id} 对应的 run 记录")
return int(row[0])
return loop.run_until_complete(_fetch())
def _optional_int(payload: Dict[str, Any] | None, *keys: str) -> int | None:
"""从字典中按顺序取可用整数。"""
if not payload:
return None
for key in keys:
value = payload.get(key)
if isinstance(value, int):
return value
return None
async def _update_status_safe(document_id: int, status: str) -> None:
"""Safely update document status via SQLAlchemy, ignoring errors."""
try:
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
async with GetAsyncSession() as session:
await session.execute(
sa_text("UPDATE leaudit_documents SET processing_status = :s, updated_at = now() WHERE id = :did"),
{"s": status, "did": document_id},
)
await session.commit()
except Exception:
pass
async def _update_run_status_safe(run_id: int, status: str) -> None:
"""安全更新运行状态。"""
try:
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
async with GetAsyncSession() as session:
await session.execute(
sa_text("UPDATE leaudit_audit_runs SET status = :s, updated_at = now() WHERE id = :rid"),
{"s": status, "rid": run_id},
)
await session.commit()
except Exception:
pass
async def _update_run_phase_safe(run_id: int, phase: str | None) -> None:
"""安全更新运行阶段。"""
try:
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
async with GetAsyncSession() as session:
await session.execute(
sa_text("UPDATE leaudit_audit_runs SET phase = :p, updated_at = now() WHERE id = :rid"),
{"p": phase, "rid": run_id},
)
await session.commit()
except Exception:
pass
async def _claim_run_safe(run_id: int, task_id: str | None) -> bool:
"""原子抢占 queued/pending 运行,避免重复消费。"""
try:
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
async with GetAsyncSession() as session:
result = await session.execute(
sa_text(
"""
UPDATE leaudit_audit_runs
SET status = 'running',
phase = 'prepare',
task_id = COALESCE(:task_id, task_id),
started_at = COALESCE(started_at, now()),
updated_at = now()
WHERE id = :rid
AND status IN ('queued', 'pending', 'retrying')
RETURNING id
"""
),
{"rid": run_id, "task_id": task_id},
)
row = result.fetchone()
await session.commit()
return row is not None
except Exception:
log.exception("run_id=%s 抢占执行权失败", run_id)
return False
async def _load_run_context(run_id: int) -> dict[str, Any]:
"""按 runId 加载执行所需文档文件上下文。"""
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
async with GetAsyncSession() as session:
run = await session.get(LeauditAuditRun, run_id)
if not run:
raise ValueError(f"未找到 run_id={run_id} 对应的运行记录")
document = await session.get(LeauditDocument, run.documentId)
if not document:
raise ValueError(f"未找到 document_id={run.documentId} 对应的文档记录")
document_file = await session.get(LeauditDocumentFile, run.documentFileId)
if not document_file:
raise ValueError(f"未找到 document_file_id={run.documentFileId} 对应的文件记录")
resolver = FileSourceResolver()
payload = await resolver.ResolvePayload(document_file)
attachmentResult = await session.execute(
select(LeauditDocumentFile)
.where(
LeauditDocumentFile.documentId == document.Id,
LeauditDocumentFile.isActive.is_(True),
LeauditDocumentFile.fileRole == "attachment",
)
.order_by(LeauditDocumentFile.Id.asc())
)
attachmentFiles = list(attachmentResult.scalars().all())
attachmentPayloads = await resolver.ResolvePayloads(attachmentFiles) if attachmentFiles else []
return {
"document_id": document.Id,
"filename": payload.fileName,
"file_content": payload.fileContent,
"source_type": payload.sourceType,
"source_path": payload.sourcePath,
"rule_version_id": run.ruleVersionId,
"rule_source_oss_url": run.ruleSourceOssUrl,
"trigger_source": run.triggerSource,
"attachments": [
{
"file_id": attachmentFile.Id,
"file_name": attachmentPayload.fileName,
"file_content": attachmentPayload.fileContent,
"source_type": attachmentPayload.sourceType,
"source_path": attachmentPayload.sourcePath,
}
for attachmentFile, attachmentPayload in zip(attachmentFiles, attachmentPayloads)
],
}
def _get_suffix(filename: str) -> str:
"""Extract file suffix from filename."""
_, ext = os.path.splitext(filename)
return ext if ext else ".pdf"
def _write_temp_file(*, filename: str, content: bytes, temp_paths: list[str]) -> str:
"""Write bytes to a temp file preserving original suffix."""
suffix = _get_suffix(filename)
with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp:
temp.write(content)
temp_path = temp.name
temp_paths.append(temp_path)
return temp_path
def _convert_to_pdf_path(*, source_path: str, temp_paths: list[str]) -> str:
"""Convert a local source file to PDF temp path when needed."""
source = Path(source_path)
if source.suffix.lower() == ".pdf":
return source_path
pdf_temp = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
pdf_temp.close()
temp_paths.append(pdf_temp.name)
doc2pdf.convert(source, pdf_temp.name, soffice="auto", pdfa=False, force=True, verify=False)
return pdf_temp.name
def _merge_pdf_paths(*, pdf_paths: list[str], temp_paths: list[str]) -> str:
"""Merge many pdf files into one temp pdf."""
merged_temp = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False)
merged_temp.close()
temp_paths.append(merged_temp.name)
output = fitz.open()
try:
for pdf_path in pdf_paths:
src = fitz.open(pdf_path)
try:
output.insert_pdf(src)
finally:
src.close()
output.save(merged_temp.name)
finally:
output.close()
return merged_temp.name
def _prepare_execution_input(
*,
filename: str,
file_content: bytes,
attachments: list[dict[str, Any]],
temp_paths: list[str],
) -> tuple[str, list[dict[str, Any]]]:
"""Prepare the actual execution input file.
Without attachments: use the main file directly.
With attachments: convert main file and attachments to PDFs, merge into one PDF.
"""
main_local_path = _write_temp_file(filename=filename, content=file_content, temp_paths=temp_paths)
if not attachments:
return main_local_path, []
main_pdf_path = _convert_to_pdf_path(source_path=main_local_path, temp_paths=temp_paths)
merged_attachment_metadata: list[dict[str, Any]] = []
attachment_pdf_paths: list[str] = []
for attachment in attachments:
attachment_name = str(attachment.get("file_name") or attachment.get("filename") or "attachment.bin")
attachment_content = attachment["file_content"]
attachment_local_path = _write_temp_file(
filename=attachment_name,
content=attachment_content,
temp_paths=temp_paths,
)
attachment_pdf_path = _convert_to_pdf_path(source_path=attachment_local_path, temp_paths=temp_paths)
attachment_pdf_paths.append(attachment_pdf_path)
merged_attachment_metadata.append(
{
"fileId": attachment.get("file_id"),
"fileName": attachment_name,
"localPath": attachment_local_path,
"mergedPdfPath": attachment_pdf_path,
}
)
merged_pdf_path = _merge_pdf_paths(
pdf_paths=[main_pdf_path, *attachment_pdf_paths],
temp_paths=temp_paths,
)
return merged_pdf_path, merged_attachment_metadata
def dispatch_leaudit_task(
run_id: int,
*,
queue_name: str | None = None,
rules_path: Optional[str] = None,
) -> str:
"""投递 runId 到 Celery worker 队列。"""
target_queue = queue_name or LEAUDIT_WORKER_QUEUE_NORMAL
task = leaudit_process_document_task.apply_async(
kwargs={"run_id": run_id, "rules_path": rules_path},
queue=target_queue,
)
log.info(
"run_id=%s 已投递到 worker 队列: queue=%s, speed=%s, task_id=%s",
run_id,
target_queue,
_queue_label(target_queue),
task.id,
)
return task.id
def resolve_worker_queue(trigger_source: str | None) -> str:
"""按触发来源选择 worker 队列。"""
normalized = (trigger_source or "").strip().lower()
if "urgent" in normalized or "high" in normalized:
return LEAUDIT_WORKER_QUEUE_URGENT
return LEAUDIT_WORKER_QUEUE_NORMAL
def _queue_label(queue_name: str | None) -> str:
"""Map queue name to a user-facing speed label for logs."""
if queue_name == LEAUDIT_WORKER_QUEUE_URGENT:
return "urgent"
return "normal"
async def _scan_and_fail_stuck_documents(*, timeout_minutes: int) -> dict[str, Any]:
"""扫描当前 run 长时间无更新时间的文档,并将其标记为失败。"""
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
lock_key = 20260512
timed_out_rows: list[dict[str, Any]] = []
async with GetAsyncSession() as session:
lock_row = (
await session.execute(
sa_text("SELECT pg_try_advisory_lock(:lock_key)"),
{"lock_key": lock_key},
)
).fetchone()
has_lock = bool(lock_row[0]) if lock_row else False
if not has_lock:
log.info("stuck-scan skipped: advisory lock already held")
return {
"status": "skipped",
"reason": "lock_not_acquired",
"timeout_minutes": timeout_minutes,
"matched": 0,
"failed": 0,
}
try:
rows = (
await session.execute(
sa_text(
"""
SELECT
d.id AS document_id,
d.processing_status,
d.current_run_id,
d.updated_at AS document_updated_at,
d.region,
d.normalized_name,
ar.id AS run_id,
ar.status AS run_status,
ar.phase,
ar.task_id,
ar.updated_at AS run_updated_at,
EXTRACT(EPOCH FROM (
NOW() - CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END
))::bigint AS idle_seconds
FROM leaudit_documents d
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE d.deleted_at IS NULL
AND (
LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
OR LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'queued', 'running', 'processing')
)
AND CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END < NOW() - make_interval(mins => :timeout_minutes)
ORDER BY CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END ASC,
d.id ASC
"""
),
{"timeout_minutes": timeout_minutes},
)
).mappings().all()
timed_out_rows = [dict(row) for row in rows]
finally:
await session.execute(
sa_text("SELECT pg_advisory_unlock(:lock_key)"),
{"lock_key": lock_key},
)
await session.commit()
storage = StorageAdapter()
failed_items: list[dict[str, Any]] = []
for row in timed_out_rows:
document_id = int(row["document_id"])
run_id = int(row["run_id"]) if row.get("run_id") is not None else None
idle_seconds = int(row.get("idle_seconds") or 0)
run_phase = row.get("phase")
run_status = row.get("run_status")
processing_status = row.get("processing_status")
file_name = row.get("normalized_name") or f"document-{document_id}"
message = (
f"文档处理长时间无进展,已自动终止:"
f"document_id={document_id}, run_id={run_id}, "
f"processing_status={processing_status}, run_status={run_status}, "
f"phase={run_phase}, idle_seconds={idle_seconds}"
)
try:
await _update_status_safe(document_id, "failed")
if run_id is not None:
await storage.fail_run(
document_id,
run_id=run_id,
phase=run_phase or "dispatch",
message=message,
detail_json={
"reason": "stuck_timeout",
"timeoutMinutes": timeout_minutes,
"idleSeconds": idle_seconds,
"taskId": row.get("task_id"),
"runStatus": run_status,
"processingStatus": processing_status,
"phase": run_phase,
"fileName": file_name,
"region": row.get("region"),
},
)
failed_items.append(
{
"document_id": document_id,
"run_id": run_id,
"phase": run_phase,
"idle_seconds": idle_seconds,
}
)
log.warning("stuck document auto-failed: %s", message)
except Exception:
log.exception("stuck document auto-fail failed: document_id=%s run_id=%s", document_id, run_id)
return {
"status": "success",
"timeout_minutes": timeout_minutes,
"matched": len(timed_out_rows),
"failed": len(failed_items),
"items": failed_items,
}