# LeAudit Platform — 基础设施深度重设计方案 > 基于老项目 docauditai 深度逆向分析,对标新平台 leaudit-platform 重新规划。 --- ## 一、文件存储 OSS 路径设计 ### 1.1 老项目路径模式 ``` documents/{instance_name}/{doc_type_name}/{year}/{中文日期}/{doc_dir}/{filename} 实例: documents/mz/行政许可卷宗/2026/04月27日/采购合同_14时30分25秒/采购合同.pdf ``` **老项目核心问题:** - 中文路径(区域名、文档类型名、日期)— URL 编码后不可读,程序解析困难 - `instance_name` 用区域缩写(mz/yf/jy/cz/sj),耦合 `INSTANCE_NAME` 环境变量 - 纯时间戳区分版本,无语义化版本号,查找历史版本全靠 DB 反查 - 业务文档和评查产物混在一个路径空间,无类型区分 - 无文件级权限元数据,拿到 presigned URL 即可访问 ### 1.2 新平台路径设计 #### 两级路径体系 ``` ┌── 业务文档 (Business Documents) ── 用户上传的原始文件 │ bdocs/{region}/{type_code}/{doc_id}/{version}/{file_role}.{ext} │ bdocs/gd-mz/contract.entrust/10042/v1/primary.pdf │ bdocs/gd-mz/contract.entrust/10042/v1/attachment_a.docx │ └── 评查产物 (Audit Artifacts) ── 引擎产出的中间/最终文件 artifacts/{region}/{run_id}/{artifact_type}/{detail}.{ext} artifacts/gd-mz/5801/ocr_result/ocr.json artifacts/gd-mz/5801/render_png/page_003.png artifacts/gd-mz/5801/final_report/report.pdf ``` #### 路径段规范 | 段 | 含义 | 格式 | 示例 | |---|---|---|---| | `bdocs` / `artifacts` | 顶层命名空间 | 固定 | `bdocs` = 业务文档, `artifacts` = 评查产物 | | `{region}` | 区域代码 | `{province}-{city}` | `gd-mz` (广东-梅州), `gd-yf` (云浮), `gd-jy` (揭阳), `gd-cz` (潮州), `gd-sj` (省级) | | `{type_code}` | 文档类型编码 | DSL type_id | `contract.entrust`, `admin_license.new` | | `{doc_id}` | 文档 ID | DB 主键 | `10042` | | `{version}` | 版本号 | `v{n}` | `v1`, `v2`, `v3` | | `{file_role}` | 文件角色 | 枚举 | `primary` / `attachment_a` / `scan` / `ocr_text` | | `{run_id}` | 评查运行 ID | DB 主键 | `5801` | | `{artifact_type}` | 产物类型 | 枚举(20种) | `ocr_result`, `extract_json`, `evaluate_json`, `final_report` | | `{detail}` | 产物详情 | 自由格式 | `page_003.png`, `rule_R001.json` | #### 关键设计决策 1. **全英文路径** — 无 URL 编码问题,日志/调试可直接阅读 2. **区域用 `{province}-{city}` 代码** — 比旧系统 `mz`/`yf` 更明确,未来跨省扩展无歧义 3. **`doc_id` 入路径** — 路径即自描述,无需查 DB 即可知道文件归属 4. **显式 `{version}` 段** — 版本号在路径中可见,支持 v1/v2/v3 并存 5. **产物按 `run_id` 组织** — 一次评查的所有产物在同一目录下,清理/归档方便 --- ## 二、同文件多版本机制 ### 2.1 老项目做法 - 每次上传用时间戳生成新目录 → 物理隔离 - DB 中按 `(name, type_id)` 分组,`create_time DESC` 取最新 - 版本号在查询时动态计算,不存储 - 旧版本永久保留,无清理策略 ### 2.2 新平台版本设计 #### 版本存储模型 ``` bdocs/gd-mz/contract.entrust/10042/ ├── v1/primary.pdf ← 首次上传 ├── v2/primary.pdf ← 第二次上传(修正版) └── v3/primary.pdf ← 第三次上传(最终版) ``` #### 版本元数据 在 `leaudit_document_files` 表中记录: ```python class LeauditDocumentFile(Base): document_id: int # 文档 ID version_no: int # 版本号 (1, 2, 3...) version_seq: str # 语义版本 "v1", "v2" file_role: str # primary / attachment / ... oss_url: str # 完整 OSS 路径 sha256: str # 文件哈希 is_current: bool # 是否当前活跃版本 replaced_by_id: int # 被哪个新版本取代(版本链) upload_user_id: int # 上传者 change_note: str # 变更说明 ``` #### 版本生命周期 ``` upload v1 → v1.is_current = True upload v2 → v1.is_current = False, v1.replaced_by_id = v2.id v2.is_current = True upload v3 → v2.is_current = False, v2.replaced_by_id = v3.id v3.is_current = True ``` - 所有旧版本文件保留在 OSS(不物理删除) - 版本链可在前端展示为"历史版本"列表 - 回滚 = 将指定旧版本标记为 `is_current = True`(无需复制文件) #### 与评查运行的关联 每个 `leaudit_audit_runs` 记录锁定使用的版本: ``` audit_runs.document_file_id → 指向具体版本的 leaudit_document_files.id ``` 这样即使文档后来更新到 v3,历史评查记录仍然精确指向当时的 v1 文件。 --- ## 三、多地区文件查看权限 & 区域隔离 ### 3.1 老项目做法 - 单一 bucket,路径前缀 `{instance_name}` 区分区域 - 文件访问无权限校验(拿到 presigned URL 即访问) - 隔离依赖 `INSTANCE_NAME` 环境变量 → 只在 API 层有效 ### 3.2 新平台权限模型 #### 三层权限控制 ``` Layer 1: 区域隔离 (Region Isolation) └── 用户所属区域决定可见文档范围 省级 (gd-sj) 用户可看所有区域 地市级 (gd-mz) 用户只能看本区域 Layer 2: 文件级权限 (Document-Level) └── 基于 RBAC 的文档访问控制 document:read:own → 本人上传的 document:read:all → 本区域全部的 document:read:cross → 跨区域查看 Layer 3: 产物级权限 (Artifact-Level) └── 评查产物按 run_id 隔离 产物继承其文档的权限策略 临时产物 (rescue debug log) 仅内部系统可读 ``` #### 权限检查流程 ``` 请求: GET /api/v2/documents/10042/files/v1/primary.pdf 1. JWT 解析 → 获取 user_id, user_role, user_region 2. 区域检查: user_region == 'gd-sj' OR user_region == 文档的区域 3. 权限检查: CheckPermission(user_id, "document:read:own") 或通过 GRANT/DENY 通配符匹配 4. 通过 → 生成 presigned URL (TTL 10分钟) 5. 拒绝 → 返回 403 ``` #### 跨区域访问 ```python # 省级用户发起跨区域评查 POST /api/v2/documents/cross-review { "document_id": 10042, # gd-mz 的文档 "reviewer_region": "gd-yf", # 让云浮审核员查看 "permission": "document:read:cross" } → 系统为 gd-yf 区域的审核员创建临时访问授权 → 记录到 leaudit_cross_access_logs → 临时授权在评查完成后自动过期 ``` --- ## 四、队列机制重设计 ### 4.1 老项目分析 **架构:** ``` ┌─────────────┐ ┌──────────────┐ ┌──────────────┐ │ API Server │────▶│ Redis Queue │────▶│ Celery Worker│ │ (8000-8873) │ │ (单队列) │ │ (8线程, 4并发)│ └─────────────┘ └──────────────┘ └──────────────┘ │ │ └── source_port ────────────────────────▶│ os.environ 切换 │ 线程级隔离 ``` **关键机制:** - 所有区域共享一个 Redis 队列 - `source_port` 参数 → worker 在任务执行时切换环境变量 - Redis 信号量限制全局并发为 4 - Thread pool (8 线程) → 4 个实际并发 + 4 个 I/O 等待 **问题:** - 环境变量切换是脆弱的状态管理(线程安全问题,需 threading.local 补偿) - 单一队列无优先级区分(紧急任务和批处理同队列) - 信号量修复依赖定时任务(有窗口期泄漏风险) ### 4.2 新平台队列设计 #### 多队列架构 ``` ┌──────────────────────────────────────────────┐ │ Redis │ │ │ │ leaudit:queue:high (优先级高) │ │ leaudit:queue:default (普通) │ │ leaudit:queue:batch (批量/低优先级) │ │ leaudit:queue:system (系统维护) │ │ │ │ leaudit:semaphore:global (并发控制) │ │ leaudit:semaphore:vlm (VLM并发) │ └──────────────────────────────────────────────┘ ``` #### 任务路由(不再用 source_port) ```python # 新方案:在任务提交时直接带上下文,而非运行时切换环境变量 @celery_app.task( bind=True, queue="leaudit:queue:default", time_limit=1800, soft_time_limit=1500, max_retries=3, default_retry_delay=60, ) async def leaudit_process_document( self, document_id: int, run_id: int, region: str, # gd-mz, gd-yf... (替代 source_port) config: dict, # 运行时配置快照 user_id: int | None = None, ): """文档评查任务 - 上下文通过参数传递,不依赖环境变量""" ... ``` **改进点:** 1. **显式参数替代环境变量** — `region` + `config` 直接传参,线程安全,可测试 2. **优先级队列** — 用户手动触发的走 high,API 自动触发的走 default,批量导入走 batch 3. **去 source_port** — 不再需要 `set_instance_environment` / `restore_instance_environment` 这种脆弱模式 4. **配置快照** — 任务创建时拍下完整配置(LLM model、OCR endpoint 等),即使配置后续变更也不影响已提交任务 #### 任务类型与路由 | 任务 | 队列 | 优先级 | 并发限制 | 超时 | |---|---|---|---|---| | 用户手动评查 | `high` | 8 | 全局 4 | 30min | | API 自动评查 | `default` | 5 | 全局 4 | 30min | | 批量导入 | `batch` | 3 | 全局 2 | 60min | | 交叉评查 | `default` | 5 | 全局 4 | 30min | | 信号量修复 | `system` | 10 | 无限制 | 10s | | 健康检查 | `system` | 10 | 无限制 | 5s | | 统计更新 | `batch` | 1 | 全局 1 | 5min | #### 并发控制改进 ```python # 新方案:上下文管理器 + 自动释放 class TaskConcurrencyLimiter: """基于 Redis 的并发限制器,使用 Lua 脚本保证原子性""" async def acquire(self, semaphore_key: str, max_concurrency: int, timeout: float) -> str: """原子获取许可 → 返回 permit_id""" ... async def release(self, semaphore_key: str, permit_id: str): """原子释放许可""" ... @asynccontextmanager async def limit(self, semaphore_key: str, max_concurrency: int): permit_id = await self.acquire(semaphore_key, max_concurrency, timeout=300) try: yield finally: await self.release(semaphore_key, permit_id) ``` - Redis Lua 脚本替代 WATCH/MULTI/EXEC(减少乐观锁冲突) - 上下文管理器替代手动 acquire/release(防泄漏) - 无定时修复任务(Lua 原子操作 → 不会出现不一致状态) --- ## 五、缓存机制重设计 ### 5.1 老项目分析 **三层 Redis 使用:** | 层 | 用途 | 数据 | TTL | |---|---|---|---| | 权限缓存 | RBAC 鉴权加速 | user:permissions:*, rbac:routes:* | 5-30 min | | Token 黑名单 | JWT 吊销 | token:revoked:{jti} | 最长 24h | | 并发控制 | 信号量 | semaphore:* | 1800s 许可 TTL | **问题:** - 5+ 个独立 Redis 连接(无统一连接池) - `fastapi_cache2` 已初始化但从未使用 - 区域隔离依赖 Redis DB 号切换(配置复杂,易出错) - 无 API 响应缓存(每次请求都查 DB) - 统计缓存 TTL=20min,与权限 TTL 不一致 ### 5.2 新平台缓存架构 #### 统一连接池 ```python # fastapi_common/fastapi_common_cache/redis_pool.py class RedisPool: """全局 Redis 连接池 — 所有模块共用""" _instance: 'RedisPool' = None def __init__(self, config: dict): # 异步连接池 (主) self.async_pool = aioredis.ConnectionPool( max_connections=50, socket_connect_timeout=5, socket_keepalive=True, retry_on_timeout=True, ) # 同步客户端 (Celery worker 用) self.sync_client = redis.Redis(...) ``` **关键决策:不再用 Redis DB 号做区域隔离** — 改用 key prefix: ``` # 旧: REDIS_DB=12 (gd-mz), REDIS_DB=13 (gd-yf) # 新: 所有区域共享 DB 0, 用 prefix 区分 leaudit:gd-mz:permission:user:12345 leaudit:gd-yf:permission:user:12345 leaudit:gd-sj:cache:stats:homepage ``` 好处:统一监控、可在单次 SCAN 中查询跨区域数据、配置简单(只有一个 DB 号)。 #### 缓存分层策略 ``` L1: 进程内存缓存 (Python dict / lru_cache) ├── 配置数据 (规则集列表、引擎版本) — 启动加载,手动刷新 └── DSL 规则文件内容 — 本地文件缓存 + mtime 检测 L2: Redis 缓存 (TTL 驱动) ├── 权限数据: 5 min TTL (变化频率低) ├── Token 黑名单: 按 JWT exp 计算,最长 24h ├── 统计聚合: 10-30 min TTL (计算开销大) └── API 响应: 1-5 min TTL (热点接口) L3: 分布式锁 └── 并发控制信号量 (Lua 原子操作) ``` #### Cache Key 命名规范 ``` leaudit:{region}:{domain}:{entity_type}:{entity_id} └── 空 = 全局共享 # 权限 leaudit:gd-mz:perm:user:12345 # 用户权限集 leaudit:gd-mz:perm:user:12345:doc:read # 单条权限检查 # Token leaudit:global:token:revoked:{jti} # 全局共享 # 统计 leaudit:gd-mz:stats:homepage:{user_id} # 首页统计 leaudit:gd-sj:stats:daily:2026-04-27 # 日报 # 并发 leaudit:global:sem:task:permits # 任务并发信号量 leaudit:global:sem:vlm:permits # VLM 并发信号量 # API 响应缓存 leaudit:gd-mz:api:documents:list:{hash} # 文档列表 ``` #### 缓存失效策略 | 触发器 | 清理动作 | 粒度 | |---|---|---| | 用户角色变更 | `SCAN leaudit:*:perm:user:{uid}` → DEL | 精确 | | 规则集发布新版本 | `DEL leaudit:*:perm:role:*` (所有角色) | 全局 | | Token 吊销 | `SETEX token:revoked:{jti} TTL 1` | 精确 | | 文档状态变更 | `DEL leaudit:{region}:stats:*` | 区域 | | 配置变更 | `DEL leaudit:global:config:*` | 全局 | #### 不缓存的数据 - **评查结果详情** — 每次评查结果不同,缓存无意义 - **文档文件内容** — 在 OSS,不在缓存 - **实时队列状态** — 直接读 Redis List,不做额外缓存 - **DSL 规则执行中间结果** — 一次性的,不需要缓存 --- ## 六、整体基础设施拓扑 ``` ┌──────────────────────────┐ │ PostgreSQL │ │ nas.7bm.co:54302 │ │ leaudit_platform │ │ (17 tables) │ └──────────┬───────────────┘ │ ┌──────────────┐ ┌──────────┴───────────────┐ ┌──────────────┐ │ MinIO OSS │ │ FastAPI App │ │ Redis │ │ │◀───│ (port 8000-8873) │───▶│ │ │ bdocs/ │ │ │ │ Queue │ │ artifacts/ │ │ Controller→Service→Model │ │ Cache │ │ │ │ Bridge→LeAudit Engine │ │ Semaphore │ └──────────────┘ └──────────────────────────┘ └──────────────┘ │ ┌──────────┴───────────────┐ │ Celery Worker(s) │ │ (thread pool) │ │ Pipeline run │ └──────────────────────────┘ ``` --- ## 七、实施优先级 | 优先级 | 任务 | 依赖 | |---|---|---| | **P0** | 配置文件补充:Redis/OSS/LLM/VLM/OCR 真实值 | app.toml | | **P0** | OSS 路径工具类 `oss_path_utils.py` | 上述路径规范 | | **P0** | Redis 统一连接池 `redis_pool.py` | Redis 配置 | | **P1** | 文件上传流程适配新 OSS 路径 | oss_path_utils | | **P1** | Celery 任务路由改造(参数化替代 source_port) | Redis 连接池 | | **P1** | 并发控制器 Lua 脚本实现 | Redis | | **P1** | 权限缓存对接 PermissionService | Redis 缓存层 | | **P2** | API 响应缓存(热点接口) | Redis 缓存层 | | **P2** | 多版本文件管理 | 文件上传流程 | | **P3** | 跨区域访问审计日志 | 权限系统 |