"""Govdoc Bridge — Celery 任务入口。 将 govdoc 审查执行投递到 Celery worker 队列。 """ from __future__ import annotations import asyncio from typing import Any from fastapi_common.fastapi_common_logger import logger from fastapi_admin.celery_app import celery_app from fastapi_modules.fastapi_leaudit.govdoc_bridge.runner import GovdocRunner from fastapi_modules.fastapi_leaudit.govdoc_bridge.storage_adapter import StorageAdapter log = logger GOVDOC_WORKER_QUEUE = "govdoc" GOVDOC_WORKER_QUEUE_URGENT = "govdoc_urgent" def resolve_govdoc_queue(speed: str = "normal") -> str: """根据优先级返回对应的 worker 队列名。""" if (speed or "").strip().lower() in {"urgent", "high", "fast", "紧急"}: return GOVDOC_WORKER_QUEUE_URGENT return GOVDOC_WORKER_QUEUE def dispatch_govdoc_task( documentId: int, runId: int, triggerUserId: int | None = None, speed: str = "normal", ) -> Any: """投递 govdoc 审查任务到 Celery 队列。 Args: documentId: 文档 ID。 runId: 已创建的 govdoc_runs.id。 triggerUserId: 触发人。 speed: 优先级 ('normal' / 'urgent')。 Returns: Celery AsyncResult。 """ queue = resolve_govdoc_queue(speed) log.info( f"[Govdoc] Dispatching task: runId={runId}, documentId={documentId}, queue={queue}" ) return govdoc_execute_task.apply_async( kwargs={ "documentId": documentId, "runId": runId, "triggerUserId": triggerUserId, "speed": speed, }, queue=queue, ) @celery_app.task( bind=True, name="govdoc_execute_task", max_retries=0, default_retry_delay=60, acks_late=True, reject_on_worker_lost=True, task_time_limit=30 * 60, # 单次执行 30 分钟超时 task_soft_time_limit=25 * 60, ) def govdoc_execute_task( self, documentId: int, runId: int, triggerUserId: int | None = None, speed: str = "normal", ) -> dict[str, Any]: """Celery 任务:执行一次 govdoc 公文格式审查。 此任务由 dispatch_govdoc_task 投递,worker 消费后执行完整审查链路。 """ taskId = self.request.id or "unknown" log.info(f"[Govdoc] Task started: taskId={taskId}, runId={runId}, documentId={documentId}") storage = StorageAdapter() loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: # 更新 run 状态 → running loop.run_until_complete(storage.UpdateRunStatus(runId, "processing", phase="parsing")) # 执行完整审查链路 runner = GovdocRunner() result = loop.run_until_complete( runner.Execute( DocumentId=documentId, RunId=runId, TriggerUserId=triggerUserId, Speed=speed, ) ) log.info(f"[Govdoc] Task completed: taskId={taskId}, runId={runId}") return result except Exception as exc: errorMessage = str(exc)[:2000] log.exception(f"[Govdoc] Task failed: taskId={taskId}, runId={runId}, error={errorMessage[:200]}") loop.run_until_complete(storage.UpdateRunError(runId, errorMessage)) loop.run_until_complete(storage.UpdateDocumentStatus(documentId, "failed", runId)) raise finally: loop.close()