16 KiB
16 KiB
交叉评查文档访问权限安全方案
📋 目录
🔍 安全风险分析
当前风险
问题描述:
- 用户可以通过修改 URL 参数
id访问任意文档 - 缺少文档-任务-用户的关联验证
- 可能导致未授权访问其他用户的文档
示例攻击场景:
正常访问: /cross-checking/result?id=123&tId=456
攻击尝试: /cross-checking/result?id=999&tId=456 ← 修改文档 ID
风险等级
| 风险项 | 等级 | 影响 |
|---|---|---|
| 未授权访问文档 | 🔴 高 | 数据泄露、隐私侵犯 |
| 修改他人评查结果 | 🔴 高 | 数据完整性破坏 |
| 跨任务访问文档 | 🟡 中 | 业务逻辑绕过 |
🛡️ 多层防护方案
架构设计:纵深防御
┌─────────────┐
│ 用户请求 │
└──────┬──────┘
│
▼
┌─────────────────────────────────────┐
│ 第 1 层:前端路由 Loader 验证 │ ✅ 已实施
│ - 验证 JWT token │
│ - 验证文档-任务关联 │
│ - 验证用户身份(发起人/评审员) │
└──────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 第 2 层:后端 API 权限验证 │ ⚠️ 需要后端实施
│ - JWT token 解析和验证 │
│ - 用户权限查询 │
│ - 返回 403 Forbidden(无权限) │
└──────┬──────────────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ 第 3 层:数据库 RLS 策略 │ ⚠️ 可选(推荐)
│ - PostgreSQL Row Level Security │
│ - 自动过滤无权访问的记录 │
└──────┬──────────────────────────────┘
│
▼
┌─────────────┐
│ 返回数据 │
└─────────────┘
📝 实施步骤
✅ Step 1: 前端 Loader 权限验证(已完成)
文件位置:
app/api/cross-checking/verify-document-access.ts- 权限验证工具app/routes/cross-checking.result.tsx- 路由 Loader
验证逻辑:
- 检查文档是否属于指定任务(
cross_task_document_mapping表) - 检查用户是否是任务的发起人(
assigner_id) - 检查用户是否是任务的评审员(
assignee_ids) - 任一条件满足则允许访问
示例代码:
// Loader 中的权限验证
const { verifyDocumentAccess } = await import("~/api/cross-checking/verify-document-access");
const accessCheck = await verifyDocumentAccess({
documentId: id,
taskId: taskId,
userId: userInfo.user_id,
jwtToken: frontendJWT
});
if (!accessCheck.hasAccess) {
return Response.json({
result: false,
message: '您没有权限访问该文档'
}, { status: 403 });
}
返回状态码:
401 Unauthorized- 未登录或 JWT 过期403 Forbidden- 已登录但无权限访问404 Not Found- 文档或任务不存在
⚠️ Step 2: 后端 API 权限验证(需要 Python 后端实施)
后端 API 路径:/api/v3/review-points/{fileId}
推荐实现(FastAPI 示例):
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
router = APIRouter()
async def verify_document_access(
document_id: int,
task_id: int,
current_user: dict = Depends(get_current_user)
) -> bool:
"""
验证用户是否有权访问文档
验证逻辑:
1. 文档是否属于该任务
2. 用户是否是任务的参与者(发起人或评审员)
"""
# 1. 检查文档-任务映射
mapping = await db.query(CrossTaskDocumentMapping).filter(
CrossTaskDocumentMapping.task_id == task_id,
CrossTaskDocumentMapping.document_id == document_id
).first()
if not mapping:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="文档不属于该任务"
)
# 2. 检查用户权限
task = await db.query(CrossExaminationTask).filter(
CrossExaminationTask.id == task_id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="任务不存在"
)
user_id = current_user["user_id"]
is_assigner = task.assigner_id == user_id
is_assignee = user_id in task.assignee_ids
if not (is_assigner or is_assignee):
# 记录未授权访问尝试(审计日志)
await log_unauthorized_access(
user_id=user_id,
document_id=document_id,
task_id=task_id,
ip_address=request.client.host
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="您没有权限访问该文档"
)
return True
@router.get("/api/v3/review-points/{file_id}")
async def get_review_points(
file_id: int,
task_id: int = Query(..., description="任务ID"),
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取文档评查点(带权限验证)
"""
# 🔒 权限验证
await verify_document_access(file_id, task_id, current_user)
# 获取数据
review_points = await get_document_review_points(file_id, db)
return {
"data": review_points,
"stats": calculate_stats(review_points),
# ...
}
关键点:
- ✅ 解析和验证 JWT token
- ✅ 从 JWT 中提取
user_id - ✅ 查询数据库验证权限
- ✅ 记录未授权访问尝试(审计日志)
- ✅ 返回明确的错误状态码
🔧 Step 3: 数据库层 RLS 策略(可选,推荐)
PostgreSQL Row Level Security (RLS) 可以在数据库层面强制执行权限控制。
创建 RLS 策略:
-- 1. 启用 RLS
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_task_document_mapping ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_examination_tasks ENABLE ROW LEVEL SECURITY;
-- 2. 创建策略:用户只能访问自己参与的任务中的文档
CREATE POLICY cross_checking_document_access ON documents
FOR SELECT
USING (
id IN (
-- 文档属于用户参与的任务
SELECT ctdm.document_id
FROM cross_task_document_mapping ctdm
JOIN cross_examination_tasks cet ON ctdm.task_id = cet.id
WHERE
cet.assigner_id = current_setting('app.current_user_id')::int
OR current_setting('app.current_user_id')::int = ANY(cet.assignee_ids)
)
);
-- 3. 创建策略:用户只能查看自己参与的任务
CREATE POLICY cross_checking_task_access ON cross_examination_tasks
FOR SELECT
USING (
assigner_id = current_setting('app.current_user_id')::int
OR current_setting('app.current_user_id')::int = ANY(assignee_ids)
);
在应用中设置当前用户:
// PostgREST 客户端请求前设置
await db.query(`SET app.current_user_id = ${userId}`);
优点:
- ✅ 数据库层面自动过滤
- ✅ 即使前端/后端被绕过,数据库也会拒绝访问
- ✅ 审计和合规性更强
缺点:
- ⚠️ 需要修改数据库配置
- ⚠️ 复杂查询可能影响性能
🧪 测试验证
测试用例
1. 正常访问测试
// 测试场景:用户访问自己参与的任务中的文档
// 预期结果:✅ 200 OK,返回文档数据
const response = await fetch('/cross-checking/result?id=123&tId=456', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(200);
expect(response.data.document.id).toBe(123);
2. 未授权访问测试
// 测试场景:用户尝试访问他人的文档
// 预期结果:❌ 403 Forbidden
const response = await fetch('/cross-checking/result?id=999&tId=456', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(403);
expect(response.data.message).toContain('没有权限');
3. 跨任务访问测试
// 测试场景:用户尝试将文档 A 的 ID 用于任务 B
// 预期结果:❌ 403 Forbidden
const response = await fetch('/cross-checking/result?id=123&tId=789', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(403);
expect(response.data.message).toContain('文档不属于该任务');
4. JWT 过期测试
// 测试场景:使用过期的 JWT token
// 预期结果:❌ 401 Unauthorized
const response = await fetch('/cross-checking/result?id=123&tId=456', {
headers: {
'Authorization': `Bearer ${expiredJWT}`
}
});
expect(response.status).toBe(401);
expect(response.data.message).toContain('身份验证失败');
5. 缺少参数测试
// 测试场景:缺少 taskId 参数
// 预期结果:❌ 400 Bad Request
const response = await fetch('/cross-checking/result?id=123');
expect(response.status).toBe(400);
expect(response.data.message).toContain('任务ID不能为空');
自动化测试脚本
创建 app/api/cross-checking/__tests__/verify-document-access.test.ts:
import { describe, it, expect } from 'vitest';
import { verifyDocumentAccess } from '../verify-document-access';
describe('verifyDocumentAccess', () => {
it('应该允许任务发起人访问文档', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 1, // 发起人 ID
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(true);
expect(result.userRole).toBe('assigner');
});
it('应该允许任务评审员访问文档', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 2, // 评审员 ID
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(true);
expect(result.userRole).toBe('assignee');
});
it('应该拒绝未参与任务的用户访问', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 999, // 未参与的用户
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(false);
expect(result.reason).toContain('没有权限');
});
it('应该拒绝访问不属于该任务的文档', async () => {
const result = await verifyDocumentAccess({
documentId: 999, // 不属于任务的文档
taskId: 456,
userId: 1,
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(false);
expect(result.reason).toContain('文档不属于该任务');
});
});
运行测试:
npm run test -- verify-document-access.test.ts
📊 审计与监控
记录未授权访问尝试
创建审计日志表:
CREATE TABLE security_audit_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER,
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50),
resource_id INTEGER,
task_id INTEGER,
ip_address INET,
user_agent TEXT,
status VARCHAR(20), -- 'denied', 'allowed'
reason TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_audit_logs_user_id ON security_audit_logs(user_id);
CREATE INDEX idx_audit_logs_created_at ON security_audit_logs(created_at);
CREATE INDEX idx_audit_logs_status ON security_audit_logs(status);
记录函数
// app/api/security/audit-log.ts
import { postgrestPost } from "../postgrest-client";
interface AuditLogParams {
userId: number;
action: string;
resourceType: string;
resourceId: number | string;
taskId?: number | string;
status: 'allowed' | 'denied';
reason?: string;
ipAddress?: string;
userAgent?: string;
}
export async function logSecurityEvent(params: AuditLogParams) {
try {
await postgrestPost('security_audit_logs', {
user_id: params.userId,
action: params.action,
resource_type: params.resourceType,
resource_id: params.resourceId,
task_id: params.taskId,
status: params.status,
reason: params.reason,
ip_address: params.ipAddress,
user_agent: params.userAgent
});
} catch (error) {
console.error('❌ [Security] 记录审计日志失败:', error);
// 审计日志失败不应影响主流程
}
}
在权限验证中记录
修改 verify-document-access.ts,添加审计日志:
import { logSecurityEvent } from "../security/audit-log";
export async function verifyDocumentAccess(
params: DocumentAccessCheckParams
): Promise<DocumentAccessCheckResult> {
// ... 现有验证逻辑 ...
if (!accessCheck.hasAccess) {
// 🔍 记录被拒绝的访问尝试
await logSecurityEvent({
userId: params.userId,
action: 'access_document',
resourceType: 'cross_checking_document',
resourceId: params.documentId,
taskId: params.taskId,
status: 'denied',
reason: accessCheck.reason
});
return accessCheck;
}
// ✅ 记录成功的访问
await logSecurityEvent({
userId: params.userId,
action: 'access_document',
resourceType: 'cross_checking_document',
resourceId: params.documentId,
taskId: params.taskId,
status: 'allowed'
});
return accessCheck;
}
监控仪表板查询
-- 查看最近被拒绝的访问尝试
SELECT
user_id,
resource_id,
task_id,
reason,
ip_address,
created_at
FROM security_audit_logs
WHERE status = 'denied'
AND action = 'access_document'
AND created_at > NOW() - INTERVAL '24 hours'
ORDER BY created_at DESC;
-- 查看频繁尝试未授权访问的用户
SELECT
user_id,
COUNT(*) as attempt_count,
MAX(created_at) as last_attempt
FROM security_audit_logs
WHERE status = 'denied'
AND created_at > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING COUNT(*) > 5
ORDER BY attempt_count DESC;
📋 检查清单
实施前检查:
- ✅ 前端 Loader 权限验证已实施
- ⚠️ 后端 API 权限验证(需要 Python 后端配合)
- ⚠️ 数据库 RLS 策略(可选)
- ⚠️ 审计日志系统(推荐)
- ⚠️ 单元测试和集成测试
- ⚠️ 安全测试(渗透测试)
部署后验证:
- 正常访问流程测试
- 未授权访问拦截测试
- JWT 过期处理测试
- 审计日志记录验证
- 性能影响评估
🎯 总结
当前实施状态
| 防护层 | 状态 | 描述 |
|---|---|---|
| 前端 Loader 验证 | ✅ 已完成 | 验证文档-任务-用户关联 |
| 后端 API 验证 | ⚠️ 待实施 | 需要 Python 后端配合 |
| 数据库 RLS | ⚠️ 可选 | 最高安全级别,可选实施 |
| 审计日志 | ⚠️ 推荐 | 用于安全监控和合规 |
后续行动
- 立即部署:前端 Loader 验证(已完成)
- 协调后端:实施后端 API 权限验证
- 可选增强:配置数据库 RLS 策略
- 监控运营:建立审计日志和监控体系
联系方式
如有问题,请联系:
- 安全团队:security@example.com
- 开发团队:dev@example.com