Files
leaudit-platform-backend/docs/规则编辑/worker并发执行改造方案.md
T

337 lines
11 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
原生 LeAudit Worker 并发执行改造方案
这份文档不是纯方案稿,而是“已经落地到哪一步 + 当前系统为什么这么设计 + 后续接手时要看什么”的实施记录。后面如果忘了这套东西是干什么的,先看这份。
目标
- 把上传触发评查从 HTTP 同步执行切到 `Run -> dispatch -> worker`
- 平台侧只做:
- 上传入库
- OSS / 规则文件解析
- run 状态编排
- 结果持久化
- 原生 `/home/wren-dev/Porject/leaudit/src/leaudit` 继续负责:
- `AuditCtx`
- `AuditService.audit(ctx)`
- OCR / extract / evaluate / rescue 主流程
为什么要改成 worker
- HTTP 请求里同步跑全链路会把 OCR、LLM、VLM、补救流程都塞进 Web 线程
- 后续存在并发上传、多文档批量处理、多 worker 扩容场景
- 同步模式下容易遇到:
- 请求超时
- 事件循环混用
- 多线程/多请求互相阻塞
- 状态不可追踪
- 改造成 worker 后,API 只负责“受理 + 排队”,重任务交给 Celery worker 进程
当前已落地
- `fastapi_admin/celery_app.py`
- 新增 Celery appbroker/backend 复用当前 Redis
- `fastapi_admin/config/_settings.py`
- 新增 worker 队列、并发、超时、OCR 视觉并发、signature probe 限时等配置
- `fastapi_admin/config/__init__.pyi`
- 暴露新的 LEAUDIT 配置项
- `app.toml`
- 正式加入 `[LEAUDIT]` worker/并发相关配置
- 补齐 `[OCR].BASE_URL`
- `fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py`
- `Run()` 改为只创建 `leaudit_audit_runs` 并投递任务
- 相同文档已有活动 run 时直接复用,避免重复创建
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py`
- 改为 `runId` 驱动,而不是把整份文件内容直接塞到同步调用链
- 增加 Celery task 入口
- 增加 run 抢占,避免重复消费
- 增加队列路由
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py`
- OCR client 读取平台配置,避免 `base_url is required`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py`
- 原生服务装配读取平台并发配置
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py`
- 修复原生 `FieldValue` 新旧结构兼容
- 修复 JSONB 字段写库序列化
- `scripts/start_worker.sh`
- worker 启动脚本改为动态读取配置
- worker nodename 改成唯一,避免 Celery `DuplicateNodenameWarning`
- `docs/规则编辑/worker并发执行改造方案.md`
- 记录当前这套机制,避免后续自己忘记
执行链路
1. `POST /api/upload`
2. `DocumentServiceImpl.Upload()``leaudit_documents / leaudit_document_files`
3. 原始文件上传 OSS
4. `AuditServiceImpl.Run()` 创建 `leaudit_audit_runs`
5. `dispatch_leaudit_task(run_id)` 投递 Celery
6. worker 消费 `leaudit.process_document`
7. worker 按 `runId` 查库,取:
- 文档
- 活跃文件版本
- 当前规则版本
- 规则 OSS 地址
8. bridge 下载 OSS 文件 / 规则文件到本地临时文件
9. `NativeRunner` 构建原生 `AuditCtx` 并调用原生 audit
10. `StorageAdapter` 把结果写回 `leaudit_*`
当前实现涉及的关键文件
- 上传入口
- `fastapi_modules/fastapi_leaudit/controllers/documentController.py`
- `fastapi_modules/fastapi_leaudit/services/documentService.py`
- `fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py`
- bridge 层
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/fileSourceResolver.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/ruleVersionResolver.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditCtxBuilder.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py`
- `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py`
- worker 与配置
- `fastapi_admin/celery_app.py`
- `fastapi_admin/config/_settings.py`
- `app.toml`
- `scripts/start_worker.sh`
并发配置正式化
- `LEAUDIT_LLM_MAX_CONCURRENCY`
- 单文档内 LLM 全局信号量上限
- `LEAUDIT_VLM_MAX_CONCURRENCY`
- 单文档内 VLM 主流程上限
- `LEAUDIT_OCR_VLM_CONCURRENCY`
- OCR 视觉增强 / 印章分类阶段的 VLM 并发上限
- `LEAUDIT_SIGNATURE_PROBE_CONCURRENCY`
- DOCX 签名候选区域补探测并发上限
- `LEAUDIT_SIGNATURE_PROBE_TIMEOUT`
- 单次签名 probe 的 VLM 超时,避免长时间卡住 OCR 尾部
- `LEAUDIT_WORKER_CONCURRENCY`
- 单 worker 进程可并发执行的任务数
- `LEAUDIT_WORKER_QUEUE_URGENT`
- 前端上传中的“紧急”队列
- `LEAUDIT_WORKER_QUEUE_NORMAL`
- 前端上传中的“普通”队列
- `LEAUDIT_TASK_SOFT_TIME_LIMIT`
- Celery 软超时
- `LEAUDIT_TASK_TIME_LIMIT`
- Celery 硬超时
- `LEAUDIT_RUN_LOCK_SECONDS`
- 为后续升级“租约锁”预留的锁超时配置
当前 `app.toml` 基线
```toml
[OCR]
BASE_URL = "http://i-2.gpushare.com:44112/"
[LEAUDIT]
LLM_MAX_CONCURRENCY = 5
VLM_MAX_CONCURRENCY = 3
OCR_VLM_CONCURRENCY = 3
SIGNATURE_PROBE_CONCURRENCY = 2
SIGNATURE_PROBE_TIMEOUT = 20
WORKER_QUEUE_URGENT = "leaudit.urgent"
WORKER_QUEUE_NORMAL = "leaudit.normal"
WORKER_CONCURRENCY = 2
RUN_LOCK_SECONDS = 1800
TASK_SOFT_TIME_LIMIT = 3300
TASK_TIME_LIMIT = 3600
```
当前队列路由策略
- 前端上传 `speed=urgent` -> `leaudit.urgent`
- 前端上传 `speed=normal` -> `leaudit.normal`
- `/audit/run` 也支持 `speed`
- 未识别值默认归到 `normal`
接口收口
- 上传接口:`POST /upload`
- 入参:
- `file`
- `typeId` / `typeCode`
- `region`
- `fileRole`
- `createdBy`
- `autoRun`
- `speed=normal|urgent`
- 已去掉旧系统 `bizDocumentId` 输入
- 上传返回:
- `documentId`
- `internalDocumentNo`
- `fileId`
- `typeId`
- `typeCode`
- `region`
- `fileName`
- `ossUrl`
- `speed`
- `processingStatus`
- `autoRunTriggered`
- `run`
- 手动触发:`POST /audit/run`
- 支持 `speed=normal|urgent`
- 状态查询:
- `GET /audit/run/{runId}`
- 结果查询:
- `GET /audit/result/{runId}`
当前业务收敛结论
- 不再继续扩 `high/default/batch` 三层方案
- 当前真实业务只有前端上传,并且只有“紧急 / 普通”两档速度需求
- 所以 worker 体系正式收敛成两级模型:
- `urgent`
- `normal`
- 这样既满足优先级需求,也避免过度设计
状态流转
- 新建 run
- `status=queued`
- `phase=dispatch`
- worker 抢占成功
- `status=running`
- `phase=prepare`
- 原生流程推进中
- `phase` 会继续流转为 OCR / extraction / evaluation / rescue / persist 等阶段
- 执行成功
- `status=completed`
- `result_status` 由 bridge 汇总写回
- 执行失败
- `status=failed`
- `phase` 停在失败发生阶段
- `leaudit_run_errors` 记录错误明细
避免重复消费
- API 触发 `Run()` 时:
- 若同一文档已有 `queued/running/retrying` 的 run,且 `Force=False`
- 直接返回当前活动 run,不重复创建
- worker 开始执行前:
- 先按 run 状态执行原子抢占
- 仅允许 `status in ('queued', 'pending', 'retrying')` 的 run 进入执行
- 抢占失败时:
- 当前 worker 直接跳过
- 返回 `already_claimed`
当前表语义
- `leaudit_documents`
- 平台内部文档主表
- 一次前端上传会创建一条新的 document
- 不再依赖旧系统 `documents.id`
- `biz_document_id` 仅作为内部追踪号保留,避免立刻改库
- `leaudit_document_files`
- 具体上传文件记录
- 保存文件名、后缀、MIME、SHA256、OSS 地址等文件级信息
- 当前仍保留单独文件表,为后续是否合表留余地
已踩过并修掉的坑
- 事件循环混用
- 同步 HTTP 链路里直接执行原生异步流程,容易出现 `Future attached to a different loop`
- 现在改成 worker 独立进程执行,HTTP 只排队
- OCR base_url 缺失
- 原因是平台 bridge 没把 `[OCR].BASE_URL` 正确喂给原生 client
- 现已在 `app.toml``client_factory.py` 补齐
- 原生 `FieldValue` 结构变化
- 新版本 `FieldValue` 不一定有 `.position`
- `storage_adapter.py` 已改为兼容:
- 优先读 `position`
- 没有时回退到 `metadata.match_position / bbox / page_num`
- asyncpg + JSONB 序列化
- 不能把 `list/dict` 直接按字符串列方式塞给 `CAST(:x AS JSONB)`
- 需要先 `json.dumps(..., ensure_ascii=False)` 再入库
- 当前已在这些路径处理:
- `leaudit_rule_results.stages`
- `leaudit_rule_results.extracted_fields`
- `leaudit_rule_results.field_positions`
- `leaudit_rule_results.remediation`
- `leaudit_rule_results.rule_meta`
- `leaudit_rescue_outcomes.payload`
当前运行现状
- `POST /api/upload` 已经可以快速返回 `queued`
- worker 已能真正执行到:
- OCR
- extraction
- evaluation
- rescue
- 已验证真实样本端到端成功:
- `run_id=10`
- `document_id=9`
- `phase=executed`
- `24 passed / 4 failed / 0 skipped`
- 当前主要耗时瓶颈明确在 OCR 阶段
- 一次验证中:
- `ocr=277.79s`
- `extraction=13.65s`
- `evaluation=11.81s`
- `total=303.25s`
- 已针对 OCR 长尾补第一轮治理:
- DOCX `signature probe` 增加单独超时
- DOCX `signature probe` 增加并发限制
- 避免某个候选框失败时串行拖住整个尾段
- 如果代码刚修过但日志仍报旧错,第一怀疑项不是代码没改,而是 worker 进程还没重启,仍在跑旧代码
- worker 日志会明确打印:
- `run_id=xx 已投递到 worker 队列: queue=..., speed=...`
- `run_id=xx worker开始执行: queue=..., speed=..., filename=...`
启动方式
后端 API
```bash
.venv/bin/python run.py
```
worker
```bash
bash scripts/start_worker.sh
```
脚本会自动读取:
- `LEAUDIT_WORKER_CONCURRENCY`
- `LEAUDIT_WORKER_QUEUE_URGENT`
- `LEAUDIT_WORKER_QUEUE_NORMAL`
并拼成:
```bash
celery -A fastapi_admin.celery_app:celery_app worker \
-Q leaudit.urgent,leaudit.normal \
--concurrency=2
```
同时 worker nodename 会带主机名和进程号,避免多个 worker 使用同名节点。
联调时的最小操作顺序
1. 启动 API`.venv/bin/python run.py`
2. 启动 worker`bash scripts/start_worker.sh`
3. 调用上传接口,确认返回 `processingStatus=queued`
4. 看 worker 日志是否出现:
- 抢占 run
- OCR result saved
- Extraction result saved
- Evaluation results saved
- finalize run
5. 如果日志还是旧报错,先重启 worker 再测一次
后续明确待办
- 给 run 抢占补上真正的租约锁字段:
- `worker_id`
- `lock_expires_at`
- `attempt_count`
- 如果后续紧急任务量上来,再考虑把 `urgent` 单独拆成专属 worker
-`GET /run/{id}/status` 或前端轮询说明
- 评估 OCR 阶段是否继续拆更细粒度并发,但前提是先确认外部 OCR / VLM 服务的限流上限