# 交叉评查文档访问权限安全方案 ## 📋 目录 1. [安全风险分析](#安全风险分析) 2. [多层防护方案](#多层防护方案) 3. [实施步骤](#实施步骤) 4. [测试验证](#测试验证) 5. [审计与监控](#审计与监控) --- ## 🔍 安全风险分析 ### 当前风险 **问题描述**: - 用户可以通过修改 URL 参数 `id` 访问任意文档 - 缺少文档-任务-用户的关联验证 - 可能导致未授权访问其他用户的文档 **示例攻击场景**: ``` 正常访问: /cross-checking/result?id=123&tId=456 攻击尝试: /cross-checking/result?id=999&tId=456 ← 修改文档 ID ``` ### 风险等级 | 风险项 | 等级 | 影响 | |--------|------|------| | 未授权访问文档 | 🔴 高 | 数据泄露、隐私侵犯 | | 修改他人评查结果 | 🔴 高 | 数据完整性破坏 | | 跨任务访问文档 | 🟡 中 | 业务逻辑绕过 | --- ## 🛡️ 多层防护方案 ### 架构设计:纵深防御 ``` ┌─────────────┐ │ 用户请求 │ └──────┬──────┘ │ ▼ ┌─────────────────────────────────────┐ │ 第 1 层:前端路由 Loader 验证 │ ✅ 已实施 │ - 验证 JWT token │ │ - 验证文档-任务关联 │ │ - 验证用户身份(发起人/评审员) │ └──────┬──────────────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ 第 2 层:后端 API 权限验证 │ ⚠️ 需要后端实施 │ - JWT token 解析和验证 │ │ - 用户权限查询 │ │ - 返回 403 Forbidden(无权限) │ └──────┬──────────────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ 第 3 层:数据库 RLS 策略 │ ⚠️ 可选(推荐) │ - PostgreSQL Row Level Security │ │ - 自动过滤无权访问的记录 │ └──────┬──────────────────────────────┘ │ ▼ ┌─────────────┐ │ 返回数据 │ └─────────────┘ ``` --- ## 📝 实施步骤 ### ✅ Step 1: 前端 Loader 权限验证(已完成) **文件位置**: - `app/api/cross-checking/verify-document-access.ts` - 权限验证工具 - `app/routes/cross-checking.result.tsx` - 路由 Loader **验证逻辑**: 1. 检查文档是否属于指定任务(`cross_task_document_mapping` 表) 2. 检查用户是否是任务的发起人(`assigner_id`) 3. 检查用户是否是任务的评审员(`assignee_ids`) 4. 任一条件满足则允许访问 **示例代码**: ```typescript // Loader 中的权限验证 const { verifyDocumentAccess } = await import("~/api/cross-checking/verify-document-access"); const accessCheck = await verifyDocumentAccess({ documentId: id, taskId: taskId, userId: userInfo.user_id, jwtToken: frontendJWT }); if (!accessCheck.hasAccess) { return Response.json({ result: false, message: '您没有权限访问该文档' }, { status: 403 }); } ``` **返回状态码**: - `401 Unauthorized` - 未登录或 JWT 过期 - `403 Forbidden` - 已登录但无权限访问 - `404 Not Found` - 文档或任务不存在 --- ### ⚠️ Step 2: 后端 API 权限验证(需要 Python 后端实施) **后端 API 路径**:`/api/v3/review-points/{fileId}` **推荐实现**(FastAPI 示例): ```python from fastapi import APIRouter, Depends, HTTPException, status from typing import List router = APIRouter() async def verify_document_access( document_id: int, task_id: int, current_user: dict = Depends(get_current_user) ) -> bool: """ 验证用户是否有权访问文档 验证逻辑: 1. 文档是否属于该任务 2. 用户是否是任务的参与者(发起人或评审员) """ # 1. 检查文档-任务映射 mapping = await db.query(CrossTaskDocumentMapping).filter( CrossTaskDocumentMapping.task_id == task_id, CrossTaskDocumentMapping.document_id == document_id ).first() if not mapping: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="文档不属于该任务" ) # 2. 检查用户权限 task = await db.query(CrossExaminationTask).filter( CrossExaminationTask.id == task_id ).first() if not task: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="任务不存在" ) user_id = current_user["user_id"] is_assigner = task.assigner_id == user_id is_assignee = user_id in task.assignee_ids if not (is_assigner or is_assignee): # 记录未授权访问尝试(审计日志) await log_unauthorized_access( user_id=user_id, document_id=document_id, task_id=task_id, ip_address=request.client.host ) raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="您没有权限访问该文档" ) return True @router.get("/api/v3/review-points/{file_id}") async def get_review_points( file_id: int, task_id: int = Query(..., description="任务ID"), current_user: dict = Depends(get_current_user), db: Session = Depends(get_db) ): """ 获取文档评查点(带权限验证) """ # 🔒 权限验证 await verify_document_access(file_id, task_id, current_user) # 获取数据 review_points = await get_document_review_points(file_id, db) return { "data": review_points, "stats": calculate_stats(review_points), # ... } ``` **关键点**: - ✅ 解析和验证 JWT token - ✅ 从 JWT 中提取 `user_id` - ✅ 查询数据库验证权限 - ✅ 记录未授权访问尝试(审计日志) - ✅ 返回明确的错误状态码 --- ### 🔧 Step 3: 数据库层 RLS 策略(可选,推荐) PostgreSQL Row Level Security (RLS) 可以在数据库层面强制执行权限控制。 **创建 RLS 策略**: ```sql -- 1. 启用 RLS ALTER TABLE documents ENABLE ROW LEVEL SECURITY; ALTER TABLE cross_task_document_mapping ENABLE ROW LEVEL SECURITY; ALTER TABLE cross_examination_tasks ENABLE ROW LEVEL SECURITY; -- 2. 创建策略:用户只能访问自己参与的任务中的文档 CREATE POLICY cross_checking_document_access ON documents FOR SELECT USING ( id IN ( -- 文档属于用户参与的任务 SELECT ctdm.document_id FROM cross_task_document_mapping ctdm JOIN cross_examination_tasks cet ON ctdm.task_id = cet.id WHERE cet.assigner_id = current_setting('app.current_user_id')::int OR current_setting('app.current_user_id')::int = ANY(cet.assignee_ids) ) ); -- 3. 创建策略:用户只能查看自己参与的任务 CREATE POLICY cross_checking_task_access ON cross_examination_tasks FOR SELECT USING ( assigner_id = current_setting('app.current_user_id')::int OR current_setting('app.current_user_id')::int = ANY(assignee_ids) ); ``` **在应用中设置当前用户**: ```typescript // PostgREST 客户端请求前设置 await db.query(`SET app.current_user_id = ${userId}`); ``` **优点**: - ✅ 数据库层面自动过滤 - ✅ 即使前端/后端被绕过,数据库也会拒绝访问 - ✅ 审计和合规性更强 **缺点**: - ⚠️ 需要修改数据库配置 - ⚠️ 复杂查询可能影响性能 --- ## 🧪 测试验证 ### 测试用例 #### 1. 正常访问测试 ```typescript // 测试场景:用户访问自己参与的任务中的文档 // 预期结果:✅ 200 OK,返回文档数据 const response = await fetch('/cross-checking/result?id=123&tId=456', { headers: { 'Authorization': `Bearer ${validJWT}` } }); expect(response.status).toBe(200); expect(response.data.document.id).toBe(123); ``` #### 2. 未授权访问测试 ```typescript // 测试场景:用户尝试访问他人的文档 // 预期结果:❌ 403 Forbidden const response = await fetch('/cross-checking/result?id=999&tId=456', { headers: { 'Authorization': `Bearer ${validJWT}` } }); expect(response.status).toBe(403); expect(response.data.message).toContain('没有权限'); ``` #### 3. 跨任务访问测试 ```typescript // 测试场景:用户尝试将文档 A 的 ID 用于任务 B // 预期结果:❌ 403 Forbidden const response = await fetch('/cross-checking/result?id=123&tId=789', { headers: { 'Authorization': `Bearer ${validJWT}` } }); expect(response.status).toBe(403); expect(response.data.message).toContain('文档不属于该任务'); ``` #### 4. JWT 过期测试 ```typescript // 测试场景:使用过期的 JWT token // 预期结果:❌ 401 Unauthorized const response = await fetch('/cross-checking/result?id=123&tId=456', { headers: { 'Authorization': `Bearer ${expiredJWT}` } }); expect(response.status).toBe(401); expect(response.data.message).toContain('身份验证失败'); ``` #### 5. 缺少参数测试 ```typescript // 测试场景:缺少 taskId 参数 // 预期结果:❌ 400 Bad Request const response = await fetch('/cross-checking/result?id=123'); expect(response.status).toBe(400); expect(response.data.message).toContain('任务ID不能为空'); ``` --- ### 自动化测试脚本 创建 `app/api/cross-checking/__tests__/verify-document-access.test.ts`: ```typescript import { describe, it, expect } from 'vitest'; import { verifyDocumentAccess } from '../verify-document-access'; describe('verifyDocumentAccess', () => { it('应该允许任务发起人访问文档', async () => { const result = await verifyDocumentAccess({ documentId: 123, taskId: 456, userId: 1, // 发起人 ID jwtToken: 'valid-jwt' }); expect(result.hasAccess).toBe(true); expect(result.userRole).toBe('assigner'); }); it('应该允许任务评审员访问文档', async () => { const result = await verifyDocumentAccess({ documentId: 123, taskId: 456, userId: 2, // 评审员 ID jwtToken: 'valid-jwt' }); expect(result.hasAccess).toBe(true); expect(result.userRole).toBe('assignee'); }); it('应该拒绝未参与任务的用户访问', async () => { const result = await verifyDocumentAccess({ documentId: 123, taskId: 456, userId: 999, // 未参与的用户 jwtToken: 'valid-jwt' }); expect(result.hasAccess).toBe(false); expect(result.reason).toContain('没有权限'); }); it('应该拒绝访问不属于该任务的文档', async () => { const result = await verifyDocumentAccess({ documentId: 999, // 不属于任务的文档 taskId: 456, userId: 1, jwtToken: 'valid-jwt' }); expect(result.hasAccess).toBe(false); expect(result.reason).toContain('文档不属于该任务'); }); }); ``` **运行测试**: ```bash npm run test -- verify-document-access.test.ts ``` --- ## 📊 审计与监控 ### 记录未授权访问尝试 创建审计日志表: ```sql CREATE TABLE security_audit_logs ( id SERIAL PRIMARY KEY, user_id INTEGER, action VARCHAR(50) NOT NULL, resource_type VARCHAR(50), resource_id INTEGER, task_id INTEGER, ip_address INET, user_agent TEXT, status VARCHAR(20), -- 'denied', 'allowed' reason TEXT, created_at TIMESTAMP DEFAULT NOW() ); CREATE INDEX idx_audit_logs_user_id ON security_audit_logs(user_id); CREATE INDEX idx_audit_logs_created_at ON security_audit_logs(created_at); CREATE INDEX idx_audit_logs_status ON security_audit_logs(status); ``` ### 记录函数 ```typescript // app/api/security/audit-log.ts import { postgrestPost } from "../postgrest-client"; interface AuditLogParams { userId: number; action: string; resourceType: string; resourceId: number | string; taskId?: number | string; status: 'allowed' | 'denied'; reason?: string; ipAddress?: string; userAgent?: string; } export async function logSecurityEvent(params: AuditLogParams) { try { await postgrestPost('security_audit_logs', { user_id: params.userId, action: params.action, resource_type: params.resourceType, resource_id: params.resourceId, task_id: params.taskId, status: params.status, reason: params.reason, ip_address: params.ipAddress, user_agent: params.userAgent }); } catch (error) { console.error('❌ [Security] 记录审计日志失败:', error); // 审计日志失败不应影响主流程 } } ``` ### 在权限验证中记录 修改 `verify-document-access.ts`,添加审计日志: ```typescript import { logSecurityEvent } from "../security/audit-log"; export async function verifyDocumentAccess( params: DocumentAccessCheckParams ): Promise { // ... 现有验证逻辑 ... if (!accessCheck.hasAccess) { // 🔍 记录被拒绝的访问尝试 await logSecurityEvent({ userId: params.userId, action: 'access_document', resourceType: 'cross_checking_document', resourceId: params.documentId, taskId: params.taskId, status: 'denied', reason: accessCheck.reason }); return accessCheck; } // ✅ 记录成功的访问 await logSecurityEvent({ userId: params.userId, action: 'access_document', resourceType: 'cross_checking_document', resourceId: params.documentId, taskId: params.taskId, status: 'allowed' }); return accessCheck; } ``` ### 监控仪表板查询 ```sql -- 查看最近被拒绝的访问尝试 SELECT user_id, resource_id, task_id, reason, ip_address, created_at FROM security_audit_logs WHERE status = 'denied' AND action = 'access_document' AND created_at > NOW() - INTERVAL '24 hours' ORDER BY created_at DESC; -- 查看频繁尝试未授权访问的用户 SELECT user_id, COUNT(*) as attempt_count, MAX(created_at) as last_attempt FROM security_audit_logs WHERE status = 'denied' AND created_at > NOW() - INTERVAL '1 hour' GROUP BY user_id HAVING COUNT(*) > 5 ORDER BY attempt_count DESC; ``` --- ## 📋 检查清单 实施前检查: - [x] ✅ 前端 Loader 权限验证已实施 - [ ] ⚠️ 后端 API 权限验证(需要 Python 后端配合) - [ ] ⚠️ 数据库 RLS 策略(可选) - [ ] ⚠️ 审计日志系统(推荐) - [ ] ⚠️ 单元测试和集成测试 - [ ] ⚠️ 安全测试(渗透测试) 部署后验证: - [ ] 正常访问流程测试 - [ ] 未授权访问拦截测试 - [ ] JWT 过期处理测试 - [ ] 审计日志记录验证 - [ ] 性能影响评估 --- ## 🎯 总结 ### 当前实施状态 | 防护层 | 状态 | 描述 | |--------|------|------| | 前端 Loader 验证 | ✅ 已完成 | 验证文档-任务-用户关联 | | 后端 API 验证 | ⚠️ 待实施 | 需要 Python 后端配合 | | 数据库 RLS | ⚠️ 可选 | 最高安全级别,可选实施 | | 审计日志 | ⚠️ 推荐 | 用于安全监控和合规 | ### 后续行动 1. **立即部署**:前端 Loader 验证(已完成) 2. **协调后端**:实施后端 API 权限验证 3. **可选增强**:配置数据库 RLS 策略 4. **监控运营**:建立审计日志和监控体系 ### 联系方式 如有问题,请联系: - **安全团队**:security@example.com - **开发团队**:dev@example.com