"""页级图片质量服务实现。""" from __future__ import annotations from typing import Any from pathlib import Path import tempfile import logging import fitz from leaudit.converters import doc2pdf from sqlalchemy import text from fastapi_admin.config import LEAUDIT_PAGE_QUALITY_ENABLED from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession from fastapi_common.fastapi_common_web.domain.responses import StatusCodeEnum from fastapi_common.fastapi_common_web.exception.LeauditException import LeauditException from fastapi_modules.fastapi_leaudit.domian.vo.pageQualityVo import ( PageQualityDetailVO, PageQualityPageResultVO, PageQualityRecheckVO, PageQualitySummaryVO, ) from fastapi_modules.fastapi_leaudit.services.pageQualityService import IPageQualityService from fastapi_modules.fastapi_leaudit.services.impl.ossServiceImpl import OssServiceImpl logger = logging.getLogger(__name__) class PageQualityServiceImpl(IPageQualityService): """页级图片质量服务实现。""" def __init__(self) -> None: self.OssService = OssServiceImpl() self.DocumentService = None async def DispatchForDocument( self, DocumentId: int, TriggerUserId: int | None = None, Force: bool = False, Speed: str = "normal", ) -> PageQualityRecheckVO | None: """按文档触发页级模糊检测任务。""" await self._ensureTables() if not bool(LEAUDIT_PAGE_QUALITY_ENABLED): return None async with GetAsyncSession() as session: if not Force: active = ( await session.execute( text( """ SELECT id, status FROM leaudit_page_quality_runs WHERE document_id = :document_id AND status IN ('queued', 'running') AND deleted_at IS NULL ORDER BY id DESC LIMIT 1 """ ), {"document_id": DocumentId}, ) ).mappings().first() if active: return PageQualityRecheckVO( runId=int(active["id"]), documentId=DocumentId, status=str(active["status"] or "queued"), ) file_row = ( await session.execute( text( """ SELECT id FROM leaudit_document_files WHERE document_id = :document_id AND is_active = true AND file_role = 'primary' ORDER BY id DESC LIMIT 1 """ ), {"document_id": DocumentId}, ) ).mappings().first() document_file_id = int(file_row["id"]) if file_row and file_row.get("id") is not None else None run_row = ( await session.execute( text( """ INSERT INTO leaudit_page_quality_runs ( document_id, document_file_id, status, created_by, created_at, updated_at ) VALUES ( :document_id, :document_file_id, 'queued', :created_by, NOW(), NOW() ) RETURNING id """ ), { "document_id": DocumentId, "document_file_id": document_file_id, "created_by": TriggerUserId, }, ) ).mappings().first() await session.commit() run_id = int(run_row["id"]) from fastapi_modules.fastapi_leaudit.page_quality.tasks import dispatch_page_quality_task task_id = dispatch_page_quality_task(run_id, speed=Speed) async with GetAsyncSession() as session: await session.execute( text( """ UPDATE leaudit_page_quality_runs SET task_id = :task_id, updated_at = NOW() WHERE id = :run_id """ ), {"run_id": run_id, "task_id": task_id}, ) await session.commit() return PageQualityRecheckVO(runId=run_id, documentId=DocumentId, status="queued") async def GetDocumentSummary( self, CurrentUserId: int, DocumentId: int, ) -> PageQualitySummaryVO: """获取文档页级模糊检测摘要。""" await self._document_service().GetDocument(CurrentUserId=CurrentUserId, Id=DocumentId) await self._ensureTables() return await self._load_summary(DocumentId) async def GetDocumentDetail( self, CurrentUserId: int, DocumentId: int, ) -> PageQualityDetailVO: """获取文档页级模糊检测详情。""" await self._document_service().GetDocument(CurrentUserId=CurrentUserId, Id=DocumentId) await self._ensureTables() summary = await self._load_summary(DocumentId) results: list[PageQualityPageResultVO] = [] if summary.runId is not None: async with GetAsyncSession() as session: rows = ( await session.execute( text( """ SELECT page_num, quality_status, quality_score, reason_text FROM leaudit_page_quality_results WHERE run_id = :run_id AND quality_status IN ('review', 'reject') ORDER BY page_num ASC, id ASC """ ), {"run_id": summary.runId}, ) ).mappings().all() results = [ PageQualityPageResultVO( pageNum=int(row["page_num"]), qualityStatus=str(row["quality_status"] or "review"), qualityScore=float(row["quality_score"]) if row["quality_score"] is not None else None, reasonText=str(row["reason_text"] or "") or None, ) for row in rows ] return PageQualityDetailVO(summary=summary, results=results) async def RecheckDocument( self, CurrentUserId: int, DocumentId: int, Speed: str = "normal", ) -> PageQualityRecheckVO: """手工重跑页级模糊检测。""" await self._document_service().GetDocument(CurrentUserId=CurrentUserId, Id=DocumentId) payload = await self.DispatchForDocument( DocumentId=DocumentId, TriggerUserId=CurrentUserId, Force=True, Speed=Speed, ) if payload is None: raise LeauditException(StatusCodeEnum.HTTP_400_BAD_REQUEST, "页级模糊检测未启用") return payload async def ExecuteRun(self, RunId: int) -> dict[str, Any]: """执行一次页级模糊检测。""" await self._ensureTables() async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT r.id, r.document_id, r.document_file_id, f.file_name, f.file_ext, f.oss_url, f.local_path FROM leaudit_page_quality_runs r LEFT JOIN leaudit_document_files f ON f.id = r.document_file_id WHERE r.id = :run_id AND r.deleted_at IS NULL LIMIT 1 """ ), {"run_id": RunId}, ) ).mappings().first() if not row: raise LeauditException(StatusCodeEnum.HTTP_404_NOT_FOUND, "页级模糊检测运行不存在") await session.execute( text( """ UPDATE leaudit_page_quality_runs SET status = 'running', started_at = COALESCE(started_at, NOW()), updated_at = NOW() WHERE id = :run_id """ ), {"run_id": RunId}, ) await session.execute( text("DELETE FROM leaudit_page_quality_results WHERE run_id = :run_id"), {"run_id": RunId}, ) await session.commit() document_id = int(row["document_id"]) file_name = str(row["file_name"] or "") suffix = f".{str(row['file_ext']).lstrip('.')}" if row.get("file_ext") else Path(file_name).suffix temp_path: str | None = None converted_path: str | None = None try: local_path = str(row["local_path"] or "").strip() if local_path and Path(local_path).is_file(): source_path = Path(local_path) else: oss_url = str(row["oss_url"] or "").strip() if not oss_url: await self._mark_skipped(RunId, "no_source") return {"runId": RunId, "documentId": document_id, "status": "skipped", "reason": "no_source"} temp_path = await self.OssService.DownloadToTempFile( oss_url, Suffix=suffix or "", Prefix=f"page-quality-{document_id}-", ) source_path = Path(temp_path) page_images = self._build_page_images(source_path) if not page_images: await self._mark_skipped(RunId, "no_pages") return {"runId": RunId, "documentId": document_id, "status": "skipped", "reason": "no_pages"} total_pages = len(page_images) review_pages = 0 reject_pages = 0 async with GetAsyncSession() as session: for page_num, page_image in page_images: status, score, reason = self._classify_page_image(page_image) if status == "review": review_pages += 1 elif status == "reject": reject_pages += 1 await session.execute( text( """ INSERT INTO leaudit_page_quality_results ( run_id, document_id, page_num, quality_status, quality_score, reason_text, created_at, updated_at ) VALUES ( :run_id, :document_id, :page_num, :quality_status, :quality_score, :reason_text, NOW(), NOW() ) """ ), { "run_id": RunId, "document_id": document_id, "page_num": page_num, "quality_status": status, "quality_score": score, "reason_text": reason, }, ) summary_status = "reject" if reject_pages > 0 else ("review" if review_pages > 0 else "pass") await session.execute( text( """ UPDATE leaudit_page_quality_runs SET status = 'completed', summary_status = :summary_status, total_pages = :total_pages, review_page_count = :review_page_count, reject_page_count = :reject_page_count, finished_at = NOW(), updated_at = NOW() WHERE id = :run_id """ ), { "run_id": RunId, "summary_status": summary_status, "total_pages": total_pages, "review_page_count": review_pages, "reject_page_count": reject_pages, }, ) await session.commit() return { "runId": RunId, "documentId": document_id, "status": "completed", "summaryStatus": summary_status, "totalPages": total_pages, "reviewPageCount": review_pages, "rejectPageCount": reject_pages, } except Exception as exc: async with GetAsyncSession() as session: await session.execute( text( """ UPDATE leaudit_page_quality_runs SET status = 'failed', error_message = :error_message, finished_at = NOW(), updated_at = NOW() WHERE id = :run_id """ ), {"run_id": RunId, "error_message": str(exc)[:2000]}, ) await session.commit() raise finally: if temp_path: Path(temp_path).unlink(missing_ok=True) async def _load_summary(self, DocumentId: int) -> PageQualitySummaryVO: async with GetAsyncSession() as session: row = ( await session.execute( text( """ SELECT id, status, summary_status, total_pages, review_page_count, reject_page_count, finished_at FROM leaudit_page_quality_runs WHERE document_id = :document_id AND deleted_at IS NULL ORDER BY id DESC LIMIT 1 """ ), {"document_id": DocumentId}, ) ).mappings().first() if not row: return PageQualitySummaryVO() page_rows = ( await session.execute( text( """ SELECT DISTINCT page_num FROM leaudit_page_quality_results WHERE run_id = :run_id AND quality_status IN ('review', 'reject') ORDER BY page_num ASC """ ), {"run_id": int(row["id"])}, ) ).mappings().all() pages = [int(item["page_num"]) for item in page_rows] summary_status = str(row["summary_status"] or "") or None return PageQualitySummaryVO( runId=int(row["id"]), runStatus=str(row["status"] or "") or None, summaryStatus=summary_status, totalPages=int(row["total_pages"] or 0), reviewPageCount=int(row["review_page_count"] or 0), rejectPageCount=int(row["reject_page_count"] or 0), warningText=self._build_warning_text(pages, summary_status), pages=pages, finishedAt=row["finished_at"].isoformat() if row["finished_at"] else None, ) def _build_warning_text(self, pages: list[int], summary_status: str | None) -> str | None: if not pages or not summary_status: return None pages_text = "、".join(f"第 {page} 页" for page in pages[:10]) suffix = "建议重拍" if summary_status == "reject" else "疑似模糊" if len(pages) > 10: pages_text = f"{pages_text} 等" return f"{pages_text}{suffix}" def _build_page_images(self, source_path: Path) -> list[tuple[int, bytes]]: suffix = source_path.suffix.lower() if suffix in {".png", ".jpg", ".jpeg", ".bmp", ".tif", ".tiff", ".webp"}: return [(1, source_path.read_bytes())] if suffix == ".pdf": return self._render_pdf_pages(source_path) if suffix in {".doc", ".docx", ".wps"}: temp_pdf = tempfile.NamedTemporaryFile(prefix="page-quality-docx-", suffix=".pdf", delete=False) temp_pdf.close() doc2pdf.convert(str(source_path), temp_pdf.name, soffice="auto", pdfa=False, force=True, verify=False) try: return self._render_pdf_pages(Path(temp_pdf.name)) finally: Path(temp_pdf.name).unlink(missing_ok=True) return [] def _render_pdf_pages(self, path: Path) -> list[tuple[int, bytes]]: doc = fitz.open(path) try: pages: list[tuple[int, bytes]] = [] for index in range(len(doc)): page = doc[index] pix = page.get_pixmap(matrix=fitz.Matrix(1.5, 1.5), alpha=False) pages.append((index + 1, pix.tobytes("png"))) return pages finally: doc.close() def _classify_page_image(self, image_bytes: bytes) -> tuple[str, float, str | None]: size = len(image_bytes) if size < 25_000: return "reject", 0.2, "页面图像内容过少或清晰度较低,建议重拍" if size < 60_000: return "review", 0.45, "页面疑似存在模糊,建议人工确认" return "pass", 0.9, None def _document_service(self): if self.DocumentService is None: from fastapi_modules.fastapi_leaudit.services.impl.documentServiceImpl import DocumentServiceImpl self.DocumentService = DocumentServiceImpl() return self.DocumentService async def _mark_skipped(self, RunId: int, SkipReason: str) -> None: async with GetAsyncSession() as session: await session.execute( text( """ UPDATE leaudit_page_quality_runs SET status = 'skipped', skip_reason = :skip_reason, finished_at = NOW(), updated_at = NOW() WHERE id = :run_id """ ), {"run_id": RunId, "skip_reason": SkipReason}, ) await session.commit() async def _ensureTables(self) -> None: async with GetAsyncSession() as session: await session.execute( text( """ CREATE TABLE IF NOT EXISTS public.leaudit_page_quality_runs ( id BIGSERIAL PRIMARY KEY, document_id BIGINT NOT NULL, document_file_id BIGINT NULL, status VARCHAR(32) NOT NULL DEFAULT 'queued', summary_status VARCHAR(32) NULL, total_pages INTEGER NOT NULL DEFAULT 0, review_page_count INTEGER NOT NULL DEFAULT 0, reject_page_count INTEGER NOT NULL DEFAULT 0, skip_reason VARCHAR(64) NULL, task_id VARCHAR(128) NULL, error_message TEXT NULL, started_at TIMESTAMP NULL, finished_at TIMESTAMP NULL, created_by BIGINT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW(), deleted_at TIMESTAMP NULL ) """ ) ) await session.execute( text( """ CREATE TABLE IF NOT EXISTS public.leaudit_page_quality_results ( id BIGSERIAL PRIMARY KEY, run_id BIGINT NOT NULL, document_id BIGINT NOT NULL, page_num INTEGER NOT NULL, quality_status VARCHAR(32) NOT NULL, quality_score NUMERIC(10, 4) NULL, reason_text TEXT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW(), updated_at TIMESTAMP NOT NULL DEFAULT NOW() ) """ ) ) await session.execute( text( "CREATE INDEX IF NOT EXISTS idx_leaudit_page_quality_runs_document_id ON public.leaudit_page_quality_runs(document_id)" ) ) await session.execute( text( "CREATE INDEX IF NOT EXISTS idx_leaudit_page_quality_results_run_id ON public.leaudit_page_quality_results(run_id)" ) ) await session.commit()