fix: harden cross-review task state handling

This commit is contained in:
wren
2026-05-12 11:30:56 +08:00
parent 2ca44f6312
commit 3823c9a2e4
10 changed files with 373 additions and 20 deletions
@@ -47,11 +47,13 @@ class CrossReviewTaskDocumentVO(BaseModel):
name: str = Field("", description="文档名称")
documentNumber: str | None = Field(None, description="文号")
typeId: int | None = Field(None, description="文档类型ID")
typeName: str | None = Field(None, description="文档类型名称")
processingStatus: str | None = Field(None, description="处理状态")
versionNo: int = Field(1, description="版本号")
isLatestVersion: bool = Field(True, description="是否最新版本")
auditStatus: int = Field(0, description="任务内完成状态")
createdAt: datetime | None = Field(None, description="创建时间")
fileSize: int = Field(0, description="文件大小(字节)")
class CrossReviewTaskDocumentPageVO(BaseModel):
@@ -359,7 +359,9 @@ class StorageAdapter:
"level": level,
"error_code": error_code,
"message": message,
"detail_json": detail_json,
"detail_json": json.dumps(detail_json, ensure_ascii=False)
if detail_json is not None
else None,
},
)
await session.commit()
@@ -17,6 +17,7 @@ from sqlalchemy import select
from fastapi_admin.celery_app import celery_app
from fastapi_admin.config import (
LEAUDIT_RULES_DIR,
LEAUDIT_STUCK_TIMEOUT_MINUTES,
LEAUDIT_WORKER_QUEUE_NORMAL,
LEAUDIT_WORKER_QUEUE_URGENT,
)
@@ -256,6 +257,22 @@ def leaudit_process_document_task(self, run_id: int, rules_path: str | None = No
)
@celery_app.task(
bind=True,
name="leaudit.scan_stuck_documents",
)
def leaudit_scan_stuck_documents_task(self) -> dict[str, Any]:
"""周期扫描长时间无进展的评查文档,并自动标记失败。"""
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
return loop.run_until_complete(
_scan_and_fail_stuck_documents(timeout_minutes=max(1, int(LEAUDIT_STUCK_TIMEOUT_MINUTES)))
)
finally:
loop.close()
# type_id → rules directory mapping (only fixed-mapping types)
# 行政许可 (type_id=2) has 9 sub-types, NOT mapped here —
# must come from document metadata (rules_file_path) or content classification.
@@ -633,3 +650,146 @@ def _queue_label(queue_name: str | None) -> str:
if queue_name == LEAUDIT_WORKER_QUEUE_URGENT:
return "urgent"
return "normal"
async def _scan_and_fail_stuck_documents(*, timeout_minutes: int) -> dict[str, Any]:
"""扫描当前 run 长时间无更新时间的文档,并将其标记为失败。"""
from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession
from sqlalchemy import text as sa_text
lock_key = 20260512
timed_out_rows: list[dict[str, Any]] = []
async with GetAsyncSession() as session:
lock_row = (
await session.execute(
sa_text("SELECT pg_try_advisory_lock(:lock_key)"),
{"lock_key": lock_key},
)
).fetchone()
has_lock = bool(lock_row[0]) if lock_row else False
if not has_lock:
log.info("stuck-scan skipped: advisory lock already held")
return {
"status": "skipped",
"reason": "lock_not_acquired",
"timeout_minutes": timeout_minutes,
"matched": 0,
"failed": 0,
}
try:
rows = (
await session.execute(
sa_text(
"""
SELECT
d.id AS document_id,
d.processing_status,
d.current_run_id,
d.updated_at AS document_updated_at,
d.region,
d.normalized_name,
ar.id AS run_id,
ar.status AS run_status,
ar.phase,
ar.task_id,
ar.updated_at AS run_updated_at,
EXTRACT(EPOCH FROM (
NOW() - CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END
))::bigint AS idle_seconds
FROM leaudit_documents d
JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
WHERE d.deleted_at IS NULL
AND d.is_latest_version = true
AND (
LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
OR LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'queued', 'running', 'processing')
)
AND CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END < NOW() - make_interval(mins => :timeout_minutes)
ORDER BY CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN COALESCE(ar.updated_at, d.updated_at)
ELSE COALESCE(d.updated_at, ar.updated_at)
END ASC,
d.id ASC
"""
),
{"timeout_minutes": timeout_minutes},
)
).mappings().all()
timed_out_rows = [dict(row) for row in rows]
finally:
await session.execute(
sa_text("SELECT pg_advisory_unlock(:lock_key)"),
{"lock_key": lock_key},
)
await session.commit()
storage = StorageAdapter()
failed_items: list[dict[str, Any]] = []
for row in timed_out_rows:
document_id = int(row["document_id"])
run_id = int(row["run_id"])
idle_seconds = int(row.get("idle_seconds") or 0)
run_phase = row.get("phase")
run_status = row.get("run_status")
processing_status = row.get("processing_status")
file_name = row.get("normalized_name") or f"document-{document_id}"
message = (
f"文档处理长时间无进展,已自动终止:"
f"document_id={document_id}, run_id={run_id}, "
f"processing_status={processing_status}, run_status={run_status}, "
f"phase={run_phase}, idle_seconds={idle_seconds}"
)
try:
await _update_status_safe(document_id, "failed")
await storage.fail_run(
document_id,
run_id=run_id,
phase=run_phase or "dispatch",
message=message,
detail_json={
"reason": "stuck_timeout",
"timeoutMinutes": timeout_minutes,
"idleSeconds": idle_seconds,
"taskId": row.get("task_id"),
"runStatus": run_status,
"processingStatus": processing_status,
"phase": run_phase,
"fileName": file_name,
"region": row.get("region"),
},
)
failed_items.append(
{
"document_id": document_id,
"run_id": run_id,
"phase": run_phase,
"idle_seconds": idle_seconds,
}
)
log.warning("stuck document auto-failed: %s", message)
except Exception:
log.exception("stuck document auto-fail failed: document_id=%s run_id=%s", document_id, run_id)
return {
"status": "success",
"timeout_minutes": timeout_minutes,
"matched": len(timed_out_rows),
"failed": len(failed_items),
"items": failed_items,
}
@@ -414,14 +414,35 @@ class CrossReviewServiceImpl(ICrossReviewService):
COALESCE(d.normalized_name, '') AS name,
CAST(d.biz_document_id AS TEXT) AS document_number,
d.type_id,
d.processing_status,
COALESCE(
CASE
WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying')
THEN ar.status
ELSE d.processing_status
END,
d.processing_status,
'waiting'
) AS processing_status,
d.version_no,
d.is_latest_version,
d.created_at,
td.audit_status
td.audit_status,
COALESCE(dt.name, '') AS type_name,
COALESCE(df.file_size, 0) AS file_size
FROM leaudit_cross_review_task_documents td
JOIN leaudit_documents d
ON d.id = td.document_id
LEFT JOIN leaudit_audit_runs ar
ON ar.id = d.current_run_id
LEFT JOIN leaudit_document_types dt
ON dt.id = d.type_id
LEFT JOIN LATERAL (
SELECT file_size
FROM leaudit_document_files
WHERE document_id = d.id
ORDER BY id ASC
LIMIT 1
) df ON TRUE
WHERE {whereSql}
ORDER BY d.created_at DESC, d.id DESC
LIMIT :limit OFFSET :offset
@@ -437,11 +458,13 @@ class CrossReviewServiceImpl(ICrossReviewService):
name=str(row["name"] or ""),
documentNumber=row.get("document_number"),
typeId=self._to_int(row.get("type_id")),
typeName=row.get("type_name"),
processingStatus=row.get("processing_status"),
versionNo=int(row.get("version_no") or 1),
isLatestVersion=bool(row.get("is_latest_version")),
auditStatus=int(row.get("audit_status") or 0),
createdAt=row.get("created_at"),
fileSize=int(row.get("file_size") or 0),
)
for row in rows
]