原生 LeAudit Worker 并发执行改造方案 这份文档不是纯方案稿,而是“已经落地到哪一步 + 当前系统为什么这么设计 + 后续接手时要看什么”的实施记录。后面如果忘了这套东西是干什么的,先看这份。 目标 - 把上传触发评查从 HTTP 同步执行切到 `Run -> dispatch -> worker` - 平台侧只做: - 上传入库 - OSS / 规则文件解析 - run 状态编排 - 结果持久化 - 原生 `/home/wren-dev/Porject/leaudit/src/leaudit` 继续负责: - `AuditCtx` - `AuditService.audit(ctx)` - OCR / extract / evaluate / rescue 主流程 为什么要改成 worker - HTTP 请求里同步跑全链路会把 OCR、LLM、VLM、补救流程都塞进 Web 线程 - 后续存在并发上传、多文档批量处理、多 worker 扩容场景 - 同步模式下容易遇到: - 请求超时 - 事件循环混用 - 多线程/多请求互相阻塞 - 状态不可追踪 - 改造成 worker 后,API 只负责“受理 + 排队”,重任务交给 Celery worker 进程 当前已落地 - `fastapi_admin/celery_app.py` - 新增 Celery app,broker/backend 复用当前 Redis - `fastapi_admin/config/_settings.py` - 新增 worker 队列、并发、超时、OCR 视觉并发、signature probe 限时等配置 - `fastapi_admin/config/__init__.pyi` - 暴露新的 LEAUDIT 配置项 - `app.toml` - 正式加入 `[LEAUDIT]` worker/并发相关配置 - 补齐 `[OCR].BASE_URL` - `fastapi_modules/fastapi_leaudit/services/impl/auditServiceImpl.py` - `Run()` 改为只创建 `leaudit_audit_runs` 并投递任务 - 相同文档已有活动 run 时直接复用,避免重复创建 - `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py` - 改为 `runId` 驱动,而不是把整份文件内容直接塞到同步调用链 - 增加 Celery task 入口 - 增加 run 抢占,避免重复消费 - 增加队列路由 - `fastapi_modules/fastapi_leaudit/leaudit_bridge/client_factory.py` - OCR client 读取平台配置,避免 `base_url is required` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py` - 原生服务装配读取平台并发配置 - `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py` - 修复原生 `FieldValue` 新旧结构兼容 - 修复 JSONB 字段写库序列化 - `scripts/start_worker.sh` - worker 启动脚本改为动态读取配置 - worker nodename 改成唯一,避免 Celery `DuplicateNodenameWarning` - `docs/规则编辑/worker并发执行改造方案.md` - 记录当前这套机制,避免后续自己忘记 执行链路 1. `POST /api/upload` 2. `DocumentServiceImpl.Upload()` 写 `leaudit_documents / leaudit_document_files` 3. 原始文件上传 OSS 4. `AuditServiceImpl.Run()` 创建 `leaudit_audit_runs` 5. `dispatch_leaudit_task(run_id)` 投递 Celery 6. worker 消费 `leaudit.process_document` 7. worker 按 `runId` 查库,取: - 文档 - 活跃文件版本 - 当前规则版本 - 规则 OSS 地址 8. bridge 下载 OSS 文件 / 规则文件到本地临时文件 9. `NativeRunner` 构建原生 `AuditCtx` 并调用原生 audit 10. `StorageAdapter` 把结果写回 `leaudit_*` 当前实现涉及的关键文件 - 上传入口 - `fastapi_modules/fastapi_leaudit/controllers/documentController.py` - `fastapi_modules/fastapi_leaudit/services/documentService.py` - `fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py` - bridge 层 - `fastapi_modules/fastapi_leaudit/leaudit_bridge/fileSourceResolver.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/ruleVersionResolver.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditCtxBuilder.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/auditServiceFactory.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/nativeRunner.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py` - `fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py` - worker 与配置 - `fastapi_admin/celery_app.py` - `fastapi_admin/config/_settings.py` - `app.toml` - `scripts/start_worker.sh` 并发配置正式化 - `LEAUDIT_LLM_MAX_CONCURRENCY` - 单文档内 LLM 全局信号量上限 - `LEAUDIT_VLM_MAX_CONCURRENCY` - 单文档内 VLM 主流程上限 - `LEAUDIT_OCR_VLM_CONCURRENCY` - OCR 视觉增强 / 印章分类阶段的 VLM 并发上限 - `LEAUDIT_SIGNATURE_PROBE_CONCURRENCY` - DOCX 签名候选区域补探测并发上限 - `LEAUDIT_SIGNATURE_PROBE_TIMEOUT` - 单次签名 probe 的 VLM 超时,避免长时间卡住 OCR 尾部 - `LEAUDIT_WORKER_CONCURRENCY` - 单 worker 进程可并发执行的任务数 - `LEAUDIT_WORKER_QUEUE_URGENT` - 前端上传中的“紧急”队列 - `LEAUDIT_WORKER_QUEUE_NORMAL` - 前端上传中的“普通”队列 - `LEAUDIT_TASK_SOFT_TIME_LIMIT` - Celery 软超时 - `LEAUDIT_TASK_TIME_LIMIT` - Celery 硬超时 - `LEAUDIT_RUN_LOCK_SECONDS` - 为后续升级“租约锁”预留的锁超时配置 当前 `app.toml` 基线 ```toml [OCR] BASE_URL = "http://i-2.gpushare.com:44112/" [LEAUDIT] LLM_MAX_CONCURRENCY = 5 VLM_MAX_CONCURRENCY = 3 OCR_VLM_CONCURRENCY = 3 SIGNATURE_PROBE_CONCURRENCY = 2 SIGNATURE_PROBE_TIMEOUT = 20 WORKER_QUEUE_URGENT = "leaudit.urgent" WORKER_QUEUE_NORMAL = "leaudit.normal" WORKER_CONCURRENCY = 2 RUN_LOCK_SECONDS = 1800 TASK_SOFT_TIME_LIMIT = 3300 TASK_TIME_LIMIT = 3600 ``` 当前队列路由策略 - 前端上传 `speed=urgent` -> `leaudit.urgent` - 前端上传 `speed=normal` -> `leaudit.normal` - `/audit/run` 也支持 `speed` - 未识别值默认归到 `normal` 接口收口 - 上传接口:`POST /upload` - 入参: - `file` - `typeId` / `typeCode` - `region` - `fileRole` - `createdBy` - `autoRun` - `speed=normal|urgent` - 已去掉旧系统 `bizDocumentId` 输入 - 上传返回: - `documentId` - `internalDocumentNo` - `fileId` - `typeId` - `typeCode` - `region` - `fileName` - `ossUrl` - `speed` - `processingStatus` - `autoRunTriggered` - `run` - 手动触发:`POST /audit/run` - 支持 `speed=normal|urgent` - 状态查询: - `GET /audit/run/{runId}` - 结果查询: - `GET /audit/result/{runId}` 当前业务收敛结论 - 不再继续扩 `high/default/batch` 三层方案 - 当前真实业务只有前端上传,并且只有“紧急 / 普通”两档速度需求 - 所以 worker 体系正式收敛成两级模型: - `urgent` - `normal` - 这样既满足优先级需求,也避免过度设计 状态流转 - 新建 run - `status=queued` - `phase=dispatch` - worker 抢占成功 - `status=running` - `phase=prepare` - 原生流程推进中 - `phase` 会继续流转为 OCR / extraction / evaluation / rescue / persist 等阶段 - 执行成功 - `status=completed` - `result_status` 由 bridge 汇总写回 - 执行失败 - `status=failed` - `phase` 停在失败发生阶段 - `leaudit_run_errors` 记录错误明细 避免重复消费 - API 触发 `Run()` 时: - 若同一文档已有 `queued/running/retrying` 的 run,且 `Force=False` - 直接返回当前活动 run,不重复创建 - worker 开始执行前: - 先按 run 状态执行原子抢占 - 仅允许 `status in ('queued', 'pending', 'retrying')` 的 run 进入执行 - 抢占失败时: - 当前 worker 直接跳过 - 返回 `already_claimed` 当前表语义 - `leaudit_documents` - 平台内部文档主表 - 一次前端上传会创建一条新的 document - 不再依赖旧系统 `documents.id` - `biz_document_id` 仅作为内部追踪号保留,避免立刻改库 - `leaudit_document_files` - 具体上传文件记录 - 保存文件名、后缀、MIME、SHA256、OSS 地址等文件级信息 - 当前仍保留单独文件表,为后续是否合表留余地 已踩过并修掉的坑 - 事件循环混用 - 同步 HTTP 链路里直接执行原生异步流程,容易出现 `Future attached to a different loop` - 现在改成 worker 独立进程执行,HTTP 只排队 - OCR base_url 缺失 - 原因是平台 bridge 没把 `[OCR].BASE_URL` 正确喂给原生 client - 现已在 `app.toml` 和 `client_factory.py` 补齐 - 原生 `FieldValue` 结构变化 - 新版本 `FieldValue` 不一定有 `.position` - `storage_adapter.py` 已改为兼容: - 优先读 `position` - 没有时回退到 `metadata.match_position / bbox / page_num` - asyncpg + JSONB 序列化 - 不能把 `list/dict` 直接按字符串列方式塞给 `CAST(:x AS JSONB)` - 需要先 `json.dumps(..., ensure_ascii=False)` 再入库 - 当前已在这些路径处理: - `leaudit_rule_results.stages` - `leaudit_rule_results.extracted_fields` - `leaudit_rule_results.field_positions` - `leaudit_rule_results.remediation` - `leaudit_rule_results.rule_meta` - `leaudit_rescue_outcomes.payload` 当前运行现状 - `POST /api/upload` 已经可以快速返回 `queued` - worker 已能真正执行到: - OCR - extraction - evaluation - rescue - 已验证真实样本端到端成功: - `run_id=10` - `document_id=9` - `phase=executed` - `24 passed / 4 failed / 0 skipped` - 当前主要耗时瓶颈明确在 OCR 阶段 - 一次验证中: - `ocr=277.79s` - `extraction=13.65s` - `evaluation=11.81s` - `total=303.25s` - 已针对 OCR 长尾补第一轮治理: - DOCX `signature probe` 增加单独超时 - DOCX `signature probe` 增加并发限制 - 避免某个候选框失败时串行拖住整个尾段 - 如果代码刚修过但日志仍报旧错,第一怀疑项不是代码没改,而是 worker 进程还没重启,仍在跑旧代码 - worker 日志会明确打印: - `run_id=xx 已投递到 worker 队列: queue=..., speed=...` - `run_id=xx worker开始执行: queue=..., speed=..., filename=...` 启动方式 后端 API: ```bash .venv/bin/python run.py ``` worker: ```bash bash scripts/start_worker.sh ``` 脚本会自动读取: - `LEAUDIT_WORKER_CONCURRENCY` - `LEAUDIT_WORKER_QUEUE_URGENT` - `LEAUDIT_WORKER_QUEUE_NORMAL` 并拼成: ```bash celery -A fastapi_admin.celery_app:celery_app worker \ -Q leaudit.urgent,leaudit.normal \ --concurrency=2 ``` 同时 worker nodename 会带主机名和进程号,避免多个 worker 使用同名节点。 联调时的最小操作顺序 1. 启动 API:`.venv/bin/python run.py` 2. 启动 worker:`bash scripts/start_worker.sh` 3. 调用上传接口,确认返回 `processingStatus=queued` 4. 看 worker 日志是否出现: - 抢占 run - OCR result saved - Extraction result saved - Evaluation results saved - finalize run 5. 如果日志还是旧报错,先重启 worker 再测一次 后续明确待办 - 给 run 抢占补上真正的租约锁字段: - `worker_id` - `lock_expires_at` - `attempt_count` - 如果后续紧急任务量上来,再考虑把 `urgent` 单独拆成专属 worker - 补 `GET /run/{id}/status` 或前端轮询说明 - 评估 OCR 阶段是否继续拆更细粒度并发,但前提是先确认外部 OCR / VLM 服务的限流上限