From b45d61fa97b3846060b9d118dce778efa1d38c16 Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Wed, 29 Apr 2026 11:48:50 +0800 Subject: [PATCH] feat: add document versioning and list API --- docs/leaudit/SYSTEM_OVERVIEW.md | 24 +- docs/leaudit/document_schema_design.md | 9 +- docs/接口/README.md | 21 + docs/接口/文档上传与评查接口.md | 692 ++++++++++++++++++ docs/接口/新系统版_documents_list接口.md | 307 ++++++++ .../fastapi_common_storage/oss_path_utils.py | 20 +- .../controllers/documentController.py | 28 +- .../fastapi_leaudit/domian/vo/documentVo.py | 65 +- .../leaudit_bridge/storage_adapter.py | 56 +- .../fastapi_leaudit/models/leauditDocument.py | 37 +- .../services/documentService.py | 18 +- .../services/impl/documentServiceImpl.py | 423 +++++++++-- .../migrate_20260429_document_versioning.sql | 75 ++ scripts/schema_v2_add_evaluation_tables.sql | 10 +- 14 files changed, 1693 insertions(+), 92 deletions(-) create mode 100644 docs/接口/README.md create mode 100644 docs/接口/文档上传与评查接口.md create mode 100644 docs/接口/新系统版_documents_list接口.md create mode 100644 scripts/migrate_20260429_document_versioning.sql diff --git a/docs/leaudit/SYSTEM_OVERVIEW.md b/docs/leaudit/SYSTEM_OVERVIEW.md index 077abb6..0cbc043 100644 --- a/docs/leaudit/SYSTEM_OVERVIEW.md +++ b/docs/leaudit/SYSTEM_OVERVIEW.md @@ -4,6 +4,10 @@ ## 一、目标架构 +补充文档: + +- 并发与重试参数:`docs/leaudit/并发与重试参数说明.md` + ``` ┌─ API ───────────────────────────────────────────────────────────┐ │ AuditController (/audit) RuleController (/rule-sets) │ @@ -166,11 +170,11 @@ finalize_run() ← 唯一写 result_status / finished_at / rescue_ POST /upload (multipart/form-data) file typeId / typeCode - bizDocumentId? region=default fileRole=primary createdBy? autoRun=false + speed=normal|urgent ``` 执行链: @@ -178,7 +182,7 @@ POST /upload (multipart/form-data) ```text Upload -> DocumentServiceImpl.Upload() - -> upsert leaudit_documents + -> create leaudit_documents -> 旧 active 文件失效 -> 上传原始文件到 OSS: bdocs/{region}/{type_code}/{document_id}/v{n}/{file_role}.{ext} @@ -186,6 +190,14 @@ Upload -> autoRun=true 时直接调用 AuditServiceImpl.Run() ``` +说明: + +- `leaudit_documents` 现阶段是平台内部文档主表,不再依赖旧系统 `documents.id` +- 每次前端上传都会新建一条 `leaudit_documents` +- `speed` + - `normal` -> `leaudit.normal` + - `urgent` -> `leaudit.urgent` + ## 四点六、最新补充:结果查询视图 当前 `GetRunStatus()` / `GetResult()` 已不再只返回 run 主表摘要。 @@ -204,6 +216,14 @@ Upload - `leaudit_run_metrics` - `leaudit_artifacts` +建议联调方式: + +- 上传后从 `run.runId` 轮询 `GET /audit/run/{runId}` +- 完成后调用 `GET /audit/result/{runId}` +- worker 日志里会明确打印: + - 已投递到哪个队列 + - worker 实际消费的是 `urgent` 还是 `normal` + ## 四点七、关键关系图:DocumentType -> Binding -> RuleSet -> RuleVersion 这一段是当前规则执行链里最关键的一层路由关系。 diff --git a/docs/leaudit/document_schema_design.md b/docs/leaudit/document_schema_design.md index 521a0ab..e8f1a24 100644 --- a/docs/leaudit/document_schema_design.md +++ b/docs/leaudit/document_schema_design.md @@ -52,7 +52,7 @@ LeAudit 有一套自己的 SQLAlchemy ORM 表(`storage/models/`)。**leaudit | # | 表名 | 用途 | 状态 | |---|------|------|------| -| 1 | `leaudit_documents` | LeAudit 域文档镜像,关联业务文档 | ✅ 已创建 | +| 1 | `leaudit_documents` | LeAudit 平台内部文档主表 | ✅ 已创建 | | 2 | `leaudit_document_files` | 文档文件版本管理 | ✅ 已创建 | | 3 | `leaudit_audit_runs` | 每次处理执行的主索引记录 | ✅ 已创建 | | 4 | `leaudit_artifacts` | OCR/normalize/manifest/markdown/图片等文件产物索引 | ✅ 已创建 | @@ -79,12 +79,12 @@ LeAudit 有一套自己的 SQLAlchemy ORM 表(`storage/models/`)。**leaudit ### 4.1 `leaudit_documents` -LeAudit 域文档镜像表。通过 `biz_document_id` 关联老系统 `documents.id`。 +LeAudit 平台内部文档主表。当前不再依赖旧系统文档表。 | 字段 | 类型 | 说明 | | --- | --- | --- | | `id` | bigint PK | 主键,自增 | -| `biz_document_id` | bigint UNIQUE | 关联老业务 `documents.id` | +| `biz_document_id` | bigint UNIQUE | 内部追踪号,沿用旧字段名以兼容历史库 | | `type_id` | bigint | 文档类型 ID → `leaudit_document_types.id` | | `processing_status` | varchar(64) | waiting / running / completed / failed | | `current_run_id` | bigint | 最新有效 `leaudit_audit_runs.id` | @@ -179,7 +179,7 @@ LeAudit 域文档镜像表。通过 `biz_document_id` 关联老系统 `documents ```text leaudit_entry_modules └── leaudit_document_types - ├── leaudit_documents ── biz_document_id → 老系统 documents + ├── leaudit_documents │ ├── leaudit_document_files │ └── leaudit_audit_runs │ ├── leaudit_artifacts (N) @@ -214,6 +214,7 @@ artifacts/{region}/{run_id}/{artifact_type}/{detail}.{ext} ## 9. 最终结论 - 所有表 `leaudit_*` 前缀,与老系统完全隔离 +- `leaudit_documents` 现阶段是平台内部文档主表,不再要求外部 `documents.id` - `leaudit_audit_runs` 是每次处理的唯一追踪单位 - `leaudit_artifacts` 统一管理所有文件产物,数据库只存索引 - `leaudit_rule_results` 粒度到逐条规则,结构与 LeAudit `RuleResult` 对齐 diff --git a/docs/接口/README.md b/docs/接口/README.md new file mode 100644 index 0000000..3bba197 --- /dev/null +++ b/docs/接口/README.md @@ -0,0 +1,21 @@ +# 接口文档目录 + +这个目录专门放当前 `leaudit-platform` 已落地接口的使用说明,重点记录: + +- 接口用途 +- 参数说明 +- 业务逻辑 +- 请求示例 +- 返回示例 + +当前已整理: + +- `文档上传与评查接口.md` +- `文档列表接口设计分析.md` + +建议阅读顺序: + +1. 先看 `文档上传与评查接口.md` +2. 再看 `文档列表接口设计分析.md` +3. 再结合 `docs/规则编辑/worker并发执行改造方案.md` +4. 如果要理解底层数据结构,再看 `docs/leaudit/document_schema_design.md` diff --git a/docs/接口/文档上传与评查接口.md b/docs/接口/文档上传与评查接口.md new file mode 100644 index 0000000..4034348 --- /dev/null +++ b/docs/接口/文档上传与评查接口.md @@ -0,0 +1,692 @@ +# 文档上传与评查接口 + +这份文档描述当前已经落地的文档上传、文档列表、自动评查、手动评查、状态查询、结果查询接口。 + +当前接口围绕以下业务语义设计: + +- 每次前端上传都会形成一个平台内部文档实例 +- 同名文档会尝试归入同一个版本组 +- 同名且内容相同: + - 不新建版本 + - `duplicateUpload=true` + - 如果 `autoRun=true`,仍然可以重新走一次评查流程 +- 同名但内容变化: + - 新建版本 + - 形成 `v2 / v3 / ...` +- 评查任务走 worker 异步执行 +- 队列只有两档: + - `urgent` + - `normal` + +--- + +## 1. 上传接口 + +### 路径 + +```http +POST /upload +``` + +### Content-Type + +```http +multipart/form-data +``` + +### 用途 + +- 上传文档 +- 创建或命中文档版本 +- 建立 `leaudit_documents / leaudit_document_files` +- 可选自动触发评查 + +### 请求参数 + +| 参数 | 类型 | 必填 | 说明 | +|---|---|---:|---| +| `file` | file | 是 | 上传文件 | +| `typeId` | int | 否 | 文档类型 ID,和 `typeCode` 二选一至少传一个 | +| `typeCode` | string | 否 | 文档类型编码,例如 `contract.sale` | +| `region` | string | 否 | 区域,默认 `default` | +| `fileRole` | string | 否 | 文件角色,默认 `primary` | +| `createdBy` | int | 否 | 上传用户 ID | +| `autoRun` | bool | 否 | 是否上传后自动触发评查,默认 `false` | +| `speed` | string | 否 | 执行速度档位:`normal` / `urgent`,默认 `normal` | + +### 版本匹配逻辑 + +上传时会先做版本候选匹配: + +1. 归一化文件名,得到 `normalized_name` +2. 按以下条件查找最新版本候选: + - `type_id` 相同 + - `region` 相同 + - `normalized_name` 相同 + - `is_latest_version = true` + - 主文件 `file_role = 'primary'` +3. 比较最新版本主文件的 `sha256` + +结果分三种: + +- 找不到候选 + - 新建版本组 + - 当前版本为 `v1` +- 找到候选且 `sha256` 相同 + - 视为重复上传 + - 不新建版本 + - `duplicateUpload=true` +- 找到候选但 `sha256` 不同 + - 新建版本 + - 当前版本为 `v2 / v3 / ...` + - 旧版本 `is_latest_version=false` + - 新版本 `is_latest_version=true` + +### 队列路由逻辑 + +- `speed=urgent` -> 投递 `leaudit.urgent` +- `speed=normal` -> 投递 `leaudit.normal` + +### 请求示例:普通上传,不自动评查 + +```bash +curl -X POST 'http://127.0.0.1:8096/api/upload' \ + -F 'file=@/path/to/合同.docx' \ + -F 'typeCode=contract.sale' \ + -F 'region=default' \ + -F 'fileRole=primary' \ + -F 'autoRun=false' \ + -F 'speed=normal' +``` + +### 返回示例:首次上传,命中 `v1` + +```json +{ + "code": 200, + "message": "ok", + "data": { + "documentId": 11, + "internalDocumentNo": 1777426812904262854, + "versionGroupKey": "4e02e455aa504cb9b75a254727f1bb4c", + "versionNo": 1, + "previousVersionId": null, + "rootVersionId": 11, + "duplicateUpload": false, + "fileId": 12, + "typeId": 9, + "typeCode": "contract.sale", + "region": "default", + "fileName": "版本归档验证合同.docx", + "ossUrl": "bdocs/default/contract.sale/2026/04/11/v1/primary__版本归档验证合同.docx", + "speed": "normal", + "processingStatus": "waiting", + "autoRunTriggered": false, + "run": null + } +} +``` + +### 返回示例:重复上传,不升版 + +```json +{ + "code": 200, + "message": "ok", + "data": { + "documentId": 11, + "internalDocumentNo": 1777426812904262854, + "versionGroupKey": "4e02e455aa504cb9b75a254727f1bb4c", + "versionNo": 1, + "previousVersionId": null, + "rootVersionId": 11, + "duplicateUpload": true, + "fileId": 12, + "typeId": 9, + "typeCode": "contract.sale", + "region": "default", + "fileName": "版本归档验证合同.docx", + "ossUrl": "bdocs/default/contract.sale/2026/04/11/v1/primary__版本归档验证合同.docx", + "speed": "normal", + "processingStatus": "waiting", + "autoRunTriggered": false, + "run": null + } +} +``` + +### 返回示例:同名但内容变化,自动形成 `v2` + +```json +{ + "code": 200, + "message": "ok", + "data": { + "documentId": 12, + "internalDocumentNo": 1777426813574315361, + "versionGroupKey": "4e02e455aa504cb9b75a254727f1bb4c", + "versionNo": 2, + "previousVersionId": 11, + "rootVersionId": 11, + "duplicateUpload": false, + "fileId": 13, + "typeId": 9, + "typeCode": "contract.sale", + "region": "default", + "fileName": "版本归档验证合同.docx", + "ossUrl": "bdocs/default/contract.sale/2026/04/12/v2/primary__版本归档验证合同.docx", + "speed": "normal", + "processingStatus": "waiting", + "autoRunTriggered": false, + "run": null + } +} +``` + +### 返回示例:重复上传但自动重新评查 + +```json +{ + "code": 200, + "message": "ok", + "data": { + "documentId": 13, + "internalDocumentNo": 1777427235286905027, + "versionGroupKey": "4e02e455aa504cb9b75a254727f1bb4c", + "versionNo": 3, + "previousVersionId": 12, + "rootVersionId": 11, + "duplicateUpload": true, + "fileId": 14, + "typeId": 9, + "typeCode": "contract.sale", + "region": "default", + "fileName": "版本归档验证合同.docx", + "ossUrl": "bdocs/default/contract.sale/2026/04/13/v3/primary__版本归档验证合同.docx", + "speed": "normal", + "processingStatus": "queued", + "autoRunTriggered": true, + "run": { + "runId": 13, + "documentId": 13, + "runNo": 2, + "documentFileId": 14, + "status": "queued", + "phase": "dispatch", + "resultStatus": null, + "ruleSetId": 29, + "ruleVersionId": 9, + "ruleTypeId": "contract.sale", + "rescueApplied": false, + "totalScore": null, + "passedCount": null, + "failedCount": null, + "skippedCount": null, + "startedAt": null, + "finishedAt": null + } + } +} +``` + +--- + +## 2. 文档列表接口 + +### 路径 + +```http +GET /documents/list +``` + +### 用途 + +- 返回文档主列表 +- 只返回每个版本组的最新版本 +- 每条记录附带历史版本摘要,前端可以直接做“展开历史版本” + +### 查询参数 + +| 参数 | 类型 | 必填 | 说明 | +|---|---|---:|---| +| `page` | int | 否 | 页码,从 `1` 开始,默认 `1` | +| `pageSize` | int | 否 | 每页数量,默认 `20`,最大 `100` | +| `keyword` | string | 否 | 文件名 / 归一化名称模糊搜索 | +| `typeCode` | string | 否 | 文档类型编码,例如 `contract.sale` | +| `region` | string | 否 | 区域过滤 | +| `processingStatus` | string | 否 | 文档处理状态过滤 | +| `resultStatus` | string | 否 | 最新 run 的结果状态过滤 | + +### 查询逻辑 + +- 主查询只看 `leaudit_documents.is_latest_version = true` +- 只关联主文件: + - `leaudit_document_files.is_active = true` + - `leaudit_document_files.file_role = 'primary'` +- 当前评查状态来自 `leaudit_audit_runs` +- 历史版本按 `version_group_key` 再查一次并挂到 `historyVersions` + +### 请求示例 + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=5' +``` + +### 带筛选请求示例 + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=2&keyword=版本归档&typeCode=contract.sale®ion=default' +``` + +### 返回示例 + +```json +{ + "code": 200, + "message": "ok", + "data": { + "total": 1, + "page": 1, + "pageSize": 2, + "totalPages": 1, + "documents": [ + { + "documentId": 13, + "internalDocumentNo": 1777427235286905027, + "versionGroupKey": "4e02e455aa504cb9b75a254727f1bb4c", + "versionNo": 3, + "rootVersionId": 11, + "previousVersionId": 12, + "typeId": 9, + "typeCode": "contract.sale", + "region": "default", + "normalizedName": "版本归档验证合同", + "fileId": 14, + "fileName": "版本归档验证合同.docx", + "fileExt": "docx", + "mimeType": "application/octet-stream", + "fileSize": 587279, + "ossUrl": "bdocs/default/contract.sale/2026/04/13/v3/primary__版本归档验证合同.docx", + "processingStatus": "completed", + "currentRunId": 13, + "runStatus": "completed", + "resultStatus": "review", + "totalScore": 92.0, + "passedCount": 25, + "failedCount": 3, + "skippedCount": 0, + "updatedAt": "2026-04-29T01:50:05.241397+00:00", + "hasHistory": true, + "totalVersions": 3, + "historyVersions": [ + { + "documentId": 12, + "fileId": 13, + "versionNo": 2, + "fileName": "版本归档验证合同.docx", + "fileExt": "docx", + "processingStatus": "waiting", + "runStatus": null, + "resultStatus": null, + "updatedAt": "2026-04-29T01:47:15.250697+00:00" + }, + { + "documentId": 11, + "fileId": 12, + "versionNo": 1, + "fileName": "版本归档验证合同.docx", + "fileExt": "docx", + "processingStatus": "waiting", + "runStatus": null, + "resultStatus": null, + "updatedAt": "2026-04-29T01:40:13.538839+00:00" + } + ] + } + ] + } +} +``` + +### 返回字段说明 + +| 字段 | 说明 | +|---|---| +| `documents[]` | 主列表,仅最新版本 | +| `versionGroupKey` | 同一版本链的归档组键 | +| `versionNo` | 当前版本号 | +| `rootVersionId` | 版本链根文档 ID | +| `previousVersionId` | 上一版本文档 ID | +| `hasHistory` | 是否存在历史版本 | +| `totalVersions` | 该版本组的总版本数 | +| `historyVersions[]` | 历史版本摘要,按 `versionNo DESC` 排序 | + +--- + +## 3. 手动触发评查 + +### 路径 + +```http +POST /audit/run +``` + +### 用途 + +- 对指定 `documentId` 手动触发一次新的评查 run +- 不改变文档版本 +- 只新增 `leaudit_audit_runs` + +### 请求体 + +```json +{ + "documentId": 13, + "ruleType": null, + "force": false, + "speed": "normal" +} +``` + +### 参数说明 + +| 字段 | 类型 | 必填 | 说明 | +|---|---|---:|---| +| `documentId` | int | 是 | 文档 ID | +| `ruleType` | string/null | 否 | 指定规则类型编码 | +| `force` | bool | 否 | 是否强制重跑 | +| `speed` | string | 否 | `normal` / `urgent` | + +### 请求示例 + +```bash +curl -X POST 'http://127.0.0.1:8096/api/audit/run' \ + -H 'Content-Type: application/json' \ + -d '{ + "documentId": 13, + "force": false, + "speed": "urgent" + }' +``` + +### 返回示例 + +```json +{ + "code": 200, + "message": "ok", + "data": { + "runId": 15, + "documentId": 13, + "runNo": 3, + "documentFileId": 14, + "status": "queued", + "phase": "dispatch", + "resultStatus": null, + "ruleSetId": 29, + "ruleVersionId": 9, + "ruleTypeId": "contract.sale", + "rescueApplied": false, + "totalScore": null, + "passedCount": null, + "failedCount": null, + "skippedCount": null, + "startedAt": null, + "finishedAt": null + } +} +``` + +--- + +## 4. 查询运行状态 + +### 路径 + +```http +GET /audit/run/{runId} +``` + +### 用途 + +- 查询 run 当前状态 +- 适合前端轮询 + +### 状态说明 + +常见状态: + +- `queued` +- `running` +- `completed` +- `failed` + +常见阶段: + +- `dispatch` +- `prepare` +- `ocr` +- `extract` +- `evaluate` +- `rescue` +- `persist` +- `executed` + +### 请求示例 + +```bash +curl 'http://127.0.0.1:8096/api/audit/run/11' +``` + +### 返回示例 + +```json +{ + "code": 200, + "message": "ok", + "data": { + "runId": 11, + "documentId": 10, + "runNo": 1, + "documentFileId": 11, + "status": "completed", + "phase": "executed", + "resultStatus": "review", + "ruleSetId": 29, + "ruleVersionId": 9, + "ruleTypeId": "contract.sale", + "rescueApplied": true, + "totalScore": 89.0, + "passedCount": 24, + "failedCount": 4, + "skippedCount": 0, + "startedAt": "2026-04-28T19:01:01.766352+08:00", + "finishedAt": "2026-04-28T19:03:11.044894+08:00" + } +} +``` + +--- + +## 5. 查询评查结果 + +### 路径 + +```http +GET /audit/result/{runId} +``` + +### 用途 + +- 查询本次 run 的完整结果 +- 包括: + - 规则结果 + - 抽取字段 + - 运行错误 + - rescue 结果 + - metrics + - artifacts + +### 请求示例 + +```bash +curl 'http://127.0.0.1:8096/api/audit/result/11' +``` + +### 返回结构说明 + +顶层字段: + +| 字段 | 说明 | +|---|---| +| `runId` | 运行 ID | +| `documentId` | 文档 ID | +| `documentFileId` | 本次锁定的文件 ID | +| `status` | 运行状态 | +| `totalScore` | 总分 | +| `passedCount` | 通过数 | +| `failedCount` | 失败数 | +| `skippedCount` | 跳过数 | +| `phase` | 当前阶段 | +| `resultStatus` | 总体结果 | +| `rescueApplied` | 是否执行 rescue | +| `ruleSetId` | 规则集 ID | +| `ruleVersionId` | 规则版本 ID | +| `startedAt` / `finishedAt` | 起止时间 | +| `rules` | 规则结果列表 | +| `fields` | 抽取字段列表 | +| `errors` | 错误列表 | +| `rescueOutcomes` | 补救结果列表 | +| `metrics` | 阶段指标 | +| `artifacts` | 产物列表 | + +### 返回示例(节选) + +```json +{ + "code": 200, + "message": "ok", + "data": { + "runId": 11, + "documentId": 10, + "documentFileId": 11, + "status": "completed", + "totalScore": 89.0, + "passedCount": 24, + "failedCount": 4, + "skippedCount": 0, + "phase": "executed", + "resultStatus": "review", + "rescueApplied": true, + "ruleSetId": 29, + "ruleVersionId": 9, + "startedAt": "2026-04-28T19:01:01.766352+08:00", + "finishedAt": "2026-04-28T19:03:11.044894+08:00", + "rules": [ + { + "ruleId": "MM-SALE-012", + "ruleName": "甲方信用代码校验", + "passed": false, + "status": "executed", + "risk": "medium", + "score": 3.0, + "failMessage": "甲方统一社会信用代码校验位错误" + } + ], + "fields": [ + { + "fieldName": "合同名称", + "valueText": "智慧法务平台建设采购项目合同", + "confidence": 0.9991 + } + ], + "rescueOutcomes": [ + { + "ruleId": "MM-SALE-012", + "status": "final_fail", + "finalStatus": "review", + "requiresHumanReview": true, + "failureReason": "Agent (4 iter, requires_human): token_budget_exhausted" + } + ], + "metrics": { + "ocrSeconds": 79.06, + "extractSeconds": 11.87, + "evaluateSeconds": 9.9, + "totalSeconds": 100.83, + "pageCount": 2, + "fieldCount": 35, + "ruleCount": 28, + "rescueRuleCount": 5, + "artifactCount": 8 + }, + "artifacts": [ + { + "artifactType": "ocr_json", + "fileName": "ocr_result.json", + "fileExt": "json", + "mimeType": "application/json" + } + ] + } +} +``` + +--- + +## 6. worker 日志怎么看 + +worker 关键日志已经做了可读化,重点看这两类: + +### 投递日志 + +```text +run_id=13 已投递到 worker 队列: queue=leaudit.normal, speed=normal, task_id=... +``` + +### 执行日志 + +```text +run_id=13 worker开始执行: queue=leaudit.normal, speed=normal, filename=版本归档验证合同.docx +``` + +结合状态查询接口可以快速判断: + +- 是否已经成功投递 +- 是否已被 worker 消费 +- 跑的是 `urgent` 还是 `normal` + +--- + +## 7. 前端建议接法 + +### 文档上传页 + +1. 调 `POST /upload` +2. 读取返回: + - `documentId` + - `versionGroupKey` + - `versionNo` + - `duplicateUpload` + - `run` + +### 自动评查场景 + +如果 `autoRun=true` 且返回里 `run != null`: + +1. 取 `run.runId` +2. 轮询 `GET /audit/run/{runId}` +3. `status=completed/failed` 后停止轮询 +4. 再调 `GET /audit/result/{runId}` + +### 列表页 + +列表页建议默认只展示: + +- `is_latest_version = true` 的 document + +点击某条后,再按: + +- `versionGroupKey` + +展开其历史版本。 diff --git a/docs/接口/新系统版_documents_list接口.md b/docs/接口/新系统版_documents_list接口.md new file mode 100644 index 0000000..a24c1d0 --- /dev/null +++ b/docs/接口/新系统版_documents_list接口.md @@ -0,0 +1,307 @@ +# 新系统版 `documents/list` 接口 + +这份文档专门说明当前 `leaudit-platform` 里已经落地的“新系统版文档列表接口”。 + +目标很明确: + +- 前端文档列表只看“最新版本” +- 同名文档的历史版本直接归到同一个版本链 +- 列表接口直接返回历史版本摘要,前端不用自己再拼版本关系 + +--- + +## 1. 接口路径 + +```http +GET /api/documents/list +``` + +--- + +## 2. 当前接口语义 + +这个接口不是“把所有上传记录平铺出来”。 + +它的语义是: + +- 每个 `version_group_key` 只返回一条“最新版本文档” +- 这条最新版本文档下面附带 `historyVersions` +- `historyVersions` 里放的是同组下更老的版本摘要 + +也就是说,前端主列表看到的是: + +- 当前版本 +- 是否有历史版本 +- 一共有多少版本 +- 历史版本有哪些 + +--- + +## 3. 请求参数 + +| 参数 | 类型 | 必填 | 说明 | +|---|---|---:|---| +| `page` | int | 否 | 页码,从 `1` 开始,默认 `1` | +| `pageSize` | int | 否 | 每页数量,默认 `20`,最大 `100` | +| `keyword` | string | 否 | 按文件名或归一化名称模糊搜索 | +| `typeCode` | string | 否 | 文档类型编码,例如 `contract.sale` | +| `region` | string | 否 | 区域 | +| `processingStatus` | string | 否 | 文档处理状态 | +| `resultStatus` | string | 否 | 最新 run 的结果状态 | + +请求示例: + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=5' +``` + +带筛选示例: + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=2&keyword=版本归档&typeCode=contract.sale®ion=default' +``` + +--- + +## 4. 返回结构 + +返回模型是分页结构: + +```json +{ + "code": 200, + "message": "ok", + "data": { + "total": 1, + "page": 1, + "pageSize": 20, + "totalPages": 1, + "documents": [] + } +} +``` + +其中 `documents[]` 的单条结构核心字段如下: + +| 字段 | 说明 | +|---|---| +| `documentId` | 当前最新版本文档 ID | +| `internalDocumentNo` | 平台内部追踪号 | +| `versionGroupKey` | 版本归档组键 | +| `versionNo` | 当前版本号 | +| `rootVersionId` | 版本链根文档 ID | +| `previousVersionId` | 上一版本文档 ID | +| `typeId` | 文档类型 ID | +| `typeCode` | 文档类型编码 | +| `region` | 区域 | +| `normalizedName` | 归一化后的名称 | +| `fileId` | 当前主文件 ID | +| `fileName` | 文件名 | +| `fileExt` | 文件扩展名 | +| `mimeType` | MIME 类型 | +| `fileSize` | 文件大小 | +| `ossUrl` | 对象存储路径 | +| `processingStatus` | 文档处理状态 | +| `currentRunId` | 当前 run ID | +| `runStatus` | 当前 run 状态 | +| `resultStatus` | 当前 run 结果状态 | +| `totalScore` | 总分 | +| `passedCount` | 通过数 | +| `failedCount` | 失败数 | +| `skippedCount` | 跳过数 | +| `updatedAt` | 更新时间 | +| `hasHistory` | 是否有历史版本 | +| `totalVersions` | 总版本数 | +| `historyVersions` | 历史版本摘要列表 | + +`historyVersions[]` 结构: + +| 字段 | 说明 | +|---|---| +| `documentId` | 历史版本文档 ID | +| `fileId` | 历史版本文件 ID | +| `versionNo` | 历史版本号 | +| `fileName` | 文件名 | +| `fileExt` | 文件扩展名 | +| `processingStatus` | 处理状态 | +| `runStatus` | 运行状态 | +| `resultStatus` | 结果状态 | +| `updatedAt` | 更新时间 | + +--- + +## 5. SQL 逻辑 + +### 5.1 主列表计数 SQL + +```sql +SELECT COUNT(*) +FROM leaudit_documents d +JOIN leaudit_document_files f + ON f.document_id = d.id +LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id +LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id +WHERE d.is_latest_version = true + AND d.deleted_at IS NULL + AND f.is_active = true + AND f.file_role = 'primary' + -- 可选过滤: + -- AND (f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword) + -- AND dt.code = :type_code + -- AND d.region = :region + -- AND d.processing_status = :processing_status + -- AND ar.result_status = :result_status +``` + +### 5.2 主列表分页 SQL + +```sql +SELECT + d.id AS document_id, + d.biz_document_id AS internal_document_no, + d.version_group_key, + d.version_no, + d.root_version_id, + d.previous_version_id, + d.type_id, + dt.code AS type_code, + d.region, + d.normalized_name, + d.processing_status, + d.current_run_id, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + f.mime_type, + f.file_size, + f.oss_url, + ar.status AS run_status, + ar.result_status, + ar.total_score, + ar.passed_count, + ar.failed_count, + ar.skipped_count, + vc.total_versions, + COALESCE(vc.total_versions, 1) > 1 AS has_history +FROM leaudit_documents d +JOIN leaudit_document_files f + ON f.document_id = d.id +LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id +LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id +LEFT JOIN ( + SELECT version_group_key, COUNT(*) AS total_versions + FROM leaudit_documents + WHERE deleted_at IS NULL + GROUP BY version_group_key +) vc + ON vc.version_group_key = d.version_group_key +WHERE d.is_latest_version = true + AND d.deleted_at IS NULL + AND f.is_active = true + AND f.file_role = 'primary' +ORDER BY d.updated_at DESC, d.id DESC +LIMIT :limit OFFSET :offset +``` + +### 5.3 历史版本摘要 SQL + +```sql +SELECT + d.version_group_key, + d.id AS document_id, + d.version_no, + d.processing_status, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + ar.status AS run_status, + ar.result_status +FROM leaudit_documents d +JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' +LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id +WHERE d.version_group_key = ANY(:group_keys) + AND d.is_latest_version = false + AND d.deleted_at IS NULL +ORDER BY d.version_group_key, d.version_no DESC, d.id DESC +``` + +--- + +## 6. 为什么新系统要这么做 + +因为现在我们已经不是老系统那种“文档记录 + 旁路版本信息”的模式了。 + +当前新系统已经有真正的版本链字段: + +- `version_group_key` +- `version_no` +- `previous_version_id` +- `root_version_id` +- `is_latest_version` +- `normalized_name` + +所以列表天然应该是: + +- 主列表 = 最新版本 +- 展开项 = 历史版本 + +而不是把所有版本平铺在一个列表里。 + +--- + +## 7. 当前代码落点 + +实现代码在这些文件: + +- 路由:`fastapi_modules/fastapi_leaudit/controllers/documentController.py` +- 服务接口:`fastapi_modules/fastapi_leaudit/services/documentService.py` +- VO:`fastapi_modules/fastapi_leaudit/domian/vo/documentVo.py` +- 具体实现:`fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py` + +--- + +## 8. 当前验证结果 + +已经实际验证通过: + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=5' +``` + +以及: + +```bash +curl 'http://127.0.0.1:8096/api/documents/list?page=1&pageSize=2&keyword=版本归档&typeCode=contract.sale®ion=default' +``` + +验证结论: + +- 接口正常返回 +- 主列表只返回最新版本 +- `historyVersions` 能正确带出历史版本摘要 +- 已验证版本组 `4e02e455aa504cb9b75a254727f1bb4c` + - `documentId=13` 是当前最新 `v3` + - `documentId=12` 是 `v2` + - `documentId=11` 是 `v1` + +--- + +## 9. 下一步建议 + +如果后面前端需要更完整的“版本展开页”,再补一个: + +```http +GET /api/documents/{documentId}/versions +``` + +但当前列表页场景下,`/documents/list` 已经够用了。 diff --git a/fastapi_common/fastapi_common_storage/oss_path_utils.py b/fastapi_common/fastapi_common_storage/oss_path_utils.py index 86234ed..c542198 100644 --- a/fastapi_common/fastapi_common_storage/oss_path_utils.py +++ b/fastapi_common/fastapi_common_storage/oss_path_utils.py @@ -2,6 +2,7 @@ from __future__ import annotations +import re from pathlib import Path @@ -16,10 +17,18 @@ class OssPathUtils: Version: str, FileRole: str, FileName: str, + Year: int | None = None, + Month: int | None = None, ) -> str: """生成业务文档 object key。""" Ext = Path(FileName).suffix or "" - return f"bdocs/{Region}/{TypeCode}/{DocumentId}/{Version}/{FileRole}{Ext}" + safe_stem = OssPathUtils.BuildSafeFileStem(FileName) + year_prefix = f"{Year:04d}" if Year is not None else "unknown-year" + month_prefix = f"{Month:02d}" if Month is not None else "unknown-month" + return ( + f"bdocs/{Region}/{TypeCode}/{year_prefix}/{month_prefix}/" + f"{DocumentId}/{Version}/{FileRole}__{safe_stem}{Ext}" + ) @staticmethod def BuildArtifactKey( @@ -42,3 +51,12 @@ class OssPathUtils: """生成规则校验报告 object key。""" prefix = f"{Region}/" if Region else "" return f"{prefix}rules/{RuleType}/{VersionNo}/validation_report.json" + + @staticmethod + def BuildSafeFileStem(FileName: str) -> str: + """生成适合放进 object key 的可读文件名主体。""" + stem = Path(FileName).stem.strip() or "upload" + stem = re.sub(r"[\\/:*?\"<>|]+", "_", stem) + stem = re.sub(r"\s+", "_", stem) + stem = re.sub(r"_+", "_", stem).strip("_") + return (stem or "upload")[:96] diff --git a/fastapi_modules/fastapi_leaudit/controllers/documentController.py b/fastapi_modules/fastapi_leaudit/controllers/documentController.py index 960d9e1..8539c4f 100644 --- a/fastapi_modules/fastapi_leaudit/controllers/documentController.py +++ b/fastapi_modules/fastapi_leaudit/controllers/documentController.py @@ -5,7 +5,7 @@ from fastapi import File, Form, UploadFile from fastapi_common.fastapi_common_web.controller import BaseController from fastapi_common.fastapi_common_web.domain.responses import Result -from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import DocumentUploadVO +from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import DocumentListPageVO, DocumentUploadVO from fastapi_modules.fastapi_leaudit.services import IDocumentService from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl @@ -22,11 +22,11 @@ class DocumentController(BaseController): file: UploadFile = File(..., description="上传文档"), typeId: int | None = Form(None, description="文档类型ID"), typeCode: str | None = Form(None, description="文档类型编码"), - bizDocumentId: int | None = Form(None, description="业务文档ID"), region: str = Form("default", description="所属地区"), fileRole: str = Form("primary", description="文件角色"), createdBy: int | None = Form(None, description="上传用户ID"), autoRun: bool = Form(False, description="是否上传后自动触发评查"), + speed: str = Form("normal", description="执行速度档位:urgent/normal"), ): """上传文档并建立评查输入。""" Content = await file.read() @@ -36,10 +36,32 @@ class DocumentController(BaseController): ContentType=file.content_type, TypeId=typeId, TypeCode=typeCode, - BizDocumentId=bizDocumentId, Region=region, FileRole=fileRole, CreatedBy=createdBy, AutoRun=autoRun, + Speed=speed, + ) + return Result.success(data=Data) + + @self.router.get("/documents/list", response_model=Result[DocumentListPageVO]) + async def ListDocuments( + page: int = 1, + pageSize: int = 20, + keyword: str | None = None, + typeCode: str | None = None, + region: str | None = None, + processingStatus: str | None = None, + resultStatus: str | None = None, + ): + """获取文档列表(仅返回最新版本,附历史版本摘要)。""" + Data = await self.DocumentService.ListDocuments( + Page=page, + PageSize=pageSize, + Keyword=keyword, + TypeCode=typeCode, + Region=region, + ProcessingStatus=processingStatus, + ResultStatus=resultStatus, ) return Result.success(data=Data) diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/documentVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/documentVo.py index ad6fec3..786080c 100644 --- a/fastapi_modules/fastapi_leaudit/domian/vo/documentVo.py +++ b/fastapi_modules/fastapi_leaudit/domian/vo/documentVo.py @@ -9,13 +9,76 @@ class DocumentUploadVO(BaseModel): """文档上传响应。""" documentId: int = Field(..., description="LeAudit 文档ID") - bizDocumentId: int = Field(..., description="业务文档ID") + internalDocumentNo: int = Field(..., description="平台内部追踪号(兼容旧字段)") + versionGroupKey: str = Field(..., description="文档版本归档组键") + versionNo: int = Field(..., description="当前文档版本号") + previousVersionId: int | None = Field(None, description="上一版本文档ID") + rootVersionId: int = Field(..., description="文档版本链根文档ID") + duplicateUpload: bool = Field(..., description="是否命中同名同内容的重复上传") fileId: int = Field(..., description="文档文件ID") typeId: int = Field(..., description="文档类型ID") typeCode: str = Field(..., description="文档类型编码") region: str = Field(..., description="所属地区") fileName: str = Field(..., description="文件名") ossUrl: str = Field(..., description="OSS 对象路径") + speed: str = Field(..., description="执行速度档位:urgent/normal") processingStatus: str = Field(..., description="文档处理状态") autoRunTriggered: bool = Field(..., description="是否已自动触发评查") run: AuditRunVO | None = Field(None, description="自动触发后的运行信息") + + +class DocumentHistoryVersionVO(BaseModel): + """历史版本摘要。""" + + documentId: int = Field(..., description="文档ID") + fileId: int | None = Field(None, description="文件ID") + versionNo: int = Field(..., description="版本号") + fileName: str | None = Field(None, description="文件名") + fileExt: str | None = Field(None, description="文件扩展名") + processingStatus: str | None = Field(None, description="处理状态") + runStatus: str | None = Field(None, description="最新运行状态") + resultStatus: str | None = Field(None, description="最新结果状态") + updatedAt: str | None = Field(None, description="更新时间") + + +class DocumentListItemVO(BaseModel): + """文档列表项。""" + + documentId: int = Field(..., description="文档ID") + internalDocumentNo: int = Field(..., description="平台内部追踪号") + versionGroupKey: str = Field(..., description="版本归档组键") + versionNo: int = Field(..., description="当前版本号") + rootVersionId: int = Field(..., description="根版本文档ID") + previousVersionId: int | None = Field(None, description="上一版本文档ID") + typeId: int | None = Field(None, description="文档类型ID") + typeCode: str | None = Field(None, description="文档类型编码") + region: str = Field(..., description="区域") + normalizedName: str | None = Field(None, description="归一化名称") + fileId: int | None = Field(None, description="文件ID") + fileName: str | None = Field(None, description="文件名") + fileExt: str | None = Field(None, description="文件扩展名") + mimeType: str | None = Field(None, description="MIME类型") + fileSize: int | None = Field(None, description="文件大小") + ossUrl: str | None = Field(None, description="OSS路径") + processingStatus: str | None = Field(None, description="处理状态") + currentRunId: int | None = Field(None, description="当前运行ID") + runStatus: str | None = Field(None, description="当前运行状态") + resultStatus: str | None = Field(None, description="当前结果状态") + totalScore: float | None = Field(None, description="总分") + passedCount: int | None = Field(None, description="通过数") + failedCount: int | None = Field(None, description="失败数") + skippedCount: int | None = Field(None, description="跳过数") + updatedAt: str | None = Field(None, description="更新时间") + hasHistory: bool = Field(False, description="是否存在历史版本") + totalVersions: int = Field(1, description="总版本数") + historyVersions: list[DocumentHistoryVersionVO] = Field(default_factory=list, description="历史版本摘要") + + +class DocumentListPageVO(BaseModel): + """文档列表分页结果。""" + + total: int = Field(..., description="总数") + page: int = Field(..., description="当前页") + pageSize: int = Field(..., description="每页数量") + totalPages: int = Field(..., description="总页数") + documents: list[DocumentListItemVO] = Field(default_factory=list, description="文档列表") diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py index 88f583f..85aa061 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py @@ -6,6 +6,7 @@ into leaudit_* table format and writes via SQLAlchemy async session. from __future__ import annotations +import json import logging import re from typing import Any @@ -157,11 +158,19 @@ class StorageAdapter: row = _rule_result_to_row(document_id, resolved_run_id, rule_result, rule, bundle) if rule_version_id is not None: row["rule_version_id"] = rule_version_id - columns = ", ".join(row.keys()) - placeholders = ", ".join(f":{k}" for k in row) + json_columns = {"stages", "extracted_fields", "field_positions", "remediation", "rule_meta"} + serialized_row = { + key: (json.dumps(value, ensure_ascii=False) if key in json_columns and value is not None else value) + for key, value in row.items() + } + columns = ", ".join(serialized_row.keys()) + placeholders = ", ".join( + f"CAST(:{k} AS JSONB)" if k in json_columns else f":{k}" + for k in serialized_row + ) await session.execute( text(f"INSERT INTO leaudit_rule_results ({columns}) VALUES ({placeholders})"), - row, + serialized_row, ) # Update audit_runs summary (scores only — terminal state set by finalize_run) @@ -371,7 +380,7 @@ class StorageAdapter: :vlm_calls, :duration_ms, :requires_human_review, - :payload, + CAST(:payload AS JSONB), :created_at, :updated_at ) @@ -390,7 +399,7 @@ class StorageAdapter: "vlm_calls": task.vlm_calls, "duration_ms": task.duration_ms, "requires_human_review": task.requires_human_review, - "payload": task.model_dump(mode="json"), + "payload": json.dumps(task.model_dump(mode="json"), ensure_ascii=False), "created_at": task.created_at, "updated_at": task.updated_at, }, @@ -527,8 +536,9 @@ def _bundle_to_extracted(bundle: ExtractionBundle) -> dict[str, Any]: "value": fv.value, "confidence": float(fv.confidence) if fv.confidence else 0.0, } - if fv.position is not None: - field_data["position"] = fv.position.model_dump(mode="json") + position_payload = _field_value_position_payload(fv) + if position_payload is not None: + field_data["position"] = position_payload fields[name] = field_data else: fields[name] = {"value": fv} @@ -637,11 +647,39 @@ def _extract_relevant_field_positions( if f in positions: continue fv = bundle.fields.get(f) - if fv is not None and isinstance(fv, FieldValue) and fv.position is not None: - positions[f] = fv.position.model_dump(mode="json") + if fv is not None and isinstance(fv, FieldValue): + position_payload = _field_value_position_payload(fv) + if position_payload is not None: + positions[f] = position_payload return positions +def _field_value_position_payload(fv: FieldValue) -> dict[str, Any] | None: + """兼容原生 leaudit 新旧 FieldValue 结构,提取可落库的位置线索。""" + position = getattr(fv, "position", None) + if position is not None: + if hasattr(position, "model_dump"): + return position.model_dump(mode="json") + if isinstance(position, dict): + return position + + metadata = fv.metadata if isinstance(fv.metadata, dict) else {} + payload: dict[str, Any] = {} + + if "match_position" in metadata: + payload["matchPosition"] = metadata.get("match_position") + if "matched_text" in metadata: + payload["matchedText"] = metadata.get("matched_text") + if "page_num" in metadata: + payload["pageNum"] = metadata.get("page_num") + if "page_nums" in metadata: + payload["pageNums"] = metadata.get("page_nums") + if "bbox" in metadata: + payload["bbox"] = metadata.get("bbox") + + return payload or None + + def _rule_result_to_row( document_id: int, run_id: int | None, diff --git a/fastapi_modules/fastapi_leaudit/models/leauditDocument.py b/fastapi_modules/fastapi_leaudit/models/leauditDocument.py index 540ad4a..50323b6 100644 --- a/fastapi_modules/fastapi_leaudit/models/leauditDocument.py +++ b/fastapi_modules/fastapi_leaudit/models/leauditDocument.py @@ -1,11 +1,12 @@ -"""LeAudit 域文档镜像模型 —— leaudit_documents 表。 +"""LeAudit 文档主模型 —— leaudit_documents 表。 -通过 biz_document_id 关联业务 documents 表,不复制业务字段。 +当前平台把它作为 LeAudit 自己的文档主表使用,不再依赖旧系统文档表。 +``biz_document_id`` 字段仅保留为内部追踪号,避免直接改库。 """ from __future__ import annotations -from sqlalchemy import BigInteger, String, ForeignKey +from sqlalchemy import BigInteger, Boolean, Integer, String from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import Mapped, mapped_column @@ -13,33 +14,27 @@ from fastapi_common.fastapi_common_web.models import BaseModel class LeauditDocument(BaseModel): - """LeAudit 文档镜像表。""" + """LeAudit 平台内部文档主表。""" __tablename__ = "leaudit_documents" Id: Mapped[int] = mapped_column("id", BigInteger, primary_key=True, autoincrement=True) - bizDocumentId: Mapped[int] = mapped_column("biz_document_id", BigInteger, unique=True, comment="关联业务 documents.id") + bizDocumentId: Mapped[int] = mapped_column("biz_document_id", BigInteger, unique=True, comment="内部追踪号(兼容旧字段名)") typeId: Mapped[int | None] = mapped_column("type_id", BigInteger, comment="文档类型ID") processingStatus: Mapped[str | None] = mapped_column("processing_status", String(64), default="waiting", comment="waiting/processing/completed/failed") currentRunId: Mapped[int | None] = mapped_column("current_run_id", BigInteger, comment="最新有效 run id") region: Mapped[str] = mapped_column(String(32), default="default", comment="所属地区: mz/yf/jy/cz/default") + versionGroupKey: Mapped[str | None] = mapped_column("version_group_key", String(64), comment="文档版本归档组键") + versionNo: Mapped[int] = mapped_column("version_no", Integer, default=1, comment="同一文档系列中的版本号") + previousVersionId: Mapped[int | None] = mapped_column("previous_version_id", BigInteger, comment="上一版本文档ID") + rootVersionId: Mapped[int | None] = mapped_column("root_version_id", BigInteger, comment="首版本文档ID") + isLatestVersion: Mapped[bool] = mapped_column("is_latest_version", Boolean, default=True, comment="是否当前最新版本") + normalizedName: Mapped[str | None] = mapped_column("normalized_name", String(512), comment="归一化文件名(不含扩展名)") @classmethod - async def get_by_biz_id(cls, session: AsyncSession, bizDocumentId: int) -> "LeauditDocument | None": - """按业务文档 ID 查询。""" - from sqlalchemy import select - return await session.scalar(select(cls).where(cls.bizDocumentId == bizDocumentId)) - - @classmethod - async def upsert_by_biz_id(cls, session: AsyncSession, bizDocumentId: int, **fields) -> "LeauditDocument": - """按业务文档 ID 创建或更新。""" - from sqlalchemy import select - doc = await session.scalar(select(cls).where(cls.bizDocumentId == bizDocumentId)) - if doc is None: - doc = cls(bizDocumentId=bizDocumentId, **fields) - session.add(doc) - else: - for k, v in fields.items(): - setattr(doc, k, v) + async def create_new(cls, session: AsyncSession, **fields) -> "LeauditDocument": + """Create a new platform-side document row for every upload.""" + doc = cls(**fields) + session.add(doc) await session.flush() return doc diff --git a/fastapi_modules/fastapi_leaudit/services/documentService.py b/fastapi_modules/fastapi_leaudit/services/documentService.py index 40b1ff9..c4e0ce3 100644 --- a/fastapi_modules/fastapi_leaudit/services/documentService.py +++ b/fastapi_modules/fastapi_leaudit/services/documentService.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod -from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import DocumentUploadVO +from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import DocumentListPageVO, DocumentUploadVO class IDocumentService(ABC): @@ -16,11 +16,25 @@ class IDocumentService(ABC): ContentType: str | None, TypeId: int | None = None, TypeCode: str | None = None, - BizDocumentId: int | None = None, Region: str = "default", FileRole: str = "primary", CreatedBy: int | None = None, AutoRun: bool = False, + Speed: str = "normal", ) -> DocumentUploadVO: """上传文档并建立 LeAudit document/file 记录。""" ... + + @abstractmethod + async def ListDocuments( + self, + Page: int = 1, + PageSize: int = 20, + Keyword: str | None = None, + TypeCode: str | None = None, + Region: str | None = None, + ProcessingStatus: str | None = None, + ResultStatus: str | None = None, + ) -> DocumentListPageVO: + """获取文档列表(仅最新版本,附历史版本摘要)。""" + ... diff --git a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py index 1f28e62..11b50c9 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/documentServiceImpl.py @@ -2,9 +2,13 @@ from __future__ import annotations +from datetime import datetime import hashlib import mimetypes +import re import time +import unicodedata +import uuid from pathlib import Path from sqlalchemy import text @@ -14,7 +18,12 @@ from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_common.fastapi_common_storage.oss_path_utils import OssPathUtils -from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import DocumentUploadVO +from fastapi_modules.fastapi_leaudit.domian.vo.documentVo import ( + DocumentHistoryVersionVO, + DocumentListItemVO, + DocumentListPageVO, + DocumentUploadVO, +) from fastapi_modules.fastapi_leaudit.models import LeauditDocument, LeauditDocumentFile from fastapi_modules.fastapi_leaudit.services import IAuditService, IDocumentService, IOssService from fastapi_modules.fastapi_leaudit.services.impl.auditServiceImpl import AuditServiceImpl @@ -39,11 +48,11 @@ class DocumentServiceImpl(IDocumentService): ContentType: str | None, TypeId: int | None = None, TypeCode: str | None = None, - BizDocumentId: int | None = None, Region: str = "default", FileRole: str = "primary", CreatedBy: int | None = None, AutoRun: bool = False, + Speed: str = "normal", ) -> DocumentUploadVO: """上传文档并建立 LeAudit document/file 记录。""" if not FileName: @@ -59,6 +68,9 @@ class DocumentServiceImpl(IDocumentService): mimeType = ContentType or mimetypes.guess_type(FileName)[0] or "application/octet-stream" fileSha256 = hashlib.sha256(FileContent).hexdigest() fileSize = len(FileContent) + normalizedSpeed = _normalize_speed(Speed) + normalizedName = _normalize_document_name(FileName) + uploadedAt = datetime.now() async with GetAsyncSession() as Session: if TypeId is not None and TypeCode is not None: @@ -107,69 +119,386 @@ class DocumentServiceImpl(IDocumentService): resolvedTypeId = int(typeRow["id"]) resolvedTypeCode = str(typeRow["code"]) - resolvedBizDocumentId = BizDocumentId or int(time.time() * 1000) + duplicateUpload = False + previousVersionId: int | None = None + rootVersionId: int | None = None + versionGroupKey: str | None = None + versionNo = 1 - document = await LeauditDocument.upsert_by_biz_id( - Session, - bizDocumentId=resolvedBizDocumentId, - typeId=resolvedTypeId, - region=normalizedRegion, - processingStatus="waiting", - ) + latestCandidate = None + if normalizedFileRole == "primary": + latestCandidate = await _find_latest_version_candidate( + Session, + type_id=resolvedTypeId, + region=normalizedRegion, + normalized_name=normalizedName, + ) - versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id) - versionNo = f"v{versionCount + 1}" - objectKey = OssPathUtils.BuildBusinessDocKey( - Region=normalizedRegion, - TypeCode=resolvedTypeCode, - DocumentId=document.Id, - Version=versionNo, - FileRole=normalizedFileRole, - FileName=FileName, - ) - ossUrl = await self.OssService.UploadBytes( - ObjectKey=objectKey, - Content=FileContent, - ContentType=mimeType, - ) + if latestCandidate and latestCandidate["sha256"] == fileSha256: + duplicateUpload = True + document = await Session.get(LeauditDocument, int(latestCandidate["document_id"])) + documentFile = await Session.get(LeauditDocumentFile, int(latestCandidate["file_id"])) + if document is None or documentFile is None: + raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, "重复上传版本定位失败") + await Session.commit() + else: + internalDocumentNo = time.time_ns() + if latestCandidate: + previousVersionId = int(latestCandidate["document_id"]) + rootVersionId = int(latestCandidate["root_version_id"] or latestCandidate["document_id"]) + versionGroupKey = str(latestCandidate["version_group_key"]) + versionNo = int(latestCandidate["version_no"]) + 1 + previousDocument = await Session.get(LeauditDocument, previousVersionId) + if previousDocument is not None: + previousDocument.isLatestVersion = False + else: + versionGroupKey = uuid.uuid4().hex - await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id) - documentFile = LeauditDocumentFile( - documentId=document.Id, - fileRole=normalizedFileRole, - fileName=FileName, - fileExt=fileExt, - mimeType=mimeType, - fileSize=fileSize, - sha256=fileSha256, - localPath=None, - ossUrl=ossUrl, - storageProvider="minio", - isActive=True, - createdBy=CreatedBy, - ) - Session.add(documentFile) - await Session.flush() - await Session.commit() - await Session.refresh(document) - await Session.refresh(documentFile) + document = await LeauditDocument.create_new( + Session, + bizDocumentId=internalDocumentNo, + typeId=resolvedTypeId, + region=normalizedRegion, + processingStatus="waiting", + versionGroupKey=versionGroupKey, + versionNo=versionNo, + previousVersionId=previousVersionId, + rootVersionId=rootVersionId, + isLatestVersion=True, + normalizedName=normalizedName, + ) + if document.rootVersionId is None: + document.rootVersionId = document.Id + rootVersionId = document.Id + else: + rootVersionId = document.rootVersionId + + versionLabel = f"v{document.versionNo}" + objectKey = OssPathUtils.BuildBusinessDocKey( + Region=normalizedRegion, + TypeCode=resolvedTypeCode, + DocumentId=document.Id, + Version=versionLabel, + FileRole=normalizedFileRole, + FileName=FileName, + Year=uploadedAt.year, + Month=uploadedAt.month, + ) + ossUrl = await self.OssService.UploadBytes( + ObjectKey=objectKey, + Content=FileContent, + ContentType=mimeType, + ) + + versionCount = await LeauditDocumentFile.count_by_document(Session, document.Id) + _ = versionCount # single-version-per-document in current model; kept for future extension + await LeauditDocumentFile.deactivate_active_by_document(Session, document.Id) + documentFile = LeauditDocumentFile( + documentId=document.Id, + fileRole=normalizedFileRole, + fileName=FileName, + fileExt=fileExt, + mimeType=mimeType, + fileSize=fileSize, + sha256=fileSha256, + localPath=None, + ossUrl=ossUrl, + storageProvider="minio", + isActive=True, + createdBy=CreatedBy, + ) + Session.add(documentFile) + await Session.flush() + await Session.commit() + await Session.refresh(document) + await Session.refresh(documentFile) + + ossUrl = documentFile.ossUrl or "" run = None processingStatus = document.processingStatus or "waiting" if AutoRun: - run = await self.AuditService.Run(DocumentId=document.Id) + run = await self.AuditService.Run( + DocumentId=document.Id, + Speed=Speed, + Force=duplicateUpload, + ) processingStatus = "running" if run.status in {"pending", "running"} else run.status return DocumentUploadVO( documentId=document.Id, - bizDocumentId=document.bizDocumentId, + internalDocumentNo=document.bizDocumentId, + versionGroupKey=document.versionGroupKey or "", + versionNo=int(document.versionNo or 1), + previousVersionId=document.previousVersionId, + rootVersionId=int(document.rootVersionId or document.Id), + duplicateUpload=duplicateUpload, fileId=documentFile.Id, typeId=resolvedTypeId, typeCode=resolvedTypeCode, region=normalizedRegion, fileName=documentFile.fileName, ossUrl=ossUrl, + speed=normalizedSpeed, processingStatus=processingStatus, autoRunTriggered=AutoRun, run=run, ) + + async def ListDocuments( + self, + Page: int = 1, + PageSize: int = 20, + Keyword: str | None = None, + TypeCode: str | None = None, + Region: str | None = None, + ProcessingStatus: str | None = None, + ResultStatus: str | None = None, + ) -> DocumentListPageVO: + """获取文档列表(仅最新版本,附历史版本摘要)。""" + page = max(1, int(Page)) + page_size = max(1, min(int(PageSize), 100)) + offset = (page - 1) * page_size + + filters = ["d.is_latest_version = true", "d.deleted_at IS NULL", "f.is_active = true", "f.file_role = 'primary'"] + params: dict[str, object] = {"limit": page_size, "offset": offset} + + if Keyword: + filters.append("(f.file_name ILIKE :keyword OR d.normalized_name ILIKE :keyword)") + params["keyword"] = f"%{Keyword.strip()}%" + if TypeCode: + filters.append("dt.code = :type_code") + params["type_code"] = TypeCode.strip() + if Region: + filters.append("d.region = :region") + params["region"] = Region.strip() + if ProcessingStatus: + filters.append("d.processing_status = :processing_status") + params["processing_status"] = ProcessingStatus.strip() + if ResultStatus: + filters.append("ar.result_status = :result_status") + params["result_status"] = ResultStatus.strip() + + where_clause = " AND ".join(filters) + + count_sql = text( + f""" + SELECT COUNT(*) + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + WHERE {where_clause} + """ + ) + + list_sql = text( + f""" + SELECT + d.id AS document_id, + d.biz_document_id AS internal_document_no, + d.version_group_key, + d.version_no, + d.root_version_id, + d.previous_version_id, + d.type_id, + dt.code AS type_code, + d.region, + d.normalized_name, + d.processing_status, + d.current_run_id, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + f.mime_type, + f.file_size, + f.oss_url, + ar.status AS run_status, + ar.result_status, + ar.total_score, + ar.passed_count, + ar.failed_count, + ar.skipped_count, + vc.total_versions, + COALESCE(vc.total_versions, 1) > 1 AS has_history + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + LEFT JOIN ( + SELECT version_group_key, COUNT(*) AS total_versions + FROM leaudit_documents + WHERE deleted_at IS NULL + GROUP BY version_group_key + ) vc + ON vc.version_group_key = d.version_group_key + WHERE {where_clause} + ORDER BY d.updated_at DESC, d.id DESC + LIMIT :limit OFFSET :offset + """ + ) + + history_sql = text( + """ + SELECT + d.version_group_key, + d.id AS document_id, + d.version_no, + d.processing_status, + d.updated_at, + f.id AS file_id, + f.file_name, + f.file_ext, + ar.status AS run_status, + ar.result_status + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + WHERE d.version_group_key = ANY(:group_keys) + AND d.is_latest_version = false + AND d.deleted_at IS NULL + ORDER BY d.version_group_key, d.version_no DESC, d.id DESC + """ + ) + + async with GetAsyncSession() as Session: + total = int((await Session.execute(count_sql, params)).scalar_one()) + rows = (await Session.execute(list_sql, params)).mappings().all() + + history_by_group: dict[str, list[DocumentHistoryVersionVO]] = {} + group_keys = [str(row["version_group_key"]) for row in rows if row["version_group_key"]] + if group_keys: + history_rows = ( + await Session.execute(history_sql, {"group_keys": group_keys}) + ).mappings().all() + for row in history_rows: + history_by_group.setdefault(str(row["version_group_key"]), []).append( + DocumentHistoryVersionVO( + documentId=int(row["document_id"]), + fileId=int(row["file_id"]) if row["file_id"] is not None else None, + versionNo=int(row["version_no"]), + fileName=row["file_name"], + fileExt=row["file_ext"], + processingStatus=row["processing_status"], + runStatus=row["run_status"], + resultStatus=row["result_status"], + updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, + ) + ) + + documents: list[DocumentListItemVO] = [] + for row in rows: + group_key = str(row["version_group_key"] or "") + documents.append( + DocumentListItemVO( + documentId=int(row["document_id"]), + internalDocumentNo=int(row["internal_document_no"]), + versionGroupKey=group_key, + versionNo=int(row["version_no"] or 1), + rootVersionId=int(row["root_version_id"] or row["document_id"]), + previousVersionId=int(row["previous_version_id"]) if row["previous_version_id"] is not None else None, + typeId=int(row["type_id"]) if row["type_id"] is not None else None, + typeCode=row["type_code"], + region=row["region"], + normalizedName=row["normalized_name"], + fileId=int(row["file_id"]) if row["file_id"] is not None else None, + fileName=row["file_name"], + fileExt=row["file_ext"], + mimeType=row["mime_type"], + fileSize=int(row["file_size"]) if row["file_size"] is not None else None, + ossUrl=row["oss_url"], + processingStatus=row["processing_status"], + currentRunId=int(row["current_run_id"]) if row["current_run_id"] is not None else None, + runStatus=row["run_status"], + resultStatus=row["result_status"], + totalScore=float(row["total_score"]) if row["total_score"] is not None else None, + passedCount=int(row["passed_count"]) if row["passed_count"] is not None else None, + failedCount=int(row["failed_count"]) if row["failed_count"] is not None else None, + skippedCount=int(row["skipped_count"]) if row["skipped_count"] is not None else None, + updatedAt=row["updated_at"].isoformat() if row["updated_at"] else None, + hasHistory=bool(row["has_history"]), + totalVersions=int(row["total_versions"] or 1), + historyVersions=history_by_group.get(group_key, []), + ) + ) + + total_pages = (total + page_size - 1) // page_size if total else 0 + return DocumentListPageVO( + total=total, + page=page, + pageSize=page_size, + totalPages=total_pages, + documents=documents, + ) + + +async def _find_latest_version_candidate( + session, + *, + type_id: int, + region: str, + normalized_name: str, +) -> dict | None: + """Find the latest primary document version candidate by normalized name.""" + result = await session.execute( + text( + """ + SELECT + d.id AS document_id, + d.version_group_key, + d.version_no, + d.root_version_id, + f.id AS file_id, + f.sha256 + FROM leaudit_documents d + JOIN leaudit_document_files f + ON f.document_id = d.id + AND f.is_active = true + AND f.file_role = 'primary' + WHERE d.type_id = :type_id + AND d.region = :region + AND d.normalized_name = :normalized_name + AND d.is_latest_version = true + AND d.deleted_at IS NULL + ORDER BY d.version_no DESC, d.id DESC + LIMIT 1 + """ + ), + { + "type_id": type_id, + "region": region, + "normalized_name": normalized_name, + }, + ) + row = result.mappings().first() + return dict(row) if row else None + + +def _normalize_speed(speed: str | None) -> str: + """Normalize front-end speed selection to urgent/normal.""" + normalized = (speed or "").strip().lower() + if normalized in {"urgent", "high", "fast", "emergency", "紧急"}: + return "urgent" + return "normal" + + +def _normalize_document_name(file_name: str) -> str: + """Build a stable name key for same-name version matching.""" + stem = Path(file_name).stem + name = unicodedata.normalize("NFKC", stem).strip().lower() + name = re.sub(r"[\s_\-]+", " ", name) + name = re.sub(r"(?:\(|()\d+(?:\)|))$", "", name).strip() + name = re.sub(r"(?:[-_\s]*副本|[-_\s]*copy)$", "", name).strip() + name = re.sub(r"\s+", " ", name).strip() + return name or "untitled" diff --git a/scripts/migrate_20260429_document_versioning.sql b/scripts/migrate_20260429_document_versioning.sql new file mode 100644 index 0000000..7e4b178 --- /dev/null +++ b/scripts/migrate_20260429_document_versioning.sql @@ -0,0 +1,75 @@ +BEGIN; + +ALTER TABLE leaudit_documents + ADD COLUMN IF NOT EXISTS version_group_key VARCHAR(64), + ADD COLUMN IF NOT EXISTS version_no INTEGER NOT NULL DEFAULT 1, + ADD COLUMN IF NOT EXISTS previous_version_id BIGINT, + ADD COLUMN IF NOT EXISTS root_version_id BIGINT, + ADD COLUMN IF NOT EXISTS is_latest_version BOOLEAN NOT NULL DEFAULT true, + ADD COLUMN IF NOT EXISTS normalized_name VARCHAR(512); + +COMMENT ON COLUMN leaudit_documents.biz_document_id IS '平台内部追踪号(兼容旧字段名)'; +COMMENT ON COLUMN leaudit_documents.version_group_key IS '文档版本归档组键,同一名称版本链共用'; +COMMENT ON COLUMN leaudit_documents.version_no IS '同一文档系列中的版本号,从 1 开始'; +COMMENT ON COLUMN leaudit_documents.previous_version_id IS '上一版本文档ID'; +COMMENT ON COLUMN leaudit_documents.root_version_id IS '首版本文档ID'; +COMMENT ON COLUMN leaudit_documents.is_latest_version IS '是否当前最新版本'; +COMMENT ON COLUMN leaudit_documents.normalized_name IS '归一化文件名(不含扩展名),用于版本匹配'; + +UPDATE leaudit_documents d +SET + version_group_key = COALESCE(d.version_group_key, 'legacy-' || d.id::text), + version_no = COALESCE(d.version_no, 1), + root_version_id = COALESCE(d.root_version_id, d.id), + is_latest_version = COALESCE(d.is_latest_version, true), + normalized_name = COALESCE( + d.normalized_name, + NULLIF( + trim( + regexp_replace( + lower( + regexp_replace( + regexp_replace( + COALESCE( + ( + SELECT f.file_name + FROM leaudit_document_files f + WHERE f.document_id = d.id + ORDER BY f.is_active DESC, f.id DESC + LIMIT 1 + ), + '' + ), + '\.[^.]+$', + '', + '' + ), + '[[:space:]_-]+', + ' ', + 'g' + ) + ), + '(?:\(|()\d+(?:\)|))$|(?:[-_\s]*副本|[-_\s]*copy)$', + '', + 'g' + ) + ), + '' + ) + ) +WHERE d.version_group_key IS NULL + OR d.root_version_id IS NULL + OR d.normalized_name IS NULL; + +CREATE INDEX IF NOT EXISTS idx_leaudit_documents_version_group + ON leaudit_documents(version_group_key); + +CREATE INDEX IF NOT EXISTS idx_leaudit_documents_name_match + ON leaudit_documents(type_id, region, normalized_name, is_latest_version) + WHERE deleted_at IS NULL; + +CREATE UNIQUE INDEX IF NOT EXISTS uq_leaudit_documents_latest_version_group + ON leaudit_documents(version_group_key) + WHERE is_latest_version = true AND deleted_at IS NULL; + +COMMIT; diff --git a/scripts/schema_v2_add_evaluation_tables.sql b/scripts/schema_v2_add_evaluation_tables.sql index db9fae5..bc8d772 100644 --- a/scripts/schema_v2_add_evaluation_tables.sql +++ b/scripts/schema_v2_add_evaluation_tables.sql @@ -109,13 +109,19 @@ COMMENT ON COLUMN jwt_tokens.update_time IS '记录更新时间'; -- -------------------------------------------------------------------------- -- 5.2 leaudit_documents -- -------------------------------------------------------------------------- -COMMENT ON TABLE leaudit_documents IS '文档主表 — 每个上传的业务文档对应一条记录,跟踪评查处理状态'; +COMMENT ON TABLE leaudit_documents IS '文档主表 — LeAudit 平台内部文档实例,每次上传对应一条记录,可通过版本链归档历史版本'; COMMENT ON COLUMN leaudit_documents.id IS '主键,自增(leaudit内部文档ID)'; -COMMENT ON COLUMN leaudit_documents.biz_document_id IS '业务系统文档ID,关联旧系统 documents.id,用于跨系统追溯'; +COMMENT ON COLUMN leaudit_documents.biz_document_id IS '平台内部追踪号(兼容旧字段名)'; COMMENT ON COLUMN leaudit_documents.type_id IS '文档类型ID,外键引用 leaudit_document_types.id'; COMMENT ON COLUMN leaudit_documents.processing_status IS '处理状态: waiting(等待处理) | running(处理中) | completed(已完成) | failed(失败)'; COMMENT ON COLUMN leaudit_documents.current_run_id IS '当前活跃的评查运行ID,外键引用 leaudit_audit_runs.id'; +COMMENT ON COLUMN leaudit_documents.version_group_key IS '文档版本归档组键,同一名称版本链共用'; +COMMENT ON COLUMN leaudit_documents.version_no IS '同一文档系列中的版本号,从1开始'; +COMMENT ON COLUMN leaudit_documents.previous_version_id IS '上一版本文档ID'; +COMMENT ON COLUMN leaudit_documents.root_version_id IS '首版本文档ID'; +COMMENT ON COLUMN leaudit_documents.is_latest_version IS '是否当前最新版本'; +COMMENT ON COLUMN leaudit_documents.normalized_name IS '归一化文件名(不含扩展名),用于版本匹配'; COMMENT ON COLUMN leaudit_documents.create_time IS '记录创建时间'; COMMENT ON COLUMN leaudit_documents.update_time IS '记录更新时间';