From f3b83c9979ce14797b429301b97249fdddf9b7bb Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Wed, 29 Apr 2026 11:48:09 +0800 Subject: [PATCH] feat: add async worker queues and retry controls --- app.toml | 19 + docs/leaudit/并发与重试参数说明.md | 149 ++++++++ docs/规则编辑/worker并发执行改造方案.md | 336 ++++++++++++++++++ fastapi_admin/celery_app.py | 50 +++ fastapi_admin/config/__init__.pyi | 19 + fastapi_admin/config/_settings.py | 19 + .../controllers/auditController.py | 1 + .../fastapi_leaudit/domian/Dto/auditDto.py | 1 + .../leaudit_bridge/auditServiceFactory.py | 7 +- .../leaudit_bridge/client_factory.py | 62 +++- .../leaudit_bridge/ocr_bridge.py | 98 +++-- .../leaudit_bridge/resilient_clients.py | 322 +++++++++++++++++ .../fastapi_leaudit/leaudit_bridge/tasks.py | 197 ++++++++-- .../fastapi_leaudit/services/auditService.py | 8 +- .../services/impl/auditServiceImpl.py | 96 +++-- scripts/start_worker.sh | 28 ++ 16 files changed, 1316 insertions(+), 96 deletions(-) create mode 100644 docs/leaudit/并发与重试参数说明.md create mode 100644 docs/规则编辑/worker并发执行改造方案.md create mode 100644 fastapi_admin/celery_app.py create mode 100644 fastapi_modules/fastapi_leaudit/leaudit_bridge/resilient_clients.py create mode 100755 scripts/start_worker.sh diff --git a/app.toml b/app.toml index bd4c0cc..07e8d3b 100644 --- a/app.toml +++ b/app.toml @@ -51,3 +51,22 @@ RULES_DIR = "rules" RESCUE_MODE = "auto" LLM_MAX_CONCURRENCY = 5 VLM_MAX_CONCURRENCY = 3 +LLM_REQUEST_TIMEOUT = 120 +LLM_RETRY_MAX_ATTEMPTS = 3 +LLM_RETRY_BACKOFF_BASE_SECONDS = 1 +VLM_REQUEST_TIMEOUT = 45 +VLM_RETRY_MAX_ATTEMPTS = 2 +VLM_RETRY_BACKOFF_BASE_SECONDS = 1 +OCR_VLM_CONCURRENCY = 32 +OCR_RETRY_MAX_ATTEMPTS = 3 +OCR_RETRY_BACKOFF_BASE_SECONDS = 1 +SIGNATURE_PROBE_CONCURRENCY = 2 +SIGNATURE_PROBE_TIMEOUT = 20 +SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS = 2 +SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS = 0.5 +WORKER_QUEUE_URGENT = "leaudit.urgent" +WORKER_QUEUE_NORMAL = "leaudit.normal" +WORKER_CONCURRENCY = 2 +RUN_LOCK_SECONDS = 1800 +TASK_SOFT_TIME_LIMIT = 3300 +TASK_TIME_LIMIT = 3600 diff --git a/docs/leaudit/并发与重试参数说明.md b/docs/leaudit/并发与重试参数说明.md new file mode 100644 index 0000000..d2bc62e --- /dev/null +++ b/docs/leaudit/并发与重试参数说明.md @@ -0,0 +1,149 @@ +# LeAudit 并发与重试参数说明 + +这份文档记录当前 `leaudit-platform` 已经落地的并发与重试参数,避免后面再忘。 + +--- + +## 1. 为什么没有用 `RETRY_BACKOFF_SECONDS = [1, 2, 4]` + +这次我没有把退避时间设计成数组,而是改成了: + +- `*_RETRY_MAX_ATTEMPTS` +- `*_RETRY_BACKOFF_BASE_SECONDS` + +原因很简单: + +- 配置更短,更容易看懂 +- 不用每次都手写 `[1, 2, 4]` +- 代码统一按“指数退避”算等待时间 + +计算方式: + +```text +第 1 次重试等待 = base +第 2 次重试等待 = base * 2 +第 3 次重试等待 = base * 4 +... +``` + +例如: + +```toml +LLM_RETRY_MAX_ATTEMPTS = 3 +LLM_RETRY_BACKOFF_BASE_SECONDS = 1 +``` + +表示: + +- 第 1 次请求失败后,等 `1s` +- 第 2 次再失败后,等 `2s` +- 总共最多尝试 `3` 次 + +--- + +## 2. 当前已正式化的参数 + +当前 `app.toml` 中已启用: + +```toml +[LEAUDIT] +LLM_MAX_CONCURRENCY = 5 +VLM_MAX_CONCURRENCY = 3 + +LLM_REQUEST_TIMEOUT = 120 +LLM_RETRY_MAX_ATTEMPTS = 3 +LLM_RETRY_BACKOFF_BASE_SECONDS = 1 + +VLM_REQUEST_TIMEOUT = 45 +VLM_RETRY_MAX_ATTEMPTS = 2 +VLM_RETRY_BACKOFF_BASE_SECONDS = 1 + +OCR_VLM_CONCURRENCY = 32 +OCR_RETRY_MAX_ATTEMPTS = 3 +OCR_RETRY_BACKOFF_BASE_SECONDS = 1 + +SIGNATURE_PROBE_CONCURRENCY = 2 +SIGNATURE_PROBE_TIMEOUT = 20 +SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS = 2 +SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS = 0.5 +``` + +--- + +## 3. 各链路当前策略 + +### 3.1 LLM + +- 客户端:`ResilientOpenAICompatibleClient` +- 超时:`LLM_REQUEST_TIMEOUT` +- 最大尝试次数:`LLM_RETRY_MAX_ATTEMPTS` +- 退避基数:`LLM_RETRY_BACKOFF_BASE_SECONDS` +- 重试条件: + - 超时 + - 连接错误 + - `5xx` + - `408` + - `429` + +### 3.2 VLM + +- 客户端:`ResilientQwenVLMClient` +- 超时:`VLM_REQUEST_TIMEOUT` +- 最大尝试次数:`VLM_RETRY_MAX_ATTEMPTS` +- 退避基数:`VLM_RETRY_BACKOFF_BASE_SECONDS` +- 重试条件: + - 超时 + - 连接错误 + - `5xx` + - `408` + - `429` + +### 3.3 OCR + +- 客户端:`ResilientChandraOCRClient` +- OCR HTTP 超时:沿用 `[OCR].TIMEOUT` +- 最大尝试次数:`OCR_RETRY_MAX_ATTEMPTS` +- 退避基数:`OCR_RETRY_BACKOFF_BASE_SECONDS` +- 重试条件: + - 超时 + - 连接错误 + - `5xx` + - `408` + - `429` + +### 3.4 Signature Probe + +这是 DOCX 签名候选补识别,不是主 OCR。 + +- 并发:`SIGNATURE_PROBE_CONCURRENCY` +- 单次超时:`SIGNATURE_PROBE_TIMEOUT` +- 最大尝试次数:`SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS` +- 退避基数:`SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS` +- 第 1 次失败后,会用 fresh VLM client 再试 + +--- + +## 4. 为什么把 signature probe 调小 + +之前这里容易长时间卡住,根因是: + +- 它是补探测 +- 但单次超时太长 +- 且失败后还会再试一次 + +所以现在把它收成: + +- `SIGNATURE_PROBE_CONCURRENCY = 2` +- `SIGNATURE_PROBE_TIMEOUT = 20` + +这样它仍然能补识别,但不会把整条 OCR 长尾拖得太夸张。 + +--- + +## 5. 当前代码位置 + +- 配置定义:`fastapi_admin/config/_settings.py` +- 客户端工厂:`fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py` +- 重试封装:`fastapi_modules/fastapi_leaudit/leaudit_bridge/resilient_clients.py` +- signature probe:`fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py` + diff --git a/docs/规则编辑/worker并发执行改造方案.md b/docs/规则编辑/worker并发执行改造方案.md new file mode 100644 index 0000000..79402ce --- /dev/null +++ b/docs/规则编辑/worker并发执行改造方案.md @@ -0,0 +1,336 @@ +原生 LeAudit Worker 并发执行改造方案 + +这份文档不是纯方案稿,而是“已经落地到哪一步 + 当前系统为什么这么设计 + 后续接手时要看什么”的实施记录。后面如果忘了这套东西是干什么的,先看这份。 + +目标 + +- 把上传触发评查从 HTTP 同步执行切到 `Run -> dispatch -> worker` +- 平台侧只做: + - 上传入库 + - OSS / 规则文件解析 + - run 状态编排 + - 结果持久化 +- 原生 `/home/wren-dev/Porject/leaudit/src/leaudit` 继续负责: + - `AuditCtx` + - `AuditService.audit(ctx)` + - OCR / extract / evaluate / rescue 主流程 + +为什么要改成 worker + +- HTTP 请求里同步跑全链路会把 OCR、LLM、VLM、补救流程都塞进 Web 线程 +- 后续存在并发上传、多文档批量处理、多 worker 扩容场景 +- 同步模式下容易遇到: + - 请求超时 + - 事件循环混用 + - 多线程/多请求互相阻塞 + - 状态不可追踪 +- 改造成 worker 后,API 只负责“受理 + 排队”,重任务交给 Celery worker 进程 + +当前已落地 + +- `fastapi_admin/celery_app.py` + - 新增 Celery app,broker/backend 复用当前 Redis +- `fastapi_admin/config/_settings.py` + - 新增 worker 队列、并发、超时、OCR 视觉并发、signature probe 限时等配置 +- `fastapi_admin/config/__init__.pyi` + - 暴露新的 LEAUDIT 配置项 +- `app.toml` + - 正式加入 `[LEAUDIT]` worker/并发相关配置 + - 补齐 `[OCR].BASE_URL` +- `fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py` + - `Run()` 改为只创建 `leaudit_audit_runs` 并投递任务 + - 相同文档已有活动 run 时直接复用,避免重复创建 +- `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py` + - 改为 `runId` 驱动,而不是把整份文件内容直接塞到同步调用链 + - 增加 Celery task 入口 + - 增加 run 抢占,避免重复消费 + - 增加队列路由 +- `fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py` + - OCR client 读取平台配置,避免 `base_url is required` +- `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py` + - 原生服务装配读取平台并发配置 +- `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py` + - 修复原生 `FieldValue` 新旧结构兼容 + - 修复 JSONB 字段写库序列化 +- `scripts/start_worker.sh` + - worker 启动脚本改为动态读取配置 + - worker nodename 改成唯一,避免 Celery `DuplicateNodenameWarning` +- `docs/规则编辑/worker并发执行改造方案.md` + - 记录当前这套机制,避免后续自己忘记 + +执行链路 + +1. `POST /api/upload` +2. `DocumentServiceImpl.Upload()` 写 `leaudit_documents / leaudit_document_files` +3. 原始文件上传 OSS +4. `AuditServiceImpl.Run()` 创建 `leaudit_audit_runs` +5. `dispatch_leaudit_task(run_id)` 投递 Celery +6. worker 消费 `leaudit.process_document` +7. worker 按 `runId` 查库,取: + - 文档 + - 活跃文件版本 + - 当前规则版本 + - 规则 OSS 地址 +8. bridge 下载 OSS 文件 / 规则文件到本地临时文件 +9. `NativeRunner` 构建原生 `AuditCtx` 并调用原生 audit +10. `StorageAdapter` 把结果写回 `leaudit_*` + +当前实现涉及的关键文件 + +- 上传入口 + - `fastapi_modules/fastapi_leaudit/controllers/documentController.py` + - `fastapi_modules/fastapi_leaudit/services/documentService.py` + - `fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py` +- bridge 层 + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/fileSourceResolver.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/ruleVersionResolver.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditCtxBuilder.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py` + - `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py` +- worker 与配置 + - `fastapi_admin/celery_app.py` + - `fastapi_admin/config/_settings.py` + - `app.toml` + - `scripts/start_worker.sh` + +并发配置正式化 + +- `LEAUDIT_LLM_MAX_CONCURRENCY` + - 单文档内 LLM 全局信号量上限 +- `LEAUDIT_VLM_MAX_CONCURRENCY` + - 单文档内 VLM 主流程上限 +- `LEAUDIT_OCR_VLM_CONCURRENCY` + - OCR 视觉增强 / 印章分类阶段的 VLM 并发上限 +- `LEAUDIT_SIGNATURE_PROBE_CONCURRENCY` + - DOCX 签名候选区域补探测并发上限 +- `LEAUDIT_SIGNATURE_PROBE_TIMEOUT` + - 单次签名 probe 的 VLM 超时,避免长时间卡住 OCR 尾部 +- `LEAUDIT_WORKER_CONCURRENCY` + - 单 worker 进程可并发执行的任务数 +- `LEAUDIT_WORKER_QUEUE_URGENT` + - 前端上传中的“紧急”队列 +- `LEAUDIT_WORKER_QUEUE_NORMAL` + - 前端上传中的“普通”队列 +- `LEAUDIT_TASK_SOFT_TIME_LIMIT` + - Celery 软超时 +- `LEAUDIT_TASK_TIME_LIMIT` + - Celery 硬超时 +- `LEAUDIT_RUN_LOCK_SECONDS` + - 为后续升级“租约锁”预留的锁超时配置 + +当前 `app.toml` 基线 + +```toml +[OCR] +BASE_URL = "http://i-2.gpushare.com:44112/" + +[LEAUDIT] +LLM_MAX_CONCURRENCY = 5 +VLM_MAX_CONCURRENCY = 3 +OCR_VLM_CONCURRENCY = 3 +SIGNATURE_PROBE_CONCURRENCY = 2 +SIGNATURE_PROBE_TIMEOUT = 20 +WORKER_QUEUE_URGENT = "leaudit.urgent" +WORKER_QUEUE_NORMAL = "leaudit.normal" +WORKER_CONCURRENCY = 2 +RUN_LOCK_SECONDS = 1800 +TASK_SOFT_TIME_LIMIT = 3300 +TASK_TIME_LIMIT = 3600 +``` + +当前队列路由策略 + +- 前端上传 `speed=urgent` -> `leaudit.urgent` +- 前端上传 `speed=normal` -> `leaudit.normal` +- `/audit/run` 也支持 `speed` +- 未识别值默认归到 `normal` + +接口收口 + +- 上传接口:`POST /upload` + - 入参: + - `file` + - `typeId` / `typeCode` + - `region` + - `fileRole` + - `createdBy` + - `autoRun` + - `speed=normal|urgent` + - 已去掉旧系统 `bizDocumentId` 输入 +- 上传返回: + - `documentId` + - `internalDocumentNo` + - `fileId` + - `typeId` + - `typeCode` + - `region` + - `fileName` + - `ossUrl` + - `speed` + - `processingStatus` + - `autoRunTriggered` + - `run` +- 手动触发:`POST /audit/run` + - 支持 `speed=normal|urgent` +- 状态查询: + - `GET /audit/run/{runId}` +- 结果查询: + - `GET /audit/result/{runId}` + +当前业务收敛结论 + +- 不再继续扩 `high/default/batch` 三层方案 +- 当前真实业务只有前端上传,并且只有“紧急 / 普通”两档速度需求 +- 所以 worker 体系正式收敛成两级模型: + - `urgent` + - `normal` +- 这样既满足优先级需求,也避免过度设计 + +状态流转 + +- 新建 run + - `status=queued` + - `phase=dispatch` +- worker 抢占成功 + - `status=running` + - `phase=prepare` +- 原生流程推进中 + - `phase` 会继续流转为 OCR / extraction / evaluation / rescue / persist 等阶段 +- 执行成功 + - `status=completed` + - `result_status` 由 bridge 汇总写回 +- 执行失败 + - `status=failed` + - `phase` 停在失败发生阶段 + - `leaudit_run_errors` 记录错误明细 + +避免重复消费 + +- API 触发 `Run()` 时: + - 若同一文档已有 `queued/running/retrying` 的 run,且 `Force=False` + - 直接返回当前活动 run,不重复创建 +- worker 开始执行前: + - 先按 run 状态执行原子抢占 + - 仅允许 `status in ('queued', 'pending', 'retrying')` 的 run 进入执行 +- 抢占失败时: + - 当前 worker 直接跳过 + - 返回 `already_claimed` + +当前表语义 + +- `leaudit_documents` + - 平台内部文档主表 + - 一次前端上传会创建一条新的 document + - 不再依赖旧系统 `documents.id` + - `biz_document_id` 仅作为内部追踪号保留,避免立刻改库 +- `leaudit_document_files` + - 具体上传文件记录 + - 保存文件名、后缀、MIME、SHA256、OSS 地址等文件级信息 + - 当前仍保留单独文件表,为后续是否合表留余地 + +已踩过并修掉的坑 + +- 事件循环混用 + - 同步 HTTP 链路里直接执行原生异步流程,容易出现 `Future attached to a different loop` + - 现在改成 worker 独立进程执行,HTTP 只排队 +- OCR base_url 缺失 + - 原因是平台 bridge 没把 `[OCR].BASE_URL` 正确喂给原生 client + - 现已在 `app.toml` 和 `client_factory.py` 补齐 +- 原生 `FieldValue` 结构变化 + - 新版本 `FieldValue` 不一定有 `.position` + - `storage_adapter.py` 已改为兼容: + - 优先读 `position` + - 没有时回退到 `metadata.match_position / bbox / page_num` +- asyncpg + JSONB 序列化 + - 不能把 `list/dict` 直接按字符串列方式塞给 `CAST(:x AS JSONB)` + - 需要先 `json.dumps(..., ensure_ascii=False)` 再入库 + - 当前已在这些路径处理: + - `leaudit_rule_results.stages` + - `leaudit_rule_results.extracted_fields` + - `leaudit_rule_results.field_positions` + - `leaudit_rule_results.remediation` + - `leaudit_rule_results.rule_meta` + - `leaudit_rescue_outcomes.payload` + +当前运行现状 + +- `POST /api/upload` 已经可以快速返回 `queued` +- worker 已能真正执行到: + - OCR + - extraction + - evaluation + - rescue +- 已验证真实样本端到端成功: + - `run_id=10` + - `document_id=9` + - `phase=executed` + - `24 passed / 4 failed / 0 skipped` +- 当前主要耗时瓶颈明确在 OCR 阶段 + - 一次验证中: + - `ocr=277.79s` + - `extraction=13.65s` + - `evaluation=11.81s` + - `total=303.25s` +- 已针对 OCR 长尾补第一轮治理: + - DOCX `signature probe` 增加单独超时 + - DOCX `signature probe` 增加并发限制 + - 避免某个候选框失败时串行拖住整个尾段 +- 如果代码刚修过但日志仍报旧错,第一怀疑项不是代码没改,而是 worker 进程还没重启,仍在跑旧代码 +- worker 日志会明确打印: + - `run_id=xx 已投递到 worker 队列: queue=..., speed=...` + - `run_id=xx worker开始执行: queue=..., speed=..., filename=...` + +启动方式 + +后端 API: + +```bash +.venv/bin/python run.py +``` + +worker: + +```bash +bash scripts/start_worker.sh +``` + +脚本会自动读取: + +- `LEAUDIT_WORKER_CONCURRENCY` +- `LEAUDIT_WORKER_QUEUE_URGENT` +- `LEAUDIT_WORKER_QUEUE_NORMAL` + +并拼成: + +```bash +celery -A fastapi_admin.celery_app:celery_app worker \ + -Q leaudit.urgent,leaudit.normal \ + --concurrency=2 +``` + +同时 worker nodename 会带主机名和进程号,避免多个 worker 使用同名节点。 + +联调时的最小操作顺序 + +1. 启动 API:`.venv/bin/python run.py` +2. 启动 worker:`bash scripts/start_worker.sh` +3. 调用上传接口,确认返回 `processingStatus=queued` +4. 看 worker 日志是否出现: + - 抢占 run + - OCR result saved + - Extraction result saved + - Evaluation results saved + - finalize run +5. 如果日志还是旧报错,先重启 worker 再测一次 + +后续明确待办 + +- 给 run 抢占补上真正的租约锁字段: + - `worker_id` + - `lock_expires_at` + - `attempt_count` +- 如果后续紧急任务量上来,再考虑把 `urgent` 单独拆成专属 worker +- 补 `GET /run/{id}/status` 或前端轮询说明 +- 评估 OCR 阶段是否继续拆更细粒度并发,但前提是先确认外部 OCR / VLM 服务的限流上限 diff --git a/fastapi_admin/celery_app.py b/fastapi_admin/celery_app.py new file mode 100644 index 0000000..51b865c --- /dev/null +++ b/fastapi_admin/celery_app.py @@ -0,0 +1,50 @@ +"""Celery 应用入口。""" + +from __future__ import annotations + +from celery import Celery +from kombu import Queue + +from fastapi_admin.config import ( + LEAUDIT_TASK_SOFT_TIME_LIMIT, + LEAUDIT_TASK_TIME_LIMIT, + LEAUDIT_WORKER_QUEUE_NORMAL, + LEAUDIT_WORKER_QUEUE_URGENT, + REDIS_DB, + REDIS_HOST, + REDIS_PASSWORD, + REDIS_PORT, +) + + +def _build_redis_url() -> str: + """拼接 Redis broker / backend 连接串。""" + auth = f":{REDIS_PASSWORD}@" if REDIS_PASSWORD else "" + return f"redis://{auth}{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}" + + +celery_app = Celery( + "leaudit_platform", + broker=_build_redis_url(), + backend=_build_redis_url(), +) + +celery_app.conf.update( + task_default_queue=LEAUDIT_WORKER_QUEUE_NORMAL, + task_queues=( + Queue(LEAUDIT_WORKER_QUEUE_URGENT), + Queue(LEAUDIT_WORKER_QUEUE_NORMAL), + ), + task_track_started=True, + task_acks_late=True, + worker_prefetch_multiplier=1, + broker_connection_retry_on_startup=True, + task_soft_time_limit=LEAUDIT_TASK_SOFT_TIME_LIMIT, + task_time_limit=LEAUDIT_TASK_TIME_LIMIT, +) + +celery_app.autodiscover_tasks( + [ + "fastapi_modules.fastapi_leaudit.leaudit_bridge", + ] +) diff --git a/fastapi_admin/config/__init__.pyi b/fastapi_admin/config/__init__.pyi index 390742d..39ba803 100644 --- a/fastapi_admin/config/__init__.pyi +++ b/fastapi_admin/config/__init__.pyi @@ -54,6 +54,25 @@ LEAUDIT_RULES_DIR: str LEAUDIT_RESCUE_MODE: str LEAUDIT_LLM_MAX_CONCURRENCY: int LEAUDIT_VLM_MAX_CONCURRENCY: int +LEAUDIT_LLM_REQUEST_TIMEOUT: int +LEAUDIT_LLM_RETRY_MAX_ATTEMPTS: int +LEAUDIT_LLM_RETRY_BACKOFF_BASE_SECONDS: float +LEAUDIT_VLM_REQUEST_TIMEOUT: int +LEAUDIT_VLM_RETRY_MAX_ATTEMPTS: int +LEAUDIT_VLM_RETRY_BACKOFF_BASE_SECONDS: float +LEAUDIT_OCR_VLM_CONCURRENCY: int +LEAUDIT_OCR_RETRY_MAX_ATTEMPTS: int +LEAUDIT_OCR_RETRY_BACKOFF_BASE_SECONDS: float +LEAUDIT_SIGNATURE_PROBE_CONCURRENCY: int +LEAUDIT_SIGNATURE_PROBE_TIMEOUT: int +LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS: int +LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS: float +LEAUDIT_WORKER_QUEUE_URGENT: str +LEAUDIT_WORKER_QUEUE_NORMAL: str +LEAUDIT_WORKER_CONCURRENCY: int +LEAUDIT_RUN_LOCK_SECONDS: int +LEAUDIT_TASK_SOFT_TIME_LIMIT: int +LEAUDIT_TASK_TIME_LIMIT: int # 常量 ROOT_PATH: object diff --git a/fastapi_admin/config/_settings.py b/fastapi_admin/config/_settings.py index 5a7bce4..eb08283 100644 --- a/fastapi_admin/config/_settings.py +++ b/fastapi_admin/config/_settings.py @@ -94,6 +94,25 @@ class LeauditSettings(_Base): LEAUDIT_RESCUE_MODE: str = "auto" LEAUDIT_LLM_MAX_CONCURRENCY: int = 5 LEAUDIT_VLM_MAX_CONCURRENCY: int = 3 + LEAUDIT_LLM_REQUEST_TIMEOUT: int = 120 + LEAUDIT_LLM_RETRY_MAX_ATTEMPTS: int = 3 + LEAUDIT_LLM_RETRY_BACKOFF_BASE_SECONDS: float = 1.0 + LEAUDIT_VLM_REQUEST_TIMEOUT: int = 90 + LEAUDIT_VLM_RETRY_MAX_ATTEMPTS: int = 2 + LEAUDIT_VLM_RETRY_BACKOFF_BASE_SECONDS: float = 1.0 + LEAUDIT_OCR_VLM_CONCURRENCY: int = 3 + LEAUDIT_OCR_RETRY_MAX_ATTEMPTS: int = 3 + LEAUDIT_OCR_RETRY_BACKOFF_BASE_SECONDS: float = 1.0 + LEAUDIT_SIGNATURE_PROBE_CONCURRENCY: int = 2 + LEAUDIT_SIGNATURE_PROBE_TIMEOUT: int = 20 + LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS: int = 2 + LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS: float = 0.5 + LEAUDIT_WORKER_QUEUE_URGENT: str = "leaudit.urgent" + LEAUDIT_WORKER_QUEUE_NORMAL: str = "leaudit.normal" + LEAUDIT_WORKER_CONCURRENCY: int = 2 + LEAUDIT_RUN_LOCK_SECONDS: int = 1800 + LEAUDIT_TASK_SOFT_TIME_LIMIT: int = 3300 + LEAUDIT_TASK_TIME_LIMIT: int = 3600 # 实例化所有 Settings diff --git a/fastapi_modules/fastapi_leaudit/controllers/auditController.py b/fastapi_modules/fastapi_leaudit/controllers/auditController.py index 9ff19d8..92c5ebc 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/auditController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/auditController.py @@ -26,6 +26,7 @@ class AuditController(BaseController): DocumentId=body.documentId, RuleType=body.ruleType, Force=body.force, + Speed=body.speed, ) return Result.success(data=run) diff --git a/fastapi_modules/fastapi_leaudit/domian/Dto/auditDto.py b/fastapi_modules/fastapi_leaudit/domian/Dto/auditDto.py index c281e6c..81e7921 100644 --- a/fastapi_modules/fastapi_leaudit/domian/Dto/auditDto.py +++ b/fastapi_modules/fastapi_leaudit/domian/Dto/auditDto.py @@ -9,3 +9,4 @@ class AuditRunDTO(BaseModel): documentId: int = Field(..., description="文档ID") ruleType: str | None = Field(None, description="指定规则类型编码") force: bool = Field(False, description="是否强制重跑") + speed: str = Field("normal", description="执行速度档位:urgent/normal") diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py index ff6c900..a007338 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py @@ -25,6 +25,7 @@ from leaudit.services.evaluation_service import EvaluationService from leaudit.services.extraction_service import ExtractionService from leaudit.services.rescue_service import RescueService +from fastapi_admin.config import LEAUDIT_OCR_VLM_CONCURRENCY from fastapi_modules.fastapi_leaudit.leaudit_bridge.client_factory import ( create_llm_client, create_ocr_client, @@ -118,7 +119,11 @@ class AuditServiceFactory: vlm_client=vlm_client, force_rules_path=rules_path, ) - ocr_client = BridgeOCRClient(adapter, vlm_client=vlm_client) + ocr_client = BridgeOCRClient( + adapter, + vlm_client=vlm_client, + vlm_concurrency=LEAUDIT_OCR_VLM_CONCURRENCY, + ) normalization_service = DocNormalizationService(ocr_client) audit_services = AuditServices( llm_client=llm_client, diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py index 94bffbd..3e2c045 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py @@ -8,6 +8,15 @@ from typing import TYPE_CHECKING from fastapi_admin.config import ( OCR_BASE_URL, OCR_TIMEOUT, + LEAUDIT_LLM_REQUEST_TIMEOUT, + LEAUDIT_LLM_RETRY_BACKOFF_BASE_SECONDS, + LEAUDIT_LLM_RETRY_MAX_ATTEMPTS, + LEAUDIT_OCR_VLM_CONCURRENCY, + LEAUDIT_OCR_RETRY_BACKOFF_BASE_SECONDS, + LEAUDIT_OCR_RETRY_MAX_ATTEMPTS, + LEAUDIT_VLM_REQUEST_TIMEOUT, + LEAUDIT_VLM_RETRY_BACKOFF_BASE_SECONDS, + LEAUDIT_VLM_RETRY_MAX_ATTEMPTS, LLM_BASE_URL, LLM_MODEL, LLM_API_KEY, @@ -15,6 +24,11 @@ from fastapi_admin.config import ( VLM_MODEL, VLM_API_KEY, ) +from fastapi_modules.fastapi_leaudit.leaudit_bridge.resilient_clients import ( + ResilientChandraOCRClient, + ResilientOpenAICompatibleClient, + ResilientQwenVLMClient, +) if TYPE_CHECKING: from leaudit.llm.base import BaseLLMClient @@ -27,44 +41,58 @@ log = logging.getLogger(__name__) def create_ocr_client() -> BaseOCRClient: """Create a leaudit ChandraOCRClient from LEAUDIT_OCR_URL config.""" import os - from leaudit.ocr.chandra_client import ChandraOCRClient base_url = os.getenv("LEAUDIT_OCR_URL", "").rstrip("/") if not base_url: base_url = OCR_BASE_URL.rstrip("/") timeout = float(OCR_TIMEOUT) - client = ChandraOCRClient( + client = ResilientChandraOCRClient( base_url=base_url, timeout=timeout, include_images=True, + vlm_concurrency=LEAUDIT_OCR_VLM_CONCURRENCY, + retry_max_attempts=LEAUDIT_OCR_RETRY_MAX_ATTEMPTS, + retry_backoff_base_seconds=LEAUDIT_OCR_RETRY_BACKOFF_BASE_SECONDS, + ) + log.info( + "leaudit OCR client created: %s (timeout=%ss, vlm_concurrency=%s, retry_max_attempts=%s, retry_backoff_base_seconds=%s)", + base_url, + timeout, + LEAUDIT_OCR_VLM_CONCURRENCY, + LEAUDIT_OCR_RETRY_MAX_ATTEMPTS, + LEAUDIT_OCR_RETRY_BACKOFF_BASE_SECONDS, ) - log.info("leaudit OCR client created: %s (timeout=%ss)", base_url, timeout) return client def create_llm_client() -> BaseLLMClient: """Create a leaudit OpenAICompatibleClient from docauditai's LLM config.""" - from leaudit.llm.openai_client import OpenAICompatibleClient - base_url = LLM_BASE_URL model = LLM_MODEL api_key = LLM_API_KEY or "no-key" - client = OpenAICompatibleClient( + client = ResilientOpenAICompatibleClient( api_key=api_key, base_url=base_url, default_model=model, - timeout=120.0, + timeout=float(LEAUDIT_LLM_REQUEST_TIMEOUT), + retry_max_attempts=LEAUDIT_LLM_RETRY_MAX_ATTEMPTS, + retry_backoff_base_seconds=LEAUDIT_LLM_RETRY_BACKOFF_BASE_SECONDS, + ) + log.info( + "leaudit LLM client created: %s (model=%s, timeout=%ss, retry_max_attempts=%s, retry_backoff_base_seconds=%s)", + base_url, + model, + LEAUDIT_LLM_REQUEST_TIMEOUT, + LEAUDIT_LLM_RETRY_MAX_ATTEMPTS, + LEAUDIT_LLM_RETRY_BACKOFF_BASE_SECONDS, ) - log.info("leaudit LLM client created: %s (model=%s)", base_url, model) return client def create_vlm_client() -> BaseVLMClient | None: """Create a leaudit QwenVLMClient from docauditai's VLM config.""" - from leaudit.llm.qwen_vlm_client import QwenVLMClient - base_url = VLM_BASE_URL model = VLM_MODEL api_key = VLM_API_KEY or LLM_API_KEY or "no-key" @@ -73,10 +101,20 @@ def create_vlm_client() -> BaseVLMClient | None: log.info("leaudit VLM client skipped: no VLM config") return None - client = QwenVLMClient( + client = ResilientQwenVLMClient( base_url=base_url, api_key=api_key, model=model, + timeout=float(LEAUDIT_VLM_REQUEST_TIMEOUT), + retry_max_attempts=LEAUDIT_VLM_RETRY_MAX_ATTEMPTS, + retry_backoff_base_seconds=LEAUDIT_VLM_RETRY_BACKOFF_BASE_SECONDS, + ) + log.info( + "leaudit VLM client created: %s (model=%s, timeout=%ss, retry_max_attempts=%s, retry_backoff_base_seconds=%s)", + base_url, + model, + LEAUDIT_VLM_REQUEST_TIMEOUT, + LEAUDIT_VLM_RETRY_MAX_ATTEMPTS, + LEAUDIT_VLM_RETRY_BACKOFF_BASE_SECONDS, ) - log.info("leaudit VLM client created: %s (model=%s)", base_url, model) return client diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py index b7d49ff..4a16ce3 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/ocr_bridge.py @@ -9,10 +9,17 @@ Keeps docauditai-specific fixes outside ``services/leaudit/**``: from __future__ import annotations +import asyncio import logging from io import BytesIO from pathlib import Path +from fastapi_admin.config import ( + LEAUDIT_SIGNATURE_PROBE_CONCURRENCY, + LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS, + LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS, + LEAUDIT_SIGNATURE_PROBE_TIMEOUT, +) from leaudit.ocr.base import BaseOCRClient from leaudit.ocr.models import OcrResult, VisualManifestItem @@ -120,18 +127,20 @@ async def _inject_docx_signature_candidates( if parent_key: parent_to_items.setdefault(parent_key, []).append(item) - for parent_key, items in parent_to_items.items(): + sem = asyncio.Semaphore(max(1, int(LEAUDIT_SIGNATURE_PROBE_CONCURRENCY))) + + async def _probe_parent(parent_key: str, items: list[VisualManifestItem]) -> None: if any((it.label or "") == "signature" for it in items): - continue + return parent_bytes = ocr_result.get_image_bytes(parent_key) if not parent_bytes: - continue + return try: image = Image.open(BytesIO(parent_bytes)) except Exception as exc: log.warning("failed to open parent image %s: %s", parent_key, exc) - continue + return width, height = image.size for candidate_bbox in _signature_candidate_boxes(items, width, height): @@ -139,11 +148,13 @@ async def _inject_docx_signature_candidates( crop = image.crop(tuple(candidate_bbox)) buf = BytesIO() crop.save(buf, format="PNG") - result = await _classify_signature_candidate( - vlm_client, - buf.getvalue(), - "这是合同签章页里疑似法人签名的候选区域,请优先判断是否为手写签名。", - ) + async with sem: + result = await _classify_signature_candidate( + vlm_client, + buf.getvalue(), + "这是合同签章页里疑似法人签名的候选区域,请优先判断是否为手写签名。", + parent_key=parent_key, + ) except Exception as exc: log.warning("signature probe failed for %s: %s", parent_key, exc) continue @@ -166,33 +177,66 @@ async def _inject_docx_signature_candidates( ) break + if parent_to_items: + await asyncio.gather( + *(_probe_parent(parent_key, items) for parent_key, items in parent_to_items.items()), + return_exceptions=False, + ) + async def _classify_signature_candidate( vlm_client: object, image_bytes: bytes, user_hint: str, + *, + parent_key: str | None = None, ) -> object: - """Classify with one retry using a fresh VLM client when needed.""" - try: - return await vlm_client.classify_visual(image_bytes, user_hint=user_hint) - except Exception as exc: - log.warning("signature probe primary VLM failed, retrying fresh client: %s", exc) + """Classify with configurable retry using a fresh VLM client when needed.""" + timeout = max(1, int(LEAUDIT_SIGNATURE_PROBE_TIMEOUT)) + max_attempts = max(1, int(LEAUDIT_SIGNATURE_PROBE_RETRY_MAX_ATTEMPTS)) + backoff_base = max(0.0, float(LEAUDIT_SIGNATURE_PROBE_RETRY_BACKOFF_BASE_SECONDS)) + last_error: Exception | None = None - try: - from leaudit.llm.qwen_vlm_client import QwenVLMClient - - fresh = QwenVLMClient( - base_url=getattr(vlm_client, "base_url"), - api_key=getattr(vlm_client, "api_key", ""), - model=getattr(vlm_client, "model"), - timeout=getattr(vlm_client, "timeout", 90.0), - ) + for attempt in range(max_attempts): + current_client = vlm_client + fresh = None try: - return await fresh.classify_visual(image_bytes, user_hint=user_hint) + if attempt > 0: + from fastapi_modules.fastapi_leaudit.leaudit_bridge.resilient_clients import ResilientQwenVLMClient + + fresh = ResilientQwenVLMClient( + base_url=getattr(vlm_client, "base_url"), + api_key=getattr(vlm_client, "api_key", ""), + model=getattr(vlm_client, "model"), + timeout=getattr(vlm_client, "timeout", 90.0), + retry_max_attempts=1, + retry_backoff_base_seconds=0.0, + ) + current_client = fresh + + return await asyncio.wait_for( + current_client.classify_visual(image_bytes, user_hint=user_hint), + timeout=timeout, + ) + except Exception as exc: + last_error = exc + if attempt < max_attempts - 1: + log.warning( + "signature probe attempt %s/%s failed for %s, retrying after %.2fs (timeout=%ss): %s", + attempt + 1, + max_attempts, + parent_key or "-", + backoff_base * (2 ** attempt), + timeout, + exc, + ) + await asyncio.sleep(backoff_base * (2 ** attempt)) + continue finally: - await fresh.close() - except Exception as exc: - raise RuntimeError(exc) from exc + if fresh is not None: + await fresh.close() + + raise RuntimeError(last_error) from last_error def _signature_candidate_boxes( diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/resilient_clients.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/resilient_clients.py new file mode 100644 index 0000000..e1992e4 --- /dev/null +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/resilient_clients.py @@ -0,0 +1,322 @@ +"""Bridge-side resilient client wrappers. + +Keep retry policy configurable in leaudit-platform without changing the +upstream leaudit package. +""" + +from __future__ import annotations + +import asyncio +import base64 +import logging +import time +from pathlib import Path +from typing import Any + +import httpx + +from leaudit.llm.base import LLMRequest, LLMResponse +from leaudit.llm.openai_client import OpenAIClientError, OpenAICompatibleClient, _parse_response +from leaudit.llm.qwen_vlm_client import QwenVLMClient, VisualClassification, _SYSTEM_PROMPT, _parse_json_loose +from leaudit.ocr.chandra_client import ChandraOCRError, ChandraOCRClient, _guess_mime, _parse_response as parse_ocr_response + +log = logging.getLogger(__name__) + +_RETRYABLE_HTTP_STATUSES = {408, 429, 500, 502, 503, 504} + + +def _retry_delay(base_seconds: float, attempt: int) -> float: + """Return exponential backoff delay for retry attempt index.""" + return max(0.0, float(base_seconds)) * (2 ** max(0, int(attempt))) + + +def _should_retry_status(status_code: int) -> bool: + return status_code in _RETRYABLE_HTTP_STATUSES or status_code >= 500 + + +class ResilientOpenAICompatibleClient(OpenAICompatibleClient): + """OpenAI-compatible client with configurable retry count/backoff.""" + + def __init__( + self, + *, + retry_max_attempts: int = 3, + retry_backoff_base_seconds: float = 1.0, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.retry_max_attempts = max(1, int(retry_max_attempts)) + self.retry_backoff_base_seconds = max(0.0, float(retry_backoff_base_seconds)) + + async def complete(self, request: LLMRequest) -> LLMResponse: + self.call_history.append(request) + + model = request.model if request.model != "default" else self.default_model + payload: dict[str, Any] = { + "model": model, + "messages": [{"role": m.role, "content": m.content} for m in request.messages], + "temperature": request.temperature, + "max_tokens": request.max_tokens, + } + if request.return_logprobs: + payload["logprobs"] = True + payload["top_logprobs"] = 3 + if request.response_format: + payload["response_format"] = request.response_format + + thinking = request.enable_thinking if request.enable_thinking is not None else self.enable_thinking + if not thinking: + payload["enable_thinking"] = False + + req_timeout = min(self.timeout, request.timeout_ms / 1000) + last_error: Exception | None = None + + for attempt in range(self.retry_max_attempts): + t0 = time.monotonic() + try: + response = await self._client.post( + f"{self.base_url}/chat/completions", + json=payload, + headers=self._headers, + timeout=req_timeout, + ) + except httpx.TimeoutException as exc: + last_error = TimeoutError(f"LLM request timed out: {exc}") + if attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + raise last_error from exc + except self._RETRYABLE_EXCEPTIONS as exc: + last_error = OpenAIClientError(f"Network error: {exc}") + if attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + raise last_error from exc + + duration_ms = int((time.monotonic() - t0) * 1000) + if _should_retry_status(response.status_code) and attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + + if response.status_code != 200: + raise OpenAIClientError(f"API returned {response.status_code}: {response.text[:500]}") + + body = response.json() + if "error" in body: + raise OpenAIClientError(f"API error: {body['error']}") + return _parse_response(body, duration_ms) + + raise last_error or OpenAIClientError("LLM request failed after retries") + + +class ResilientQwenVLMClient(QwenVLMClient): + """VLM client with configurable retry count/backoff.""" + + _RETRYABLE_EXCEPTIONS = ( + httpx.ConnectError, + httpx.RemoteProtocolError, + httpx.ReadError, + httpx.ReadTimeout, + httpx.TimeoutException, + ) + + def __init__( + self, + *, + retry_max_attempts: int = 2, + retry_backoff_base_seconds: float = 1.0, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.retry_max_attempts = max(1, int(retry_max_attempts)) + self.retry_backoff_base_seconds = max(0.0, float(retry_backoff_base_seconds)) + + async def _post_with_retry(self, payload: dict[str, Any]) -> httpx.Response: + last_error: Exception | None = None + for attempt in range(self.retry_max_attempts): + try: + response = await self._client.post( + f"{self.base_url}/chat/completions", + json=payload, + headers={k: v for k, v in self._headers.items() if v}, + ) + except self._RETRYABLE_EXCEPTIONS as exc: + last_error = exc + if attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + raise + + if _should_retry_status(response.status_code) and attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + return response + + raise last_error or RuntimeError("VLM request failed after retries") + + async def classify_visual( + self, + image_bytes: bytes, + user_hint: str | None = None, + max_tokens: int = 300, + ) -> VisualClassification: + b64 = base64.b64encode(image_bytes).decode() + url = f"data:image/png;base64,{b64}" + + user_text = "请判断此区域内容并输出 JSON。" + if user_hint: + user_text += f"\n上下文:{user_hint}" + + payload: dict[str, Any] = { + "model": self.model, + "messages": [ + {"role": "system", "content": _SYSTEM_PROMPT}, + { + "role": "user", + "content": [ + {"type": "image_url", "image_url": {"url": url}}, + {"type": "text", "text": user_text}, + ], + }, + ], + "max_tokens": max_tokens, + "temperature": 0.0, + } + + try: + response = await self._post_with_retry(payload) + except Exception as exc: # noqa: BLE001 + return VisualClassification(kind="other", confidence=0.0, raw=f"retry_failed: {exc}") + + if response.status_code != 200: + return VisualClassification(kind="other", confidence=0.0, raw=f"HTTP {response.status_code}: {response.text[:300]}") + + body = response.json() + content = (body.get("choices") or [{}])[0].get("message", {}).get("content", "") + parsed = _parse_json_loose(content) + if parsed is None: + return VisualClassification(kind="other", confidence=0.0, raw=content) + + kind = parsed.get("kind", "other") + if kind not in {"seal", "signature", "other"}: + kind = "other" + + return VisualClassification( + kind=kind, + seal_type=parsed.get("seal_type"), + text=parsed.get("text"), + is_complete=parsed.get("is_complete"), + confidence=0.9 if kind != "other" else 0.3, + raw=content, + ) + + async def extract_multifield( + self, + *, + prompt: str, + images_data_urls: list[str], + max_tokens: int = 800, + ) -> dict: + content: list[dict] = [{"type": "text", "text": prompt}] + for url in images_data_urls: + content.append({"type": "image_url", "image_url": {"url": url}}) + + payload: dict[str, Any] = { + "model": self.model, + "messages": [{"role": "user", "content": content}], + "max_tokens": max_tokens, + "temperature": 0.0, + } + + try: + response = await self._post_with_retry(payload) + except Exception: + return {} + + if response.status_code != 200: + return {} + + body = response.json() + text = (body.get("choices") or [{}])[0].get("message", {}).get("content", "") + parsed = _parse_json_loose(text) + return parsed if isinstance(parsed, dict) else {} + + +class ResilientChandraOCRClient(ChandraOCRClient): + """OCR client with configurable retry count/backoff.""" + + _RETRYABLE_EXCEPTIONS = ( + httpx.ConnectError, + httpx.RemoteProtocolError, + httpx.ReadError, + httpx.ReadTimeout, + httpx.TimeoutException, + ) + + def __init__( + self, + *, + retry_max_attempts: int = 3, + retry_backoff_base_seconds: float = 1.0, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + self.retry_max_attempts = max(1, int(retry_max_attempts)) + self.retry_backoff_base_seconds = max(0.0, float(retry_backoff_base_seconds)) + + async def _post_ocr(self, file_path: Path | str) -> dict: + path = Path(file_path) + if not path.exists(): + raise FileNotFoundError(f"File not found: {path}") + + last_error: Exception | None = None + for attempt in range(self.retry_max_attempts): + try: + async with httpx.AsyncClient(timeout=self.timeout, verify=False) as client: + with open(path, "rb") as file_obj: + files = {"file": (path.name, file_obj, _guess_mime(path))} + data = {"include_images": str(self.include_images).lower()} + response = await client.post( + f"{self.base_url}/ocr", + files=files, + data=data, + ) + except self._RETRYABLE_EXCEPTIONS as exc: + last_error = exc + if attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + raise ChandraOCRError(f"OCR request failed after retries: {exc}") from exc + + if _should_retry_status(response.status_code) and attempt < self.retry_max_attempts - 1: + await asyncio.sleep(_retry_delay(self.retry_backoff_base_seconds, attempt)) + continue + + if response.status_code != 200: + raise ChandraOCRError(f"OCR API returned {response.status_code}: {response.text[:500]}") + + body = response.json() + if "error" in body: + raise ChandraOCRError(f"OCR error: {body['error']}") + return body + + raise ChandraOCRError(f"OCR request failed after retries: {last_error}") + + async def ocr(self, file_path: Path | str): + body = await self._post_ocr(file_path) + result = parse_ocr_response(body) + if self.vlm_client is not None and ( + result.visual_manifest.seals + or result.visual_manifest.signatures + or result.visual_manifest.cross_page_seals + ): + try: + from leaudit.ocr.visual_classifier import refine_visual_manifest + + await refine_visual_manifest(result, self.vlm_client, concurrency=self.vlm_concurrency) + except Exception as exc: # noqa: BLE001 + log.warning("VLM refinement failed, keeping geometric manifest: %s", exc) + return result + + async def ocr_raw(self, file_path: Path | str) -> dict: + return await self._post_ocr(file_path) diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py index 372c490..094e1a9 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py @@ -3,7 +3,6 @@ from __future__ import annotations import asyncio -from concurrent.futures import ThreadPoolExecutor import os from pathlib import Path import tempfile @@ -12,7 +11,12 @@ from typing import Any, Dict, Optional from fastapi_common.fastapi_common_logger import logger -from fastapi_admin.config import LEAUDIT_RULES_DIR +from fastapi_admin.celery_app import celery_app +from fastapi_admin.config import ( + LEAUDIT_RULES_DIR, + LEAUDIT_WORKER_QUEUE_NORMAL, + LEAUDIT_WORKER_QUEUE_URGENT, +) from fastapi_modules.fastapi_leaudit.leaudit_bridge.nativeRunner import ( NativeRunRequest, NativeRunner, @@ -20,19 +24,23 @@ from fastapi_modules.fastapi_leaudit.leaudit_bridge.nativeRunner import ( from fastapi_modules.fastapi_leaudit.leaudit_bridge.auditCtxBuilder import ( NativeAuditMetadata, ) +from fastapi_modules.fastapi_leaudit.leaudit_bridge.fileSourceResolver import ( + FileSourceResolver, +) from fastapi_modules.fastapi_leaudit.leaudit_bridge.ruleVersionResolver import ( RuleVersionResolver, ) from fastapi_modules.fastapi_leaudit.leaudit_bridge.rules_loader import RulesLoader from fastapi_modules.fastapi_leaudit.leaudit_bridge.storage_adapter import StorageAdapter - -# Celery 集成待 P2 阶段实现,当前使用同步占位 -# from core.celery_app_limited import celery_app +from fastapi_modules.fastapi_leaudit.models import ( + LeauditAuditRun, + LeauditDocument, + LeauditDocumentFile, +) log = logger -# P2: Celery 集成后启用 @celery_app.task 装饰器 def leaudit_process_document( document_id: int, file_content: bytes, @@ -160,6 +168,65 @@ def leaudit_process_document( loop.close() +def leaudit_process_document_by_run( + run_id: int, + *, + task_id: str | None = None, + rules_path: str | None = None, + queue_name: str | None = None, +) -> dict[str, Any]: + """按 runId 加载执行上下文并执行原生 leaudit。""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + claimed = loop.run_until_complete(_claim_run_safe(run_id, task_id)) + if not claimed: + log.warning("run_id=%s 未抢占成功,跳过重复消费", run_id) + return {"status": "skipped", "run_id": run_id, "reason": "already_claimed"} + + context = loop.run_until_complete(_load_run_context(run_id)) + log.info( + "run_id=%s worker开始执行: queue=%s, speed=%s, filename=%s", + run_id, + queue_name or resolve_worker_queue(context.get("trigger_source")), + _queue_label(queue_name or resolve_worker_queue(context.get("trigger_source"))), + context["filename"], + ) + return leaudit_process_document( + document_id=context["document_id"], + file_content=context["file_content"], + filename=context["filename"], + upload_info={ + "run_id": run_id, + "rule_version_id": context["rule_version_id"], + "rule_source_oss_url": context["rule_source_oss_url"], + "source_type": context["source_type"], + "source_path": context["source_path"], + "trigger_source": context["trigger_source"], + }, + rules_path=rules_path, + ) + finally: + loop.close() + + +@celery_app.task( + bind=True, + name="leaudit.process_document", + acks_late=True, +) +def leaudit_process_document_task(self, run_id: int, rules_path: str | None = None) -> dict[str, Any]: + """Celery worker 入口 —— 按 runId 执行评查。""" + delivery_info = getattr(self.request, "delivery_info", {}) or {} + queue_name = delivery_info.get("routing_key") or delivery_info.get("queue") + return leaudit_process_document_by_run( + run_id=run_id, + task_id=self.request.id, + rules_path=rules_path, + queue_name=queue_name, + ) + + # type_id → rules directory mapping (only fixed-mapping types) # 行政许可 (type_id=2) has 9 sub-types, NOT mapped here — # must come from document metadata (rules_file_path) or content classification. @@ -326,6 +393,69 @@ async def _update_run_phase_safe(run_id: int, phase: str | None) -> None: pass +async def _claim_run_safe(run_id: int, task_id: str | None) -> bool: + """原子抢占 queued/pending 运行,避免重复消费。""" + try: + from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession + from sqlalchemy import text as sa_text + + async with GetAsyncSession() as session: + result = await session.execute( + sa_text( + """ + UPDATE leaudit_audit_runs + SET status = 'running', + phase = 'prepare', + task_id = COALESCE(:task_id, task_id), + started_at = COALESCE(started_at, now()), + updated_at = now() + WHERE id = :rid + AND status IN ('queued', 'pending', 'retrying') + RETURNING id + """ + ), + {"rid": run_id, "task_id": task_id}, + ) + row = result.fetchone() + await session.commit() + return row is not None + except Exception: + log.exception("run_id=%s 抢占执行权失败", run_id) + return False + + +async def _load_run_context(run_id: int) -> dict[str, Any]: + """按 runId 加载执行所需文档文件上下文。""" + from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession + + async with GetAsyncSession() as session: + run = await session.get(LeauditAuditRun, run_id) + if not run: + raise ValueError(f"未找到 run_id={run_id} 对应的运行记录") + + document = await session.get(LeauditDocument, run.documentId) + if not document: + raise ValueError(f"未找到 document_id={run.documentId} 对应的文档记录") + + document_file = await session.get(LeauditDocumentFile, run.documentFileId) + if not document_file: + raise ValueError(f"未找到 document_file_id={run.documentFileId} 对应的文件记录") + + resolver = FileSourceResolver() + payload = await resolver.ResolvePayload(document_file) + + return { + "document_id": document.Id, + "filename": payload.fileName, + "file_content": payload.fileContent, + "source_type": payload.sourceType, + "source_path": payload.sourcePath, + "rule_version_id": run.ruleVersionId, + "rule_source_oss_url": run.ruleSourceOssUrl, + "trigger_source": run.triggerSource, + } + + def _get_suffix(filename: str) -> str: """Extract file suffix from filename.""" _, ext = os.path.splitext(filename) @@ -333,32 +463,37 @@ def _get_suffix(filename: str) -> str: def dispatch_leaudit_task( - document_id: int, - file_content: bytes, - filename: str, - upload_info: Optional[Dict[str, Any]] = None, - source_port: Optional[int] = None, + run_id: int, + *, + queue_name: str | None = None, rules_path: Optional[str] = None, -): - """Dispatch a leaudit processing task. +) -> str: + """投递 runId 到 Celery worker 队列。""" + target_queue = queue_name or LEAUDIT_WORKER_QUEUE_NORMAL + task = leaudit_process_document_task.apply_async( + kwargs={"run_id": run_id, "rules_path": rules_path}, + queue=target_queue, + ) + log.info( + "run_id=%s 已投递到 worker 队列: queue=%s, speed=%s, task_id=%s", + run_id, + target_queue, + _queue_label(target_queue), + task.id, + ) + return task.id - P2: Celery 集成后改用 leaudit_process_document.apply_async(...) - 当前阶段直接同步调用。 - """ - kwargs = { - "document_id": document_id, - "file_content": file_content, - "filename": filename, - "upload_info": upload_info, - "source_port": source_port or int(os.getenv("APP_PORT", "8000")), - "rules_path": rules_path, - } - try: - asyncio.get_running_loop() - except RuntimeError: - return leaudit_process_document(**kwargs) +def resolve_worker_queue(trigger_source: str | None) -> str: + """按触发来源选择 worker 队列。""" + normalized = (trigger_source or "").strip().lower() + if "urgent" in normalized or "high" in normalized: + return LEAUDIT_WORKER_QUEUE_URGENT + return LEAUDIT_WORKER_QUEUE_NORMAL - with ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(leaudit_process_document, **kwargs) - return future.result() + +def _queue_label(queue_name: str | None) -> str: + """Map queue name to a user-facing speed label for logs.""" + if queue_name == LEAUDIT_WORKER_QUEUE_URGENT: + return "urgent" + return "normal" diff --git a/fastapi_modules/fastapi_leaudit/services/auditService.py b/fastapi_modules/fastapi_leaudit/services/auditService.py index fd1fef6..deb48b4 100644 --- a/fastapi_modules/fastapi_leaudit/services/auditService.py +++ b/fastapi_modules/fastapi_leaudit/services/auditService.py @@ -9,7 +9,13 @@ class IAuditService(ABC): """评查服务接口。""" @abstractmethod - async def Run(self, DocumentId: int, RuleType: str | None = None, Force: bool = False) -> AuditRunVO: + async def Run( + self, + DocumentId: int, + RuleType: str | None = None, + Force: bool = False, + Speed: str = "normal", + ) -> AuditRunVO: """触发文档评查。""" ... diff --git a/fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py index f0594bc..f1fdbdd 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py @@ -21,8 +21,10 @@ from fastapi_modules.fastapi_leaudit.domian.vo.auditVo import ( AuditRunErrorVO, AuditRunVO, ) -from fastapi_modules.fastapi_leaudit.leaudit_bridge.fileSourceResolver import FileSourceResolver -from fastapi_modules.fastapi_leaudit.leaudit_bridge.tasks import dispatch_leaudit_task +from fastapi_modules.fastapi_leaudit.leaudit_bridge.tasks import ( + dispatch_leaudit_task, + resolve_worker_queue, +) from fastapi_modules.fastapi_leaudit.models import ( LeauditAuditRun, LeauditDocument, @@ -31,20 +33,67 @@ from fastapi_modules.fastapi_leaudit.models import ( from fastapi_modules.fastapi_leaudit.services import IAuditService +def _normalize_speed(speed: str | None) -> str: + """Normalize front-end speed selection to urgent/normal.""" + normalized = (speed or "").strip().lower() + if normalized in {"urgent", "high", "fast", "emergency", "紧急"}: + return "urgent" + return "normal" + + class AuditServiceImpl(IAuditService): """评查服务实现。""" - async def Run(self, DocumentId: int, RuleType: str | None = None, Force: bool = False) -> AuditRunVO: + async def Run( + self, + DocumentId: int, + RuleType: str | None = None, + Force: bool = False, + Speed: str = "normal", + ) -> AuditRunVO: """触发文档评查。 - 当前阶段同步触发 bridge 执行链,后续再切换为 Celery 异步分发。 + 当前阶段只负责创建 run 并投递 worker,不在 HTTP 请求内同步执行。 """ async with GetAsyncSession() as session: logger.info(f"触发评查: documentId={DocumentId}, ruleType={RuleType}") + normalizedSpeed = _normalize_speed(Speed) document = await session.get(LeauditDocument, DocumentId) if not document: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "评查文档不存在") + if not Force: + activeRunResult = await session.execute( + select(LeauditAuditRun) + .where( + LeauditAuditRun.documentId == DocumentId, + LeauditAuditRun.status.in_(("queued", "running", "retrying")), + ) + .order_by(LeauditAuditRun.Id.desc()) + .limit(1) + ) + activeRun = activeRunResult.scalar_one_or_none() + if activeRun: + return AuditRunVO( + runId=activeRun.Id, + documentId=activeRun.documentId, + runNo=activeRun.runNo, + documentFileId=activeRun.documentFileId, + status=activeRun.status, + phase=activeRun.phase, + resultStatus=activeRun.resultStatus, + ruleSetId=activeRun.ruleSetId, + ruleVersionId=activeRun.ruleVersionId, + ruleTypeId=activeRun.ruleTypeId, + rescueApplied=activeRun.rescueApplied or False, + totalScore=float(activeRun.totalScore) if activeRun.totalScore else None, + passedCount=activeRun.passedCount, + failedCount=activeRun.failedCount, + skippedCount=activeRun.skippedCount, + startedAt=activeRun.startedAt, + finishedAt=activeRun.finishedAt, + ) + fileResult = await session.execute( select(LeauditDocumentFile) .where( @@ -91,49 +140,48 @@ class AuditServiceImpl(IAuditService): if not binding or not binding["rule_set_id"] or not binding["rule_version_id"]: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前文档类型未绑定可用规则版本") + triggerSource = f"{'retry' if Force else 'upload'}:{normalizedSpeed}" + run = LeauditAuditRun( documentId=DocumentId, documentFileId=documentFile.Id, runNo=int(latestRunNo) + 1, - triggerSource="manual" if not Force else "retry", - status="pending", + triggerSource=triggerSource, + status="queued", + phase="dispatch", ruleSetId=int(binding["rule_set_id"]), ruleVersionId=int(binding["rule_version_id"]), ruleTypeId=binding["rule_type_id"], ruleSourceOssUrl=binding["rule_source_oss_url"], ruleSourceSha256=binding["rule_source_sha256"], - startedAt=datetime.now(), ) session.add(run) await session.flush() document.currentRunId = run.Id - document.processingStatus = "running" + document.processingStatus = "queued" await session.commit() await session.refresh(run) try: - Resolver = FileSourceResolver() - Payload = await Resolver.ResolvePayload(documentFile) + taskId = dispatch_leaudit_task( + run_id=run.Id, + queue_name=resolve_worker_queue(triggerSource), + rules_path=RuleType, + ) except Exception as Error: + run.status = "failed" + run.phase = "dispatch" + run.finishedAt = datetime.now() + document.processingStatus = "failed" + await session.commit() raise LeauditException( StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, - f"读取评查文件失败: {Error}", + f"投递评查任务失败: {Error}", ) from Error - dispatch_leaudit_task( - document_id=DocumentId, - file_content=Payload.fileContent, - filename=Payload.fileName, - upload_info={ - "run_id": run.Id, - "rule_version_id": run.ruleVersionId, - "rule_source_oss_url": run.ruleSourceOssUrl, - "source_type": Payload.sourceType, - "source_path": Payload.sourcePath, - }, - rules_path=RuleType, - ) + run.taskId = taskId + await session.commit() await session.refresh(run) return AuditRunVO( diff --git a/scripts/start_worker.sh b/scripts/start_worker.sh new file mode 100755 index 0000000..30bf957 --- /dev/null +++ b/scripts/start_worker.sh @@ -0,0 +1,28 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT_DIR" + +source .venv/bin/activate + +eval "$( + .venv/bin/python - <<'PY' +from fastapi_admin.config import ( + LEAUDIT_WORKER_CONCURRENCY, + LEAUDIT_WORKER_QUEUE_NORMAL, + LEAUDIT_WORKER_QUEUE_URGENT, +) + +print(f'WORKER_CONCURRENCY={LEAUDIT_WORKER_CONCURRENCY}') +print(f'WORKER_QUEUES={LEAUDIT_WORKER_QUEUE_URGENT},{LEAUDIT_WORKER_QUEUE_NORMAL}') +PY +)" + +WORKER_NODE_NAME="leaudit_worker_${HOSTNAME:-local}_$$@%h" + +celery -A fastapi_admin.celery_app:celery_app worker \ + -Q "${WORKER_QUEUES}" \ + -n "${WORKER_NODE_NAME}" \ + --concurrency="${WORKER_CONCURRENCY}" \ + --loglevel=INFO