Files
leaudit-platform-frontend/docs/cross-checking-security-guide.md
2025-12-05 00:09:32 +08:00

16 KiB
Raw Permalink Blame History

交叉评查文档访问权限安全方案

📋 目录

  1. 安全风险分析
  2. 多层防护方案
  3. 实施步骤
  4. 测试验证
  5. 审计与监控

🔍 安全风险分析

当前风险

问题描述

  • 用户可以通过修改 URL 参数 id 访问任意文档
  • 缺少文档-任务-用户的关联验证
  • 可能导致未授权访问其他用户的文档

示例攻击场景

正常访问: /cross-checking/result?id=123&tId=456
攻击尝试: /cross-checking/result?id=999&tId=456  ← 修改文档 ID

风险等级

风险项 等级 影响
未授权访问文档 🔴 数据泄露、隐私侵犯
修改他人评查结果 🔴 数据完整性破坏
跨任务访问文档 🟡 业务逻辑绕过

🛡️ 多层防护方案

架构设计:纵深防御

┌─────────────┐
│   用户请求   │
└──────┬──────┘
       │
       ▼
┌─────────────────────────────────────┐
│ 第 1 层:前端路由 Loader 验证        │  ✅ 已实施
│ - 验证 JWT token                    │
│ - 验证文档-任务关联                  │
│ - 验证用户身份(发起人/评审员)      │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────┐
│ 第 2 层:后端 API 权限验证           │  ⚠️ 需要后端实施
│ - JWT token 解析和验证              │
│ - 用户权限查询                      │
│ - 返回 403 Forbidden(无权限)      │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────────────────────────────┐
│ 第 3 层:数据库 RLS 策略            │  ⚠️ 可选(推荐)
│ - PostgreSQL Row Level Security     │
│ - 自动过滤无权访问的记录             │
└──────┬──────────────────────────────┘
       │
       ▼
┌─────────────┐
│   返回数据   │
└─────────────┘

📝 实施步骤

Step 1: 前端 Loader 权限验证(已完成)

文件位置

  • app/api/cross-checking/verify-document-access.ts - 权限验证工具
  • app/routes/cross-checking.result.tsx - 路由 Loader

验证逻辑

  1. 检查文档是否属于指定任务(cross_task_document_mapping 表)
  2. 检查用户是否是任务的发起人(assigner_id
  3. 检查用户是否是任务的评审员(assignee_ids
  4. 任一条件满足则允许访问

示例代码

// Loader 中的权限验证
const { verifyDocumentAccess } = await import("~/api/cross-checking/verify-document-access");
const accessCheck = await verifyDocumentAccess({
  documentId: id,
  taskId: taskId,
  userId: userInfo.user_id,
  jwtToken: frontendJWT
});

if (!accessCheck.hasAccess) {
  return Response.json({
    result: false,
    message: '您没有权限访问该文档'
  }, { status: 403 });
}

返回状态码

  • 401 Unauthorized - 未登录或 JWT 过期
  • 403 Forbidden - 已登录但无权限访问
  • 404 Not Found - 文档或任务不存在

⚠️ Step 2: 后端 API 权限验证(需要 Python 后端实施)

后端 API 路径/api/v3/review-points/{fileId}

推荐实现FastAPI 示例):

from fastapi import APIRouter, Depends, HTTPException, status
from typing import List

router = APIRouter()

async def verify_document_access(
    document_id: int,
    task_id: int,
    current_user: dict = Depends(get_current_user)
) -> bool:
    """
    验证用户是否有权访问文档

    验证逻辑:
    1. 文档是否属于该任务
    2. 用户是否是任务的参与者(发起人或评审员)
    """
    # 1. 检查文档-任务映射
    mapping = await db.query(CrossTaskDocumentMapping).filter(
        CrossTaskDocumentMapping.task_id == task_id,
        CrossTaskDocumentMapping.document_id == document_id
    ).first()

    if not mapping:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="文档不属于该任务"
        )

    # 2. 检查用户权限
    task = await db.query(CrossExaminationTask).filter(
        CrossExaminationTask.id == task_id
    ).first()

    if not task:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="任务不存在"
        )

    user_id = current_user["user_id"]
    is_assigner = task.assigner_id == user_id
    is_assignee = user_id in task.assignee_ids

    if not (is_assigner or is_assignee):
        # 记录未授权访问尝试(审计日志)
        await log_unauthorized_access(
            user_id=user_id,
            document_id=document_id,
            task_id=task_id,
            ip_address=request.client.host
        )

        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="您没有权限访问该文档"
        )

    return True


