"""LeAudit 任务入口。""" from __future__ import annotations import asyncio import os from pathlib import Path import tempfile import time from typing import Any, Dict, Optional import fitz from fastapi_common.fastapi_common_logger import logger from leaudit.converters import doc2pdf from leaudit.ocr.chandra_client import ChandraOCRError from sqlalchemy import select from fastapi_admin.celery_app import celery_app from fastapi_admin.config import ( LEAUDIT_RULES_DIR, LEAUDIT_STUCK_TIMEOUT_MINUTES, LEAUDIT_WORKER_QUEUE_NORMAL, LEAUDIT_WORKER_QUEUE_URGENT, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.nativeRunner import ( NativeRunRequest, NativeRunner, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditCtxBuilder import ( NativeAuditMetadata, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.fileSourceResolver import ( FileSourceResolver, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleVersionResolver import ( RuleVersionResolver, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.rules_loader import RulesLoader from fastapi_modules.fastapi_leaudit.leaudit_bridge.storage_adapter import StorageAdapter from fastapi_modules.fastapi_leaudit.models import ( LeauditAuditRun, LeauditDocument, LeauditDocumentFile, ) log = logger def _classify_run_failure(exc: Exception) -> tuple[str, str, dict[str, Any]]: """将底层异常归一化为可落库、可展示的失败信息。""" raw_message = str(exc).strip() or exc.__class__.__name__ detail: dict[str, Any] = { "rawMessage": raw_message, "errorType": type(exc).__name__, } if isinstance(exc, ChandraOCRError): lower_message = raw_message.lower() detail["service"] = "ocr" if "all connection attempts failed" in lower_message or "connect" in lower_message: return ( "OCR_SERVICE_UNAVAILABLE", "OCR服务暂时不可用,文档未完成识别,请稍后重试。", {**detail, "reason": "connection_failed"}, ) if "timeout" in lower_message: return ( "OCR_SERVICE_TIMEOUT", "OCR服务处理超时,文档未完成识别,请稍后重试。", {**detail, "reason": "timeout"}, ) if "ocr api returned" in lower_message: return ( "OCR_SERVICE_BAD_RESPONSE", "OCR服务响应异常,文档未完成识别,请稍后重试。", {**detail, "reason": "bad_status"}, ) return ( "OCR_PROCESSING_FAILED", "OCR处理失败,文档未完成识别,请稍后重试。", {**detail, "reason": "ocr_failed"}, ) return ( "AUDIT_RUN_FAILED", raw_message, detail, ) def leaudit_process_document( document_id: int, file_content: bytes, filename: str, upload_info: Optional[Dict[str, Any]] = None, source_port: Optional[int] = None, rules_path: Optional[str] = None, ): """处理单个文档的 LeAudit 任务。""" task_id = os.urandom(8).hex() log.info(f"[任务ID: {task_id}] leaudit管线开始处理: {filename}") # 新平台:region 通过参数传递,不再依赖 os.environ 切换 if source_port: log.info(f"[任务ID: {task_id}] 来源端口: {source_port}") if upload_info is None: upload_info = {} loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) temp_paths: list[str] = [] storage = StorageAdapter() try: run_id = _resolve_run_id(document_id, upload_info, loop) rules_resolution = _resolve_rules_runtime(document_id, run_id, rules_path, loop) loop.run_until_complete(_update_run_status_safe(run_id, "running")) rules_path_resolved = rules_resolution["rules_path"] attachment_inputs = list((upload_info or {}).get("attachments") or []) rules_file = None if rules_path_resolved: loader = RulesLoader() rules_file = loader.load(rules_path_resolved) temp_rule_path = rules_resolution.get("temp_rule_path") if isinstance(temp_rule_path, str): temp_paths.append(temp_rule_path) log.info( f"[任务ID: {task_id}] RulesFile pre-loaded: {rules_path_resolved} " f"({len(rules_file.flat_rules)} rules, {len(rules_file.flat_extract)} fields)" ) else: log.info( f"[任务ID: {task_id}] No fixed rules_path — " "will classify from document content after OCR" ) attachment_metadata: list[dict[str, Any]] = [] for attachment in attachment_inputs: attachment_metadata.append( { "fileId": attachment.get("file_id"), "fileName": attachment.get("file_name") or attachment.get("filename"), "sourceType": attachment.get("source_type"), "sourcePath": attachment.get("source_path"), } ) temp_path, merged_attachment_metadata = _prepare_execution_input( filename=filename, file_content=file_content, attachments=attachment_inputs, temp_paths=temp_paths, ) if merged_attachment_metadata: for item in attachment_metadata: for merged in merged_attachment_metadata: if item["fileId"] == merged["fileId"]: item["localPath"] = merged["localPath"] item["mergedPdfPath"] = merged["mergedPdfPath"] break log.info("[任务ID: %s] 已装配并合并附件 %s 个到单输入文件", task_id, len(merged_attachment_metadata)) runner = NativeRunner() t0 = time.time() native_result = loop.run_until_complete( runner.run( NativeRunRequest( metadata=NativeAuditMetadata( run_id=run_id, document_id=document_id, rule_version_id=_optional_int(upload_info, "rule_version_id", "ruleVersionId"), extras={ "taskId": task_id, "attachments": attachment_metadata, "mergedInput": bool(merged_attachment_metadata), }, ), local_file_path=temp_path, rules_file=rules_file, rule_source_path=rules_path_resolved, ) ) ) loop.run_until_complete(runner.persist_result(native_result)) elapsed = round(time.time() - t0, 2) ctx = native_result.ctx loop.run_until_complete(_update_run_phase_safe(run_id, ctx.phase)) loop.run_until_complete(_update_run_status_safe(run_id, "completed")) loop.run_until_complete(_update_status_safe(document_id, "completed")) log.info( f"[任务ID: {task_id}] leaudit管线完成: phase={ctx.phase}, " f"timing={dict(ctx.timing)}, 总耗时={elapsed:.1f}s" ) return { "status": "success", "document_id": document_id, "run_id": run_id, "phase": ctx.phase, "timing": dict(ctx.timing), "errors": list(ctx.extraction.all_errors) if ctx.extraction is not None else list(ctx.extraction_errors), } except Exception as e: log.error(f"[任务ID: {task_id}] leaudit管线失败: {e}", exc_info=True) error_code, user_message, error_detail = _classify_run_failure(e) try: loop.run_until_complete(_update_status_safe(document_id, "failed")) if 'run_id' in locals(): failed_phase = "persist" if "native_result" in locals(): failed_phase = native_result.ctx.phase or failed_phase loop.run_until_complete( storage.fail_run( document_id, run_id=run_id, phase=failed_phase, message=user_message, error_code=error_code, detail_json={ "taskId": task_id, "filename": filename, **error_detail, }, ) ) except Exception: pass raise finally: for temp_path in temp_paths: try: if Path(temp_path).exists(): os.remove(temp_path) except OSError: pass loop.close() def leaudit_process_document_by_run( run_id: int, *, task_id: str | None = None, rules_path: str | None = None, queue_name: str | None = None, ) -> dict[str, Any]: """按 runId 加载执行上下文并执行原生 leaudit。""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: claimed = loop.run_until_complete(_claim_run_safe(run_id, task_id)) if not claimed: log.warning("run_id=%s 未抢占成功,跳过重复消费", run_id) return {"status": "skipped", "run_id": run_id, "reason": "already_claimed"} context = loop.run_until_complete(_load_run_context(run_id)) log.info( "run_id=%s worker开始执行: queue=%s, speed=%s, filename=%s", run_id, queue_name or resolve_worker_queue(context.get("trigger_source")), _queue_label(queue_name or resolve_worker_queue(context.get("trigger_source"))), context["filename"], ) return leaudit_process_document( document_id=context["document_id"], file_content=context["file_content"], filename=context["filename"], upload_info={ "run_id": run_id, "rule_version_id": context["rule_version_id"], "rule_source_oss_url": context["rule_source_oss_url"], "source_type": context["source_type"], "source_path": context["source_path"], "trigger_source": context["trigger_source"], "attachments": context.get("attachments") or [], }, rules_path=rules_path, ) finally: loop.close() @celery_app.task( bind=True, name="leaudit.process_document", acks_late=True, ) def leaudit_process_document_task(self, run_id: int, rules_path: str | None = None) -> dict[str, Any]: """Celery worker 入口 —— 按 runId 执行评查。""" delivery_info = getattr(self.request, "delivery_info", {}) or {} queue_name = delivery_info.get("routing_key") or delivery_info.get("queue") return leaudit_process_document_by_run( run_id=run_id, task_id=self.request.id, rules_path=rules_path, queue_name=queue_name, ) @celery_app.task( bind=True, name="leaudit.scan_stuck_documents", ) def leaudit_scan_stuck_documents_task(self) -> dict[str, Any]: """周期扫描长时间无进展的评查文档,并自动标记失败。""" loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: return loop.run_until_complete( _scan_and_fail_stuck_documents(timeout_minutes=max(1, int(LEAUDIT_STUCK_TIMEOUT_MINUTES))) ) finally: loop.close() # type_id → rules directory mapping (only fixed-mapping types) # 行政许可 (type_id=2) has 9 sub-types, NOT mapped here — # must come from document metadata (rules_file_path) or content classification. _TYPE_ID_RULES_MAP: dict[int, str] = { 3: "行政处罚", } def _resolve_rules_path(document_id: int, loop: asyncio.AbstractEventLoop) -> str | None: """Resolve rules_path: config override → document metadata → type_id mapping.""" from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text # 1. Config override (when explicitly set in app.toml) if LEAUDIT_RULES_DIR: return LEAUDIT_RULES_DIR try: async def _fetch(): async with GetAsyncSession() as session: result = await session.execute( sa_text("SELECT type_id FROM leaudit_documents WHERE id = :did"), {"did": document_id}, ) row = result.fetchone() if row and row[0] and row[0] in _TYPE_ID_RULES_MAP: return f"{_TYPE_ID_RULES_MAP[row[0]]}/rules.yaml" return None return loop.run_until_complete(_fetch()) except Exception as e: log.warning(f"Failed to resolve rules_path from document: {e}") return None def _resolve_rules_runtime( document_id: int, run_id: int, explicit_rules_path: str | None, loop: asyncio.AbstractEventLoop, ) -> dict[str, str | None]: """解析本次执行使用的规则来源。""" if explicit_rules_path: return { "rules_path": explicit_rules_path, "temp_rule_path": None, "source_type": "explicit", "source_path": explicit_rules_path, } resolver = RuleVersionResolver() try: payload = loop.run_until_complete(resolver.ResolveForRun(run_id)) if payload: log.info( f"run_id={run_id} 规则来源已解析: sourceType={payload.sourceType}, " f"sourcePath={payload.sourcePath}, localPath={payload.localPath}" ) return { "rules_path": payload.localPath, "temp_rule_path": payload.localPath if payload.sourceType == "oss" else None, "source_type": payload.sourceType, "source_path": payload.sourcePath, } except Exception as e: log.warning(f"Failed to resolve rule version from run: run_id={run_id}, error={e}") fallback_rules_path = _resolve_rules_path(document_id, loop) return { "rules_path": fallback_rules_path, "temp_rule_path": None, "source_type": "legacy_fallback", "source_path": fallback_rules_path, } def _resolve_run_id( document_id: int, upload_info: Dict[str, Any] | None, loop: asyncio.AbstractEventLoop, ) -> int: """解析本次任务对应的运行 ID。""" if upload_info: for key in ("run_id", "runId"): value = upload_info.get(key) if isinstance(value, int): return value from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text async def _fetch() -> int: async with GetAsyncSession() as session: result = await session.execute( sa_text("SELECT id FROM leaudit_audit_runs WHERE document_id = :did ORDER BY id DESC LIMIT 1"), {"did": document_id}, ) row = result.fetchone() if not row: raise ValueError(f"未找到 document_id={document_id} 对应的 run 记录") return int(row[0]) return loop.run_until_complete(_fetch()) def _optional_int(payload: Dict[str, Any] | None, *keys: str) -> int | None: """从字典中按顺序取可用整数。""" if not payload: return None for key in keys: value = payload.get(key) if isinstance(value, int): return value return None async def _update_status_safe(document_id: int, status: str) -> None: """Safely update document status via SQLAlchemy, ignoring errors.""" try: from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text async with GetAsyncSession() as session: await session.execute( sa_text("UPDATE leaudit_documents SET processing_status = :s, updated_at = now() WHERE id = :did"), {"s": status, "did": document_id}, ) await session.commit() except Exception: pass async def _update_run_status_safe(run_id: int, status: str) -> None: """安全更新运行状态。""" try: from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text async with GetAsyncSession() as session: await session.execute( sa_text("UPDATE leaudit_audit_runs SET status = :s, updated_at = now() WHERE id = :rid"), {"s": status, "rid": run_id}, ) await session.commit() except Exception: pass async def _update_run_phase_safe(run_id: int, phase: str | None) -> None: """安全更新运行阶段。""" try: from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text async with GetAsyncSession() as session: await session.execute( sa_text("UPDATE leaudit_audit_runs SET phase = :p, updated_at = now() WHERE id = :rid"), {"p": phase, "rid": run_id}, ) await session.commit() except Exception: pass async def _claim_run_safe(run_id: int, task_id: str | None) -> bool: """原子抢占 queued/pending 运行,避免重复消费。""" try: from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text async with GetAsyncSession() as session: result = await session.execute( sa_text( """ UPDATE leaudit_audit_runs SET status = 'running', phase = 'prepare', task_id = COALESCE(:task_id, task_id), started_at = COALESCE(started_at, now()), updated_at = now() WHERE id = :rid AND status IN ('queued', 'pending', 'retrying') RETURNING id """ ), {"rid": run_id, "task_id": task_id}, ) row = result.fetchone() await session.commit() return row is not None except Exception: log.exception("run_id=%s 抢占执行权失败", run_id) return False async def _load_run_context(run_id: int) -> dict[str, Any]: """按 runId 加载执行所需文档文件上下文。""" from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession async with GetAsyncSession() as session: run = await session.get(LeauditAuditRun, run_id) if not run: raise ValueError(f"未找到 run_id={run_id} 对应的运行记录") document = await session.get(LeauditDocument, run.documentId) if not document: raise ValueError(f"未找到 document_id={run.documentId} 对应的文档记录") document_file = await session.get(LeauditDocumentFile, run.documentFileId) if not document_file: raise ValueError(f"未找到 document_file_id={run.documentFileId} 对应的文件记录") resolver = FileSourceResolver() payload = await resolver.ResolvePayload(document_file) attachmentResult = await session.execute( select(LeauditDocumentFile) .where( LeauditDocumentFile.documentId == document.Id, LeauditDocumentFile.isActive.is_(True), LeauditDocumentFile.fileRole == "attachment", ) .order_by(LeauditDocumentFile.Id.asc()) ) attachmentFiles = list(attachmentResult.scalars().all()) attachmentPayloads = await resolver.ResolvePayloads(attachmentFiles) if attachmentFiles else [] return { "document_id": document.Id, "filename": payload.fileName, "file_content": payload.fileContent, "source_type": payload.sourceType, "source_path": payload.sourcePath, "rule_version_id": run.ruleVersionId, "rule_source_oss_url": run.ruleSourceOssUrl, "trigger_source": run.triggerSource, "attachments": [ { "file_id": attachmentFile.Id, "file_name": attachmentPayload.fileName, "file_content": attachmentPayload.fileContent, "source_type": attachmentPayload.sourceType, "source_path": attachmentPayload.sourcePath, } for attachmentFile, attachmentPayload in zip(attachmentFiles, attachmentPayloads) ], } def _get_suffix(filename: str) -> str: """Extract file suffix from filename.""" _, ext = os.path.splitext(filename) return ext if ext else ".pdf" def _write_temp_file(*, filename: str, content: bytes, temp_paths: list[str]) -> str: """Write bytes to a temp file preserving original suffix.""" suffix = _get_suffix(filename) with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as temp: temp.write(content) temp_path = temp.name temp_paths.append(temp_path) return temp_path def _convert_to_pdf_path(*, source_path: str, temp_paths: list[str]) -> str: """Convert a local source file to PDF temp path when needed.""" source = Path(source_path) if source.suffix.lower() == ".pdf": return source_path pdf_temp = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) pdf_temp.close() temp_paths.append(pdf_temp.name) doc2pdf.convert(source, pdf_temp.name, soffice="auto", pdfa=False, force=True, verify=False) return pdf_temp.name def _merge_pdf_paths(*, pdf_paths: list[str], temp_paths: list[str]) -> str: """Merge many pdf files into one temp pdf.""" merged_temp = tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) merged_temp.close() temp_paths.append(merged_temp.name) output = fitz.open() try: for pdf_path in pdf_paths: src = fitz.open(pdf_path) try: output.insert_pdf(src) finally: src.close() output.save(merged_temp.name) finally: output.close() return merged_temp.name def _prepare_execution_input( *, filename: str, file_content: bytes, attachments: list[dict[str, Any]], temp_paths: list[str], ) -> tuple[str, list[dict[str, Any]]]: """Prepare the actual execution input file. Without attachments: use the main file directly. With attachments: convert main file and attachments to PDFs, merge into one PDF. """ main_local_path = _write_temp_file(filename=filename, content=file_content, temp_paths=temp_paths) if not attachments: return main_local_path, [] main_pdf_path = _convert_to_pdf_path(source_path=main_local_path, temp_paths=temp_paths) merged_attachment_metadata: list[dict[str, Any]] = [] attachment_pdf_paths: list[str] = [] for attachment in attachments: attachment_name = str(attachment.get("file_name") or attachment.get("filename") or "attachment.bin") attachment_content = attachment["file_content"] attachment_local_path = _write_temp_file( filename=attachment_name, content=attachment_content, temp_paths=temp_paths, ) attachment_pdf_path = _convert_to_pdf_path(source_path=attachment_local_path, temp_paths=temp_paths) attachment_pdf_paths.append(attachment_pdf_path) merged_attachment_metadata.append( { "fileId": attachment.get("file_id"), "fileName": attachment_name, "localPath": attachment_local_path, "mergedPdfPath": attachment_pdf_path, } ) merged_pdf_path = _merge_pdf_paths( pdf_paths=[main_pdf_path, *attachment_pdf_paths], temp_paths=temp_paths, ) return merged_pdf_path, merged_attachment_metadata def dispatch_leaudit_task( run_id: int, *, queue_name: str | None = None, rules_path: Optional[str] = None, ) -> str: """投递 runId 到 Celery worker 队列。""" target_queue = queue_name or LEAUDIT_WORKER_QUEUE_NORMAL task = leaudit_process_document_task.apply_async( kwargs={"run_id": run_id, "rules_path": rules_path}, queue=target_queue, ) log.info( "run_id=%s 已投递到 worker 队列: queue=%s, speed=%s, task_id=%s", run_id, target_queue, _queue_label(target_queue), task.id, ) return task.id def resolve_worker_queue(trigger_source: str | None) -> str: """按触发来源选择 worker 队列。""" normalized = (trigger_source or "").strip().lower() if "urgent" in normalized or "high" in normalized: return LEAUDIT_WORKER_QUEUE_URGENT return LEAUDIT_WORKER_QUEUE_NORMAL def _queue_label(queue_name: str | None) -> str: """Map queue name to a user-facing speed label for logs.""" if queue_name == LEAUDIT_WORKER_QUEUE_URGENT: return "urgent" return "normal" async def _scan_and_fail_stuck_documents(*, timeout_minutes: int) -> dict[str, Any]: """扫描当前 run 长时间无更新时间的文档,并将其标记为失败。""" from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from sqlalchemy import text as sa_text lock_key = 20260512 timed_out_rows: list[dict[str, Any]] = [] async with GetAsyncSession() as session: lock_row = ( await session.execute( sa_text("SELECT pg_try_advisory_lock(:lock_key)"), {"lock_key": lock_key}, ) ).fetchone() has_lock = bool(lock_row[0]) if lock_row else False if not has_lock: log.info("stuck-scan skipped: advisory lock already held") return { "status": "skipped", "reason": "lock_not_acquired", "timeout_minutes": timeout_minutes, "matched": 0, "failed": 0, } try: rows = ( await session.execute( sa_text( """ SELECT d.id AS document_id, d.processing_status, d.current_run_id, d.updated_at AS document_updated_at, d.region, d.normalized_name, ar.id AS run_id, ar.status AS run_status, ar.phase, ar.task_id, ar.updated_at AS run_updated_at, EXTRACT(EPOCH FROM ( NOW() - CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') THEN COALESCE(ar.updated_at, d.updated_at) ELSE COALESCE(d.updated_at, ar.updated_at) END ))::bigint AS idle_seconds FROM leaudit_documents d LEFT JOIN leaudit_audit_runs ar ON ar.id = d.current_run_id WHERE d.deleted_at IS NULL AND ( LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') OR LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'queued', 'running', 'processing') ) AND CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') THEN COALESCE(ar.updated_at, d.updated_at) ELSE COALESCE(d.updated_at, ar.updated_at) END < NOW() - make_interval(mins => :timeout_minutes) ORDER BY CASE WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') THEN COALESCE(ar.updated_at, d.updated_at) ELSE COALESCE(d.updated_at, ar.updated_at) END ASC, d.id ASC """ ), {"timeout_minutes": timeout_minutes}, ) ).mappings().all() timed_out_rows = [dict(row) for row in rows] finally: await session.execute( sa_text("SELECT pg_advisory_unlock(:lock_key)"), {"lock_key": lock_key}, ) await session.commit() storage = StorageAdapter() failed_items: list[dict[str, Any]] = [] for row in timed_out_rows: document_id = int(row["document_id"]) run_id = int(row["run_id"]) if row.get("run_id") is not None else None idle_seconds = int(row.get("idle_seconds") or 0) run_phase = row.get("phase") run_status = row.get("run_status") processing_status = row.get("processing_status") file_name = row.get("normalized_name") or f"document-{document_id}" message = ( f"文档处理长时间无进展,已自动终止:" f"document_id={document_id}, run_id={run_id}, " f"processing_status={processing_status}, run_status={run_status}, " f"phase={run_phase}, idle_seconds={idle_seconds}" ) try: await _update_status_safe(document_id, "failed") if run_id is not None: await storage.fail_run( document_id, run_id=run_id, phase=run_phase or "dispatch", message=message, detail_json={ "reason": "stuck_timeout", "timeoutMinutes": timeout_minutes, "idleSeconds": idle_seconds, "taskId": row.get("task_id"), "runStatus": run_status, "processingStatus": processing_status, "phase": run_phase, "fileName": file_name, "region": row.get("region"), }, ) failed_items.append( { "document_id": document_id, "run_id": run_id, "phase": run_phase, "idle_seconds": idle_seconds, } ) log.warning("stuck document auto-failed: %s", message) except Exception: log.exception("stuck document auto-fail failed: document_id=%s run_id=%s", document_id, run_id) return { "status": "success", "timeout_minutes": timeout_minutes, "matched": len(timed_out_rows), "failed": len(failed_items), "items": failed_items, }