9.6 KiB
9.6 KiB
队列状态 API 文档
概述
队列状态 API 提供文档处理队列的实时状态查询功能,让用户能够了解:
- 当前队列中有多少文档在等待
- 正在处理的文档数量
- 自己的文档在队列中的位置
- 预估等待时间
更新日志 (2026-01-29):
documents字段改为从数据库查询,不再依赖 Redis metrics- 新增
documents.waiting_ids字段 - 新增
documents.processing_details字段 - 修复:即使 Worker 未启动,也能正确显示等待处理的文档数量
API 端点
1. 查询队列状态
获取文档处理队列的整体状态。
请求
GET /api/v2/system/queue/status
Authorization: Bearer <token>
响应
{
"success": true,
"timestamp": "2026-01-28T16:45:00.123456",
"queue": {
"pending_tasks": 5,
"processing_tasks": 2,
"available_slots": 2,
"max_concurrent": 4
},
"documents": {
"waiting": 3,
"waiting_ids": [101, 102, 103],
"processing": 2,
"processing_ids": [123, 456],
"processing_details": [
{"id": 123, "status": "Cutting"},
{"id": 456, "status": "Extractioning"}
]
}
}
响应字段说明
| 字段 | 类型 | 数据来源 | 说明 |
|---|---|---|---|
success |
boolean | - | 请求是否成功 |
timestamp |
string | - | 查询时间(ISO 8601 格式) |
queue.pending_tasks |
integer | Redis | Celery 队列中等待的任务数 |
queue.processing_tasks |
integer | Redis | 正在处理的任务数(通过并发许可统计) |
queue.available_slots |
integer | Redis | 可用的处理槽位 |
queue.max_concurrent |
integer | 配置 | 最大并发数(默认 4) |
documents.waiting |
integer | 数据库 | 等待处理的文档数(status='Queued') |
documents.waiting_ids |
array | 数据库 | 等待中的文档 ID 列表 |
documents.processing |
integer | 数据库 | 正在处理的文档数 |
documents.processing_ids |
array | 数据库 | 正在处理的文档 ID 列表 |
documents.processing_details |
array | 数据库 | 处理中文档的详细状态 |
处理中文档状态
processing_details 中的 status 字段可能的值:
| 状态 | 说明 |
|---|---|
Cutting |
OCR 识别中 |
Extractioning |
AI 分析中 |
Evaluationing |
规则检查中 |
2. 查询文档排队位置
查询指定文档在队列中的位置。
请求
GET /api/v2/system/queue/position/{document_id}
Authorization: Bearer <token>
路径参数
| 参数 | 类型 | 必填 | 说明 |
|---|---|---|---|
document_id |
integer | 是 | 文档 ID |
响应示例 - 排队中
{
"success": true,
"document_id": 456,
"status": "Queued",
"position": 3,
"ahead_count": 2,
"total_in_queue": 5,
"estimated_wait_minutes": 4,
"message": "文档在队列中排第 3 位,前面有 2 个文档"
}
响应示例 - OCR 识别中
{
"success": true,
"document_id": 123,
"status": "Cutting",
"position": 0,
"ahead_count": 0,
"message": "OCR 识别中"
}
响应示例 - AI 分析中
{
"success": true,
"document_id": 123,
"status": "Extractioning",
"position": 0,
"ahead_count": 0,
"message": "AI 分析中"
}
响应示例 - 规则检查中
{
"success": true,
"document_id": 123,
"status": "Evaluationing",
"position": 0,
"ahead_count": 0,
"message": "规则检查中"
}
响应示例 - 处理完成
{
"success": true,
"document_id": 789,
"status": "Processed",
"position": null,
"ahead_count": 0,
"message": "文档处理完成"
}
响应示例 - 处理失败
{
"success": true,
"document_id": 789,
"status": "Failed",
"position": null,
"ahead_count": 0,
"message": "文档处理失败"
}
响应字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
success |
boolean | 请求是否成功 |
document_id |
integer | 文档 ID |
status |
string | 文档状态(见下表) |
position |
integer | 在队列中的位置(从 1 开始) |
ahead_count |
integer | 前面排队的文档数 |
total_in_queue |
integer | 队列中的总文档数 |
estimated_wait_minutes |
integer | 预估等待时间(分钟) |
message |
string | 状态描述 |
文档状态说明
| 状态 | 说明 | 阶段 |
|---|---|---|
Queued |
排队等待中 | 等待 |
Cutting |
OCR 识别中 | 处理 |
Extractioning |
AI 分析中 | 处理 |
Evaluationing |
规则检查中 | 处理 |
Processed |
处理完成 | 完成 |
Failed |
处理失败 | 完成 |
3. 查询队列详情(管理员)
获取队列中任务的详细信息,用于管理和监控。
请求
GET /api/v2/system/queue/details?limit=20
Authorization: Bearer <token>
查询参数
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|---|---|---|---|---|
limit |
integer | 否 | 20 | 返回的最大任务数(1-100) |
响应
{
"success": true,
"total_in_queue": 5,
"showing": 5,
"tasks": [
{
"position": 1,
"task_name": "document.ocr_processing_v2",
"task_id": "abc123-def456-...",
"document_id": 123
},
{
"position": 2,
"task_name": "extractionV3.extraction_processing",
"task_id": "ghi789-jkl012-...",
"document_id": 456
}
]
}
响应字段说明
| 字段 | 类型 | 说明 |
|---|---|---|
success |
boolean | 请求是否成功 |
total_in_queue |
integer | 队列中的总任务数 |
showing |
integer | 当前返回的任务数 |
tasks |
array | 任务列表 |
tasks[].position |
integer | 任务在队列中的位置 |
tasks[].task_name |
string | 任务名称 |
tasks[].task_id |
string | Celery 任务 ID |
tasks[].document_id |
integer | 关联的文档 ID |
错误响应
所有 API 在发生错误时返回统一格式:
{
"detail": "错误描述信息"
}
HTTP 状态码
| 状态码 | 说明 |
|---|---|
| 200 | 成功 |
| 401 | 未授权(Token 无效或过期) |
| 500 | 服务器内部错误 |
使用示例
cURL
# 查询队列状态
curl -X GET "http://localhost:8000/api/v2/system/queue/status" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 查询文档位置
curl -X GET "http://localhost:8000/api/v2/system/queue/position/123" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 查询队列详情
curl -X GET "http://localhost:8000/api/v2/system/queue/details?limit=10" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
JavaScript (Axios)
import axios from 'axios';
const api = axios.create({
baseURL: 'http://localhost:8000/api/v2',
headers: {
'Authorization': `Bearer ${token}`
}
});
// 查询队列状态
const getQueueStatus = async () => {
const response = await api.get('/system/queue/status');
return response.data;
};
// 查询文档位置
const getDocumentPosition = async (documentId) => {
const response = await api.get(`/system/queue/position/${documentId}`);
return response.data;
};
// 轮询文档状态
const pollDocumentStatus = async (documentId, interval = 5000) => {
const poll = async () => {
const result = await getDocumentPosition(documentId);
if (result.status === 'Processed' || result.status === 'Failed') {
console.log('文档处理完成或失败');
return result;
}
if (result.status === 'Queued') {
console.log(`排队中,前面还有 ${result.ahead_count} 个文档`);
} else {
console.log(`处理中: ${result.status}`);
}
// 继续轮询
setTimeout(poll, interval);
};
return poll();
};
Python
import requests
BASE_URL = "http://localhost:8000/api/v2"
TOKEN = "your_jwt_token"
headers = {
"Authorization": f"Bearer {TOKEN}"
}
# 查询队列状态
def get_queue_status():
response = requests.get(f"{BASE_URL}/system/queue/status", headers=headers)
return response.json()
# 查询文档位置
def get_document_position(document_id: int):
response = requests.get(
f"{BASE_URL}/system/queue/position/{document_id}",
headers=headers
)
return response.json()
# 使用示例
status = get_queue_status()
print(f"队列中等待: {status['documents']['waiting']} 个文档")
print(f"正在处理: {status['documents']['processing']} 个文档")
if status['documents']['waiting'] > 0:
print(f"等待中的文档ID: {status['documents']['waiting_ids']}")
position = get_document_position(123)
if position['status'] == 'Queued':
print(f"文档排在第 {position['position']} 位")
print(f"预计等待 {position['estimated_wait_minutes']} 分钟")
注意事项
-
预估等待时间:基于每个文档平均处理 2 分钟计算,实际时间可能因文档大小和复杂度而异。
-
轮询频率:建议轮询间隔不低于 3-5 秒,避免对服务器造成过大压力。
-
权限要求:所有 API 都需要有效的 JWT Token。
-
队列详情 API:建议仅管理员使用,普通用户使用位置查询 API 即可。
-
数据一致性:
queue.pending_tasks反映 Celery 队列中的任务数documents.waiting反映数据库中status='Queued'的文档数- 两者可能不完全一致(一个文档可能对应多个任务)
-
Worker 未启动时:
documents.waiting会正确显示等待中的文档数queue.processing_tasks和documents.processing为 0- 这可以用来检测 Worker 是否正常运行