@router.get("/api/v3/review-points/{file_id}")
async def get_review_points(
    file_id: int,
    task_id: int = Query(..., description="任务ID"),
    current_user: dict = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    """
    获取文档评查点(带权限验证)
    """
    # 🔒 权限验证
    await verify_document_access(file_id, task_id, current_user)

    # 获取数据
    review_points = await get_document_review_points(file_id, db)

    return {
        "data": review_points,
        "stats": calculate_stats(review_points),
        # ...
    }

关键点

  • 解析和验证 JWT token
  • 从 JWT 中提取 user_id
  • 查询数据库验证权限
  • 记录未授权访问尝试(审计日志)
  • 返回明确的错误状态码

🔧 Step 3: 数据库层 RLS 策略(可选,推荐)

PostgreSQL Row Level Security (RLS) 可以在数据库层面强制执行权限控制。

创建 RLS 策略

-- 1. 启用 RLS
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_task_document_mapping ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_examination_tasks ENABLE ROW LEVEL SECURITY;

-- 2. 创建策略:用户只能访问自己参与的任务中的文档
CREATE POLICY cross_checking_document_access ON documents
FOR SELECT
USING (
  id IN (
    -- 文档属于用户参与的任务
    SELECT ctdm.document_id
    FROM cross_task_document_mapping ctdm
    JOIN cross_examination_tasks cet ON ctdm.task_id = cet.id
    WHERE
      cet.assigner_id = current_setting('app.current_user_id')::int
      OR current_setting('app.current_user_id')::int = ANY(cet.assignee_ids)
  )
);

-- 3. 创建策略:用户只能查看自己参与的任务
CREATE POLICY cross_checking_task_access ON cross_examination_tasks
FOR SELECT
USING (
  assigner_id = current_setting('app.current_user_id')::int
  OR current_setting('app.current_user_id')::int = ANY(assignee_ids)
);

在应用中设置当前用户

// PostgREST 客户端请求前设置
await db.query(`SET app.current_user_id = ${userId}`);

优点

  • 数据库层面自动过滤
  • 即使前端/后端被绕过,数据库也会拒绝访问
  • 审计和合规性更强

缺点

  • ⚠️ 需要修改数据库配置
  • ⚠️ 复杂查询可能影响性能

🧪 测试验证

测试用例

1. 正常访问测试

// 测试场景:用户访问自己参与的任务中的文档
// 预期结果:✅ 200 OK,返回文档数据

const response = await fetch('/cross-checking/result?id=123&tId=456', {
  headers: {
    'Authorization': `Bearer ${validJWT}`
  }
});

expect(response.status).toBe(200);
expect(response.data.document.id).toBe(123);

2. 未授权访问测试

// 测试场景:用户尝试访问他人的文档
// 预期结果:❌ 403 Forbidden

const response = await fetch('/cross-checking/result?id=999&tId=456', {
  headers: {
    'Authorization': `Bearer ${validJWT}`
  }
});

expect(response.status).toBe(403);
expect(response.data.message).toContain('没有权限');

3. 跨任务访问测试

// 测试场景:用户尝试将文档 A 的 ID 用于任务 B
// 预期结果:❌ 403 Forbidden

const response = await fetch('/cross-checking/result?id=123&tId=789', {
  headers: {
    'Authorization': `Bearer ${validJWT}`
  }
});

expect(response.status).toBe(403);
expect(response.data.message).toContain('文档不属于该任务');

4. JWT 过期测试

// 测试场景:使用过期的 JWT token
// 预期结果:❌ 401 Unauthorized

const response = await fetch('/cross-checking/result?id=123&tId=456', {
  headers: {
    'Authorization': `Bearer ${expiredJWT}`
  }
});

expect(response.status).toBe(401);
expect(response.data.message).toContain('身份验证失败');

5. 缺少参数测试

// 测试场景:缺少 taskId 参数
// 预期结果:❌ 400 Bad Request

const response = await fetch('/cross-checking/result?id=123');
expect(response.status).toBe(400);
expect(response.data.message).toContain('任务ID不能为空');

自动化测试脚本

创建 app/api/cross-checking/__tests__/verify-document-access.test.ts:

import { describe, it, expect } from 'vitest';
import { verifyDocumentAccess } from '../verify-document-access';

describe('verifyDocumentAccess', () => {
  it('应该允许任务发起人访问文档', async () => {
    const result = await verifyDocumentAccess({
      documentId: 123,
      taskId: 456,
      userId: 1, // 发起人 ID
      jwtToken: 'valid-jwt'
    });

    expect(result.hasAccess).toBe(true);
    expect(result.userRole).toBe('assigner');
  });

  it('应该允许任务评审员访问文档', async () => {
    const result = await verifyDocumentAccess({
      documentId: 123,
      taskId: 456,
      userId: 2, // 评审员 ID
      jwtToken: 'valid-jwt'
    });

    expect(result.hasAccess).toBe(true);
    expect(result.userRole).toBe('assignee');
  });

  it('应该拒绝未参与任务的用户访问', async () => {
    const result = await verifyDocumentAccess({
      documentId: 123,
      taskId: 456,
      userId: 999, // 未参与的用户
      jwtToken: 'valid-jwt'
    });

    expect(result.hasAccess).toBe(false);
    expect(result.reason).toContain('没有权限');
  });

  it('应该拒绝访问不属于该任务的文档', async () => {
    const result = await verifyDocumentAccess({
      documentId: 999, // 不属于任务的文档
      taskId: 456,
      userId: 1,
      jwtToken: 'valid-jwt'
    });

    expect(result.hasAccess).toBe(false);
    expect(result.reason).toContain('文档不属于该任务');
  });
});

运行测试

npm run test -- verify-document-access.test.ts

📊 审计与监控

记录未授权访问尝试

创建审计日志表:

CREATE TABLE security_audit_logs (
  id SERIAL PRIMARY KEY,
  user_id INTEGER,
  action VARCHAR(50) NOT NULL,
  resource_type VARCHAR(50),
  resource_id INTEGER,
  task_id INTEGER,
  ip_address INET,
  user_agent TEXT,
  status VARCHAR(20), -- 'denied', 'allowed'
  reason TEXT,
  created_at TIMESTAMP DEFAULT NOW()
);

CREATE INDEX idx_audit_logs_user_id ON security_audit_logs(user_id);
CREATE INDEX idx_audit_logs_created_at ON security_audit_logs(created_at);
CREATE INDEX idx_audit_logs_status ON security_audit_logs(status);

记录函数

// app/api/security/audit-log.ts
import { postgrestPost } from "../postgrest-client";

interface AuditLogParams {
  userId: number;
  action: string;
  resourceType: string;
  resourceId: number | string;
  taskId?: number | string;
  status: 'allowed' | 'denied';
  reason?: string;
  ipAddress?: string;
  userAgent?: string;
}

export async function logSecurityEvent(params: AuditLogParams) {
  try {
    await postgrestPost('security_audit_logs', {
      user_id: params.userId,
      action: params.action,
      resource_type: params.resourceType,
      resource_id: params.resourceId,
      task_id: params.taskId,
      status: params.status,
      reason: params.reason,
      ip_address: params.ipAddress,
      user_agent: params.userAgent
    });
  } catch (error) {
    console.error('❌ [Security] 记录审计日志失败:', error);
    // 审计日志失败不应影响主流程
  }
}

在权限验证中记录

修改 verify-document-access.ts,添加审计日志:

import { logSecurityEvent } from "../security/audit-log";

export async function verifyDocumentAccess(
  params: DocumentAccessCheckParams
): Promise<DocumentAccessCheckResult> {
  // ... 现有验证逻辑 ...

  if (!accessCheck.hasAccess) {
    // 🔍 记录被拒绝的访问尝试
    await logSecurityEvent({
      userId: params.userId,
      action: 'access_document',
      resourceType: 'cross_checking_document',
      resourceId: params.documentId,
      taskId: params.taskId,
      status: 'denied',
      reason: accessCheck.reason
    });

    return accessCheck;
  }

  // ✅ 记录成功的访问
  await logSecurityEvent({
    userId: params.userId,
    action: 'access_document',
    resourceType: 'cross_checking_document',
    resourceId: params.documentId,
    taskId: params.taskId,
    status: 'allowed'
  });

  return accessCheck;
}

监控仪表板查询

-- 查看最近被拒绝的访问尝试
SELECT
  user_id,
  resource_id,
  task_id,
  reason,
  ip_address,
  created_at
FROM security_audit_logs
WHERE status = 'denied'
  AND action = 'access_document'
  AND created_at > NOW() - INTERVAL '24 hours'
ORDER BY created_at DESC;

-- 查看频繁尝试未授权访问的用户
SELECT
  user_id,
  COUNT(*) as attempt_count,
  MAX(created_at) as last_attempt
FROM security_audit_logs
WHERE status = 'denied'
  AND created_at > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING COUNT(*) > 5
ORDER BY attempt_count DESC;

📋 检查清单

实施前检查:

  • 前端 Loader 权限验证已实施
  • ⚠️ 后端 API 权限验证(需要 Python 后端配合)
  • ⚠️ 数据库 RLS 策略(可选)
  • ⚠️ 审计日志系统(推荐)
  • ⚠️ 单元测试和集成测试
  • ⚠️ 安全测试(渗透测试)

部署后验证:

  • 正常访问流程测试
  • 未授权访问拦截测试
  • JWT 过期处理测试
  • 审计日志记录验证
  • 性能影响评估

🎯 总结

当前实施状态

防护层 状态 描述
前端 Loader 验证 已完成 验证文档-任务-用户关联
后端 API 验证 ⚠️ 待实施 需要 Python 后端配合
数据库 RLS ⚠️ 可选 最高安全级别,可选实施
审计日志 ⚠️ 推荐 用于安全监控和合规

后续行动

  1. 立即部署:前端 Loader 验证(已完成)
  2. 协调后端:实施后端 API 权限验证
  3. 可选增强:配置数据库 RLS 策略
  4. 监控运营:建立审计日志和监控体系

联系方式

如有问题,请联系: