6.4 多层防护体系设计
🎯 学习目标:构建完整的MCP安全防护体系
⏱️ 预计时间:40分钟
📊 难度等级:⭐⭐⭐⭐⭐
🏰 多层防护架构
🛡️ 防护层次图
🌐 网络层防护
🚫 DDoS攻击防护
python
# DDoS防护系统
import asyncio
import time
from collections import defaultdict, deque
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
import ipaddress
class AttackType(Enum):
VOLUMETRIC = "volumetric" # 容量攻击
PROTOCOL = "protocol" # 协议攻击
APPLICATION = "application" # 应用层攻击
@dataclass
class AttackPattern:
attack_type: AttackType
threshold: int
time_window: int
action: str
class DDoSProtection:
"""DDoS防护系统"""
def __init__(self):
# 请求统计
self.request_counts = defaultdict(deque)
self.blocked_ips = set()
self.rate_limits = defaultdict(int)
# 攻击模式配置
self.attack_patterns = [
AttackPattern(AttackType.VOLUMETRIC, 100, 60, "block"),
AttackPattern(AttackType.PROTOCOL, 50, 30, "challenge"),
AttackPattern(AttackType.APPLICATION, 20, 10, "throttle")
]
# 白名单IP
self.whitelist = {
ipaddress.ip_network("127.0.0.0/8"),
ipaddress.ip_network("10.0.0.0/8"),
ipaddress.ip_network("172.16.0.0/12"),
ipaddress.ip_network("192.168.0.0/16")
}
# 地理IP封锁
self.geo_blocked_countries = {"CN", "RU", "KP"} # 示例
def is_whitelisted(self, ip: str) -> bool:
"""检查IP是否在白名单中"""
try:
ip_addr = ipaddress.ip_address(ip)
return any(ip_addr in network for network in self.whitelist)
except ValueError:
return False
def is_geo_blocked(self, ip: str) -> bool:
"""检查IP地理位置是否被封锁"""
# 这里应该集成真实的地理IP数据库
# 示例实现
country_code = self.get_country_code(ip)
return country_code in self.geo_blocked_countries
def get_country_code(self, ip: str) -> str:
"""获取IP对应的国家代码"""
# 实际实现应该使用GeoIP数据库
# 这里返回示例数据
return "US"
def check_request(self, ip: str, user_agent: str, request_path: str) -> Dict[str, Any]:
"""检查请求是否存在DDoS攻击"""
current_time = time.time()
# 1. 白名单检查
if self.is_whitelisted(ip):
return {"action": "allow", "reason": "whitelisted"}
# 2. 黑名单检查
if ip in self.blocked_ips:
return {"action": "block", "reason": "blacklisted"}
# 3. 地理位置检查
if self.is_geo_blocked(ip):
self.blocked_ips.add(ip)
return {"action": "block", "reason": "geo_blocked"}
# 4. 请求频率检查
self.request_counts[ip].append(current_time)
# 清理旧记录
while (self.request_counts[ip] and
current_time - self.request_counts[ip][0] > 300): # 5分钟窗口
self.request_counts[ip].popleft()
# 5. 攻击模式检测
for pattern in self.attack_patterns:
if self.detect_attack_pattern(ip, pattern, current_time):
return self.handle_attack(ip, pattern)
# 6. 应用层攻击检测
if self.detect_application_attack(user_agent, request_path):
return {"action": "challenge", "reason": "suspicious_behavior"}
return {"action": "allow", "reason": "clean"}
def detect_attack_pattern(self, ip: str, pattern: AttackPattern,
current_time: float) -> bool:
"""检测攻击模式"""
requests_in_window = [
req_time for req_time in self.request_counts[ip]
if current_time - req_time <= pattern.time_window
]
return len(requests_in_window) > pattern.threshold
def detect_application_attack(self, user_agent: str, request_path: str) -> bool:
"""检测应用层攻击"""
# 1. 可疑User-Agent检测
suspicious_agents = [
"bot", "crawler", "scanner", "sqlmap", "nikto",
"nmap", "masscan", "zap", "burp"
]
if any(agent in user_agent.lower() for agent in suspicious_agents):
return True
# 2. 可疑路径检测
suspicious_paths = [
"/admin", "/.env", "/config", "/backup",
"/wp-admin", "/phpmyadmin", "/shell"
]
if any(path in request_path.lower() for path in suspicious_paths):
return True
# 3. SQL注入特征检测
sql_patterns = [
"union select", "or 1=1", "drop table",
"exec(", "script>", "<iframe"
]
if any(pattern in request_path.lower() for pattern in sql_patterns):
return True
return False
def handle_attack(self, ip: str, pattern: AttackPattern) -> Dict[str, Any]:
"""处理攻击"""
if pattern.action == "block":
self.blocked_ips.add(ip)
# 设置自动解封时间
asyncio.create_task(self.schedule_unblock(ip, 3600)) # 1小时后解封
return {"action": "block", "reason": f"{pattern.attack_type.value}_attack"}
elif pattern.action == "challenge":
return {"action": "challenge", "reason": "rate_limit_exceeded"}
elif pattern.action == "throttle":
self.rate_limits[ip] = int(time.time()) + 300 # 5分钟限制
return {"action": "throttle", "reason": "temporary_limit"}
return {"action": "allow", "reason": "unknown"}
async def schedule_unblock(self, ip: str, delay: int):
"""定时解封IP"""
await asyncio.sleep(delay)
if ip in self.blocked_ips:
self.blocked_ips.remove(ip)
logger.info(f"IP {ip} 已自动解封")
# WAF防火墙
class WebApplicationFirewall:
"""Web应用防火墙"""
def __init__(self):
# 攻击规则库
self.attack_rules = {
"xss": [
r"<script[^>]*>",
r"javascript:",
r"on\w+\s*=",
r"<iframe[^>]*>",
r"<object[^>]*>",
r"<embed[^>]*>"
],
"sqli": [
r"union\s+select",
r"or\s+1\s*=\s*1",
r"drop\s+table",
r"insert\s+into",
r"delete\s+from",
r"update\s+.*\s+set"
],
"lfi": [
r"\.\./",
r"\.\.\\",
r"/etc/passwd",
r"/proc/self/environ",
r"boot\.ini"
],
"rce": [
r"exec\s*\(",
r"system\s*\(",
r"shell_exec\s*\(",
r"passthru\s*\(",
r"eval\s*\(",
r"`.*`"
]
}
# 请求大小限制
self.max_request_size = 10 * 1024 * 1024 # 10MB
self.max_header_size = 8 * 1024 # 8KB
self.max_url_length = 2048
def scan_request(self, request_data: Dict[str, Any]) -> Dict[str, Any]:
"""扫描请求中的攻击特征"""
scan_result = {
"is_malicious": False,
"attack_types": [],
"confidence": 0.0,
"blocked_content": []
}
# 1. 请求大小检查
content_length = request_data.get("content_length", 0)
if content_length > self.max_request_size:
scan_result["is_malicious"] = True
scan_result["attack_types"].append("oversized_request")
# 2. URL长度检查
url = request_data.get("url", "")
if len(url) > self.max_url_length:
scan_result["is_malicious"] = True
scan_result["attack_types"].append("long_url")
# 3. 头部大小检查
headers = request_data.get("headers", {})
total_header_size = sum(len(k) + len(v) for k, v in headers.items())
if total_header_size > self.max_header_size:
scan_result["is_malicious"] = True
scan_result["attack_types"].append("oversized_headers")
# 4. 攻击模式检测
content_to_scan = [
url,
request_data.get("body", ""),
str(headers),
request_data.get("query_params", "")
]
for content in content_to_scan:
if content:
attack_result = self.detect_attacks(content)
if attack_result["is_malicious"]:
scan_result["is_malicious"] = True
scan_result["attack_types"].extend(attack_result["attack_types"])
scan_result["blocked_content"].extend(attack_result["matches"])
# 5. 计算置信度
if scan_result["is_malicious"]:
scan_result["confidence"] = min(
len(scan_result["attack_types"]) * 0.25, 1.0
)
return scan_result
def detect_attacks(self, content: str) -> Dict[str, Any]:
"""检测内容中的攻击模式"""
import re
result = {
"is_malicious": False,
"attack_types": [],
"matches": []
}
for attack_type, patterns in self.attack_rules.items():
for pattern in patterns:
matches = re.findall(pattern, content, re.IGNORECASE)
if matches:
result["is_malicious"] = True
result["attack_types"].append(attack_type)
result["matches"].extend(matches)
return result
def sanitize_content(self, content: str) -> str:
"""清理恶意内容"""
import re
import html
# 1. HTML编码
sanitized = html.escape(content)
# 2. 移除危险标签
dangerous_tags = [
r"<script[^>]*>.*?</script>",
r"<iframe[^>]*>.*?</iframe>",
r"<object[^>]*>.*?</object>",
r"<embed[^>]*>.*?</embed>"
]
for pattern in dangerous_tags:
sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE | re.DOTALL)
# 3. 移除危险属性
dangerous_attrs = [
r"on\w+\s*=\s*[\"'][^\"']*[\"']",
r"javascript:\s*[^\"']*",
r"vbscript:\s*[^\"']*"
]
for pattern in dangerous_attrs:
sanitized = re.sub(pattern, "", sanitized, flags=re.IGNORECASE)
return sanitized.strip()
🔐 应用层防护
🛡️ 统一认证授权系统
python
# 统一认证授权系统
from abc import ABC, abstractmethod
from typing import Dict, Any, List, Optional, Set
from enum import Enum
import bcrypt
import pyotp
import qrcode
from io import BytesIO
import base64
class AuthenticationMethod(Enum):
PASSWORD = "password"
TOTP = "totp"
SMS = "sms"
EMAIL = "email"
BIOMETRIC = "biometric"
class Permission(Enum):
READ = "read"
WRITE = "write"
EXECUTE = "execute"
ADMIN = "admin"
DELETE = "delete"
class Role(Enum):
GUEST = "guest"
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
class AuthenticationProvider(ABC):
"""认证提供者抽象基类"""
@abstractmethod
async def authenticate(self, credentials: Dict[str, Any]) -> bool:
pass
@abstractmethod
def get_method_type(self) -> AuthenticationMethod:
pass
class PasswordAuthProvider(AuthenticationProvider):
"""密码认证提供者"""
def __init__(self, user_store):
self.user_store = user_store
self.password_policy = {
"min_length": 12,
"require_uppercase": True,
"require_lowercase": True,
"require_digits": True,
"require_special": True,
"max_age_days": 90,
"history_count": 5
}
async def authenticate(self, credentials: Dict[str, Any]) -> bool:
"""密码认证"""
username = credentials.get("username")
password = credentials.get("password")
if not username or not password:
return False
user = await self.user_store.get_user(username)
if not user:
return False
# 检查密码
stored_hash = user.password_hash.encode('utf-8')
password_bytes = password.encode('utf-8')
if bcrypt.checkpw(password_bytes, stored_hash):
# 检查密码是否需要更新
if self.should_update_password(user):
await self.schedule_password_update(user)
return True
return False
def get_method_type(self) -> AuthenticationMethod:
return AuthenticationMethod.PASSWORD
def validate_password_policy(self, password: str) -> Dict[str, Any]:
"""验证密码策略"""
import re
result = {
"is_valid": True,
"violations": []
}
policy = self.password_policy
# 长度检查
if len(password) < policy["min_length"]:
result["is_valid"] = False
result["violations"].append(f"密码长度至少{policy['min_length']}位")
# 大写字母检查
if policy["require_uppercase"] and not re.search(r"[A-Z]", password):
result["is_valid"] = False
result["violations"].append("必须包含大写字母")
# 小写字母检查
if policy["require_lowercase"] and not re.search(r"[a-z]", password):
result["is_valid"] = False
result["violations"].append("必须包含小写字母")
# 数字检查
if policy["require_digits"] and not re.search(r"\d", password):
result["is_valid"] = False
result["violations"].append("必须包含数字")
# 特殊字符检查
if policy["require_special"] and not re.search(r"[!@#$%^&*(),.?\":{}|<>]", password):
result["is_valid"] = False
result["violations"].append("必须包含特殊字符")
return result
def hash_password(self, password: str) -> str:
"""哈希密码"""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt(rounds=12)
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
class TOTPAuthProvider(AuthenticationProvider):
"""TOTP二次认证提供者"""
def __init__(self, user_store):
self.user_store = user_store
self.issuer_name = "MCP Server"
async def authenticate(self, credentials: Dict[str, Any]) -> bool:
"""TOTP认证"""
username = credentials.get("username")
totp_code = credentials.get("totp_code")
if not username or not totp_code:
return False
user = await self.user_store.get_user(username)
if not user or not user.totp_secret:
return False
# 验证TOTP代码
totp = pyotp.TOTP(user.totp_secret)
return totp.verify(totp_code, valid_window=1) # 允许30秒时间偏差
def get_method_type(self) -> AuthenticationMethod:
return AuthenticationMethod.TOTP
def generate_totp_secret(self, username: str) -> Dict[str, str]:
"""生成TOTP密钥"""
secret = pyotp.random_base32()
# 生成QR码
totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
name=username,
issuer_name=self.issuer_name
)
# 创建QR码图片
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(totp_uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
# 转换为base64
buffer = BytesIO()
img.save(buffer, format='PNG')
qr_code_data = base64.b64encode(buffer.getvalue()).decode()
return {
"secret": secret,
"qr_code": f"data:image/png;base64,{qr_code_data}",
"manual_entry_key": secret
}
class MultiFactorAuthenticator:
"""多因素认证器"""
def __init__(self):
self.providers: Dict[AuthenticationMethod, AuthenticationProvider] = {}
self.required_methods = {
Role.USER: [AuthenticationMethod.PASSWORD],
Role.MODERATOR: [AuthenticationMethod.PASSWORD, AuthenticationMethod.TOTP],
Role.ADMIN: [AuthenticationMethod.PASSWORD, AuthenticationMethod.TOTP],
Role.SUPER_ADMIN: [AuthenticationMethod.PASSWORD, AuthenticationMethod.TOTP]
}
def register_provider(self, provider: AuthenticationProvider):
"""注册认证提供者"""
self.providers[provider.get_method_type()] = provider
async def authenticate_user(self, username: str, credentials: Dict[str, Any],
required_role: Role = Role.USER) -> Dict[str, Any]:
"""多因素用户认证"""
result = {
"success": False,
"user_id": None,
"completed_methods": [],
"required_methods": self.required_methods[required_role],
"next_method": None,
"session_token": None
}
# 获取用户信息
user = await self.get_user(username)
if not user:
return result
# 检查用户角色
if not self.check_user_role(user, required_role):
result["error"] = "用户权限不足"
return result
# 执行认证流程
for method in result["required_methods"]:
if method not in self.providers:
result["error"] = f"认证方法 {method.value} 不可用"
return result
provider = self.providers[method]
method_credentials = self.extract_method_credentials(credentials, method)
if await provider.authenticate(method_credentials):
result["completed_methods"].append(method)
else:
result["error"] = f"{method.value} 认证失败"
return result
# 所有认证方法都成功
if len(result["completed_methods"]) == len(result["required_methods"]):
result["success"] = True
result["user_id"] = user.id
result["session_token"] = await self.create_session_token(user)
else:
# 确定下一个认证方法
remaining_methods = [
m for m in result["required_methods"]
if m not in result["completed_methods"]
]
result["next_method"] = remaining_methods[0] if remaining_methods else None
return result
def extract_method_credentials(self, credentials: Dict[str, Any],
method: AuthenticationMethod) -> Dict[str, Any]:
"""提取特定认证方法的凭据"""
base_creds = {"username": credentials.get("username")}
if method == AuthenticationMethod.PASSWORD:
base_creds["password"] = credentials.get("password")
elif method == AuthenticationMethod.TOTP:
base_creds["totp_code"] = credentials.get("totp_code")
elif method == AuthenticationMethod.SMS:
base_creds["sms_code"] = credentials.get("sms_code")
return base_creds
# RBAC权限控制系统
class RBACManager:
"""基于角色的访问控制管理器"""
def __init__(self):
# 角色权限映射
self.role_permissions = {
Role.GUEST: {Permission.READ},
Role.USER: {Permission.READ, Permission.WRITE},
Role.MODERATOR: {Permission.READ, Permission.WRITE, Permission.EXECUTE},
Role.ADMIN: {Permission.READ, Permission.WRITE, Permission.EXECUTE, Permission.DELETE},
Role.SUPER_ADMIN: {Permission.READ, Permission.WRITE, Permission.EXECUTE, Permission.DELETE, Permission.ADMIN}
}
# 资源权限要求
self.resource_permissions = {
"/api/users": {Permission.READ, Permission.ADMIN},
"/api/tools": {Permission.EXECUTE},
"/api/config": {Permission.ADMIN},
"/api/logs": {Permission.READ, Permission.ADMIN}
}
# 动态权限规则
self.dynamic_rules = []
def check_permission(self, user_role: Role, resource: str,
action: Permission, context: Dict[str, Any] = None) -> bool:
"""检查权限"""
# 1. 检查角色基础权限
role_perms = self.role_permissions.get(user_role, set())
if action not in role_perms:
return False
# 2. 检查资源特定权限
resource_perms = self.resource_permissions.get(resource, set())
if resource_perms and not resource_perms.intersection(role_perms):
return False
# 3. 检查动态权限规则
for rule in self.dynamic_rules:
if not rule.evaluate(user_role, resource, action, context):
return False
return True
def add_dynamic_rule(self, rule):
"""添加动态权限规则"""
self.dynamic_rules.append(rule)
def get_user_permissions(self, user_role: Role) -> Set[Permission]:
"""获取用户权限"""
return self.role_permissions.get(user_role, set())
def can_access_resource(self, user_role: Role, resource: str) -> bool:
"""检查是否可以访问资源"""
role_perms = self.role_permissions.get(user_role, set())
resource_perms = self.resource_permissions.get(resource, set())
# 如果资源没有特定权限要求,则允许访问
if not resource_perms:
return True
# 检查是否有交集
return bool(resource_perms.intersection(role_perms))
# 动态权限规则示例
class TimeBasedAccessRule:
"""基于时间的访问规则"""
def __init__(self, allowed_hours: List[int]):
self.allowed_hours = allowed_hours
def evaluate(self, user_role: Role, resource: str,
action: Permission, context: Dict[str, Any]) -> bool:
"""评估规则"""
from datetime import datetime
current_hour = datetime.now().hour
return current_hour in self.allowed_hours
class IPBasedAccessRule:
"""基于IP的访问规则"""
def __init__(self, allowed_networks: List[str]):
import ipaddress
self.allowed_networks = [
ipaddress.ip_network(net) for net in allowed_networks
]
def evaluate(self, user_role: Role, resource: str,
action: Permission, context: Dict[str, Any]) -> bool:
"""评估规则"""
if not context or "client_ip" not in context:
return False
try:
import ipaddress
client_ip = ipaddress.ip_address(context["client_ip"])
return any(client_ip in network for network in self.allowed_networks)
except ValueError:
return False
🔍 数据层防护
🔐 数据加密与脱敏
python
# 数据加密与脱敏系统
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import os
import base64
import re
from typing import Dict, Any, List, Optional
class DataEncryption:
"""数据加密管理器"""
def __init__(self, master_key: Optional[str] = None):
if master_key:
self.key = master_key.encode()
else:
self.key = self.generate_key()
self.fernet = Fernet(base64.urlsafe_b64encode(self.key[:32]))
# 敏感数据字段配置
self.sensitive_fields = {
"password", "secret", "token", "key", "credential",
"ssn", "credit_card", "phone", "email", "address"
}
def generate_key(self) -> bytes:
"""生成加密密钥"""
salt = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
# 在实际应用中,应该从安全的地方获取密码
password = os.environ.get("ENCRYPTION_PASSWORD", "default_password").encode()
return kdf.derive(password)
def encrypt_data(self, data: str) -> str:
"""加密数据"""
if not data:
return data
encrypted_data = self.fernet.encrypt(data.encode())
return base64.b64encode(encrypted_data).decode()
def decrypt_data(self, encrypted_data: str) -> str:
"""解密数据"""
if not encrypted_data:
return encrypted_data
try:
decoded_data = base64.b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(decoded_data)
return decrypted_data.decode()
except Exception as e:
logger.error(f"数据解密失败: {e}")
return None
def encrypt_dict(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""加密字典中的敏感数据"""
encrypted_data = {}
for key, value in data.items():
if self.is_sensitive_field(key):
if isinstance(value, str):
encrypted_data[key] = self.encrypt_data(value)
else:
encrypted_data[key] = value
elif isinstance(value, dict):
encrypted_data[key] = self.encrypt_dict(value)
elif isinstance(value, list):
encrypted_data[key] = [
self.encrypt_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
encrypted_data[key] = value
return encrypted_data
def decrypt_dict(self, encrypted_data: Dict[str, Any]) -> Dict[str, Any]:
"""解密字典中的敏感数据"""
decrypted_data = {}
for key, value in encrypted_data.items():
if self.is_sensitive_field(key) and isinstance(value, str):
decrypted_data[key] = self.decrypt_data(value)
elif isinstance(value, dict):
decrypted_data[key] = self.decrypt_dict(value)
elif isinstance(value, list):
decrypted_data[key] = [
self.decrypt_dict(item) if isinstance(item, dict) else item
for item in value
]
else:
decrypted_data[key] = value
return decrypted_data
def is_sensitive_field(self, field_name: str) -> bool:
"""判断字段是否敏感"""
field_lower = field_name.lower()
return any(sensitive in field_lower for sensitive in self.sensitive_fields)
class DataMasking:
"""数据脱敏管理器"""
def __init__(self):
# 脱敏规则配置
self.masking_rules = {
"email": self.mask_email,
"phone": self.mask_phone,
"credit_card": self.mask_credit_card,
"ssn": self.mask_ssn,
"name": self.mask_name,
"address": self.mask_address,
"ip_address": self.mask_ip
}
# 正则表达式模式
self.patterns = {
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone": r"\b\d{3}-\d{3}-\d{4}\b|\b\(\d{3}\)\s*\d{3}-\d{4}\b",
"credit_card": r"\b\d{4}[\s-]?\d{4}[\s-]?\d{4}[\s-]?\d{4}\b",
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"
}
def mask_data(self, data: str, data_type: str = "auto") -> str:
"""脱敏数据"""
if not data:
return data
if data_type == "auto":
# 自动检测数据类型并脱敏
for pattern_type, pattern in self.patterns.items():
if re.search(pattern, data):
masking_func = self.masking_rules.get(pattern_type)
if masking_func:
data = masking_func(data)
else:
# 使用指定的脱敏规则
masking_func = self.masking_rules.get(data_type)
if masking_func:
data = masking_func(data)
return data
def mask_email(self, text: str) -> str:
"""脱敏邮箱地址"""
def replace_email(match):
email = match.group()
username, domain = email.split('@')
if len(username) <= 2:
masked_username = '*' * len(username)
else:
masked_username = username[0] + '*' * (len(username) - 2) + username[-1]
return f"{masked_username}@{domain}"
return re.sub(self.patterns["email"], replace_email, text)
def mask_phone(self, text: str) -> str:
"""脱敏电话号码"""
def replace_phone(match):
phone = match.group()
# 保留前3位和后4位,中间用*替换
digits = re.sub(r'\D', '', phone)
if len(digits) >= 7:
return f"{digits[:3]}***{digits[-4:]}"
return '*' * len(digits)
return re.sub(self.patterns["phone"], replace_phone, text)
def mask_credit_card(self, text: str) -> str:
"""脱敏信用卡号"""
def replace_card(match):
card = match.group()
digits = re.sub(r'\D', '', card)
if len(digits) >= 8:
return f"****-****-****-{digits[-4:]}"
return '*' * len(card)
return re.sub(self.patterns["credit_card"], replace_card, text)
def mask_ssn(self, text: str) -> str:
"""脱敏社会保险号"""
def replace_ssn(match):
ssn = match.group()
return "***-**-" + ssn[-4:]
return re.sub(self.patterns["ssn"], replace_ssn, text)
def mask_name(self, name: str) -> str:
"""脱敏姓名"""
if len(name) <= 1:
return '*'
elif len(name) == 2:
return name[0] + '*'
else:
return name[0] + '*' * (len(name) - 2) + name[-1]
def mask_address(self, address: str) -> str:
"""脱敏地址"""
# 保留地区信息,隐藏具体地址
words = address.split()
if len(words) > 2:
return f"{words[0]} *** {words[-1]}"
return "***"
def mask_ip(self, text: str) -> str:
"""脱敏IP地址"""
def replace_ip(match):
ip = match.group()
parts = ip.split('.')
return f"{parts[0]}.{parts[1]}.***.**"
return re.sub(self.patterns["ip_address"], replace_ip, text)
def mask_dict(self, data: Dict[str, Any], fields_to_mask: List[str] = None) -> Dict[str, Any]:
"""脱敏字典数据"""
if fields_to_mask is None:
fields_to_mask = []
masked_data = {}
for key, value in data.items():
if key.lower() in [field.lower() for field in fields_to_mask]:
if isinstance(value, str):
masked_data[key] = self.mask_data(value)
else:
masked_data[key] = "***"
elif isinstance(value, dict):
masked_data[key] = self.mask_dict(value, fields_to_mask)
elif isinstance(value, list):
masked_data[key] = [
self.mask_dict(item, fields_to_mask) if isinstance(item, dict)
else self.mask_data(str(item)) if key.lower() in [field.lower() for field in fields_to_mask]
else item
for item in value
]
else:
masked_data[key] = value
return masked_data
# 数据访问审计
class DataAccessAuditor:
"""数据访问审计器"""
def __init__(self, audit_logger):
self.audit_logger = audit_logger
self.sensitive_tables = {
"users", "passwords", "tokens", "credentials",
"personal_info", "financial_data", "medical_records"
}
def audit_data_access(self, user_id: str, operation: str,
table_name: str, record_id: str = None,
data_sample: Dict[str, Any] = None):
"""审计数据访问"""
audit_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"operation": operation,
"table_name": table_name,
"record_id": record_id,
"is_sensitive": table_name.lower() in self.sensitive_tables,
"data_sample": self.sanitize_data_sample(data_sample) if data_sample else None,
"client_ip": self.get_client_ip(),
"session_id": self.get_session_id()
}
# 记录审计日志
if audit_entry["is_sensitive"]:
self.audit_logger.warning(f"敏感数据访问: {json.dumps(audit_entry)}")
else:
self.audit_logger.info(f"数据访问: {json.dumps(audit_entry)}")
# 检查异常访问模式
self.detect_anomalous_access(audit_entry)
def sanitize_data_sample(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""清理数据样本"""
masker = DataMasking()
# 只保留字段名和数据类型,不保留实际值
sanitized = {}
for key, value in data.items():
if isinstance(value, str):
sanitized[key] = f"<string:{len(value)}>"
elif isinstance(value, (int, float)):
sanitized[key] = f"<number:{type(value).__name__}>"
elif isinstance(value, (list, tuple)):
sanitized[key] = f"<array:{len(value)}>"
elif isinstance(value, dict):
sanitized[key] = f"<object:{len(value)}>"
else:
sanitized[key] = f"<{type(value).__name__}>"
return sanitized
def detect_anomalous_access(self, audit_entry: Dict[str, Any]):
"""检测异常访问模式"""
# 这里可以实现异常检测逻辑
# 例如:频繁访问、大量数据下载、异常时间访问等
pass
def get_client_ip(self) -> str:
"""获取客户端IP"""
# 从请求上下文获取
return "127.0.0.1" # 示例
def get_session_id(self) -> str:
"""获取会话ID"""
# 从请求上下文获取
return "session_123" # 示例
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 多层防护架构:网络层、应用层、数据层、AI层的综合防护
✅ DDoS攻击防护:智能攻击检测和自动化响应机制
✅ WAF防火墙:全面的Web应用攻击防护
✅ 统一认证授权:多因素认证和RBAC权限控制
✅ 数据加密脱敏:敏感数据的全方位保护
✅ 访问审计:完整的数据访问监控和记录
🤔 思考题
- 防护平衡:如何在安全防护和用户体验之间找到最佳平衡点?
- 性能优化:多层防护可能影响性能,如何进行优化?
- 适应性防护:如何让防护系统能够适应新的攻击模式?
多层防护,层层守护! 🏰 安全不是一道墙,而是一套完整的防御体系。