# 队列状态 API 文档 ## 概述 队列状态 API 提供文档处理队列的实时状态查询功能,让用户能够了解: - 当前队列中有多少文档在等待 - 正在处理的文档数量 - 自己的文档在队列中的位置 - 预估等待时间 **更新日志 (2026-01-29)**: - `documents` 字段改为从数据库查询,不再依赖 Redis metrics - 新增 `documents.waiting_ids` 字段 - 新增 `documents.processing_details` 字段 - 修复:即使 Worker 未启动,也能正确显示等待处理的文档数量 ## API 端点 ### 1. 查询队列状态 获取文档处理队列的整体状态。 **请求** ```http GET /api/v2/system/queue/status Authorization: Bearer ``` **响应** ```json { "success": true, "timestamp": "2026-01-28T16:45:00.123456", "queue": { "pending_tasks": 5, "processing_tasks": 2, "available_slots": 2, "max_concurrent": 4 }, "documents": { "waiting": 3, "waiting_ids": [101, 102, 103], "processing": 2, "processing_ids": [123, 456], "processing_details": [ {"id": 123, "status": "Cutting"}, {"id": 456, "status": "Extractioning"} ] } } ``` **响应字段说明** | 字段 | 类型 | 数据来源 | 说明 | |------|------|----------|------| | `success` | boolean | - | 请求是否成功 | | `timestamp` | string | - | 查询时间(ISO 8601 格式) | | `queue.pending_tasks` | integer | Redis | Celery 队列中等待的任务数 | | `queue.processing_tasks` | integer | Redis | 正在处理的任务数(通过并发许可统计) | | `queue.available_slots` | integer | Redis | 可用的处理槽位 | | `queue.max_concurrent` | integer | 配置 | 最大并发数(默认 4) | | `documents.waiting` | integer | **数据库** | 等待处理的文档数(status='Queued') | | `documents.waiting_ids` | array | **数据库** | 等待中的文档 ID 列表 | | `documents.processing` | integer | **数据库** | 正在处理的文档数 | | `documents.processing_ids` | array | **数据库** | 正在处理的文档 ID 列表 | | `documents.processing_details` | array | **数据库** | 处理中文档的详细状态 | **处理中文档状态** `processing_details` 中的 `status` 字段可能的值: | 状态 | 说明 | |------|------| | `Cutting` | OCR 识别中 | | `Extractioning` | AI 分析中 | | `Evaluationing` | 规则检查中 | --- ### 2. 查询文档排队位置 查询指定文档在队列中的位置。 **请求** ```http GET /api/v2/system/queue/position/{document_id} Authorization: Bearer ``` **路径参数** | 参数 | 类型 | 必填 | 说明 | |------|------|------|------| | `document_id` | integer | 是 | 文档 ID | **响应示例 - 排队中** ```json { "success": true, "document_id": 456, "status": "Queued", "position": 3, "ahead_count": 2, "total_in_queue": 5, "estimated_wait_minutes": 4, "message": "文档在队列中排第 3 位,前面有 2 个文档" } ``` **响应示例 - OCR 识别中** ```json { "success": true, "document_id": 123, "status": "Cutting", "position": 0, "ahead_count": 0, "message": "OCR 识别中" } ``` **响应示例 - AI 分析中** ```json { "success": true, "document_id": 123, "status": "Extractioning", "position": 0, "ahead_count": 0, "message": "AI 分析中" } ``` **响应示例 - 规则检查中** ```json { "success": true, "document_id": 123, "status": "Evaluationing", "position": 0, "ahead_count": 0, "message": "规则检查中" } ``` **响应示例 - 处理完成** ```json { "success": true, "document_id": 789, "status": "Processed", "position": null, "ahead_count": 0, "message": "文档处理完成" } ``` **响应示例 - 处理失败** ```json { "success": true, "document_id": 789, "status": "Failed", "position": null, "ahead_count": 0, "message": "文档处理失败" } ``` **响应字段说明** | 字段 | 类型 | 说明 | |------|------|------| | `success` | boolean | 请求是否成功 | | `document_id` | integer | 文档 ID | | `status` | string | 文档状态(见下表) | | `position` | integer | 在队列中的位置(从 1 开始) | | `ahead_count` | integer | 前面排队的文档数 | | `total_in_queue` | integer | 队列中的总文档数 | | `estimated_wait_minutes` | integer | 预估等待时间(分钟) | | `message` | string | 状态描述 | **文档状态说明** | 状态 | 说明 | 阶段 | |------|------|------| | `Queued` | 排队等待中 | 等待 | | `Cutting` | OCR 识别中 | 处理 | | `Extractioning` | AI 分析中 | 处理 | | `Evaluationing` | 规则检查中 | 处理 | | `Processed` | 处理完成 | 完成 | | `Failed` | 处理失败 | 完成 | --- ### 3. 查询队列详情(管理员) 获取队列中任务的详细信息,用于管理和监控。 **请求** ```http GET /api/v2/system/queue/details?limit=20 Authorization: Bearer ``` **查询参数** | 参数 | 类型 | 必填 | 默认值 | 说明 | |------|------|------|--------|------| | `limit` | integer | 否 | 20 | 返回的最大任务数(1-100) | **响应** ```json { "success": true, "total_in_queue": 5, "showing": 5, "tasks": [ { "position": 1, "task_name": "document.ocr_processing_v2", "task_id": "abc123-def456-...", "document_id": 123 }, { "position": 2, "task_name": "extractionV3.extraction_processing", "task_id": "ghi789-jkl012-...", "document_id": 456 } ] } ``` **响应字段说明** | 字段 | 类型 | 说明 | |------|------|------| | `success` | boolean | 请求是否成功 | | `total_in_queue` | integer | 队列中的总任务数 | | `showing` | integer | 当前返回的任务数 | | `tasks` | array | 任务列表 | | `tasks[].position` | integer | 任务在队列中的位置 | | `tasks[].task_name` | string | 任务名称 | | `tasks[].task_id` | string | Celery 任务 ID | | `tasks[].document_id` | integer | 关联的文档 ID | --- ## 错误响应 所有 API 在发生错误时返回统一格式: ```json { "detail": "错误描述信息" } ``` **HTTP 状态码** | 状态码 | 说明 | |--------|------| | 200 | 成功 | | 401 | 未授权(Token 无效或过期) | | 500 | 服务器内部错误 | --- ## 使用示例 ### cURL ```bash # 查询队列状态 curl -X GET "http://localhost:8000/api/v2/system/queue/status" \ -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." # 查询文档位置 curl -X GET "http://localhost:8000/api/v2/system/queue/position/123" \ -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." # 查询队列详情 curl -X GET "http://localhost:8000/api/v2/system/queue/details?limit=10" \ -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." ``` ### JavaScript (Axios) ```javascript import axios from 'axios'; const api = axios.create({ baseURL: 'http://localhost:8000/api/v2', headers: { 'Authorization': `Bearer ${token}` } }); // 查询队列状态 const getQueueStatus = async () => { const response = await api.get('/system/queue/status'); return response.data; }; // 查询文档位置 const getDocumentPosition = async (documentId) => { const response = await api.get(`/system/queue/position/${documentId}`); return response.data; }; // 轮询文档状态 const pollDocumentStatus = async (documentId, interval = 5000) => { const poll = async () => { const result = await getDocumentPosition(documentId); if (result.status === 'Processed' || result.status === 'Failed') { console.log('文档处理完成或失败'); return result; } if (result.status === 'Queued') { console.log(`排队中,前面还有 ${result.ahead_count} 个文档`); } else { console.log(`处理中: ${result.status}`); } // 继续轮询 setTimeout(poll, interval); }; return poll(); }; ``` ### Python ```python import requests BASE_URL = "http://localhost:8000/api/v2" TOKEN = "your_jwt_token" headers = { "Authorization": f"Bearer {TOKEN}" } # 查询队列状态 def get_queue_status(): response = requests.get(f"{BASE_URL}/system/queue/status", headers=headers) return response.json() # 查询文档位置 def get_document_position(document_id: int): response = requests.get( f"{BASE_URL}/system/queue/position/{document_id}", headers=headers ) return response.json() # 使用示例 status = get_queue_status() print(f"队列中等待: {status['documents']['waiting']} 个文档") print(f"正在处理: {status['documents']['processing']} 个文档") if status['documents']['waiting'] > 0: print(f"等待中的文档ID: {status['documents']['waiting_ids']}") position = get_document_position(123) if position['status'] == 'Queued': print(f"文档排在第 {position['position']} 位") print(f"预计等待 {position['estimated_wait_minutes']} 分钟") ``` --- ## 注意事项 1. **预估等待时间**:基于每个文档平均处理 2 分钟计算,实际时间可能因文档大小和复杂度而异。 2. **轮询频率**:建议轮询间隔不低于 3-5 秒,避免对服务器造成过大压力。 3. **权限要求**:所有 API 都需要有效的 JWT Token。 4. **队列详情 API**:建议仅管理员使用,普通用户使用位置查询 API 即可。 5. **数据一致性**: - `queue.pending_tasks` 反映 Celery 队列中的任务数 - `documents.waiting` 反映数据库中 `status='Queued'` 的文档数 - 两者可能不完全一致(一个文档可能对应多个任务) 6. **Worker 未启动时**: - `documents.waiting` 会正确显示等待中的文档数 - `queue.processing_tasks` 和 `documents.processing` 为 0 - 这可以用来检测 Worker 是否正常运行