408 lines
9.6 KiB
Markdown
408 lines
9.6 KiB
Markdown
# 队列状态 API 文档
|
||
|
||
## 概述
|
||
|
||
队列状态 API 提供文档处理队列的实时状态查询功能,让用户能够了解:
|
||
- 当前队列中有多少文档在等待
|
||
- 正在处理的文档数量
|
||
- 自己的文档在队列中的位置
|
||
- 预估等待时间
|
||
|
||
**更新日志 (2026-01-29)**:
|
||
- `documents` 字段改为从数据库查询,不再依赖 Redis metrics
|
||
- 新增 `documents.waiting_ids` 字段
|
||
- 新增 `documents.processing_details` 字段
|
||
- 修复:即使 Worker 未启动,也能正确显示等待处理的文档数量
|
||
|
||
## API 端点
|
||
|
||
### 1. 查询队列状态
|
||
|
||
获取文档处理队列的整体状态。
|
||
|
||
**请求**
|
||
|
||
```http
|
||
GET /api/v2/system/queue/status
|
||
Authorization: Bearer <token>
|
||
```
|
||
|
||
**响应**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"timestamp": "2026-01-28T16:45:00.123456",
|
||
"queue": {
|
||
"pending_tasks": 5,
|
||
"processing_tasks": 2,
|
||
"available_slots": 2,
|
||
"max_concurrent": 4
|
||
},
|
||
"documents": {
|
||
"waiting": 3,
|
||
"waiting_ids": [101, 102, 103],
|
||
"processing": 2,
|
||
"processing_ids": [123, 456],
|
||
"processing_details": [
|
||
{"id": 123, "status": "Cutting"},
|
||
{"id": 456, "status": "Extractioning"}
|
||
]
|
||
}
|
||
}
|
||
```
|
||
|
||
**响应字段说明**
|
||
|
||
| 字段 | 类型 | 数据来源 | 说明 |
|
||
|------|------|----------|------|
|
||
| `success` | boolean | - | 请求是否成功 |
|
||
| `timestamp` | string | - | 查询时间(ISO 8601 格式) |
|
||
| `queue.pending_tasks` | integer | Redis | Celery 队列中等待的任务数 |
|
||
| `queue.processing_tasks` | integer | Redis | 正在处理的任务数(通过并发许可统计) |
|
||
| `queue.available_slots` | integer | Redis | 可用的处理槽位 |
|
||
| `queue.max_concurrent` | integer | 配置 | 最大并发数(默认 4) |
|
||
| `documents.waiting` | integer | **数据库** | 等待处理的文档数(status='Queued') |
|
||
| `documents.waiting_ids` | array | **数据库** | 等待中的文档 ID 列表 |
|
||
| `documents.processing` | integer | **数据库** | 正在处理的文档数 |
|
||
| `documents.processing_ids` | array | **数据库** | 正在处理的文档 ID 列表 |
|
||
| `documents.processing_details` | array | **数据库** | 处理中文档的详细状态 |
|
||
|
||
**处理中文档状态**
|
||
|
||
`processing_details` 中的 `status` 字段可能的值:
|
||
|
||
| 状态 | 说明 |
|
||
|------|------|
|
||
| `Cutting` | OCR 识别中 |
|
||
| `Extractioning` | AI 分析中 |
|
||
| `Evaluationing` | 规则检查中 |
|
||
|
||
---
|
||
|
||
### 2. 查询文档排队位置
|
||
|
||
查询指定文档在队列中的位置。
|
||
|
||
**请求**
|
||
|
||
```http
|
||
GET /api/v2/system/queue/position/{document_id}
|
||
Authorization: Bearer <token>
|
||
```
|
||
|
||
**路径参数**
|
||
|
||
| 参数 | 类型 | 必填 | 说明 |
|
||
|------|------|------|------|
|
||
| `document_id` | integer | 是 | 文档 ID |
|
||
|
||
**响应示例 - 排队中**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 456,
|
||
"status": "Queued",
|
||
"position": 3,
|
||
"ahead_count": 2,
|
||
"total_in_queue": 5,
|
||
"estimated_wait_minutes": 4,
|
||
"message": "文档在队列中排第 3 位,前面有 2 个文档"
|
||
}
|
||
```
|
||
|
||
**响应示例 - OCR 识别中**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 123,
|
||
"status": "Cutting",
|
||
"position": 0,
|
||
"ahead_count": 0,
|
||
"message": "OCR 识别中"
|
||
}
|
||
```
|
||
|
||
**响应示例 - AI 分析中**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 123,
|
||
"status": "Extractioning",
|
||
"position": 0,
|
||
"ahead_count": 0,
|
||
"message": "AI 分析中"
|
||
}
|
||
```
|
||
|
||
**响应示例 - 规则检查中**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 123,
|
||
"status": "Evaluationing",
|
||
"position": 0,
|
||
"ahead_count": 0,
|
||
"message": "规则检查中"
|
||
}
|
||
```
|
||
|
||
**响应示例 - 处理完成**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 789,
|
||
"status": "Processed",
|
||
"position": null,
|
||
"ahead_count": 0,
|
||
"message": "文档处理完成"
|
||
}
|
||
```
|
||
|
||
**响应示例 - 处理失败**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"document_id": 789,
|
||
"status": "Failed",
|
||
"position": null,
|
||
"ahead_count": 0,
|
||
"message": "文档处理失败"
|
||
}
|
||
```
|
||
|
||
**响应字段说明**
|
||
|
||
| 字段 | 类型 | 说明 |
|
||
|------|------|------|
|
||
| `success` | boolean | 请求是否成功 |
|
||
| `document_id` | integer | 文档 ID |
|
||
| `status` | string | 文档状态(见下表) |
|
||
| `position` | integer | 在队列中的位置(从 1 开始) |
|
||
| `ahead_count` | integer | 前面排队的文档数 |
|
||
| `total_in_queue` | integer | 队列中的总文档数 |
|
||
| `estimated_wait_minutes` | integer | 预估等待时间(分钟) |
|
||
| `message` | string | 状态描述 |
|
||
|
||
**文档状态说明**
|
||
|
||
| 状态 | 说明 | 阶段 |
|
||
|------|------|------|
|
||
| `Queued` | 排队等待中 | 等待 |
|
||
| `Cutting` | OCR 识别中 | 处理 |
|
||
| `Extractioning` | AI 分析中 | 处理 |
|
||
| `Evaluationing` | 规则检查中 | 处理 |
|
||
| `Processed` | 处理完成 | 完成 |
|
||
| `Failed` | 处理失败 | 完成 |
|
||
|
||
---
|
||
|
||
### 3. 查询队列详情(管理员)
|
||
|
||
获取队列中任务的详细信息,用于管理和监控。
|
||
|
||
**请求**
|
||
|
||
```http
|
||
GET /api/v2/system/queue/details?limit=20
|
||
Authorization: Bearer <token>
|
||
```
|
||
|
||
**查询参数**
|
||
|
||
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|
||
|------|------|------|--------|------|
|
||
| `limit` | integer | 否 | 20 | 返回的最大任务数(1-100) |
|
||
|
||
**响应**
|
||
|
||
```json
|
||
{
|
||
"success": true,
|
||
"total_in_queue": 5,
|
||
"showing": 5,
|
||
"tasks": [
|
||
{
|
||
"position": 1,
|
||
"task_name": "document.ocr_processing_v2",
|
||
"task_id": "abc123-def456-...",
|
||
"document_id": 123
|
||
},
|
||
{
|
||
"position": 2,
|
||
"task_name": "extractionV3.extraction_processing",
|
||
"task_id": "ghi789-jkl012-...",
|
||
"document_id": 456
|
||
}
|
||
]
|
||
}
|
||
```
|
||
|
||
**响应字段说明**
|
||
|
||
| 字段 | 类型 | 说明 |
|
||
|------|------|------|
|
||
| `success` | boolean | 请求是否成功 |
|
||
| `total_in_queue` | integer | 队列中的总任务数 |
|
||
| `showing` | integer | 当前返回的任务数 |
|
||
| `tasks` | array | 任务列表 |
|
||
| `tasks[].position` | integer | 任务在队列中的位置 |
|
||
| `tasks[].task_name` | string | 任务名称 |
|
||
| `tasks[].task_id` | string | Celery 任务 ID |
|
||
| `tasks[].document_id` | integer | 关联的文档 ID |
|
||
|
||
---
|
||
|
||
## 错误响应
|
||
|
||
所有 API 在发生错误时返回统一格式:
|
||
|
||
```json
|
||
{
|
||
"detail": "错误描述信息"
|
||
}
|
||
```
|
||
|
||
**HTTP 状态码**
|
||
|
||
| 状态码 | 说明 |
|
||
|--------|------|
|
||
| 200 | 成功 |
|
||
| 401 | 未授权(Token 无效或过期) |
|
||
| 500 | 服务器内部错误 |
|
||
|
||
---
|
||
|
||
## 使用示例
|
||
|
||
### cURL
|
||
|
||
```bash
|
||
# 查询队列状态
|
||
curl -X GET "http://localhost:8000/api/v2/system/queue/status" \
|
||
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||
|
||
# 查询文档位置
|
||
curl -X GET "http://localhost:8000/api/v2/system/queue/position/123" \
|
||
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||
|
||
# 查询队列详情
|
||
curl -X GET "http://localhost:8000/api/v2/system/queue/details?limit=10" \
|
||
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
|
||
```
|
||
|
||
### JavaScript (Axios)
|
||
|
||
```javascript
|
||
import axios from 'axios';
|
||
|
||
const api = axios.create({
|
||
baseURL: 'http://localhost:8000/api/v2',
|
||
headers: {
|
||
'Authorization': `Bearer ${token}`
|
||
}
|
||
});
|
||
|
||
// 查询队列状态
|
||
const getQueueStatus = async () => {
|
||
const response = await api.get('/system/queue/status');
|
||
return response.data;
|
||
};
|
||
|
||
// 查询文档位置
|
||
const getDocumentPosition = async (documentId) => {
|
||
const response = await api.get(`/system/queue/position/${documentId}`);
|
||
return response.data;
|
||
};
|
||
|
||
// 轮询文档状态
|
||
const pollDocumentStatus = async (documentId, interval = 5000) => {
|
||
const poll = async () => {
|
||
const result = await getDocumentPosition(documentId);
|
||
|
||
if (result.status === 'Processed' || result.status === 'Failed') {
|
||
console.log('文档处理完成或失败');
|
||
return result;
|
||
}
|
||
|
||
if (result.status === 'Queued') {
|
||
console.log(`排队中,前面还有 ${result.ahead_count} 个文档`);
|
||
} else {
|
||
console.log(`处理中: ${result.status}`);
|
||
}
|
||
|
||
// 继续轮询
|
||
setTimeout(poll, interval);
|
||
};
|
||
|
||
return poll();
|
||
};
|
||
```
|
||
|
||
### Python
|
||
|
||
```python
|
||
import requests
|
||
|
||
BASE_URL = "http://localhost:8000/api/v2"
|
||
TOKEN = "your_jwt_token"
|
||
|
||
headers = {
|
||
"Authorization": f"Bearer {TOKEN}"
|
||
}
|
||
|
||
# 查询队列状态
|
||
def get_queue_status():
|
||
response = requests.get(f"{BASE_URL}/system/queue/status", headers=headers)
|
||
return response.json()
|
||
|
||
# 查询文档位置
|
||
def get_document_position(document_id: int):
|
||
response = requests.get(
|
||
f"{BASE_URL}/system/queue/position/{document_id}",
|
||
headers=headers
|
||
)
|
||
return response.json()
|
||
|
||
# 使用示例
|
||
status = get_queue_status()
|
||
print(f"队列中等待: {status['documents']['waiting']} 个文档")
|
||
print(f"正在处理: {status['documents']['processing']} 个文档")
|
||
|
||
if status['documents']['waiting'] > 0:
|
||
print(f"等待中的文档ID: {status['documents']['waiting_ids']}")
|
||
|
||
position = get_document_position(123)
|
||
if position['status'] == 'Queued':
|
||
print(f"文档排在第 {position['position']} 位")
|
||
print(f"预计等待 {position['estimated_wait_minutes']} 分钟")
|
||
```
|
||
|
||
---
|
||
|
||
## 注意事项
|
||
|
||
1. **预估等待时间**:基于每个文档平均处理 2 分钟计算,实际时间可能因文档大小和复杂度而异。
|
||
|
||
2. **轮询频率**:建议轮询间隔不低于 3-5 秒,避免对服务器造成过大压力。
|
||
|
||
3. **权限要求**:所有 API 都需要有效的 JWT Token。
|
||
|
||
4. **队列详情 API**:建议仅管理员使用,普通用户使用位置查询 API 即可。
|
||
|
||
5. **数据一致性**:
|
||
- `queue.pending_tasks` 反映 Celery 队列中的任务数
|
||
- `documents.waiting` 反映数据库中 `status='Queued'` 的文档数
|
||
- 两者可能不完全一致(一个文档可能对应多个任务)
|
||
|
||
6. **Worker 未启动时**:
|
||
- `documents.waiting` 会正确显示等待中的文档数
|
||
- `queue.processing_tasks` 和 `documents.processing` 为 0
|
||
- 这可以用来检测 Worker 是否正常运行
|