From 3823c9a2e46297e904d7180a3543f73548b0c065 Mon Sep 17 00:00:00 2001 From: wren <“porlong@qq.com”> Date: Tue, 12 May 2026 11:30:56 +0800 Subject: [PATCH] fix: harden cross-review task state handling --- fastapi_admin/celery_app.py | 12 +- fastapi_admin/config/__init__.pyi | 2 + fastapi_admin/config/_settings.py | 2 + .../domian/vo/crossReviewVo.py | 2 + .../leaudit_bridge/storage_adapter.py | 4 +- .../fastapi_leaudit/leaudit_bridge/tasks.py | 160 ++++++++++++++++++ .../services/impl/crossReviewServiceImpl.py | 27 ++- leaudit.sh | 154 +++++++++++++++-- package.json | 15 ++ scripts/start_beat.sh | 15 ++ 10 files changed, 373 insertions(+), 20 deletions(-) create mode 100644 package.json create mode 100755 scripts/start_beat.sh diff --git a/fastapi_admin/celery_app.py b/fastapi_admin/celery_app.py index 51b865c..5c0fb97 100644 --- a/fastapi_admin/celery_app.py +++ b/fastapi_admin/celery_app.py @@ -3,11 +3,13 @@ from __future__ import annotations from celery import Celery +from celery.schedules import crontab from kombu import Queue from fastapi_admin.config import ( LEAUDIT_TASK_SOFT_TIME_LIMIT, LEAUDIT_TASK_TIME_LIMIT, + LEAUDIT_STUCK_SCAN_CRON_MINUTES, LEAUDIT_WORKER_QUEUE_NORMAL, LEAUDIT_WORKER_QUEUE_URGENT, REDIS_DB, @@ -41,10 +43,18 @@ celery_app.conf.update( broker_connection_retry_on_startup=True, task_soft_time_limit=LEAUDIT_TASK_SOFT_TIME_LIMIT, task_time_limit=LEAUDIT_TASK_TIME_LIMIT, + beat_schedule={ + "leaudit-scan-stuck-documents": { + "task": "leaudit.scan_stuck_documents", + "schedule": crontab(minute=f"*/{max(1, int(LEAUDIT_STUCK_SCAN_CRON_MINUTES))}"), + "options": {"queue": LEAUDIT_WORKER_QUEUE_NORMAL}, + } + }, ) celery_app.autodiscover_tasks( [ "fastapi_modules.fastapi_leaudit.leaudit_bridge", - ] + ], + force=True, ) diff --git a/fastapi_admin/config/__init__.pyi b/fastapi_admin/config/__init__.pyi index 39ba803..472990e 100644 --- a/fastapi_admin/config/__init__.pyi +++ b/fastapi_admin/config/__init__.pyi @@ -71,6 +71,8 @@ LEAUDIT_WORKER_QUEUE_URGENT: str LEAUDIT_WORKER_QUEUE_NORMAL: str LEAUDIT_WORKER_CONCURRENCY: int LEAUDIT_RUN_LOCK_SECONDS: int +LEAUDIT_STUCK_SCAN_CRON_MINUTES: int +LEAUDIT_STUCK_TIMEOUT_MINUTES: int LEAUDIT_TASK_SOFT_TIME_LIMIT: int LEAUDIT_TASK_TIME_LIMIT: int diff --git a/fastapi_admin/config/_settings.py b/fastapi_admin/config/_settings.py index eb08283..5d599f4 100644 --- a/fastapi_admin/config/_settings.py +++ b/fastapi_admin/config/_settings.py @@ -111,6 +111,8 @@ class LeauditSettings(_Base): LEAUDIT_WORKER_QUEUE_NORMAL: str = "leaudit.normal" LEAUDIT_WORKER_CONCURRENCY: int = 2 LEAUDIT_RUN_LOCK_SECONDS: int = 1800 + LEAUDIT_STUCK_SCAN_CRON_MINUTES: int = 5 + LEAUDIT_STUCK_TIMEOUT_MINUTES: int = 20 LEAUDIT_TASK_SOFT_TIME_LIMIT: int = 3300 LEAUDIT_TASK_TIME_LIMIT: int = 3600 diff --git a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py index bba7997..31d4768 100644 --- a/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py +++ b/fastapi_modules/fastapi_leaudit/domian/vo/crossReviewVo.py @@ -47,11 +47,13 @@ class CrossReviewTaskDocumentVO(BaseModel): name: str = Field("", description="文档名称") documentNumber: str | None = Field(None, description="文号") typeId: int | None = Field(None, description="文档类型ID") + typeName: str | None = Field(None, description="文档类型名称") processingStatus: str | None = Field(None, description="处理状态") versionNo: int = Field(1, description="版本号") isLatestVersion: bool = Field(True, description="是否最新版本") auditStatus: int = Field(0, description="任务内完成状态") createdAt: datetime | None = Field(None, description="创建时间") + fileSize: int = Field(0, description="文件大小(字节)") class CrossReviewTaskDocumentPageVO(BaseModel): diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py index 87c6fd4..35e17d7 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/storage_adapter.py @@ -359,7 +359,9 @@ class StorageAdapter: "level": level, "error_code": error_code, "message": message, - "detail_json": detail_json, + "detail_json": json.dumps(detail_json, ensure_ascii=False) + if detail_json is not None + else None, }, ) await session.commit() diff --git a/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py b/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py index f5a16d6..8995ba4 100644 --- a/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py +++ b/fastapi_modules/fastapi_leaudit/leaudit_bridge/tasks.py @@ -17,6 +17,7 @@ from sqlalchemy import select from fastapi_admin.celery_app import celery_app from fastapi_admin.config import ( LEAUDIT_RULES_DIR, + LEAUDIT_STUCK_TIMEOUT_MINUTES, LEAUDIT_WORKER_QUEUE_NORMAL, LEAUDIT_WORKER_QUEUE_URGENT, ) @@ -256,6 +257,22 @@ def leaudit_process_document_task(self, run_id: int, rules_path: str | None = No ) +@celery_app.task( + bind=True, + name="leaudit.scan_stuck_documents", +) +def leaudit_scan_stuck_documents_task(self) -> dict[str, Any]: + """周期扫描长时间无进展的评查文档,并自动标记失败。""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + return loop.run_until_complete( + _scan_and_fail_stuck_documents(timeout_minutes=max(1, int(LEAUDIT_STUCK_TIMEOUT_MINUTES))) + ) + finally: + loop.close() + + # type_id → rules directory mapping (only fixed-mapping types) # 行政许可 (type_id=2) has 9 sub-types, NOT mapped here — # must come from document metadata (rules_file_path) or content classification. @@ -633,3 +650,146 @@ def _queue_label(queue_name: str | None) -> str: if queue_name == LEAUDIT_WORKER_QUEUE_URGENT: return "urgent" return "normal" + + +async def _scan_and_fail_stuck_documents(*, timeout_minutes: int) -> dict[str, Any]: + """扫描当前 run 长时间无更新时间的文档,并将其标记为失败。""" + from fastapi_common.fastapi_common_sqlalchemy.database import GetAsyncSession + from sqlalchemy import text as sa_text + + lock_key = 20260512 + timed_out_rows: list[dict[str, Any]] = [] + + async with GetAsyncSession() as session: + lock_row = ( + await session.execute( + sa_text("SELECT pg_try_advisory_lock(:lock_key)"), + {"lock_key": lock_key}, + ) + ).fetchone() + has_lock = bool(lock_row[0]) if lock_row else False + if not has_lock: + log.info("stuck-scan skipped: advisory lock already held") + return { + "status": "skipped", + "reason": "lock_not_acquired", + "timeout_minutes": timeout_minutes, + "matched": 0, + "failed": 0, + } + + try: + rows = ( + await session.execute( + sa_text( + """ + SELECT + d.id AS document_id, + d.processing_status, + d.current_run_id, + d.updated_at AS document_updated_at, + d.region, + d.normalized_name, + ar.id AS run_id, + ar.status AS run_status, + ar.phase, + ar.task_id, + ar.updated_at AS run_updated_at, + EXTRACT(EPOCH FROM ( + NOW() - CASE + WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') + THEN COALESCE(ar.updated_at, d.updated_at) + ELSE COALESCE(d.updated_at, ar.updated_at) + END + ))::bigint AS idle_seconds + FROM leaudit_documents d + JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + WHERE d.deleted_at IS NULL + AND d.is_latest_version = true + AND ( + LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') + OR LOWER(COALESCE(d.processing_status, '')) IN ('waiting', 'queued', 'running', 'processing') + ) + AND CASE + WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') + THEN COALESCE(ar.updated_at, d.updated_at) + ELSE COALESCE(d.updated_at, ar.updated_at) + END < NOW() - make_interval(mins => :timeout_minutes) + ORDER BY CASE + WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') + THEN COALESCE(ar.updated_at, d.updated_at) + ELSE COALESCE(d.updated_at, ar.updated_at) + END ASC, + d.id ASC + """ + ), + {"timeout_minutes": timeout_minutes}, + ) + ).mappings().all() + + timed_out_rows = [dict(row) for row in rows] + finally: + await session.execute( + sa_text("SELECT pg_advisory_unlock(:lock_key)"), + {"lock_key": lock_key}, + ) + await session.commit() + + storage = StorageAdapter() + failed_items: list[dict[str, Any]] = [] + + for row in timed_out_rows: + document_id = int(row["document_id"]) + run_id = int(row["run_id"]) + idle_seconds = int(row.get("idle_seconds") or 0) + run_phase = row.get("phase") + run_status = row.get("run_status") + processing_status = row.get("processing_status") + file_name = row.get("normalized_name") or f"document-{document_id}" + + message = ( + f"文档处理长时间无进展,已自动终止:" + f"document_id={document_id}, run_id={run_id}, " + f"processing_status={processing_status}, run_status={run_status}, " + f"phase={run_phase}, idle_seconds={idle_seconds}" + ) + + try: + await _update_status_safe(document_id, "failed") + await storage.fail_run( + document_id, + run_id=run_id, + phase=run_phase or "dispatch", + message=message, + detail_json={ + "reason": "stuck_timeout", + "timeoutMinutes": timeout_minutes, + "idleSeconds": idle_seconds, + "taskId": row.get("task_id"), + "runStatus": run_status, + "processingStatus": processing_status, + "phase": run_phase, + "fileName": file_name, + "region": row.get("region"), + }, + ) + failed_items.append( + { + "document_id": document_id, + "run_id": run_id, + "phase": run_phase, + "idle_seconds": idle_seconds, + } + ) + log.warning("stuck document auto-failed: %s", message) + except Exception: + log.exception("stuck document auto-fail failed: document_id=%s run_id=%s", document_id, run_id) + + return { + "status": "success", + "timeout_minutes": timeout_minutes, + "matched": len(timed_out_rows), + "failed": len(failed_items), + "items": failed_items, + } diff --git a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py index d45a663..2e75229 100644 --- a/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py +++ b/fastapi_modules/fastapi_leaudit/services/impl/crossReviewServiceImpl.py @@ -414,14 +414,35 @@ class CrossReviewServiceImpl(ICrossReviewService): COALESCE(d.normalized_name, '') AS name, CAST(d.biz_document_id AS TEXT) AS document_number, d.type_id, - d.processing_status, + COALESCE( + CASE + WHEN LOWER(COALESCE(ar.status, '')) IN ('pending', 'queued', 'running', 'retrying') + THEN ar.status + ELSE d.processing_status + END, + d.processing_status, + 'waiting' + ) AS processing_status, d.version_no, d.is_latest_version, d.created_at, - td.audit_status + td.audit_status, + COALESCE(dt.name, '') AS type_name, + COALESCE(df.file_size, 0) AS file_size FROM leaudit_cross_review_task_documents td JOIN leaudit_documents d ON d.id = td.document_id + LEFT JOIN leaudit_audit_runs ar + ON ar.id = d.current_run_id + LEFT JOIN leaudit_document_types dt + ON dt.id = d.type_id + LEFT JOIN LATERAL ( + SELECT file_size + FROM leaudit_document_files + WHERE document_id = d.id + ORDER BY id ASC + LIMIT 1 + ) df ON TRUE WHERE {whereSql} ORDER BY d.created_at DESC, d.id DESC LIMIT :limit OFFSET :offset @@ -437,11 +458,13 @@ class CrossReviewServiceImpl(ICrossReviewService): name=str(row["name"] or ""), documentNumber=row.get("document_number"), typeId=self._to_int(row.get("type_id")), + typeName=row.get("type_name"), processingStatus=row.get("processing_status"), versionNo=int(row.get("version_no") or 1), isLatestVersion=bool(row.get("is_latest_version")), auditStatus=int(row.get("audit_status") or 0), createdAt=row.get("created_at"), + fileSize=int(row.get("file_size") or 0), ) for row in rows ] diff --git a/leaudit.sh b/leaudit.sh index 44c3bdf..32dbc2d 100755 --- a/leaudit.sh +++ b/leaudit.sh @@ -4,13 +4,16 @@ PROJECT_DIR="$(cd "$(dirname "$0")" && pwd)" FRONTEND_DIR="$PROJECT_DIR/legal-platform-frontend" BACKEND_DIR="$PROJECT_DIR" WORKER_SCRIPT="$PROJECT_DIR/scripts/start_worker.sh" +BEAT_SCRIPT="$PROJECT_DIR/scripts/start_beat.sh" LOG_DIR="$PROJECT_DIR/.codex-run" BACKEND_PID_FILE="$LOG_DIR/backend.pid" FRONTEND_PID_FILE="$LOG_DIR/frontend.pid" WORKER_PID_FILE="$LOG_DIR/worker.pid" +BEAT_PID_FILE="$LOG_DIR/beat.pid" BACKEND_LOG="$LOG_DIR/backend.log" FRONTEND_LOG="$LOG_DIR/frontend.log" WORKER_LOG="$LOG_DIR/worker.log" +BEAT_LOG="$LOG_DIR/beat.log" RED='\033[0;31m' GREEN='\033[0;32m' @@ -113,6 +116,22 @@ pid_command() { ps -p "$pid" -o args= 2>/dev/null | xargs } +kill_process_tree() { + local pid=$1 + [ -n "$pid" ] || return 0 + + local children + children=$(pgrep -P "$pid" 2>/dev/null || true) + if [ -n "$children" ]; then + local child + for child in $children; do + kill_process_tree "$child" + done + fi + + kill "$pid" 2>/dev/null || true +} + ensure_port_free() { local name=$1 local port=$2 @@ -153,10 +172,12 @@ start_backend() { exec "$BACKEND_PYTHON" run.py ) >> "$BACKEND_LOG" 2>&1 & pid=$! - echo "$pid" > "$BACKEND_PID_FILE" sleep 2 - if pid_alive "$pid"; then + local listening_pid + listening_pid=$(port_pid "$BACKEND_PORT") + if pid_alive "$pid" || [ -n "$listening_pid" ]; then + echo "${listening_pid:-$pid}" > "$BACKEND_PID_FILE" log_success "后端启动成功 (PID: $pid, 端口: $BACKEND_PORT)" return 0 fi @@ -185,10 +206,12 @@ start_frontend() { exec npm run dev:dev ) >> "$FRONTEND_LOG" 2>&1 & pid=$! - echo "$pid" > "$FRONTEND_PID_FILE" sleep 4 - if pid_alive "$pid"; then + local listening_pid + listening_pid=$(port_pid "$FRONTEND_DEV_PORT") + if pid_alive "$pid" || [ -n "$listening_pid" ]; then + echo "${listening_pid:-$pid}" > "$FRONTEND_PID_FILE" log_success "前端启动成功 (PID: $pid, 开发端口: $FRONTEND_DEV_PORT, 访问端口: $FRONTEND_PUBLIC_PORT)" return 0 fi @@ -234,29 +257,89 @@ start_worker() { return 1 } +start_beat() { + cleanup_pid_file "$BEAT_PID_FILE" + local pid + pid=$(service_pid "$BEAT_PID_FILE") + if pid_alive "$pid"; then + log_warn "Beat 已在运行 (PID: $pid)" + return 0 + fi + + if [ ! -x "$BEAT_SCRIPT" ]; then + log_error "Beat 启动脚本不存在或不可执行: $BEAT_SCRIPT" + return 1 + fi + + log_info "启动 Beat 调度服务..." + : > "$BEAT_LOG" + ( + cd "$PROJECT_DIR" + exec "$BEAT_SCRIPT" + ) >> "$BEAT_LOG" 2>&1 & + pid=$! + echo "$pid" > "$BEAT_PID_FILE" + sleep 2 + + if pid_alive "$pid"; then + log_success "Beat 启动成功 (PID: $pid)" + return 0 + fi + + log_error "Beat 启动失败,查看日志: $BEAT_LOG" + tail -20 "$BEAT_LOG" 2>/dev/null || true + rm -f "$BEAT_PID_FILE" + return 1 +} + stop_service() { local name=$1 local pid_file=$2 + local port=${3:-} local pid pid=$(service_pid "$pid_file") if ! pid_alive "$pid"; then + if [ -n "$port" ]; then + local stray_pid + stray_pid=$(port_pid "$port") + if [ -n "$stray_pid" ]; then + log_warn "$name PID 文件已失效,正在清理端口 $port 上的残留进程 (PID: $stray_pid)..." + kill_process_tree "$stray_pid" + sleep 1 + if pid_alive "$stray_pid"; then + kill -9 "$stray_pid" 2>/dev/null || true + fi + rm -f "$pid_file" + log_success "$name 残留进程已停止" + return 0 + fi + fi rm -f "$pid_file" log_warn "$name 未运行" return 0 fi log_info "停止 $name (PID: $pid)..." - kill "$pid" 2>/dev/null || true + kill_process_tree "$pid" for _ in $(seq 1 10); do - if ! pid_alive "$pid"; then + local current_port_pid="" + if [ -n "$port" ]; then + current_port_pid=$(port_pid "$port") + fi + if ! pid_alive "$pid" && [ -z "$current_port_pid" ]; then break fi sleep 0.5 done - if pid_alive "$pid"; then + local stubborn_pid="" + if [ -n "$port" ]; then + stubborn_pid=$(port_pid "$port") + fi + if pid_alive "$pid" || [ -n "$stubborn_pid" ]; then log_warn "$name 未响应,强制终止..." kill -9 "$pid" 2>/dev/null || true + [ -n "$stubborn_pid" ] && kill -9 "$stubborn_pid" 2>/dev/null || true fi rm -f "$pid_file" log_success "$name 已停止" @@ -272,12 +355,14 @@ do_start() { start_backend || return 1 start_frontend || return 1 start_worker || return 1 + start_beat || return 1 echo "" echo -e "${GREEN}============================================${NC}" echo -e "${GREEN} 前端: http://localhost:$FRONTEND_PUBLIC_PORT (开发服务: $FRONTEND_DEV_PORT)${NC}" echo -e "${GREEN} 后端: http://localhost:$BACKEND_PORT${NC}" echo -e "${GREEN} Worker: $WORKER_SCRIPT${NC}" + echo -e "${GREEN} Beat: $BEAT_SCRIPT${NC}" echo -e "${GREEN} 日志目录: $LOG_DIR${NC}" echo -e "${GREEN}============================================${NC}" echo "" @@ -289,9 +374,10 @@ do_stop() { echo -e "${YELLOW} 停止 LeAudit 前后端${NC}" echo -e "${YELLOW}============================================${NC}" echo "" + stop_service "Beat" "$BEAT_PID_FILE" stop_service "Worker" "$WORKER_PID_FILE" - stop_service "前端" "$FRONTEND_PID_FILE" - stop_service "后端" "$BACKEND_PID_FILE" + stop_service "前端" "$FRONTEND_PID_FILE" "$FRONTEND_DEV_PORT" + stop_service "后端" "$BACKEND_PID_FILE" "$BACKEND_PORT" echo "" } @@ -305,6 +391,7 @@ do_status() { cleanup_pid_file "$BACKEND_PID_FILE" cleanup_pid_file "$FRONTEND_PID_FILE" cleanup_pid_file "$WORKER_PID_FILE" + cleanup_pid_file "$BEAT_PID_FILE" echo "" echo -e "${CYAN}============================================${NC}" @@ -334,10 +421,18 @@ do_status() { echo -e " Worker: ${RED}○ 已停止${NC}" fi + pid=$(service_pid "$BEAT_PID_FILE") + if pid_alive "$pid"; then + echo -e " Beat: ${GREEN}● 运行中${NC} PID=$pid" + else + echo -e " Beat: ${RED}○ 已停止${NC}" + fi + echo "" echo " 后端日志: $BACKEND_LOG" echo " 前端日志: $FRONTEND_LOG" echo " Worker日志: $WORKER_LOG" + echo " Beat日志: $BEAT_LOG" echo "" } @@ -367,10 +462,18 @@ do_logs() { echo -e "${CYAN}--- Ctrl+C 退出 ---${NC}" tail -f "$WORKER_LOG" ;; + beat) + [ -f "$BEAT_LOG" ] || touch "$BEAT_LOG" + echo -e "${CYAN}--- Beat 日志 (最近 $lines 行) ---${NC}" + tail -n "$lines" "$BEAT_LOG" + echo -e "${CYAN}--- Ctrl+C 退出 ---${NC}" + tail -f "$BEAT_LOG" + ;; all) [ -f "$BACKEND_LOG" ] || touch "$BACKEND_LOG" [ -f "$FRONTEND_LOG" ] || touch "$FRONTEND_LOG" [ -f "$WORKER_LOG" ] || touch "$WORKER_LOG" + [ -f "$BEAT_LOG" ] || touch "$BEAT_LOG" echo -e "${CYAN}--- 后端日志 (最近 $lines 行) ---${NC}" tail -n "$lines" "$BACKEND_LOG" echo "" @@ -380,16 +483,20 @@ do_logs() { echo -e "${CYAN}--- Worker 日志 (最近 $lines 行) ---${NC}" tail -n "$lines" "$WORKER_LOG" echo "" + echo -e "${CYAN}--- Beat 日志 (最近 $lines 行) ---${NC}" + tail -n "$lines" "$BEAT_LOG" + echo "" echo -e "${CYAN}--- Ctrl+C 退出 ---${NC}" - tail -n 0 -f "$BACKEND_LOG" "$FRONTEND_LOG" "$WORKER_LOG" 2>/dev/null | awk ' + tail -n 0 -f "$BACKEND_LOG" "$FRONTEND_LOG" "$WORKER_LOG" "$BEAT_LOG" 2>/dev/null | awk ' /^==> .*backend\.log <==$/ { current="[backend]"; next } /^==> .*frontend\.log <==$/ { current="[frontend]"; next } /^==> .*worker\.log <==$/ { current="[worker]"; next } + /^==> .*beat\.log <==$/ { current="[beat]"; next } { print current " " $0; fflush() } ' ;; *) - echo "用法: ./leaudit.sh logs [backend|frontend|worker|all] [行数]" + echo "用法: ./leaudit.sh logs [backend|frontend|worker|beat|all] [行数]" exit 1 ;; esac @@ -441,6 +548,16 @@ do_doctor() { echo -e " Worker 进程: ${RED}○ 未运行${NC}" fi + local beat_pid + beat_pid=$(service_pid "$BEAT_PID_FILE") + if pid_alive "$beat_pid"; then + cmd="$(pid_command "$beat_pid")" + echo -e " Beat 进程: ${GREEN}● 运行中${NC} PID=$beat_pid" + [ -n "$cmd" ] && echo " 命令: $cmd" + else + echo -e " Beat 进程: ${RED}○ 未运行${NC}" + fi + echo "" } @@ -454,6 +571,7 @@ do_open() { echo " 前端开发: http://127.0.0.1:$FRONTEND_DEV_PORT" echo " 后端访问: http://localhost:$BACKEND_PORT" echo " Worker脚本: $WORKER_SCRIPT" + echo " Beat脚本: $BEAT_SCRIPT" echo "" echo -e "${CYAN}--- 后端最近 5 行 ---${NC}" tail -n 5 "$BACKEND_LOG" 2>/dev/null || echo "(无后端日志)" @@ -464,6 +582,9 @@ do_open() { echo -e "${CYAN}--- Worker最近 5 行 ---${NC}" tail -n 5 "$WORKER_LOG" 2>/dev/null || echo "(无Worker日志)" echo "" + echo -e "${CYAN}--- Beat最近 5 行 ---${NC}" + tail -n 5 "$BEAT_LOG" 2>/dev/null || echo "(无Beat日志)" + echo "" } case "${1:-help}" in @@ -492,14 +613,15 @@ case "${1:-help}" in echo "用法: ./leaudit.sh <命令>" echo "" echo "命令说明:" - echo " start 启动前后端" - echo " stop 停止前后端" - echo " restart 重启前后端" - echo " status 查看前后端运行状态" - echo " logs 查看前后端日志并持续跟踪" + echo " start 启动前后端、Worker、Beat" + echo " stop 停止前后端、Worker、Beat" + echo " restart 重启前后端、Worker、Beat" + echo " status 查看前后端、Worker、Beat 运行状态" + echo " logs 查看前后端、Worker、Beat 日志并持续跟踪" echo " logs backend 只看后端日志" echo " logs frontend 50 看前端最近 50 行日志并持续跟踪" echo " logs worker 50 看 Worker 最近 50 行日志并持续跟踪" + echo " logs beat 50 看 Beat 最近 50 行日志并持续跟踪" echo " doctor 检查 5173 / 5193 / 8096 端口占用情况" echo " open 打印访问地址和最近日志摘要" echo "" diff --git a/package.json b/package.json new file mode 100644 index 0000000..824babe --- /dev/null +++ b/package.json @@ -0,0 +1,15 @@ +{ + "name": "leaudit-platform-dev-runner", + "private": true, + "scripts": { + "dev": "bash ./leaudit.sh start", + "dev:stop": "bash ./leaudit.sh stop", + "dev:restart": "bash ./leaudit.sh restart", + "dev:status": "bash ./leaudit.sh status", + "dev:logs": "bash ./leaudit.sh logs", + "dev:logs:frontend": "bash ./leaudit.sh logs frontend", + "dev:logs:backend": "bash ./leaudit.sh logs backend", + "dev:doctor": "bash ./leaudit.sh doctor", + "dev:open": "bash ./leaudit.sh open" + } +} diff --git a/scripts/start_beat.sh b/scripts/start_beat.sh new file mode 100755 index 0000000..2aa3d3e --- /dev/null +++ b/scripts/start_beat.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "$0")/.." && pwd)" +cd "$ROOT_DIR" + +source .venv/bin/activate + +SCHEDULE_DIR="$ROOT_DIR/.codex-run" +mkdir -p "$SCHEDULE_DIR" + +celery -A fastapi_admin.celery_app:celery_app beat \ + --loglevel=INFO \ + --pidfile= \ + --schedule "$SCHEDULE_DIR/celerybeat-schedule"