Files
leaudit-platform-frontend/docs/cross-checking-security-guide.md
T
2025-12-05 00:09:32 +08:00

600 lines
16 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 交叉评查文档访问权限安全方案
## 📋 目录
1. [安全风险分析](#安全风险分析)
2. [多层防护方案](#多层防护方案)
3. [实施步骤](#实施步骤)
4. [测试验证](#测试验证)
5. [审计与监控](#审计与监控)
---
## 🔍 安全风险分析
### 当前风险
**问题描述**
- 用户可以通过修改 URL 参数 `id` 访问任意文档
- 缺少文档-任务-用户的关联验证
- 可能导致未授权访问其他用户的文档
**示例攻击场景**
```
正常访问: /cross-checking/result?id=123&tId=456
攻击尝试: /cross-checking/result?id=999&tId=456 ← 修改文档 ID
```
### 风险等级
| 风险项 | 等级 | 影响 |
|--------|------|------|
| 未授权访问文档 | 🔴 高 | 数据泄露、隐私侵犯 |
| 修改他人评查结果 | 🔴 高 | 数据完整性破坏 |
| 跨任务访问文档 | 🟡 中 | 业务逻辑绕过 |
---
## 🛡️ 多层防护方案
### 架构设计:纵深防御
```
┌─────────────┐
│ 用户请求 │
└──────┬──────┘
┌─────────────────────────────────────┐
│ 第 1 层:前端路由 Loader 验证 │ ✅ 已实施
│ - 验证 JWT token │
│ - 验证文档-任务关联 │
│ - 验证用户身份(发起人/评审员) │
└──────┬──────────────────────────────┘
┌─────────────────────────────────────┐
│ 第 2 层:后端 API 权限验证 │ ⚠️ 需要后端实施
│ - JWT token 解析和验证 │
│ - 用户权限查询 │
│ - 返回 403 Forbidden(无权限) │
└──────┬──────────────────────────────┘
┌─────────────────────────────────────┐
│ 第 3 层:数据库 RLS 策略 │ ⚠️ 可选(推荐)
│ - PostgreSQL Row Level Security │
│ - 自动过滤无权访问的记录 │
└──────┬──────────────────────────────┘
┌─────────────┐
│ 返回数据 │
└─────────────┘
```
---
## 📝 实施步骤
### ✅ Step 1: 前端 Loader 权限验证(已完成)
**文件位置**
- `app/api/cross-checking/verify-document-access.ts` - 权限验证工具
- `app/routes/cross-checking.result.tsx` - 路由 Loader
**验证逻辑**
1. 检查文档是否属于指定任务(`cross_task_document_mapping` 表)
2. 检查用户是否是任务的发起人(`assigner_id`
3. 检查用户是否是任务的评审员(`assignee_ids`
4. 任一条件满足则允许访问
**示例代码**
```typescript
// Loader 中的权限验证
const { verifyDocumentAccess } = await import("~/api/cross-checking/verify-document-access");
const accessCheck = await verifyDocumentAccess({
documentId: id,
taskId: taskId,
userId: userInfo.user_id,
jwtToken: frontendJWT
});
if (!accessCheck.hasAccess) {
return Response.json({
result: false,
message: '您没有权限访问该文档'
}, { status: 403 });
}
```
**返回状态码**
- `401 Unauthorized` - 未登录或 JWT 过期
- `403 Forbidden` - 已登录但无权限访问
- `404 Not Found` - 文档或任务不存在
---
### ⚠️ Step 2: 后端 API 权限验证(需要 Python 后端实施)
**后端 API 路径**`/api/v3/review-points/{fileId}`
**推荐实现**FastAPI 示例):
```python
from fastapi import APIRouter, Depends, HTTPException, status
from typing import List
router = APIRouter()
async def verify_document_access(
document_id: int,
task_id: int,
current_user: dict = Depends(get_current_user)
) -> bool:
"""
验证用户是否有权访问文档
验证逻辑:
1. 文档是否属于该任务
2. 用户是否是任务的参与者(发起人或评审员)
"""
# 1. 检查文档-任务映射
mapping = await db.query(CrossTaskDocumentMapping).filter(
CrossTaskDocumentMapping.task_id == task_id,
CrossTaskDocumentMapping.document_id == document_id
).first()
if not mapping:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="文档不属于该任务"
)
# 2. 检查用户权限
task = await db.query(CrossExaminationTask).filter(
CrossExaminationTask.id == task_id
).first()
if not task:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="任务不存在"
)
user_id = current_user["user_id"]
is_assigner = task.assigner_id == user_id
is_assignee = user_id in task.assignee_ids
if not (is_assigner or is_assignee):
# 记录未授权访问尝试(审计日志)
await log_unauthorized_access(
user_id=user_id,
document_id=document_id,
task_id=task_id,
ip_address=request.client.host
)
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="您没有权限访问该文档"
)
return True
@router.get("/api/v3/review-points/{file_id}")
async def get_review_points(
file_id: int,
task_id: int = Query(..., description="任务ID"),
current_user: dict = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
获取文档评查点(带权限验证)
"""
# 🔒 权限验证
await verify_document_access(file_id, task_id, current_user)
# 获取数据
review_points = await get_document_review_points(file_id, db)
return {
"data": review_points,
"stats": calculate_stats(review_points),
# ...
}
```
**关键点**
- ✅ 解析和验证 JWT token
- ✅ 从 JWT 中提取 `user_id`
- ✅ 查询数据库验证权限
- ✅ 记录未授权访问尝试(审计日志)
- ✅ 返回明确的错误状态码
---
### 🔧 Step 3: 数据库层 RLS 策略(可选,推荐)
PostgreSQL Row Level Security (RLS) 可以在数据库层面强制执行权限控制。
**创建 RLS 策略**
```sql
-- 1. 启用 RLS
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_task_document_mapping ENABLE ROW LEVEL SECURITY;
ALTER TABLE cross_examination_tasks ENABLE ROW LEVEL SECURITY;
-- 2. 创建策略:用户只能访问自己参与的任务中的文档
CREATE POLICY cross_checking_document_access ON documents
FOR SELECT
USING (
id IN (
-- 文档属于用户参与的任务
SELECT ctdm.document_id
FROM cross_task_document_mapping ctdm
JOIN cross_examination_tasks cet ON ctdm.task_id = cet.id
WHERE
cet.assigner_id = current_setting('app.current_user_id')::int
OR current_setting('app.current_user_id')::int = ANY(cet.assignee_ids)
)
);
-- 3. 创建策略:用户只能查看自己参与的任务
CREATE POLICY cross_checking_task_access ON cross_examination_tasks
FOR SELECT
USING (
assigner_id = current_setting('app.current_user_id')::int
OR current_setting('app.current_user_id')::int = ANY(assignee_ids)
);
```
**在应用中设置当前用户**
```typescript
// PostgREST 客户端请求前设置
await db.query(`SET app.current_user_id = ${userId}`);
```
**优点**
- ✅ 数据库层面自动过滤
- ✅ 即使前端/后端被绕过,数据库也会拒绝访问
- ✅ 审计和合规性更强
**缺点**
- ⚠️ 需要修改数据库配置
- ⚠️ 复杂查询可能影响性能
---
## 🧪 测试验证
### 测试用例
#### 1. 正常访问测试
```typescript
// 测试场景:用户访问自己参与的任务中的文档
// 预期结果:✅ 200 OK,返回文档数据
const response = await fetch('/cross-checking/result?id=123&tId=456', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(200);
expect(response.data.document.id).toBe(123);
```
#### 2. 未授权访问测试
```typescript
// 测试场景:用户尝试访问他人的文档
// 预期结果:❌ 403 Forbidden
const response = await fetch('/cross-checking/result?id=999&tId=456', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(403);
expect(response.data.message).toContain('没有权限');
```
#### 3. 跨任务访问测试
```typescript
// 测试场景:用户尝试将文档 A 的 ID 用于任务 B
// 预期结果:❌ 403 Forbidden
const response = await fetch('/cross-checking/result?id=123&tId=789', {
headers: {
'Authorization': `Bearer ${validJWT}`
}
});
expect(response.status).toBe(403);
expect(response.data.message).toContain('文档不属于该任务');
```
#### 4. JWT 过期测试
```typescript
// 测试场景:使用过期的 JWT token
// 预期结果:❌ 401 Unauthorized
const response = await fetch('/cross-checking/result?id=123&tId=456', {
headers: {
'Authorization': `Bearer ${expiredJWT}`
}
});
expect(response.status).toBe(401);
expect(response.data.message).toContain('身份验证失败');
```
#### 5. 缺少参数测试
```typescript
// 测试场景:缺少 taskId 参数
// 预期结果:❌ 400 Bad Request
const response = await fetch('/cross-checking/result?id=123');
expect(response.status).toBe(400);
expect(response.data.message).toContain('任务ID不能为空');
```
---
### 自动化测试脚本
创建 `app/api/cross-checking/__tests__/verify-document-access.test.ts`:
```typescript
import { describe, it, expect } from 'vitest';
import { verifyDocumentAccess } from '../verify-document-access';
describe('verifyDocumentAccess', () => {
it('应该允许任务发起人访问文档', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 1, // 发起人 ID
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(true);
expect(result.userRole).toBe('assigner');
});
it('应该允许任务评审员访问文档', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 2, // 评审员 ID
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(true);
expect(result.userRole).toBe('assignee');
});
it('应该拒绝未参与任务的用户访问', async () => {
const result = await verifyDocumentAccess({
documentId: 123,
taskId: 456,
userId: 999, // 未参与的用户
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(false);
expect(result.reason).toContain('没有权限');
});
it('应该拒绝访问不属于该任务的文档', async () => {
const result = await verifyDocumentAccess({
documentId: 999, // 不属于任务的文档
taskId: 456,
userId: 1,
jwtToken: 'valid-jwt'
});
expect(result.hasAccess).toBe(false);
expect(result.reason).toContain('文档不属于该任务');
});
});
```
**运行测试**
```bash
npm run test -- verify-document-access.test.ts
```
---
## 📊 审计与监控
### 记录未授权访问尝试
创建审计日志表:
```sql
CREATE TABLE security_audit_logs (
id SERIAL PRIMARY KEY,
user_id INTEGER,
action VARCHAR(50) NOT NULL,
resource_type VARCHAR(50),
resource_id INTEGER,
task_id INTEGER,
ip_address INET,
user_agent TEXT,
status VARCHAR(20), -- 'denied', 'allowed'
reason TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE INDEX idx_audit_logs_user_id ON security_audit_logs(user_id);
CREATE INDEX idx_audit_logs_created_at ON security_audit_logs(created_at);
CREATE INDEX idx_audit_logs_status ON security_audit_logs(status);
```
### 记录函数
```typescript
// app/api/security/audit-log.ts
import { postgrestPost } from "../postgrest-client";
interface AuditLogParams {
userId: number;
action: string;
resourceType: string;
resourceId: number | string;
taskId?: number | string;
status: 'allowed' | 'denied';
reason?: string;
ipAddress?: string;
userAgent?: string;
}
export async function logSecurityEvent(params: AuditLogParams) {
try {
await postgrestPost('security_audit_logs', {
user_id: params.userId,
action: params.action,
resource_type: params.resourceType,
resource_id: params.resourceId,
task_id: params.taskId,
status: params.status,
reason: params.reason,
ip_address: params.ipAddress,
user_agent: params.userAgent
});
} catch (error) {
console.error('❌ [Security] 记录审计日志失败:', error);
// 审计日志失败不应影响主流程
}
}
```
### 在权限验证中记录
修改 `verify-document-access.ts`,添加审计日志:
```typescript
import { logSecurityEvent } from "../security/audit-log";
export async function verifyDocumentAccess(
params: DocumentAccessCheckParams
): Promise<DocumentAccessCheckResult> {
// ... 现有验证逻辑 ...
if (!accessCheck.hasAccess) {
// 🔍 记录被拒绝的访问尝试
await logSecurityEvent({
userId: params.userId,
action: 'access_document',
resourceType: 'cross_checking_document',
resourceId: params.documentId,
taskId: params.taskId,
status: 'denied',
reason: accessCheck.reason
});
return accessCheck;
}
// ✅ 记录成功的访问
await logSecurityEvent({
userId: params.userId,
action: 'access_document',
resourceType: 'cross_checking_document',
resourceId: params.documentId,
taskId: params.taskId,
status: 'allowed'
});
return accessCheck;
}
```
### 监控仪表板查询
```sql
-- 查看最近被拒绝的访问尝试
SELECT
user_id,
resource_id,
task_id,
reason,
ip_address,
created_at
FROM security_audit_logs
WHERE status = 'denied'
AND action = 'access_document'
AND created_at > NOW() - INTERVAL '24 hours'
ORDER BY created_at DESC;
-- 查看频繁尝试未授权访问的用户
SELECT
user_id,
COUNT(*) as attempt_count,
MAX(created_at) as last_attempt
FROM security_audit_logs
WHERE status = 'denied'
AND created_at > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING COUNT(*) > 5
ORDER BY attempt_count DESC;
```
---
## 📋 检查清单
实施前检查:
- [x] ✅ 前端 Loader 权限验证已实施
- [ ] ⚠️ 后端 API 权限验证(需要 Python 后端配合)
- [ ] ⚠️ 数据库 RLS 策略(可选)
- [ ] ⚠️ 审计日志系统(推荐)
- [ ] ⚠️ 单元测试和集成测试
- [ ] ⚠️ 安全测试(渗透测试)
部署后验证:
- [ ] 正常访问流程测试
- [ ] 未授权访问拦截测试
- [ ] JWT 过期处理测试
- [ ] 审计日志记录验证
- [ ] 性能影响评估
---
## 🎯 总结
### 当前实施状态
| 防护层 | 状态 | 描述 |
|--------|------|------|
| 前端 Loader 验证 | ✅ 已完成 | 验证文档-任务-用户关联 |
| 后端 API 验证 | ⚠️ 待实施 | 需要 Python 后端配合 |
| 数据库 RLS | ⚠️ 可选 | 最高安全级别,可选实施 |
| 审计日志 | ⚠️ 推荐 | 用于安全监控和合规 |
### 后续行动
1. **立即部署**:前端 Loader 验证(已完成)
2. **协调后端**:实施后端 API 权限验证
3. **可选增强**:配置数据库 RLS 策略
4. **监控运营**:建立审计日志和监控体系
### 联系方式
如有问题,请联系:
- **安全团队**security@example.com
- **开发团队**dev@example.com