Files
leaudit-platform-frontend/docs/QUEUE_STATUS_API.md
T
2026-01-29 16:54:45 +08:00

9.6 KiB
Raw Blame History

队列状态 API 文档

概述

队列状态 API 提供文档处理队列的实时状态查询功能,让用户能够了解:

  • 当前队列中有多少文档在等待
  • 正在处理的文档数量
  • 自己的文档在队列中的位置
  • 预估等待时间

更新日志 (2026-01-29)

  • documents 字段改为从数据库查询,不再依赖 Redis metrics
  • 新增 documents.waiting_ids 字段
  • 新增 documents.processing_details 字段
  • 修复:即使 Worker 未启动,也能正确显示等待处理的文档数量

API 端点

1. 查询队列状态

获取文档处理队列的整体状态。

请求

GET /api/v2/system/queue/status
Authorization: Bearer <token>

响应

{
  "success": true,
  "timestamp": "2026-01-28T16:45:00.123456",
  "queue": {
    "pending_tasks": 5,
    "processing_tasks": 2,
    "available_slots": 2,
    "max_concurrent": 4
  },
  "documents": {
    "waiting": 3,
    "waiting_ids": [101, 102, 103],
    "processing": 2,
    "processing_ids": [123, 456],
    "processing_details": [
      {"id": 123, "status": "Cutting"},
      {"id": 456, "status": "Extractioning"}
    ]
  }
}

响应字段说明

