6.5 自动化安全响应
🎯 学习目标:构建智能化的自动安全响应和事件处置体系
⏱️ 预计时间:40分钟
📊 难度等级:⭐⭐⭐⭐⭐
🤖 自动化响应架构设计
🏗️ 智能响应系统架构
🔧 智能响应决策引擎
🧠 核心响应系统
python
# 智能自动化安全响应系统
import asyncio
import json
import time
import uuid
from typing import Dict, List, Any, Optional, Callable, Tuple, Union
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import logging
from abc import ABC, abstractmethod
import numpy as np
from collections import defaultdict, deque
import threading
import yaml
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import requests
import subprocess
logger = logging.getLogger(__name__)
class ResponseAction(Enum):
"""响应动作类型"""
BLOCK_IP = "block_ip"
RATE_LIMIT = "rate_limit"
ISOLATE_USER = "isolate_user"
QUARANTINE_SYSTEM = "quarantine_system"
COLLECT_EVIDENCE = "collect_evidence"
NOTIFY_ADMIN = "notify_admin"
ESCALATE_ALERT = "escalate_alert"
RESET_PASSWORD = "reset_password"
DISABLE_ACCOUNT = "disable_account"
BACKUP_DATA = "backup_data"
ROLLBACK_CHANGES = "rollback_changes"
PATCH_VULNERABILITY = "patch_vulnerability"
class ResponsePriority(Enum):
"""响应优先级"""
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
EMERGENCY = 5
class ResponseStatus(Enum):
"""响应状态"""
PENDING = "pending"
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
CANCELLED = "cancelled"
TIMEOUT = "timeout"
@dataclass
class SecurityIncident:
"""安全事件"""
incident_id: str
event_type: str
severity: int
source_ip: Optional[str] = None
target_resource: Optional[str] = None
user_id: Optional[str] = None
timestamp: datetime = field(default_factory=datetime.utcnow)
evidence: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
'incident_id': self.incident_id,
'event_type': self.event_type,
'severity': self.severity,
'source_ip': self.source_ip,
'target_resource': self.target_resource,
'user_id': self.user_id,
'timestamp': self.timestamp.isoformat(),
'evidence': self.evidence,
'metadata': self.metadata
}
@dataclass
class ResponsePlan:
"""响应计划"""
plan_id: str
incident_id: str
actions: List['ResponseAction']
priority: ResponsePriority
estimated_duration: int # 秒
dependencies: List[str] = field(default_factory=list)
conditions: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
if not self.actions:
self.actions = []
@dataclass
class ResponseExecution:
"""响应执行"""
execution_id: str
plan_id: str
status: ResponseStatus
start_time: datetime
end_time: Optional[datetime] = None
results: Dict[str, Any] = field(default_factory=dict)
error_message: Optional[str] = None
@property
def duration(self) -> Optional[float]:
"""执行时长(秒)"""
if self.end_time:
return (self.end_time - self.start_time).total_seconds()
return None
class AutomatedSecurityResponse:
"""自动化安全响应系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.is_running = False
# 初始化组件
self.decision_engine = ResponseDecisionEngine(config.get('decision', {}))
self.orchestrator = ResponseOrchestrator(config.get('orchestrator', {}))
self.executor_pool = ExecutorPool(config.get('executors', {}))
self.learning_engine = ResponseLearningEngine(config.get('learning', {}))
# 事件队列
self.incident_queue = asyncio.Queue(maxsize=1000)
self.response_workers = []
# 执行历史
self.execution_history = deque(maxlen=10000)
# 响应策略
self.response_policies = self._load_response_policies()
# 统计指标
self.metrics = {
'incidents_processed': 0,
'responses_executed': 0,
'success_rate': 0.0,
'avg_response_time': 0.0
}
async def start(self):
"""启动自动化响应系统"""
self.is_running = True
logger.info("自动化安全响应系统启动")
# 启动组件
await self.decision_engine.start()
await self.orchestrator.start()
await self.executor_pool.start()
await self.learning_engine.start()
# 启动工作进程
num_workers = self.config.get('workers', 3)
for i in range(num_workers):
worker = asyncio.create_task(self._process_incidents())
self.response_workers.append(worker)
logger.info(f"自动化响应系统已启动,{num_workers}个工作进程")
async def stop(self):
"""停止自动化响应系统"""
self.is_running = False
logger.info("正在停止自动化响应系统...")
# 停止组件
await self.decision_engine.stop()
await self.orchestrator.stop()
await self.executor_pool.stop()
await self.learning_engine.stop()
# 停止工作进程
for worker in self.response_workers:
worker.cancel()
await asyncio.gather(*self.response_workers, return_exceptions=True)
logger.info("自动化响应系统已停止")
async def handle_incident(self, incident: SecurityIncident):
"""处理安全事件"""
try:
await self.incident_queue.put(incident)
logger.info(f"安全事件已入队: {incident.incident_id}")
except asyncio.QueueFull:
logger.error("事件队列已满,丢弃事件")
async def _process_incidents(self):
"""处理安全事件"""
while self.is_running:
try:
# 获取事件
incident = await asyncio.wait_for(
self.incident_queue.get(),
timeout=1.0
)
start_time = time.time()
# 处理事件
await self._handle_security_incident(incident)
# 更新指标
self.metrics['incidents_processed'] += 1
processing_time = time.time() - start_time
self._update_avg_response_time(processing_time)
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"处理安全事件时发生错误: {e}")
async def _handle_security_incident(self, incident: SecurityIncident):
"""处理单个安全事件"""
logger.info(f"处理安全事件: {incident.incident_id}")
try:
# 1. 决策阶段:生成响应计划
response_plan = await self.decision_engine.generate_response_plan(incident)
if not response_plan:
logger.info(f"事件 {incident.incident_id} 无需自动响应")
return
# 2. 编排阶段:优化执行顺序
optimized_plan = await self.orchestrator.optimize_plan(response_plan)
# 3. 执行阶段:执行响应动作
execution = await self.executor_pool.execute_plan(optimized_plan)
# 4. 记录执行结果
self.execution_history.append(execution)
# 5. 学习阶段:更新策略
await self.learning_engine.learn_from_execution(incident, execution)
# 6. 更新指标
self._update_success_rate(execution.status == ResponseStatus.SUCCESS)
logger.info(f"事件 {incident.incident_id} 响应完成: {execution.status.value}")
except Exception as e:
logger.error(f"处理安全事件失败 {incident.incident_id}: {e}")
def _load_response_policies(self) -> Dict[str, Any]:
"""加载响应策略"""
default_policies = {
'brute_force': {
'actions': [ResponseAction.BLOCK_IP, ResponseAction.NOTIFY_ADMIN],
'conditions': {'failure_count': {'min': 5}},
'priority': ResponsePriority.HIGH,
'auto_execute': True
},
'sql_injection': {
'actions': [ResponseAction.BLOCK_IP, ResponseAction.COLLECT_EVIDENCE,
ResponseAction.NOTIFY_ADMIN],
'conditions': {'confidence': {'min': 0.8}},
'priority': ResponsePriority.CRITICAL,
'auto_execute': True
},
'data_breach': {
'actions': [ResponseAction.QUARANTINE_SYSTEM, ResponseAction.BACKUP_DATA,
ResponseAction.ESCALATE_ALERT, ResponseAction.COLLECT_EVIDENCE],
'conditions': {},
'priority': ResponsePriority.EMERGENCY,
'auto_execute': False # 需要人工确认
},
'malware_detected': {
'actions': [ResponseAction.ISOLATE_USER, ResponseAction.COLLECT_EVIDENCE,
ResponseAction.NOTIFY_ADMIN],
'conditions': {'threat_score': {'min': 0.7}},
'priority': ResponsePriority.HIGH,
'auto_execute': True
}
}
# 从配置文件加载(如果存在)
policy_file = self.config.get('policy_file')
if policy_file:
try:
with open(policy_file, 'r', encoding='utf-8') as f:
custom_policies = yaml.safe_load(f)
default_policies.update(custom_policies)
except Exception as e:
logger.error(f"加载策略文件失败: {e}")
return default_policies
def _update_avg_response_time(self, processing_time: float):
"""更新平均响应时间"""
current_avg = self.metrics['avg_response_time']
count = self.metrics['incidents_processed']
if count == 1:
self.metrics['avg_response_time'] = processing_time
else:
# 指数移动平均
alpha = 0.1
self.metrics['avg_response_time'] = (1 - alpha) * current_avg + alpha * processing_time
def _update_success_rate(self, success: bool):
"""更新成功率"""
self.metrics['responses_executed'] += 1
total_responses = self.metrics['responses_executed']
if success:
current_successes = self.metrics['success_rate'] * (total_responses - 1)
self.metrics['success_rate'] = (current_successes + 1) / total_responses
else:
current_successes = self.metrics['success_rate'] * (total_responses - 1)
self.metrics['success_rate'] = current_successes / total_responses
class ResponseDecisionEngine:
"""响应决策引擎"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# 决策模型
self.decision_models = {
'rule_based': RuleBasedDecisionModel(),
'ml_based': MLBasedDecisionModel(config.get('ml_model', {})),
'hybrid': HybridDecisionModel()
}
self.current_model = self.decision_models[
config.get('model_type', 'hybrid')
]
# 风险评估器
self.risk_assessor = RiskAssessment()
async def start(self):
"""启动决策引擎"""
logger.info("响应决策引擎启动")
await self.current_model.initialize()
async def stop(self):
"""停止决策引擎"""
logger.info("响应决策引擎停止")
async def generate_response_plan(self, incident: SecurityIncident) -> Optional[ResponsePlan]:
"""生成响应计划"""
# 1. 风险评估
risk_score = await self.risk_assessor.assess_incident_risk(incident)
# 2. 决策模型判断
decision = await self.current_model.make_decision(incident, risk_score)
if not decision.should_respond:
return None
# 3. 生成响应计划
plan_id = f"PLAN-{uuid.uuid4().hex[:8]}"
response_plan = ResponsePlan(
plan_id=plan_id,
incident_id=incident.incident_id,
actions=decision.recommended_actions,
priority=decision.priority,
estimated_duration=self._estimate_duration(decision.recommended_actions),
conditions=decision.conditions
)
logger.info(f"生成响应计划: {plan_id}, 动作数量: {len(decision.recommended_actions)}")
return response_plan
def _estimate_duration(self, actions: List[ResponseAction]) -> int:
"""估算执行时长"""
# 基于历史数据的预估时长(秒)
duration_map = {
ResponseAction.BLOCK_IP: 5,
ResponseAction.RATE_LIMIT: 3,
ResponseAction.ISOLATE_USER: 10,
ResponseAction.QUARANTINE_SYSTEM: 30,
ResponseAction.COLLECT_EVIDENCE: 60,
ResponseAction.NOTIFY_ADMIN: 2,
ResponseAction.ESCALATE_ALERT: 5,
ResponseAction.RESET_PASSWORD: 8,
ResponseAction.DISABLE_ACCOUNT: 6,
ResponseAction.BACKUP_DATA: 120,
ResponseAction.ROLLBACK_CHANGES: 45,
ResponseAction.PATCH_VULNERABILITY: 300
}
return sum(duration_map.get(action, 10) for action in actions)
@dataclass
class Decision:
"""决策结果"""
should_respond: bool
recommended_actions: List[ResponseAction] = field(default_factory=list)
priority: ResponsePriority = ResponsePriority.MEDIUM
confidence: float = 0.0
conditions: Dict[str, Any] = field(default_factory=dict)
reasoning: str = ""
class DecisionModel(ABC):
"""决策模型抽象基类"""
@abstractmethod
async def initialize(self):
"""初始化模型"""
pass
@abstractmethod
async def make_decision(self, incident: SecurityIncident,
risk_score: float) -> Decision:
"""做出决策"""
pass
class RuleBasedDecisionModel(DecisionModel):
"""基于规则的决策模型"""
def __init__(self):
self.rules = self._load_decision_rules()
async def initialize(self):
"""初始化模型"""
logger.info("规则决策模型初始化完成")
async def make_decision(self, incident: SecurityIncident,
risk_score: float) -> Decision:
"""基于规则做出决策"""
for rule in self.rules:
if rule.matches(incident, risk_score):
decision = rule.get_decision(incident, risk_score)
logger.debug(f"匹配规则: {rule.name}, 决策: {decision.should_respond}")
return decision
# 默认决策
return Decision(
should_respond=False,
reasoning="未匹配到任何响应规则"
)
def _load_decision_rules(self) -> List['DecisionRule']:
"""加载决策规则"""
return [
BruteForceDecisionRule(),
SQLInjectionDecisionRule(),
DataBreachDecisionRule(),
MalwareDecisionRule(),
DDoSDecisionRule(),
AnomalousAccessDecisionRule()
]
class MLBasedDecisionModel(DecisionModel):
"""基于机器学习的决策模型"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.model = None
self.feature_scaler = None
async def initialize(self):
"""初始化模型"""
# 这里应该加载预训练的ML模型
# 为了简化,我们使用模拟模型
logger.info("ML决策模型初始化完成")
async def make_decision(self, incident: SecurityIncident,
risk_score: float) -> Decision:
"""基于ML模型做出决策"""
# 提取特征
features = self._extract_features(incident, risk_score)
# 模型预测(模拟)
should_respond_prob = self._predict_response_probability(features)
action_probs = self._predict_action_probabilities(features)
if should_respond_prob > 0.7:
# 选择概率最高的动作
recommended_actions = self._select_actions(action_probs)
priority = self._determine_priority(risk_score, should_respond_prob)
return Decision(
should_respond=True,
recommended_actions=recommended_actions,
priority=priority,
confidence=should_respond_prob,
reasoning=f"ML模型预测 (置信度: {should_respond_prob:.2f})"
)
return Decision(
should_respond=False,
confidence=1.0 - should_respond_prob,
reasoning=f"ML模型预测无需响应 (置信度: {1.0 - should_respond_prob:.2f})"
)
def _extract_features(self, incident: SecurityIncident, risk_score: float) -> List[float]:
"""提取特征"""
return [
incident.severity,
risk_score,
hash(incident.event_type) % 100,
hash(incident.source_ip or '') % 100,
len(incident.evidence),
incident.timestamp.hour,
incident.timestamp.weekday()
]
def _predict_response_probability(self, features: List[float]) -> float:
"""预测响应概率(模拟)"""
# 简单的启发式预测
severity_weight = features[0] / 5.0
risk_weight = features[1]
prob = (severity_weight * 0.6 + risk_weight * 0.4)
return min(max(prob, 0.0), 1.0)
def _predict_action_probabilities(self, features: List[float]) -> Dict[ResponseAction, float]:
"""预测动作概率(模拟)"""
# 基于特征的简单概率分配
base_prob = features[1] # 风险分数
return {
ResponseAction.BLOCK_IP: base_prob * 0.8,
ResponseAction.NOTIFY_ADMIN: base_prob * 0.9,
ResponseAction.COLLECT_EVIDENCE: base_prob * 0.7,
ResponseAction.ISOLATE_USER: base_prob * 0.6,
ResponseAction.RATE_LIMIT: base_prob * 0.5
}
def _select_actions(self, action_probs: Dict[ResponseAction, float]) -> List[ResponseAction]:
"""选择动作"""
threshold = 0.5
return [action for action, prob in action_probs.items() if prob > threshold]
def _determine_priority(self, risk_score: float, confidence: float) -> ResponsePriority:
"""确定优先级"""
score = risk_score * confidence
if score > 0.9:
return ResponsePriority.EMERGENCY
elif score > 0.7:
return ResponsePriority.CRITICAL
elif score > 0.5:
return ResponsePriority.HIGH
elif score > 0.3:
return ResponsePriority.MEDIUM
else:
return ResponsePriority.LOW
class ResponseOrchestrator:
"""响应编排器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
async def start(self):
"""启动编排器"""
logger.info("响应编排器启动")
async def stop(self):
"""停止编排器"""
logger.info("响应编排器停止")
async def optimize_plan(self, plan: ResponsePlan) -> ResponsePlan:
"""优化执行计划"""
# 1. 依赖关系分析
optimized_actions = self._resolve_dependencies(plan.actions)
# 2. 并发执行优化
parallel_groups = self._group_for_parallel_execution(optimized_actions)
# 3. 资源分配优化
optimized_plan = self._optimize_resource_allocation(plan, parallel_groups)
logger.debug(f"计划优化完成: {plan.plan_id}")
return optimized_plan
def _resolve_dependencies(self, actions: List[ResponseAction]) -> List[ResponseAction]:
"""解决动作依赖关系"""
# 定义动作依赖关系
dependencies = {
ResponseAction.COLLECT_EVIDENCE: [], # 优先执行
ResponseAction.BACKUP_DATA: [],
ResponseAction.BLOCK_IP: [ResponseAction.COLLECT_EVIDENCE],
ResponseAction.ISOLATE_USER: [ResponseAction.COLLECT_EVIDENCE],
ResponseAction.QUARANTINE_SYSTEM: [ResponseAction.BACKUP_DATA, ResponseAction.COLLECT_EVIDENCE],
ResponseAction.NOTIFY_ADMIN: [], # 可以随时执行
ResponseAction.ROLLBACK_CHANGES: [ResponseAction.BACKUP_DATA]
}
# 拓扑排序
sorted_actions = []
remaining_actions = set(actions)
while remaining_actions:
# 找到没有未满足依赖的动作
ready_actions = []
for action in remaining_actions:
deps = dependencies.get(action, [])
if all(dep in sorted_actions or dep not in actions for dep in deps):
ready_actions.append(action)
if not ready_actions:
# 如果有循环依赖,按原顺序处理剩余动作
ready_actions = list(remaining_actions)
# 添加就绪的动作
for action in ready_actions:
sorted_actions.append(action)
remaining_actions.remove(action)
return sorted_actions
def _group_for_parallel_execution(self, actions: List[ResponseAction]) -> List[List[ResponseAction]]:
"""分组以支持并发执行"""
# 可以并发执行的動作組
parallel_compatible = {
ResponseAction.NOTIFY_ADMIN,
ResponseAction.COLLECT_EVIDENCE,
ResponseAction.BACKUP_DATA
}
groups = []
current_group = []
for action in actions:
if action in parallel_compatible and current_group and \
all(a in parallel_compatible for a in current_group):
# 可以加入当前并发组
current_group.append(action)
else:
# 开始新的组
if current_group:
groups.append(current_group)
current_group = [action]
if current_group:
groups.append(current_group)
return groups
def _optimize_resource_allocation(self, plan: ResponsePlan,
parallel_groups: List[List[ResponseAction]]) -> ResponsePlan:
"""优化资源分配"""
# 更新计划中的动作顺序
optimized_actions = []
for group in parallel_groups:
optimized_actions.extend(group)
# 重新计算估计时长
total_duration = 0
for group in parallel_groups:
# 并发组中的最长时间
group_duration = max(self._get_action_duration(action) for action in group)
total_duration += group_duration
plan.actions = optimized_actions
plan.estimated_duration = total_duration
return plan
def _get_action_duration(self, action: ResponseAction) -> int:
"""获取动作执行时长"""
duration_map = {
ResponseAction.BLOCK_IP: 5,
ResponseAction.RATE_LIMIT: 3,
ResponseAction.ISOLATE_USER: 10,
ResponseAction.QUARANTINE_SYSTEM: 30,
ResponseAction.COLLECT_EVIDENCE: 60,
ResponseAction.NOTIFY_ADMIN: 2,
ResponseAction.ESCALATE_ALERT: 5,
ResponseAction.RESET_PASSWORD: 8,
ResponseAction.DISABLE_ACCOUNT: 6,
ResponseAction.BACKUP_DATA: 120,
ResponseAction.ROLLBACK_CHANGES: 45,
ResponseAction.PATCH_VULNERABILITY: 300
}
return duration_map.get(action, 10)
class ExecutorPool:
"""执行器池"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.executors = {}
self.executor_queue = asyncio.Queue()
self.is_running = False
async def start(self):
"""启动执行器池"""
self.is_running = True
logger.info("执行器池启动")
# 初始化执行器
self.executors = {
ResponseAction.BLOCK_IP: IPBlockExecutor(self.config.get('ip_block', {})),
ResponseAction.RATE_LIMIT: RateLimitExecutor(self.config.get('rate_limit', {})),
ResponseAction.ISOLATE_USER: UserIsolationExecutor(self.config.get('user_isolation', {})),
ResponseAction.QUARANTINE_SYSTEM: SystemQuarantineExecutor(self.config.get('quarantine', {})),
ResponseAction.COLLECT_EVIDENCE: EvidenceCollectionExecutor(self.config.get('evidence', {})),
ResponseAction.NOTIFY_ADMIN: AdminNotificationExecutor(self.config.get('notification', {})),
ResponseAction.ESCALATE_ALERT: AlertEscalationExecutor(self.config.get('escalation', {})),
ResponseAction.RESET_PASSWORD: PasswordResetExecutor(self.config.get('password_reset', {})),
ResponseAction.DISABLE_ACCOUNT: AccountDisableExecutor(self.config.get('account_disable', {})),
ResponseAction.BACKUP_DATA: DataBackupExecutor(self.config.get('backup', {}))
}
# 启动执行器
for executor in self.executors.values():
await executor.initialize()
async def stop(self):
"""停止执行器池"""
self.is_running = False
# 停止执行器
for executor in self.executors.values():
await executor.cleanup()
logger.info("执行器池停止")
async def execute_plan(self, plan: ResponsePlan) -> ResponseExecution:
"""执行响应计划"""
execution = ResponseExecution(
execution_id=f"EXEC-{uuid.uuid4().hex[:8]}",
plan_id=plan.plan_id,
status=ResponseStatus.RUNNING,
start_time=datetime.utcnow()
)
logger.info(f"开始执行响应计划: {plan.plan_id}")
try:
results = {}
for action in plan.actions:
executor = self.executors.get(action)
if not executor:
logger.error(f"未找到执行器: {action}")
continue
# 执行动作
action_result = await executor.execute(plan, execution)
results[action.value] = action_result
# 检查是否失败
if not action_result.get('success', False):
logger.error(f"动作执行失败: {action}")
if action_result.get('critical', False):
# 关键动作失败,终止执行
execution.status = ResponseStatus.FAILED
execution.error_message = action_result.get('error', '关键动作执行失败')
break
# 设置最终状态
if execution.status == ResponseStatus.RUNNING:
execution.status = ResponseStatus.SUCCESS
execution.results = results
except Exception as e:
logger.error(f"执行响应计划失败: {e}")
execution.status = ResponseStatus.FAILED
execution.error_message = str(e)
finally:
execution.end_time = datetime.utcnow()
logger.info(f"响应计划执行完成: {plan.plan_id}, 状态: {execution.status.value}")
return execution
# 具体执行器实现
class ResponseExecutor(ABC):
"""响应执行器抽象基类"""
def __init__(self, config: Dict[str, Any]):
self.config = config
@abstractmethod
async def initialize(self):
"""初始化执行器"""
pass
@abstractmethod
async def execute(self, plan: ResponsePlan, execution: ResponseExecution) -> Dict[str, Any]:
"""执行响应动作"""
pass
@abstractmethod
async def cleanup(self):
"""清理资源"""
pass
class IPBlockExecutor(ResponseExecutor):
"""IP阻断执行器"""
async def initialize(self):
"""初始化执行器"""
logger.info("IP阻断执行器初始化完成")
async def execute(self, plan: ResponsePlan, execution: ResponseExecution) -> Dict[str, Any]:
"""执行IP阻断"""
try:
# 从事件中获取源IP
# 这里应该集成实际的防火墙或WAF API
# 模拟执行
await asyncio.sleep(1) # 模拟网络延迟
logger.info(f"IP阻断执行成功: 计划 {plan.plan_id}")
return {
'success': True,
'action': 'IP阻断',
'details': '已成功阻断恶意IP地址',
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"IP阻断执行失败: {e}")
return {
'success': False,
'error': str(e),
'critical': True # 这是关键动作
}
async def cleanup(self):
"""清理资源"""
pass
class AdminNotificationExecutor(ResponseExecutor):
"""管理员通知执行器"""
async def initialize(self):
"""初始化执行器"""
logger.info("管理员通知执行器初始化完成")
async def execute(self, plan: ResponsePlan, execution: ResponseExecution) -> Dict[str, Any]:
"""发送管理员通知"""
try:
# 构建通知消息
message = self._build_notification_message(plan, execution)
# 发送通知(邮件、短信、即时消息等)
await self._send_email_notification(message)
await self._send_slack_notification(message)
logger.info(f"管理员通知发送成功: 计划 {plan.plan_id}")
return {
'success': True,
'action': '管理员通知',
'details': '已成功发送安全事件通知',
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"管理员通知发送失败: {e}")
return {
'success': False,
'error': str(e),
'critical': False # 非关键动作
}
def _build_notification_message(self, plan: ResponsePlan,
execution: ResponseExecution) -> str:
"""构建通知消息"""
return f"""
🚨 安全事件自动响应通知
事件ID: {plan.incident_id}
响应计划: {plan.plan_id}
执行ID: {execution.execution_id}
优先级: {plan.priority.name}
开始时间: {execution.start_time.strftime('%Y-%m-%d %H:%M:%S')}
已执行的响应动作:
{', '.join([action.value for action in plan.actions])}
请及时查看系统状态并采取必要的后续措施。
"""
async def _send_email_notification(self, message: str):
"""发送邮件通知"""
# 这里应该集成实际的邮件服务
pass
async def _send_slack_notification(self, message: str):
"""发送Slack通知"""
# 这里应该集成Slack API
pass
async def cleanup(self):
"""清理资源"""
pass
class EvidenceCollectionExecutor(ResponseExecutor):
"""证据收集执行器"""
async def initialize(self):
"""初始化执行器"""
logger.info("证据收集执行器初始化完成")
async def execute(self, plan: ResponsePlan, execution: ResponseExecution) -> Dict[str, Any]:
"""收集证据"""
try:
evidence_data = {}
# 收集系统日志
evidence_data['system_logs'] = await self._collect_system_logs()
# 收集网络数据
evidence_data['network_data'] = await self._collect_network_data()
# 收集进程信息
evidence_data['process_info'] = await self._collect_process_info()
# 保存证据
evidence_file = await self._save_evidence(evidence_data, execution.execution_id)
logger.info(f"证据收集完成: {evidence_file}")
return {
'success': True,
'action': '证据收集',
'details': f'已收集并保存证据文件: {evidence_file}',
'evidence_file': evidence_file,
'timestamp': datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"证据收集失败: {e}")
return {
'success': False,
'error': str(e),
'critical': False
}
async def _collect_system_logs(self) -> Dict[str, Any]:
"""收集系统日志"""
# 模拟收集
return {
'log_entries': ['模拟日志条目1', '模拟日志条目2'],
'collection_time': datetime.utcnow().isoformat()
}
async def _collect_network_data(self) -> Dict[str, Any]:
"""收集网络数据"""
# 模拟收集
return {
'connections': ['连接信息1', '连接信息2'],
'traffic_stats': {'bytes_in': 1024, 'bytes_out': 512}
}
async def _collect_process_info(self) -> Dict[str, Any]:
"""收集进程信息"""
# 模拟收集
return {
'running_processes': ['进程1', '进程2'],
'resource_usage': {'cpu': 25.5, 'memory': 60.2}
}
async def _save_evidence(self, evidence_data: Dict[str, Any],
execution_id: str) -> str:
"""保存证据"""
filename = f"evidence_{execution_id}_{int(time.time())}.json"
# 这里应该保存到安全的存储位置
# 模拟保存
return filename
async def cleanup(self):
"""清理资源"""
pass
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 智能响应架构:构建全自动化的安全事件响应体系
✅ 决策引擎设计:实现基于规则和机器学习的智能决策系统
✅ 响应编排优化:优化响应动作的执行顺序和资源分配
✅ 执行器池管理:建立可扩展的响应动作执行框架
✅ 学习反馈机制:构建持续改进的响应策略优化系统
🔄 第6章总结
整个第6章我们构建了完整的安全防护体系:
🛡️ 安全防护全景图
📈 核心能力矩阵
安全能力 | 覆盖范围 | 自动化程度 | 智能化水平 |
---|---|---|---|
威胁检测 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
访问控制 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
数据保护 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐ |
事件响应 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
合规审计 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐⭐ |
🤔 综合思考题
安全平衡:如何在用户体验、系统性能和安全防护之间找到最佳平衡点?
零信任架构:如何将现有的安全体系升级为零信任安全模型?
威胁演进:面对不断演进的AI驱动攻击,如何保持防护系统的前瞻性?
安全是一个持续的过程,而非一次性的配置!在下一章,我们将学习如何将MCP服务部署到生产环境。 🚀