From 9e1b7a6de7c532bfd6117630ff0dacce22d2d36c Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Thu, 30 Apr 2026 12:32:37 +0800 Subject: [PATCH] feat: add queue status endpoint for upload page GET /api/v2/system/queue/status returns counts of documents by processing_status (waiting/processing) from leaudit_documents, plus processing document IDs for the frontend progress display. --- .../controllers/documentController.py | 76 +++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/fastapi_modules/fastapi_leaudit/controllers/documentController.py b/fastapi_modules/fastapi_leaudit/controllers/documentController.py index a47bcb1..a42ad32 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/documentController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/documentController.py @@ -3,7 +3,10 @@ from typing import Any from fastapi import File, Form, Query, UploadFile +from pydantic import BaseModel, Field +from sqlalchemy import text +from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result @@ -12,6 +15,13 @@ from fastapi_modules.fastapi_leaudit.services import IDocumentService from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl +class QueueStatusVO(BaseModel): + success: bool = Field(True) + timestamp: str = Field("") + queue: dict[str, int] = Field(default_factory=lambda: {"pending_tasks": 0, "processing_tasks": 0, "available_slots": 4, "max_concurrent": 4}) + documents: dict[str, object] = Field(default_factory=lambda: {"waiting": 0, "processing": 0, "processing_ids": []}) + + class DocumentController(BaseController): """文档控制器。""" @@ -84,3 +94,69 @@ class DocumentController(BaseController): idList = [int(x.strip()) for x in ids.split(",") if x.strip().isdigit()] Data = await self.DocumentService.ListDocumentTypes(Ids=idList) return Result.success(data=Data) + + @self.router.get("/v2/system/queue/status", response_model=Result[QueueStatusVO]) + async def GetQueueStatus(): + """获取文档处理队列状态。""" + from datetime import datetime + + async with GetAsyncSession() as Session: + statusRows = ( + await Session.execute( + text( + """ + SELECT processing_status, COUNT(*) AS cnt + FROM leaudit_documents + WHERE deleted_at IS NULL AND is_latest_version = true + GROUP BY processing_status + """ + ) + ) + ).mappings().all() + + waiting = 0 + processing = 0 + for row in statusRows: + s = str(row["processing_status"] or "") + c = int(row["cnt"] or 0) + if s == "waiting": + waiting = c + elif s in ("processing", "running"): + processing += c + + processingIdsRows: list[int] = [] + if processing > 0: + async with GetAsyncSession() as Session: + idRows = ( + await Session.execute( + text( + """ + SELECT id FROM leaudit_documents + WHERE deleted_at IS NULL + AND is_latest_version = true + AND processing_status IN ('processing', 'running') + ORDER BY updated_at DESC + LIMIT 50 + """ + ) + ) + ).fetchall() + processingIdsRows = [int(r[0]) for r in idRows] + + return Result.success( + data=QueueStatusVO( + success=True, + timestamp=datetime.now().isoformat(), + queue={ + "pending_tasks": waiting, + "processing_tasks": processing, + "available_slots": max(0, 4 - processing), + "max_concurrent": 4, + }, + documents={ + "waiting": waiting, + "processing": processing, + "processing_ids": processingIdsRows, + }, + ) + )