diff --git a/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py b/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py
index d67c1b0..d64955b 100644
--- a/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py
+++ b/fastapi_modules/fastapi_leaudit/controllers/crossReviewController.py
@@ -3,7 +3,7 @@
from typing import Any
from fastapi import Depends, File, Form, Query, UploadFile
-from fastapi.responses import JSONResponse
+from fastapi.responses import JSONResponse, Response
from fastapi_common.fastapi_common_security.security import verify_access_token
from fastapi_common.fastapi_common_web.controller import BaseController
@@ -221,6 +221,26 @@ class CrossReviewController(BaseController):
Data = await self.CrossReviewService.GetDocumentPendingVotes(CurrentUserId=int(payload["user_id"]), DocumentId=DocumentId)
return Result.success(data=Data)
+ @self.router.get("/documents/{DocumentId}/proposals/export")
+ async def ExportDocumentProposals(
+ DocumentId: int,
+ payload: dict[str, Any] = Depends(verify_access_token),
+ ):
+ """导出文档提案列表 Excel。"""
+ if not await self._check_permission(int(payload["user_id"]), [self._PERMISSIONS["proposal_read"]]):
+ return JSONResponse(status_code=403, content={"code": 403, "msg": "当前用户没有导出交叉评查提案权限", "data": None})
+ fileContent, fileName = await self.CrossReviewService.ExportDocumentProposals(
+ CurrentUserId=int(payload["user_id"]),
+ DocumentId=DocumentId,
+ )
+ return Response(
+ content=fileContent,
+ media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
+ headers={
+ "Content-Disposition": f"attachment; filename*=UTF-8''{fileName}",
+ },
+ )
+
async def _check_permission(self, user_id: int, permission_keys: list[str]) -> bool:
"""OR 逻辑权限校验。"""
for permission_key in permission_keys:
diff --git a/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py b/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py
index 5ff1766..d5c92ea 100644
--- a/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py
+++ b/fastapi_modules/fastapi_leaudit/domian/Dto/crossReviewDto.py
@@ -39,9 +39,12 @@ class CrossReviewTaskDocumentQueryDTO(BaseModel):
class CrossReviewProposalCreateDTO(BaseModel):
"""创建交叉评查提案。"""
- reviewPointResultId: int = Field(..., description="规则结果ID")
+ proposalType: str = Field("review_point", description="提案类型:review_point/supplement")
+ reviewPointResultId: int | None = Field(None, description="规则结果ID")
documentId: int = Field(..., description="文档ID")
evaluationPointId: int | None = Field(None, description="评查点ID")
+ evaluationPointName: str | None = Field(None, description="补充评查点名称")
+ extractionResultText: str | None = Field(None, description="补充抽取结果文本")
auditOpinion: str = Field(..., min_length=1, description="提案理由")
deductionScore: float = Field(..., description="分值调整量")
diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py
index ed96061..7cfe4f0 100644
--- a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py
+++ b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py
@@ -146,7 +146,10 @@ class CrossReviewProposalItemVO(BaseModel):
"""提案列表项。"""
proposalId: int = Field(..., description="提案ID")
+ proposalType: str = Field("review_point", description="提案类型")
+ reviewPointResultId: int | None = Field(None, description="评查点结果ID")
evaluationPointName: str = Field("", description="评查点名称")
+ extractionResultText: str = Field("", description="抽取结果文本")
proposedScore: float = Field(..., description="调整分值")
reason: str = Field("", description="提案理由")
proposer: str = Field("", description="提案人姓名")
diff --git a/fastapi_modules/fastapi_leaudit/services/crossReviewService.py b/fastapi_modules/fastapi_leaudit/services/crossReviewService.py
index 59a93fa..1db3fba 100644
--- a/fastapi_modules/fastapi_leaudit/services/crossReviewService.py
+++ b/fastapi_modules/fastapi_leaudit/services/crossReviewService.py
@@ -99,6 +99,11 @@ class ICrossReviewService(ABC):
"""获取文档待投票信息。"""
...
+ @abstractmethod
+ async def ExportDocumentProposals(self, CurrentUserId: int, DocumentId: int) -> tuple[bytes, str]:
+ """导出文档交叉评查意见 Excel。"""
+ ...
+
@abstractmethod
async def UploadTaskDocument(
self,
diff --git a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py
index e2175eb..fb65594 100644
--- a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py
+++ b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py
@@ -2,8 +2,13 @@
from __future__ import annotations
+import io
import logging
+from datetime import datetime
from math import floor
+from urllib.parse import quote
+from xml.sax.saxutils import escape
+from zipfile import ZIP_DEFLATED, ZipFile
from sqlalchemy import bindparam, text
@@ -49,6 +54,37 @@ from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import Do
class CrossReviewServiceImpl(ICrossReviewService):
"""交叉评查服务实现。"""
+ _PROPOSAL_EXPORT_HEADERS: tuple[str, ...] = (
+ "任务名称",
+ "文档名称",
+ "意见分类",
+ "评查点名称",
+ "问题依据/命中内容",
+ "原问题说明",
+ "评查意见",
+ "分数调整",
+ "提案状态",
+ "发起人",
+ "提出时间",
+ "同意人数",
+ "同意人员",
+ "不同意人数",
+ "不同意人员",
+ "未投人数",
+ "未投人员",
+ )
+
+ _VOTE_EXPORT_HEADERS: tuple[str, ...] = (
+ "任务名称",
+ "文档名称",
+ "意见分类",
+ "评查点名称",
+ "发起人",
+ "投票人",
+ "投票状态",
+ "投票时间",
+ )
+
_SCHEMA_BOOTSTRAP_STATEMENTS: tuple[str, ...] = (
"""
CREATE TABLE IF NOT EXISTS leaudit_cross_review_tasks (
@@ -110,8 +146,11 @@ class CrossReviewServiceImpl(ICrossReviewService):
id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
task_id BIGINT NOT NULL,
document_id BIGINT NOT NULL,
- rule_result_id BIGINT NOT NULL,
+ proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point',
+ rule_result_id BIGINT,
proposer_id BIGINT NOT NULL,
+ evaluation_point_name VARCHAR(255),
+ extraction_result_text TEXT,
proposed_score_delta NUMERIC(10, 2) NOT NULL,
reason TEXT NOT NULL,
status VARCHAR(32) NOT NULL DEFAULT 'pending',
@@ -564,6 +603,13 @@ class CrossReviewServiceImpl(ICrossReviewService):
WHERE rr.document_id = d.id
AND rr.run_id = d.current_run_id
) es ON TRUE
+ LEFT JOIN LATERAL (
+ SELECT COALESCE(SUM(proposed_score_delta), 0) AS approved_delta
+ FROM leaudit_cross_review_proposals p
+ WHERE p.document_id = d.id
+ AND p.status = 'approved'
+ AND p.delete_time IS NULL
+ ) pd ON TRUE
WHERE {whereSql}
ORDER BY d.created_at DESC, d.id DESC
LIMIT :limit OFFSET :offset
@@ -601,10 +647,16 @@ class CrossReviewServiceImpl(ICrossReviewService):
errorMessages=self._parse_text_array(row.get("error_messages")),
issueMessages=self._parse_text_array(row.get("issue_messages")),
manualMessages=self._parse_text_array(row.get("manual_messages")),
- finalScore=float(row.get("final_score") or 0),
+ finalScore=float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0),
fullScore=float(row.get("full_score") or 0),
- scoreSummary=str(row.get("score_summary") or ""),
- scorePercent=float(row.get("score_percent") or 0),
+ scoreSummary=self._build_score_summary(
+ float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0),
+ float(row.get("full_score") or 0),
+ ),
+ scorePercent=self._build_score_percent(
+ float(row.get("final_score") or 0) + float(row.get("approved_delta") or 0),
+ float(row.get("full_score") or 0),
+ ),
)
for row in rows
]
@@ -738,13 +790,33 @@ class CrossReviewServiceImpl(ICrossReviewService):
"""创建交叉评查提案。"""
async with GetAsyncSession() as session:
await self._ensure_tables_ready(session)
+ proposalType = (Body.proposalType or "review_point").strip().lower()
+ if proposalType not in {"review_point", "supplement"}:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "proposalType 仅支持 review_point、supplement")
if Body.deductionScore == 0:
raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "不允许创建 0 分提案")
taskId = await self._resolve_task_id_for_document(session, Body.documentId, CurrentUserId)
await self._ensure_task_member(session, taskId, CurrentUserId)
await self._ensure_task_document(session, taskId, Body.documentId)
- ruleResult = await self._load_rule_result(session, Body.reviewPointResultId, Body.documentId)
+
+ ruleResult = None
+ reviewPointResultId: int | None = None
+ evaluationPointName = (Body.evaluationPointName or "").strip()
+ extractionResultText = (Body.extractionResultText or "").strip()
+
+ if proposalType == "review_point":
+ if Body.reviewPointResultId is None:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "reviewPointResultId 不能为空")
+ reviewPointResultId = int(Body.reviewPointResultId)
+ ruleResult = await self._load_rule_result(session, reviewPointResultId, Body.documentId)
+ if not evaluationPointName:
+ evaluationPointName = str(ruleResult.get("rule_name") or ruleResult.get("rule_id") or "")
+ else:
+ if not evaluationPointName:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写评查点名称")
+ if not extractionResultText:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "补充意见必须填写抽取结果")
duplicateExists = bool(
await session.scalar(
@@ -754,29 +826,41 @@ class CrossReviewServiceImpl(ICrossReviewService):
FROM leaudit_cross_review_proposals
WHERE task_id = :task_id
AND document_id = :document_id
- AND rule_result_id = :rule_result_id
+ AND proposal_type = :proposal_type
AND proposer_id = :proposer_id
AND status IN ('pending', 'approved')
AND delete_time IS NULL
+ AND (
+ (:proposal_type = 'review_point' AND rule_result_id = :rule_result_id)
+ OR (
+ :proposal_type = 'supplement'
+ AND COALESCE(evaluation_point_name, '') = :evaluation_point_name
+ AND COALESCE(extraction_result_text, '') = :extraction_result_text
+ )
+ )
LIMIT 1
"""
),
{
"task_id": taskId,
"document_id": Body.documentId,
- "rule_result_id": Body.reviewPointResultId,
+ "proposal_type": proposalType,
+ "rule_result_id": reviewPointResultId,
+ "evaluation_point_name": evaluationPointName,
+ "extraction_result_text": extractionResultText,
"proposer_id": CurrentUserId,
},
)
)
if duplicateExists:
- raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前评查点已存在您的有效提案")
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前内容已存在您的有效提案")
- currentScore, fullScore = await self._calculate_current_score(session, Body.reviewPointResultId, Body.documentId)
- if Body.deductionScore < 0 and currentScore <= 0:
- raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分")
- if Body.deductionScore > 0 and currentScore >= fullScore:
- raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分")
+ if proposalType == "review_point":
+ currentScore, fullScore = await self._calculate_current_score(session, reviewPointResultId, Body.documentId)
+ if Body.deductionScore < 0 and currentScore <= 0:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已为 0,不能继续扣分")
+ if Body.deductionScore > 0 and currentScore >= fullScore:
+ raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "当前分值已满分,不能继续加分")
await self._reset_transaction_for_write(session)
async with session.begin():
@@ -785,17 +869,42 @@ class CrossReviewServiceImpl(ICrossReviewService):
text(
"""
INSERT INTO leaudit_cross_review_proposals
- (task_id, document_id, rule_result_id, proposer_id, proposed_score_delta, reason, status)
+ (
+ task_id,
+ document_id,
+ proposal_type,
+ rule_result_id,
+ proposer_id,
+ evaluation_point_name,
+ extraction_result_text,
+ proposed_score_delta,
+ reason,
+ status
+ )
VALUES
- (:task_id, :document_id, :rule_result_id, :proposer_id, :proposed_score_delta, :reason, 'pending')
+ (
+ :task_id,
+ :document_id,
+ :proposal_type,
+ :rule_result_id,
+ :proposer_id,
+ :evaluation_point_name,
+ :extraction_result_text,
+ :proposed_score_delta,
+ :reason,
+ 'pending'
+ )
RETURNING id, create_time
"""
),
{
"task_id": taskId,
"document_id": Body.documentId,
- "rule_result_id": Body.reviewPointResultId,
+ "proposal_type": proposalType,
+ "rule_result_id": reviewPointResultId,
"proposer_id": CurrentUserId,
+ "evaluation_point_name": evaluationPointName or None,
+ "extraction_result_text": extractionResultText or None,
"proposed_score_delta": Body.deductionScore,
"reason": Body.auditOpinion.strip(),
},
@@ -965,6 +1074,74 @@ class CrossReviewServiceImpl(ICrossReviewService):
pendingProposals=pendingProposals,
)
+ async def ExportDocumentProposals(self, CurrentUserId: int, DocumentId: int) -> tuple[bytes, str]:
+ """导出文档交叉评查意见 Excel。"""
+ async with GetAsyncSession() as session:
+ await self._ensure_tables_ready(session)
+ taskId = await self._resolve_task_id_for_document(session, DocumentId, CurrentUserId)
+ await self._ensure_task_member(session, taskId, CurrentUserId)
+
+ documentMeta = await self._load_document_export_meta(session, taskId, DocumentId)
+ proposalPage = await self._build_document_proposals_page(session, CurrentUserId, taskId, DocumentId, 1, 1000)
+
+ defaultRows: list[dict[str, object]] = []
+ supplementRows: list[dict[str, object]] = []
+ voteRows: list[dict[str, object]] = []
+ taskMembers = await self._load_task_member_names(session, taskId)
+
+ for item in proposalPage.items:
+ row = self._build_export_summary_row(
+ taskId=taskId,
+ taskName=documentMeta["task_name"],
+ documentId=DocumentId,
+ documentName=documentMeta["document_name"],
+ item=item,
+ )
+ if item.proposalType == "supplement":
+ supplementRows.append(row)
+ else:
+ defaultRows.append(row)
+
+ votedNames = {vote.voter for vote in item.votes}
+ for vote in item.votes:
+ voteRows.append(
+ {
+ "任务名称": documentMeta["task_name"],
+ "文档名称": documentMeta["document_name"],
+ "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见",
+ "评查点名称": item.evaluationPointName,
+ "发起人": item.proposer,
+ "投票人": vote.voter,
+ "投票结果": self._translate_vote_type(vote.voteType),
+ "投票时间": "",
+ }
+ )
+ for _, memberName in taskMembers.items():
+ if memberName in votedNames:
+ continue
+ voteRows.append(
+ {
+ "任务名称": documentMeta["task_name"],
+ "文档名称": documentMeta["document_name"],
+ "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见",
+ "评查点名称": item.evaluationPointName,
+ "发起人": item.proposer,
+ "投票人": memberName,
+ "投票结果": "未投",
+ "投票时间": "",
+ }
+ )
+
+ workbook = self._build_simple_xlsx(
+ sheets=[
+ ("默认规则意见", list(self._PROPOSAL_EXPORT_HEADERS), defaultRows),
+ ("补充意见", list(self._PROPOSAL_EXPORT_HEADERS), supplementRows),
+ ("投票明细", list(self._VOTE_EXPORT_HEADERS), voteRows),
+ ]
+ )
+ safeName = quote(f"交叉评查意见_{documentMeta['document_name']}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx")
+ return workbook, safeName
+
async def UploadTaskDocument(
self,
CurrentUserId: int,
@@ -1105,8 +1282,11 @@ class CrossReviewServiceImpl(ICrossReviewService):
p.id,
p.task_id,
p.document_id,
+ p.proposal_type,
p.rule_result_id,
p.proposer_id,
+ p.evaluation_point_name,
+ p.extraction_result_text,
p.proposed_score_delta,
p.reason,
p.status,
@@ -1145,7 +1325,10 @@ class CrossReviewServiceImpl(ICrossReviewService):
items.append(
CrossReviewProposalItemVO(
proposalId=proposalId,
- evaluationPointName=str(row.get("rule_name") or ""),
+ proposalType=str(row.get("proposal_type") or "review_point"),
+ reviewPointResultId=self._to_int(row.get("rule_result_id")),
+ evaluationPointName=str(row.get("evaluation_point_name") or row.get("rule_name") or ""),
+ extractionResultText=str(row.get("extraction_result_text") or ""),
proposedScore=float(row["proposed_score_delta"] or 0),
reason=str(row.get("reason") or ""),
proposer=str(row.get("proposer_name") or ""),
@@ -1160,7 +1343,7 @@ class CrossReviewServiceImpl(ICrossReviewService):
disagreeVoters=disagreeVoters,
pendingVoters=pendingVoters,
canVote=canVote,
- problemMessage=str(row.get("fail_message") or row.get("rule_name") or ""),
+ problemMessage=str(row.get("fail_message") or row.get("extraction_result_text") or row.get("rule_name") or ""),
proposerId=int(row["proposer_id"]),
createdAt=row.get("create_time"),
status=str(row.get("status") or "pending"),
@@ -1288,6 +1471,38 @@ class CrossReviewServiceImpl(ICrossReviewService):
raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "交叉评查提案不存在")
return row
+ async def _load_document_export_meta(self, session, taskId: int, documentId: int) -> dict[str, str]:
+ row = (
+ await session.execute(
+ text(
+ """
+ SELECT
+ t.task_name,
+ COALESCE(f.file_name, d.normalized_name, CAST(d.id AS TEXT)) AS document_name
+ FROM leaudit_cross_review_tasks t
+ JOIN leaudit_documents d
+ ON d.id = :document_id
+ LEFT JOIN leaudit_document_files f
+ ON f.document_id = d.id
+ AND f.is_active = true
+ AND f.deleted_at IS NULL
+ AND f.file_role IN ('original', 'primary')
+ WHERE t.id = :task_id
+ AND t.delete_time IS NULL
+ AND d.deleted_at IS NULL
+ LIMIT 1
+ """
+ ),
+ {"task_id": taskId, "document_id": documentId},
+ )
+ ).mappings().first()
+ if not row:
+ raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "导出目标不存在")
+ return {
+ "task_name": str(row.get("task_name") or f"任务{taskId}"),
+ "document_name": str(row.get("document_name") or f"文档{documentId}"),
+ }
+
async def _load_task_member_names(self, session, taskId: int) -> dict[int, str]:
rows = (
await session.execute(
@@ -1469,6 +1684,41 @@ class CrossReviewServiceImpl(ICrossReviewService):
if not exists:
raise LeauditException(StatusCodeEnum.HTTP_500_INTERNAL_SERVER_ERROR, f"交叉评查表未初始化: {tableName}")
+ proposalColumns = await self._load_table_columns(session, "leaudit_cross_review_proposals")
+ if "proposal_type" not in proposalColumns:
+ await session.execute(
+ text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN proposal_type VARCHAR(32) NOT NULL DEFAULT 'review_point'")
+ )
+ if "evaluation_point_name" not in proposalColumns:
+ await session.execute(
+ text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN evaluation_point_name VARCHAR(255)")
+ )
+ if "extraction_result_text" not in proposalColumns:
+ await session.execute(
+ text("ALTER TABLE leaudit_cross_review_proposals ADD COLUMN extraction_result_text TEXT")
+ )
+ if proposalColumns:
+ isRuleResultNotNull = bool(
+ await session.scalar(
+ text(
+ """
+ SELECT 1
+ FROM information_schema.columns
+ WHERE table_schema = current_schema()
+ AND table_name = 'leaudit_cross_review_proposals'
+ AND column_name = 'rule_result_id'
+ AND is_nullable = 'NO'
+ LIMIT 1
+ """
+ )
+ )
+ )
+ if isRuleResultNotNull:
+ await session.execute(
+ text("ALTER TABLE leaudit_cross_review_proposals ALTER COLUMN rule_result_id DROP NOT NULL")
+ )
+ await session.commit()
+
async def _ensure_task_member(self, session, taskId: int, userId: int) -> None:
"""校验当前用户是否是任务成员。"""
exists = bool(
@@ -1522,3 +1772,283 @@ class CrossReviewServiceImpl(ICrossReviewService):
if isinstance(value, list):
return [str(v) for v in value]
return [str(value)]
+
+ def _build_score_summary(self, finalScore: float, fullScore: float) -> str:
+ if fullScore <= 0:
+ return "0/0"
+ return f"{round(finalScore, 1)}/{round(fullScore, 1)}"
+
+ def _build_score_percent(self, finalScore: float, fullScore: float) -> float:
+ if fullScore <= 0:
+ return 0.0
+ return round(finalScore / fullScore * 100, 1)
+
+ def _build_export_summary_row(
+ self,
+ taskId: int,
+ taskName: str,
+ documentId: int,
+ documentName: str,
+ item: CrossReviewProposalItemVO,
+ ) -> dict[str, object]:
+ return {
+ "任务名称": taskName,
+ "文档名称": documentName,
+ "意见分类": "补充意见" if item.proposalType == "supplement" else "默认规则意见",
+ "评查点名称": item.evaluationPointName,
+ "问题依据/命中内容": item.extractionResultText or item.problemMessage or "",
+ "原问题说明": item.problemMessage or "",
+ "评查意见": item.reason,
+ "分数调整": item.proposedScore,
+ "提案状态": self._translate_proposal_status(item.status),
+ "发起人": item.proposer,
+ "提出时间": self._format_datetime(item.createdAt),
+ "同意人数": len(item.agreeVoters),
+ "同意人员": "、".join(item.agreeVoters),
+ "不同意人数": len(item.disagreeVoters),
+ "不同意人员": "、".join(item.disagreeVoters),
+ "未投人数": len(item.pendingVoters),
+ "未投人员": "、".join(item.pendingVoters),
+ }
+
+ def _translate_vote_type(self, voteType: str) -> str:
+ return {
+ "agree": "同意",
+ "disagree": "不同意",
+ "cancel": "撤销",
+ }.get(voteType, voteType or "")
+
+ def _translate_proposal_status(self, status: str) -> str:
+ return {
+ "pending": "待定",
+ "approved": "已通过",
+ "rejected": "已驳回",
+ "cancelled": "已撤销",
+ }.get(status, status or "")
+
+ def _format_datetime(self, value) -> str:
+ if value is None:
+ return ""
+ if isinstance(value, datetime):
+ return value.strftime("%Y-%m-%d %H:%M:%S")
+ return str(value)
+
+ def _build_simple_xlsx(self, sheets: list[tuple[str, list[str], list[dict[str, object]]]]) -> bytes:
+ normalized_sheets: list[tuple[str, list[str], list[list[str]]]] = []
+ for sheetName, headers, rows in sheets:
+ tableRows = [[self._excel_cell_text(row.get(header)) for header in headers] for row in rows]
+ normalized_sheets.append((sheetName, headers, tableRows))
+
+ sharedStrings: list[str] = []
+ stringIndex: dict[str, int] = {}
+
+ def intern(value: str) -> int:
+ if value in stringIndex:
+ return stringIndex[value]
+ idx = len(sharedStrings)
+ stringIndex[value] = idx
+ sharedStrings.append(value)
+ return idx
+
+ for _, headers, rows in normalized_sheets:
+ for header in headers:
+ intern(header)
+ for row in rows:
+ for cell in row:
+ intern(cell)
+
+ buffer = io.BytesIO()
+ with ZipFile(buffer, "w", ZIP_DEFLATED) as zf:
+ zf.writestr("[Content_Types].xml", self._xlsx_content_types(len(normalized_sheets)))
+ zf.writestr("_rels/.rels", self._xlsx_root_rels())
+ zf.writestr("xl/workbook.xml", self._xlsx_workbook([sheet[0] for sheet in normalized_sheets]))
+ zf.writestr("xl/_rels/workbook.xml.rels", self._xlsx_workbook_rels(len(normalized_sheets)))
+ zf.writestr("xl/styles.xml", self._xlsx_styles())
+ zf.writestr("xl/sharedStrings.xml", self._xlsx_shared_strings(sharedStrings))
+ for index, (_, headers, rows) in enumerate(normalized_sheets, start=1):
+ zf.writestr(f"xl/worksheets/sheet{index}.xml", self._xlsx_sheet(headers, rows, intern))
+ zf.writestr("docProps/core.xml", self._xlsx_core_props())
+ zf.writestr("docProps/app.xml", self._xlsx_app_props([sheet[0] for sheet in normalized_sheets]))
+ return buffer.getvalue()
+
+ def _excel_cell_text(self, value: object) -> str:
+ if value is None:
+ return ""
+ if isinstance(value, float):
+ return str(round(value, 2))
+ return str(value)
+
+ def _xlsx_content_types(self, sheetCount: int) -> str:
+ overrides = [
+ '',
+ '',
+ '',
+ '',
+ '',
+ ]
+ overrides.extend(
+ f''
+ for index in range(1, sheetCount + 1)
+ )
+ return (
+ ''
+ ''
+ ''
+ ''
+ + "".join(overrides)
+ + ""
+ )
+
+ def _xlsx_root_rels(self) -> str:
+ return (
+ ''
+ ''
+ ''
+ ''
+ ''
+ ""
+ )
+
+ def _xlsx_workbook(self, sheetNames: list[str]) -> str:
+ sheets = "".join(
+ f''
+ for index, name in enumerate(sheetNames, start=1)
+ )
+ return (
+ ''
+ ''
+ f"{sheets}"
+ ""
+ )
+
+ def _xlsx_workbook_rels(self, sheetCount: int) -> str:
+ relationships = []
+ for index in range(1, sheetCount + 1):
+ relationships.append(
+ f''
+ )
+ relationships.append(
+ f''
+ )
+ relationships.append(
+ f''
+ )
+ return (
+ ''
+ ''
+ + "".join(relationships)
+ + ""
+ )
+
+ def _xlsx_styles(self) -> str:
+ return (
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ ''
+ )
+
+ def _xlsx_shared_strings(self, values: list[str]) -> str:
+ items = "".join(f"{escape(value)}" for value in values)
+ return (
+ ''
+ f'{items}'
+ )
+
+ def _xlsx_sheet(self, headers: list[str], rows: list[list[str]], intern) -> str:
+ all_rows: list[list[str]] = []
+ if headers:
+ all_rows.append(headers)
+ all_rows.extend(rows)
+ sheet_rows = []
+ for rowIndex, row in enumerate(all_rows, start=1):
+ cells = []
+ styleIndex = "1" if headers and rowIndex == 1 else "0"
+ for colIndex, value in enumerate(row, start=1):
+ colRef = self._xlsx_column_name(colIndex)
+ cells.append(f'{intern(value)}')
+ sheet_rows.append(f'{"".join(cells)}
')
+ return (
+ ''
+ ''
+ f'{"".join(sheet_rows)}'
+ ''
+ )
+
+ def _xlsx_column_name(self, columnIndex: int) -> str:
+ result = ""
+ current = columnIndex
+ while current > 0:
+ current, remainder = divmod(current - 1, 26)
+ result = chr(65 + remainder) + result
+ return result
+
+ def _xlsx_core_props(self) -> str:
+ now = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
+ return (
+ ''
+ ''
+ 'LeAudit'
+ 'LeAudit'
+ f'{now}'
+ f'{now}'
+ ''
+ )
+
+ def _xlsx_app_props(self, sheetNames: list[str]) -> str:
+ parts = "".join(f"{escape(name)}" for name in sheetNames)
+ return (
+ ''
+ ''
+ 'LeAudit'
+ f'{parts}'
+ ''
+ )
+
+ async def _load_table_columns(self, session, tableName: str) -> set[str]:
+ rows = (
+ await session.execute(
+ text(
+ """
+ SELECT column_name
+ FROM information_schema.columns
+ WHERE table_schema = current_schema()
+ AND table_name = :table_name
+ """
+ ),
+ {"table_name": tableName},
+ )
+ ).mappings().all()
+ return {str(row["column_name"]) for row in rows if row.get("column_name")}