新增上传队列显示

This commit is contained in:
2026-01-29 16:54:45 +08:00
parent 7e6424e9ac
commit 0a436311c8
5 changed files with 920 additions and 34 deletions
+1
View File
@@ -113,6 +113,7 @@ function getDocumentTypeIdsFromSession(): number[] | null {
export enum DocumentStatus {
waiting = 'waiting',
WAITING = "Waiting",
QUEUED = "Queued", // 排队中
CUTTING = "Cutting",
EXTRACTIONING = "Extractioning",
EVALUATIONING = "Evaluationing",
+59
View File
@@ -0,0 +1,59 @@
// app/api/queue.ts
// 队列状态 API 客户端
import axios from 'axios';
import { API_BASE_URL } from '../config/api-config';
/**
* 队列状态响应接口
*/
export interface QueueStatus {
success: boolean;
timestamp: string;
queue: {
pending_tasks: number;
processing_tasks: number;
available_slots: number;
max_concurrent: number;
};
documents: {
waiting: number; // 排队中的文档数
processing: number; // 处理中的文档数
processing_ids: string[];
};
}
/**
* 获取队列整体状态
* @returns 队列状态信息
*/
export async function getQueueStatus(): Promise<{ data?: QueueStatus; error?: string }> {
try {
// 从 localStorage 获取 token
let token: string | null = null;
if (typeof window !== 'undefined') {
token = localStorage.getItem('access_token');
}
const headers: Record<string, string> = {
'Accept': 'application/json'
};
if (token) {
headers['Authorization'] = `Bearer ${token}`;
}
const response = await axios.get<QueueStatus>(
`${API_BASE_URL}/api/v2/system/queue/status`,
{ headers }
);
return { data: response.data };
} catch (error) {
console.error('【队列状态】获取队列状态失败:', error);
if (axios.isAxiosError(error)) {
return { error: error.response?.data?.detail || error.message };
}
return { error: error instanceof Error ? error.message : '获取队列状态失败' };
}
}
+86 -20
View File
@@ -26,6 +26,7 @@ import {
} from "~/api/files/files-upload";
import { updateDocumentAuditStatus } from "~/api/evaluation_points/rules-files";
import { links as fileTypeTagLinks } from "~/components/ui/FileTypeTag";
import { getQueueStatus, type QueueStatus } from "~/api/queue";
export function links() {
return [
@@ -375,6 +376,7 @@ export default function FilesUpload() {
const [uploadStage, setUploadStage] = useState<"idle" | "uploading" | "processing" | "completed" | "hadden">("idle");
const [processingSteps, setProcessingSteps] = useState<Step[]>([
{ title: "文件上传", description: "等待上传文件到服务器...", status: "waiting" },
{ title: "排队等待", description: "等待进入处理队列", status: "waiting" },
{ title: "文档转换拆分", description: "转换文档格式,拆分文档内容", status: "waiting" },
{ title: "评查点抽取", description: "DeepSeek 抽取中", status: "waiting" },
{ title: "评查点审核", description: "DeepSeek 评查中", status: "waiting" },
@@ -386,6 +388,9 @@ export default function FilesUpload() {
const [queueFiles, setQueueFiles] = useState<Document[]>([]);
const [documentTypesState, setDocumentTypesState] = useState<DocumentType[]>([]);
// 全局队列状态(用于显示排队统计)
const [globalQueueStatus, setGlobalQueueStatus] = useState<QueueStatus | null>(null);
// 在组件挂载时从 sessionStorage 获取 documentTypeIds
useEffect(() => {
try {
@@ -544,6 +549,34 @@ export default function FilesUpload() {
};
}, []);
// 全局队列状态轮询 - 每10秒获取一次队列统计
useEffect(() => {
// 立即获取一次队列状态
const fetchQueueStatus = async () => {
try {
const response = await getQueueStatus();
if (response.data && isMountedRef.current) {
setGlobalQueueStatus(response.data);
}
} catch (error) {
console.error('获取全局队列状态失败:', error);
}
};
fetchQueueStatus();
// 设置轮询定时器
const intervalId = setInterval(() => {
if (isMountedRef.current) {
fetchQueueStatus();
}
}, 10000);
return () => {
clearInterval(intervalId);
};
}, []);
// 启动状态检查定时器的函数
const startStatusChecker = (files: Document[]) => {
console.log('启动状态检查定时器,队列文件数量:', files.length);
@@ -1678,12 +1711,12 @@ export default function FilesUpload() {
// 更新上传阶段
setUploadStage("processing");
// 更新步骤状态
// 更新步骤状态:上传完成,进入排队等待
const updatedSteps = processingSteps.map(step => ({...step}));
updatedSteps[0].status = "done";
updatedSteps[0].description = "文件已成功上传到服务器";
updatedSteps[1].status = "active";
updatedSteps[1].description = "正在转换文档格式,拆分文档内容...";
updatedSteps[1].description = "文档正在排队等待处理...";
setProcessingSteps(updatedSteps);
@@ -1810,10 +1843,11 @@ export default function FilesUpload() {
}));
completedSteps[0].description = "文件已成功上传到服务器";
completedSteps[1].description = "文档格式转换完成,内容已拆分";
completedSteps[2].description = "评查点已成功抽取";
completedSteps[3].description = "文档评查已完成";
completedSteps[4].description = "文档已准备就绪,可以查看";
completedSteps[1].description = "已进入处理队列";
completedSteps[2].description = "文档格式转换完成,内容已拆分";
completedSteps[3].description = "评查点已成功抽取";
completedSteps[4].description = "文档评查已完成";
completedSteps[5].description = "文档已准备就绪,可以查看";
setProcessingSteps(completedSteps);
setUploadStage("completed");
@@ -1834,6 +1868,7 @@ export default function FilesUpload() {
};
// 更新处理步骤状态
// 步骤索引: 0-文件上传, 1-排队等待, 2-文档转换拆分, 3-评查点抽取, 4-评查点审核, 5-审核准备
const updateProcessingSteps = (status: DocumentStatus) => {
// console.log('更新处理步骤状态:', status);
@@ -1844,42 +1879,57 @@ export default function FilesUpload() {
step.status = "waiting";
});
// 第一步始终是完成的
// 第一步始终是完成的(文件上传)
updatedSteps[0].status = "done";
updatedSteps[0].description = "文件已成功上传到服务器";
// 根据状态更新步骤
switch (status) {
case DocumentStatus.CUTTING:
case DocumentStatus.QUEUED:
// 排队等待中
updatedSteps[1].status = "active";
updatedSteps[1].description = "正在转换文档格式,拆分文档内容...";
updatedSteps[1].description = "文档正在排队等待处理...";
break;
case DocumentStatus.CUTTING:
// 排队完成,开始转换
updatedSteps[1].status = "done";
updatedSteps[1].description = "已进入处理队列";
updatedSteps[2].status = "active";
updatedSteps[2].description = "正在转换文档格式,拆分文档内容...";
break;
case DocumentStatus.EXTRACTIONING:
updatedSteps[1].status = "done";
updatedSteps[1].description = "文档格式转换完成,内容已拆分";
updatedSteps[2].status = "active";
updatedSteps[2].description = "正在抽取评查点...";
updatedSteps[1].description = "已进入处理队列";
updatedSteps[2].status = "done";
updatedSteps[2].description = "文档格式转换完成,内容已拆分";
updatedSteps[3].status = "active";
updatedSteps[3].description = "正在抽取评查点...";
break;
case DocumentStatus.EVALUATIONING:
updatedSteps[1].status = "done";
updatedSteps[1].description = "文档格式转换完成,内容已拆分";
updatedSteps[1].description = "已进入处理队列";
updatedSteps[2].status = "done";
updatedSteps[2].description = "评查点已成功抽取";
updatedSteps[3].status = "active";
updatedSteps[3].description = "正在评查文档...";
updatedSteps[2].description = "文档格式转换完成,内容已拆分";
updatedSteps[3].status = "done";
updatedSteps[3].description = "评查点已成功抽取";
updatedSteps[4].status = "active";
updatedSteps[4].description = "正在评查文档...";
break;
case DocumentStatus.PROCESSED:
updatedSteps[1].status = "done";
updatedSteps[1].description = "文档格式转换完成,内容已拆分";
updatedSteps[1].description = "已进入处理队列";
updatedSteps[2].status = "done";
updatedSteps[2].description = "评查点已成功抽取";
updatedSteps[2].description = "文档格式转换完成,内容已拆分";
updatedSteps[3].status = "done";
updatedSteps[3].description = "文档评查已完成";
updatedSteps[3].description = "评查点已成功抽取";
updatedSteps[4].status = "done";
updatedSteps[4].description = "文档已准备就绪,可以查看";
updatedSteps[4].description = "文档评查已完成";
updatedSteps[5].status = "done";
updatedSteps[5].description = "文档已准备就绪,可以查看";
break;
}
@@ -1934,6 +1984,7 @@ export default function FilesUpload() {
// 重置步骤状态
setProcessingSteps([
{ title: "文件上传", description: "等待上传文件到服务器...", status: "waiting" },
{ title: "排队等待", description: "等待进入处理队列", status: "waiting" },
{ title: "文档转换拆分", description: "转换文档格式,拆分文档内容", status: "waiting" },
{ title: "评查点抽取", description: "DeepSeek 抽取中", status: "waiting" },
{ title: "评查点审核", description: "DeepSeek 评查中", status: "waiting" },
@@ -2212,7 +2263,22 @@ export default function FilesUpload() {
<div className="file-upload-page">
{/* 页面头部 */}
<div className="page-header">
<div className="flex items-center gap-4">
<h2 className="page-title"></h2>
{/* 全局队列状态显示 */}
{globalQueueStatus && (
<div className="flex items-center gap-3 text-sm">
<span className="flex items-center gap-1 text-orange-600">
<i className="ri-time-line"></i>
: {globalQueueStatus.documents.waiting}
</span>
<span className="flex items-center gap-1 text-blue-600">
<i className="ri-loader-4-line"></i>
: {globalQueueStatus.documents.processing}
</span>
</div>
)}
</div>
<button className="ant-btn ant-btn-default flex items-center my-2" onClick={()=>handleBackClick()}><i className="ri-arrow-left-line"></i>{isNavigating ? '返回中...' : '返回'}</button>
</div>
+353
View File
@@ -0,0 +1,353 @@
# 前端队列状态 API 接口文档
## 概述
本文档说明队列状态查询的 API 接口,前端可以通过这些接口获取队列统计信息和文档排队位置。
**重要更新 (2026-01-29)**
- `documents` 字段现在从**数据库**查询,而不是 Redis metrics
- 即使 Worker 未启动,也能正确显示等待处理的文档数量
- 新增 `waiting_ids``processing_details` 字段
---
## 接口 1:查询队列整体状态
**用途**:获取全局队列统计信息(排队数、处理数)
**请求**
```http
GET /api/v2/system/queue/status
Authorization: Bearer <your_jwt_token>
```
**响应**
```json
{
"success": true,
"timestamp": "2026-01-28T16:45:00.123456",
"queue": {
"pending_tasks": 5,
"processing_tasks": 2,
"available_slots": 2,
"max_concurrent": 4
},
"documents": {
"waiting": 3,
"waiting_ids": [101, 102, 103],
"processing": 2,
"processing_ids": [123, 456],
"processing_details": [
{"id": 123, "status": "Cutting"},
{"id": 456, "status": "Extractioning"}
]
}
}
```
**响应字段说明**
| 字段 | 类型 | 说明 |
|------|------|------|
| `success` | boolean | 请求是否成功 |
| `timestamp` | string | 查询时间 |
| `queue.pending_tasks` | number | Celery 队列中等待的任务数 |
| `queue.processing_tasks` | number | 正在处理的任务数(通过并发许可统计) |
| `queue.available_slots` | number | 可用的处理槽位 |
| `queue.max_concurrent` | number | 最大并发数 |
| `documents.waiting` | number | **等待处理的文档数**(数据库 status='Queued' |
| `documents.waiting_ids` | number[] | **等待中的文档ID列表** |
| `documents.processing` | number | **正在处理的文档数**(数据库 status in ['Cutting', 'Extractioning', 'Evaluationing'] |
| `documents.processing_ids` | number[] | 正在处理的文档ID列表 |
| `documents.processing_details` | object[] | **处理中文档的详细状态** |
**数据来源说明**
| 字段 | 数据来源 | 说明 |
|------|----------|------|
| `queue.*` | Redis | 从 Celery 队列和并发控制器获取 |
| `documents.*` | PostgreSQL | 从数据库 documents 表查询 |
> ⚠️ **注意**:即使 Worker 未启动,`documents.waiting` 也会正确显示等待处理的文档数量。
---
## 接口 2:查询单个文档位置
**用途**:查询指定文档在队列中的位置、预估等待时间、以及细分处理状态
**请求**
```http
GET /api/v2/system/queue/position/{document_id}
Authorization: Bearer <your_jwt_token>
```
**响应示例 - 排队中**
```json
{
"success": true,
"document_id": 456,
"status": "Queued",
"position": 3,
"ahead_count": 2,
"total_in_queue": 5,
"estimated_wait_minutes": 4,
"message": "文档在队列中排第 3 位,前面有 2 个文档"
}
```
**响应示例 - OCR 识别中**
```json
{
"success": true,
"document_id": 123,
"status": "Cutting",
"position": 0,
"ahead_count": 0,
"message": "OCR 识别中"
}
```
**响应示例 - AI 分析中**
```json
{
"success": true,
"document_id": 123,
"status": "Extractioning",
"position": 0,
"ahead_count": 0,
"message": "AI 分析中"
}
```
**响应示例 - 规则检查中**
```json
{
"success": true,
"document_id": 123,
"status": "Evaluationing",
"position": 0,
"ahead_count": 0,
"message": "规则检查中"
}
```
**响应示例 - 处理完成**
```json
{
"success": true,
"document_id": 789,
"status": "Processed",
"position": null,
"ahead_count": 0,
"message": "文档处理完成"
}
```
**响应示例 - 处理失败**
```json
{
"success": true,
"document_id": 789,
"status": "Failed",
"position": null,
"ahead_count": 0,
"message": "文档处理失败"
}
```
**响应字段说明**
| 字段 | 类型 | 说明 |
|------|------|------|
| `success` | boolean | 请求是否成功 |
| `document_id` | number | 文档ID |
| `status` | string | **细分状态**(见下表) |
| `position` | number | 在队列中的位置(从1开始,处理中或完成时为0或null) |
| `ahead_count` | number | 前面排队的文档数 |
| `total_in_queue` | number | 队列中的总文档数(仅排队时返回) |
| `estimated_wait_minutes` | number | 预估等待时间(分钟,仅排队时返回) |
| `message` | string | 状态描述 |
**status 字段可能的值**
| 状态值 | 说明 | 进度 | 建议显示 |
|--------|------|------|----------|
| `Queued` | 排队中 | 0% | 显示排队位置和预估时间 |
| `Cutting` | OCR 识别中 | 33% | 显示进度条 + "正在识别文档..." |
| `Extractioning` | AI 分析中 | 66% | 显示进度条 + "正在分析内容..." |
| `Evaluationing` | 规则检查中 | 90% | 显示进度条 + "正在检查规则..." |
| `Processed` | 处理完成 | 100% | 显示完成提示,可跳转查看结果 |
| `Failed` | 处理失败 | - | 显示错误提示,可查看详情 |
---
## 文档状态流转图
```
上传文档
┌─────────┐
│ Queued │ ← 文档初始状态(等待 Worker 处理)
└────┬────┘
│ Worker 开始处理
┌─────────┐
│ Cutting │ ← OCR 识别阶段
└────┬────┘
┌──────────────┐
│Extractioning │ ← AI 分析阶段
└──────┬───────┘
┌──────────────┐
│Evaluationing │ ← 规则检查阶段
└──────┬───────┘
├────────────┐
▼ ▼
┌──────────┐ ┌────────┐
│Processed │ │ Failed │
└──────────┘ └────────┘
```
---
## 前端集成建议
### 1. 上传后立即显示队列状态
```javascript
// 上传文档后
const uploadResult = await uploadDocument(file);
const documentId = uploadResult.result.id;
// 立即查询队列状态
const position = await api.get(`/system/queue/position/${documentId}`);
if (position.status === 'Queued') {
showNotification(`文档已加入队列,前面有 ${position.ahead_count} 个文档`);
}
```
### 2. 轮询文档处理状态
```javascript
const pollDocumentStatus = async (documentId) => {
const poll = async () => {
const result = await api.get(`/system/queue/position/${documentId}`);
switch (result.status) {
case 'Queued':
updateProgress(0, `排队中,前面还有 ${result.ahead_count} 个文档`);
break;
case 'Cutting':
updateProgress(33, 'OCR 识别中...');
break;
case 'Extractioning':
updateProgress(66, 'AI 分析中...');
break;
case 'Evaluationing':
updateProgress(90, '规则检查中...');
break;
case 'Processed':
updateProgress(100, '处理完成');
return result; // 停止轮询
case 'Failed':
showError('处理失败');
return result; // 停止轮询
}
// 继续轮询
setTimeout(poll, 5000);
};
return poll();
};
```
### 3. 显示全局队列状态
```javascript
// 在页面顶部或侧边栏显示队列状态
const QueueStatusBadge = () => {
const [status, setStatus] = useState(null);
useEffect(() => {
const fetchStatus = async () => {
const data = await api.get('/system/queue/status');
setStatus(data);
};
fetchStatus();
const interval = setInterval(fetchStatus, 10000); // 每 10 秒刷新
return () => clearInterval(interval);
}, []);
if (!status) return null;
return (
<div className="queue-status">
<span>等待: {status.documents.waiting}</span>
<span>处理中: {status.documents.processing}</span>
</div>
);
};
```
---
## 轮询建议
| 场景 | 推荐间隔 | 说明 |
|------|----------|------|
| 队列整体状态 | 10 秒 | 用于全局状态显示 |
| 单个文档位置 | 5 秒 | 用于进度跟踪 |
| 文档排队中 | 5 秒 | 显示排队位置变化 |
| 文档处理中 | 3 秒 | 更频繁更新进度 |
**停止轮询条件**:当 `status` 变为 `Processed``Failed` 时停止轮询
---
## 错误响应
所有接口在发生错误时返回:
```json
{
"detail": "错误描述信息"
}
```
**HTTP 状态码**
- `200` - 成功
- `401` - 未授权(Token 无效或过期)
- `500` - 服务器内部错误
---
## 常见问题
### Q1: 为什么 `queue.pending_tasks` 和 `documents.waiting` 数量不一致?
**A**:
- `queue.pending_tasks` 统计的是 Celery 队列中的**任务数**
- `documents.waiting` 统计的是数据库中 `status='Queued'` 的**文档数**
- 一个文档可能对应多个任务,或者有些任务不是文档处理任务
### Q2: Worker 未启动时会显示什么?
**A**:
- `queue.pending_tasks` 会显示队列中等待的任务数
- `documents.waiting` 会正确显示等待处理的文档数(从数据库查询)
- `queue.processing_tasks``documents.processing` 为 0
### Q3: 如何判断系统是否正常运行?
**A**:
- 如果 `documents.waiting > 0``queue.processing_tasks == 0` 且持续较长时间,可能 Worker 未启动
- 正常情况下,应该有文档在处理(`processing > 0`)或所有文档已处理完(`waiting == 0`
+407
View File
@@ -0,0 +1,407 @@
# 队列状态 API 文档
## 概述
队列状态 API 提供文档处理队列的实时状态查询功能,让用户能够了解:
- 当前队列中有多少文档在等待
- 正在处理的文档数量
- 自己的文档在队列中的位置
- 预估等待时间
**更新日志 (2026-01-29)**
- `documents` 字段改为从数据库查询,不再依赖 Redis metrics
- 新增 `documents.waiting_ids` 字段
- 新增 `documents.processing_details` 字段
- 修复:即使 Worker 未启动,也能正确显示等待处理的文档数量
## API 端点
### 1. 查询队列状态
获取文档处理队列的整体状态。
**请求**
```http
GET /api/v2/system/queue/status
Authorization: Bearer <token>
```
**响应**
```json
{
"success": true,
"timestamp": "2026-01-28T16:45:00.123456",
"queue": {
"pending_tasks": 5,
"processing_tasks": 2,
"available_slots": 2,
"max_concurrent": 4
},
"documents": {
"waiting": 3,
"waiting_ids": [101, 102, 103],
"processing": 2,
"processing_ids": [123, 456],
"processing_details": [
{"id": 123, "status": "Cutting"},
{"id": 456, "status": "Extractioning"}
]
}
}
```
**响应字段说明**
| 字段 | 类型 | 数据来源 | 说明 |
|------|------|----------|------|
| `success` | boolean | - | 请求是否成功 |
| `timestamp` | string | - | 查询时间(ISO 8601 格式) |
| `queue.pending_tasks` | integer | Redis | Celery 队列中等待的任务数 |
| `queue.processing_tasks` | integer | Redis | 正在处理的任务数(通过并发许可统计) |
| `queue.available_slots` | integer | Redis | 可用的处理槽位 |
| `queue.max_concurrent` | integer | 配置 | 最大并发数(默认 4) |
| `documents.waiting` | integer | **数据库** | 等待处理的文档数(status='Queued' |
| `documents.waiting_ids` | array | **数据库** | 等待中的文档 ID 列表 |
| `documents.processing` | integer | **数据库** | 正在处理的文档数 |
| `documents.processing_ids` | array | **数据库** | 正在处理的文档 ID 列表 |
| `documents.processing_details` | array | **数据库** | 处理中文档的详细状态 |
**处理中文档状态**
`processing_details` 中的 `status` 字段可能的值:
| 状态 | 说明 |
|------|------|
| `Cutting` | OCR 识别中 |
| `Extractioning` | AI 分析中 |
| `Evaluationing` | 规则检查中 |
---
### 2. 查询文档排队位置
查询指定文档在队列中的位置。
**请求**
```http
GET /api/v2/system/queue/position/{document_id}
Authorization: Bearer <token>
```
**路径参数**
| 参数 | 类型 | 必填 | 说明 |
|------|------|------|------|
| `document_id` | integer | 是 | 文档 ID |
**响应示例 - 排队中**
```json
{
"success": true,
"document_id": 456,
"status": "Queued",
"position": 3,
"ahead_count": 2,
"total_in_queue": 5,
"estimated_wait_minutes": 4,
"message": "文档在队列中排第 3 位,前面有 2 个文档"
}
```
**响应示例 - OCR 识别中**
```json
{
"success": true,
"document_id": 123,
"status": "Cutting",
"position": 0,
"ahead_count": 0,
"message": "OCR 识别中"
}
```
**响应示例 - AI 分析中**
```json
{
"success": true,
"document_id": 123,
"status": "Extractioning",
"position": 0,
"ahead_count": 0,
"message": "AI 分析中"
}
```
**响应示例 - 规则检查中**
```json
{
"success": true,
"document_id": 123,
"status": "Evaluationing",
"position": 0,
"ahead_count": 0,
"message": "规则检查中"
}
```
**响应示例 - 处理完成**
```json
{
"success": true,
"document_id": 789,
"status": "Processed",
"position": null,
"ahead_count": 0,
"message": "文档处理完成"
}
```
**响应示例 - 处理失败**
```json
{
"success": true,
"document_id": 789,
"status": "Failed",
"position": null,
"ahead_count": 0,
"message": "文档处理失败"
}
```
**响应字段说明**
| 字段 | 类型 | 说明 |
|------|------|------|
| `success` | boolean | 请求是否成功 |
| `document_id` | integer | 文档 ID |
| `status` | string | 文档状态(见下表) |
| `position` | integer | 在队列中的位置(从 1 开始) |
| `ahead_count` | integer | 前面排队的文档数 |
| `total_in_queue` | integer | 队列中的总文档数 |
| `estimated_wait_minutes` | integer | 预估等待时间(分钟) |
| `message` | string | 状态描述 |
**文档状态说明**
| 状态 | 说明 | 阶段 |
|------|------|------|
| `Queued` | 排队等待中 | 等待 |
| `Cutting` | OCR 识别中 | 处理 |
| `Extractioning` | AI 分析中 | 处理 |
| `Evaluationing` | 规则检查中 | 处理 |
| `Processed` | 处理完成 | 完成 |
| `Failed` | 处理失败 | 完成 |
---
### 3. 查询队列详情(管理员)
获取队列中任务的详细信息,用于管理和监控。
**请求**
```http
GET /api/v2/system/queue/details?limit=20
Authorization: Bearer <token>
```
**查询参数**
| 参数 | 类型 | 必填 | 默认值 | 说明 |
|------|------|------|--------|------|
| `limit` | integer | 否 | 20 | 返回的最大任务数(1-100) |
**响应**
```json
{
"success": true,
"total_in_queue": 5,
"showing": 5,
"tasks": [
{
"position": 1,
"task_name": "document.ocr_processing_v2",
"task_id": "abc123-def456-...",
"document_id": 123
},
{
"position": 2,
"task_name": "extractionV3.extraction_processing",
"task_id": "ghi789-jkl012-...",
"document_id": 456
}
]
}
```
**响应字段说明**
| 字段 | 类型 | 说明 |
|------|------|------|
| `success` | boolean | 请求是否成功 |
| `total_in_queue` | integer | 队列中的总任务数 |
| `showing` | integer | 当前返回的任务数 |
| `tasks` | array | 任务列表 |
| `tasks[].position` | integer | 任务在队列中的位置 |
| `tasks[].task_name` | string | 任务名称 |
| `tasks[].task_id` | string | Celery 任务 ID |
| `tasks[].document_id` | integer | 关联的文档 ID |
---
## 错误响应
所有 API 在发生错误时返回统一格式:
```json
{
"detail": "错误描述信息"
}
```
**HTTP 状态码**
| 状态码 | 说明 |
|--------|------|
| 200 | 成功 |
| 401 | 未授权(Token 无效或过期) |
| 500 | 服务器内部错误 |
---
## 使用示例
### cURL
```bash
# 查询队列状态
curl -X GET "http://localhost:8000/api/v2/system/queue/status" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 查询文档位置
curl -X GET "http://localhost:8000/api/v2/system/queue/position/123" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
# 查询队列详情
curl -X GET "http://localhost:8000/api/v2/system/queue/details?limit=10" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
```
### JavaScript (Axios)
```javascript
import axios from 'axios';
const api = axios.create({
baseURL: 'http://localhost:8000/api/v2',
headers: {
'Authorization': `Bearer ${token}`
}
});
// 查询队列状态
const getQueueStatus = async () => {
const response = await api.get('/system/queue/status');
return response.data;
};
// 查询文档位置
const getDocumentPosition = async (documentId) => {
const response = await api.get(`/system/queue/position/${documentId}`);
return response.data;
};
// 轮询文档状态
const pollDocumentStatus = async (documentId, interval = 5000) => {
const poll = async () => {
const result = await getDocumentPosition(documentId);
if (result.status === 'Processed' || result.status === 'Failed') {
console.log('文档处理完成或失败');
return result;
}
if (result.status === 'Queued') {
console.log(`排队中,前面还有 ${result.ahead_count} 个文档`);
} else {
console.log(`处理中: ${result.status}`);
}
// 继续轮询
setTimeout(poll, interval);
};
return poll();
};
```
### Python
```python
import requests
BASE_URL = "http://localhost:8000/api/v2"
TOKEN = "your_jwt_token"
headers = {
"Authorization": f"Bearer {TOKEN}"
}
# 查询队列状态
def get_queue_status():
response = requests.get(f"{BASE_URL}/system/queue/status", headers=headers)
return response.json()
# 查询文档位置
def get_document_position(document_id: int):
response = requests.get(
f"{BASE_URL}/system/queue/position/{document_id}",
headers=headers
)
return response.json()
# 使用示例
status = get_queue_status()
print(f"队列中等待: {status['documents']['waiting']} 个文档")
print(f"正在处理: {status['documents']['processing']} 个文档")
if status['documents']['waiting'] > 0:
print(f"等待中的文档ID: {status['documents']['waiting_ids']}")
position = get_document_position(123)
if position['status'] == 'Queued':
print(f"文档排在第 {position['position']}")
print(f"预计等待 {position['estimated_wait_minutes']} 分钟")
```
---
## 注意事项
1. **预估等待时间**:基于每个文档平均处理 2 分钟计算,实际时间可能因文档大小和复杂度而异。
2. **轮询频率**:建议轮询间隔不低于 3-5 秒,避免对服务器造成过大压力。
3. **权限要求**:所有 API 都需要有效的 JWT Token。
4. **队列详情 API**:建议仅管理员使用,普通用户使用位置查询 API 即可。
5. **数据一致性**
- `queue.pending_tasks` 反映 Celery 队列中的任务数
- `documents.waiting` 反映数据库中 `status='Queued'` 的文档数
- 两者可能不完全一致(一个文档可能对应多个任务)
6. **Worker 未启动时**
- `documents.waiting` 会正确显示等待中的文档数
- `queue.processing_tasks``documents.processing` 为 0
- 这可以用来检测 Worker 是否正常运行