字段 类型 数据来源 说明
success boolean - 请求是否成功
timestamp string - 查询时间(ISO 8601 格式)
queue.pending_tasks integer Redis Celery 队列中等待的任务数
queue.processing_tasks integer Redis 正在处理的任务数(通过并发许可统计)
queue.available_slots integer Redis 可用的处理槽位
queue.max_concurrent integer 配置 最大并发数(默认 4
documents.waiting integer 数据库 等待处理的文档数(status='Queued'
documents.waiting_ids array 数据库 等待中的文档 ID 列表
documents.processing integer 数据库 正在处理的文档数
documents.processing_ids array 数据库 正在处理的文档 ID 列表
documents.processing_details array 数据库 处理中文档的详细状态

处理中文档状态

processing_details 中的 status 字段可能的值:

状态 说明
Cutting OCR 识别中
Extractioning AI 分析中
Evaluationing 规则检查中

2. 查询文档排队位置

查询指定文档在队列中的位置。

请求

GET /api/v2/system/queue/position/{document_id}
Authorization: Bearer <token>

路径参数

参数 类型 必填 说明
document_id integer 文档 ID

响应示例 - 排队中

{
  "success": true,
  "document_id": 456,
  "status": "Queued",
  "position": 3,
  "ahead_count": 2,
  "total_in_queue": 5,
  "estimated_wait_minutes": 4,
  "message": "文档在队列中排第 3 位,前面有 2 个文档"
}

响应示例 - OCR 识别中

{
  "success": true,
  "document_id": 123,
  "status": "Cutting",
  "position": 0,
  "ahead_count": 0,
  "message": "OCR 识别中"
}

响应示例 - AI 分析中

{
  "success": true,
  "document_id": 123,
  "status": "Extractioning",
  "position": 0,
  "ahead_count": 0,
  "message": "AI 分析中"
}

响应示例 - 规则检查中

{
  "success": true,
  "document_id": 123,
  "status": "Evaluationing",
  "position": 0,
  "ahead_count": 0,
  "message": "规则检查中"
}

响应示例 - 处理完成

{
  "success": true,
  "document_id": 789,
  "status": "Processed",
  "position": null,
  "ahead_count": 0,
  "message": "文档处理完成"
}

响应示例 - 处理失败

{
  "success": true,
  "document_id": 789,
  "status": "Failed",
  "position": null,
  "ahead_count": 0,
  "message": "文档处理失败"
}

响应字段说明

字段 类型 说明
success boolean 请求是否成功
document_id integer 文档 ID
status string 文档状态(见下表)
position integer 在队列中的位置(从 1 开始)
ahead_count integer 前面排队的文档数
total_in_queue integer 队列中的总文档数
estimated_wait_minutes integer 预估等待时间(分钟)
message string 状态描述

文档状态说明

状态 说明 阶段
Queued 排队等待中 等待
Cutting OCR 识别中 处理
Extractioning AI 分析中 处理
Evaluationing 规则检查中 处理
Processed 处理完成 完成
Failed 处理失败 完成

3. 查询队列详情(管理员)

获取队列中任务的详细信息,用于管理和监控。

请求

GET /api/v2/system/queue/details?limit=20
Authorization: Bearer <token>

查询参数

参数 类型 必填 默认值 说明
limit integer 20 返回的最大任务数(1-100

响应

{
  "success": true,
  "total_in_queue": 5,
  "showing": 5,
  "tasks": [
    {
      "position": 1,
      "task_name": "document.ocr_processing_v2",
      "task_id": "abc123-def456-...",
      "document_id": 123
    },
    {
      "position": 2,
      "task_name": "extractionV3.extraction_processing",
      "task_id": "ghi789-jkl012-...",
      "document_id": 456
    }
  ]
}

响应字段说明

字段 类型 说明
success boolean 请求是否成功
total_in_queue integer 队列中的总任务数
showing integer 当前返回的任务数
tasks array 任务列表
tasks[].position integer 任务在队列中的位置
tasks[].task_name string 任务名称
tasks[].task_id string Celery 任务 ID
tasks[].document_id integer 关联的文档 ID

错误响应

所有 API 在发生错误时返回统一格式:

{
  "detail": "错误描述信息"
}

HTTP 状态码

状态码 说明
200 成功
401 未授权(Token 无效或过期)
500 服务器内部错误

使用示例

cURL

# 查询队列状态
curl -X GET "http://localhost:8000/api/v2/system/queue/status" \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

# 查询文档位置
curl -X GET "http://localhost:8000/api/v2/system/queue/position/123" \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

# 查询队列详情
curl -X GET "http://localhost:8000/api/v2/system/queue/details?limit=10" \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."

JavaScript (Axios)

import axios from 'axios';

const api = axios.create({
  baseURL: 'http://localhost:8000/api/v2',
  headers: {
    'Authorization': `Bearer ${token}`
  }
});

// 查询队列状态
const getQueueStatus = async () => {
  const response = await api.get('/system/queue/status');
  return response.data;
};

// 查询文档位置
const getDocumentPosition = async (documentId) => {
  const response = await api.get(`/system/queue/position/${documentId}`);
  return response.data;
};

// 轮询文档状态
const pollDocumentStatus = async (documentId, interval = 5000) => {
  const poll = async () => {
    const result = await getDocumentPosition(documentId);

    if (result.status === 'Processed' || result.status === 'Failed') {
      console.log('文档处理完成或失败');
      return result;
    }

    if (result.status === 'Queued') {
      console.log(`排队中,前面还有 ${result.ahead_count} 个文档`);
    } else {
      console.log(`处理中: ${result.status}`);
    }

    // 继续轮询
    setTimeout(poll, interval);
  };

  return poll();
};

Python

import requests

BASE_URL = "http://localhost:8000/api/v2"
TOKEN = "your_jwt_token"

headers = {
    "Authorization": f"Bearer {TOKEN}"
}

# 查询队列状态
def get_queue_status():
    response = requests.get(f"{BASE_URL}/system/queue/status", headers=headers)
    return response.json()

# 查询文档位置
def get_document_position(document_id: int):
    response = requests.get(
        f"{BASE_URL}/system/queue/position/{document_id}",
        headers=headers
    )
    return response.json()

# 使用示例
status = get_queue_status()
print(f"队列中等待: {status['documents']['waiting']} 个文档")
print(f"正在处理: {status['documents']['processing']} 个文档")

if status['documents']['waiting'] > 0:
    print(f"等待中的文档ID: {status['documents']['waiting_ids']}")

position = get_document_position(123)
if position['status'] == 'Queued':
    print(f"文档排在第 {position['position']} 位")
    print(f"预计等待 {position['estimated_wait_minutes']} 分钟")

注意事项

  1. 预估等待时间:基于每个文档平均处理 2 分钟计算,实际时间可能因文档大小和复杂度而异。

  2. 轮询频率:建议轮询间隔不低于 3-5 秒,避免对服务器造成过大压力。

  3. 权限要求:所有 API 都需要有效的 JWT Token。

  4. 队列详情 API:建议仅管理员使用,普通用户使用位置查询 API 即可。

  5. 数据一致性

    • queue.pending_tasks 反映 Celery 队列中的任务数
    • documents.waiting 反映数据库中 status='Queued' 的文档数
    • 两者可能不完全一致(一个文档可能对应多个任务)
  6. Worker 未启动时

    • documents.waiting 会正确显示等待中的文档数
    • queue.processing_tasksdocuments.processing 为 0
    • 这可以用来检测 Worker 是否正常运行