6.6 安全监控与日志
🎯 学习目标:构建企业级的安全监控体系和智能日志分析系统
⏱️ 预计时间:45分钟
📊 难度等级:⭐⭐⭐⭐⭐
📊 安全监控架构设计
🏗️ 实时监控体系架构
🔍 智能安全监控系统
📡 实时监控引擎
python
# 智能安全监控系统
import asyncio
import json
import time
import hashlib
from typing import Dict, List, Any, Optional, Callable, Tuple
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime, timedelta
import logging
import threading
from collections import defaultdict, deque
import numpy as np
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import redis
import elasticsearch
from prometheus_client import Counter, Histogram, Gauge
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
logger = logging.getLogger(__name__)
class SecurityEventType(Enum):
AUTHENTICATION_FAILURE = "auth_failure"
AUTHORIZATION_VIOLATION = "auth_violation"
INJECTION_ATTACK = "injection_attack"
XSS_ATTACK = "xss_attack"
DDOS_ATTACK = "ddos_attack"
BRUTE_FORCE = "brute_force"
ANOMALOUS_BEHAVIOR = "anomalous_behavior"
DATA_BREACH = "data_breach"
SYSTEM_INTRUSION = "system_intrusion"
PRIVILEGE_ESCALATION = "privilege_escalation"
class SeverityLevel(Enum):
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
EMERGENCY = 5
class AlertStatus(Enum):
NEW = "new"
INVESTIGATING = "investigating"
CONFIRMED = "confirmed"
RESOLVED = "resolved"
FALSE_POSITIVE = "false_positive"
@dataclass
class SecurityEvent:
"""安全事件"""
event_id: str
event_type: SecurityEventType
severity: SeverityLevel
timestamp: datetime
source_ip: str
user_id: Optional[str] = None
resource: Optional[str] = None
description: str = ""
raw_data: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
'event_id': self.event_id,
'event_type': self.event_type.value,
'severity': self.severity.value,
'timestamp': self.timestamp.isoformat(),
'source_ip': self.source_ip,
'user_id': self.user_id,
'resource': self.resource,
'description': self.description,
'raw_data': self.raw_data,
'metadata': self.metadata
}
@dataclass
class SecurityAlert:
"""安全告警"""
alert_id: str
events: List[SecurityEvent]
severity: SeverityLevel
status: AlertStatus
created_at: datetime
updated_at: datetime
assigned_to: Optional[str] = None
resolution_notes: str = ""
def escalate(self) -> None:
"""升级告警"""
if self.severity.value < SeverityLevel.EMERGENCY.value:
self.severity = SeverityLevel(self.severity.value + 1)
self.updated_at = datetime.utcnow()
class SecurityMonitor:
"""安全监控器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.is_running = False
# 初始化组件
self.event_collector = EventCollector(config.get('collector', {}))
self.threat_detector = ThreatDetector(config.get('detector', {}))
self.alert_manager = AlertManager(config.get('alert', {}))
self.log_analyzer = LogAnalyzer(config.get('log_analyzer', {}))
# 初始化存储
self.redis_client = redis.Redis(
host=config.get('redis', {}).get('host', 'localhost'),
port=config.get('redis', {}).get('port', 6379),
decode_responses=True
)
self.elasticsearch_client = elasticsearch.Elasticsearch(
hosts=config.get('elasticsearch', {}).get('hosts', ['localhost:9200'])
)
# 监控指标
self.metrics = {
'events_processed': Counter('security_events_processed_total',
'Total processed security events',
['event_type', 'severity']),
'alerts_generated': Counter('security_alerts_generated_total',
'Total generated security alerts',
['severity']),
'processing_time': Histogram('security_event_processing_seconds',
'Time spent processing security events'),
'active_threats': Gauge('security_active_threats',
'Number of active security threats')
}
# 事件队列
self.event_queue = asyncio.Queue(maxsize=10000)
self.processing_workers = []
async def start(self):
"""启动安全监控"""
self.is_running = True
logger.info("安全监控系统启动")
# 启动事件收集器
await self.event_collector.start()
# 启动处理工作者
num_workers = self.config.get('workers', 4)
for i in range(num_workers):
worker = asyncio.create_task(self._process_events())
self.processing_workers.append(worker)
# 启动威胁检测器
await self.threat_detector.start()
# 启动告警管理器
await self.alert_manager.start()
logger.info(f"安全监控系统已启动,{num_workers}个工作进程")
async def stop(self):
"""停止安全监控"""
self.is_running = False
logger.info("正在停止安全监控系统...")
# 停止组件
await self.event_collector.stop()
await self.threat_detector.stop()
await self.alert_manager.stop()
# 等待工作者完成
for worker in self.processing_workers:
worker.cancel()
await asyncio.gather(*self.processing_workers, return_exceptions=True)
logger.info("安全监控系统已停止")
async def report_event(self, event: SecurityEvent):
"""报告安全事件"""
try:
await self.event_queue.put(event)
logger.debug(f"安全事件已入队: {event.event_id}")
except asyncio.QueueFull:
logger.error("事件队列已满,丢弃事件")
self.metrics['events_processed'].labels(
event_type='queue_full',
severity='error'
).inc()
async def _process_events(self):
"""处理安全事件"""
while self.is_running:
try:
# 获取事件
event = await asyncio.wait_for(
self.event_queue.get(),
timeout=1.0
)
start_time = time.time()
# 处理事件
await self._handle_security_event(event)
# 记录指标
processing_time = time.time() - start_time
self.metrics['processing_time'].observe(processing_time)
self.metrics['events_processed'].labels(
event_type=event.event_type.value,
severity=event.severity.name.lower()
).inc()
except asyncio.TimeoutError:
continue
except Exception as e:
logger.error(f"处理安全事件时发生错误: {e}")
async def _handle_security_event(self, event: SecurityEvent):
"""处理单个安全事件"""
# 1. 存储事件
await self._store_event(event)
# 2. 威胁检测
threat_score = await self.threat_detector.analyze_event(event)
# 3. 关联分析
related_events = await self._find_related_events(event)
# 4. 生成告警(如果需要)
if self._should_generate_alert(event, threat_score, related_events):
alert = await self._create_alert(event, related_events, threat_score)
await self.alert_manager.process_alert(alert)
# 5. 实时响应(如果配置了自动响应)
if self.config.get('auto_response', {}).get('enabled', False):
await self._trigger_auto_response(event, threat_score)
async def _store_event(self, event: SecurityEvent):
"""存储安全事件"""
# 存储到Elasticsearch
doc = event.to_dict()
index_name = f"security-events-{datetime.utcnow().strftime('%Y-%m')}"
try:
await self.elasticsearch_client.index(
index=index_name,
id=event.event_id,
body=doc
)
except Exception as e:
logger.error(f"存储事件到Elasticsearch失败: {e}")
# 存储到Redis(用于快速查询)
try:
self.redis_client.hset(
f"event:{event.event_id}",
mapping=doc
)
self.redis_client.expire(f"event:{event.event_id}", 86400) # 24小时过期
except Exception as e:
logger.error(f"存储事件到Redis失败: {e}")
async def _find_related_events(self, event: SecurityEvent,
time_window: int = 300) -> List[SecurityEvent]:
"""查找相关事件"""
query = {
"query": {
"bool": {
"must": [
{
"range": {
"timestamp": {
"gte": (event.timestamp - timedelta(seconds=time_window)).isoformat(),
"lte": (event.timestamp + timedelta(seconds=time_window)).isoformat()
}
}
}
],
"should": [
{"term": {"source_ip.keyword": event.source_ip}},
{"term": {"user_id.keyword": event.user_id}} if event.user_id else {},
{"term": {"resource.keyword": event.resource}} if event.resource else {}
],
"minimum_should_match": 1
}
},
"size": 100
}
try:
response = await self.elasticsearch_client.search(
index="security-events-*",
body=query
)
related_events = []
for hit in response['hits']['hits']:
if hit['_id'] != event.event_id: # 排除自己
related_event = self._dict_to_event(hit['_source'])
related_events.append(related_event)
return related_events
except Exception as e:
logger.error(f"查找相关事件失败: {e}")
return []
def _should_generate_alert(self, event: SecurityEvent, threat_score: float,
related_events: List[SecurityEvent]) -> bool:
"""判断是否应该生成告警"""
# 基于威胁分数
if threat_score > 0.8:
return True
# 基于事件严重性
if event.severity in [SeverityLevel.CRITICAL, SeverityLevel.EMERGENCY]:
return True
# 基于相关事件数量
if len(related_events) > 5:
return True
# 基于事件类型
high_priority_types = [
SecurityEventType.DATA_BREACH,
SecurityEventType.SYSTEM_INTRUSION,
SecurityEventType.PRIVILEGE_ESCALATION
]
if event.event_type in high_priority_types:
return True
return False
async def _create_alert(self, primary_event: SecurityEvent,
related_events: List[SecurityEvent],
threat_score: float) -> SecurityAlert:
"""创建安全告警"""
all_events = [primary_event] + related_events
# 确定告警严重性
max_severity = max(event.severity for event in all_events)
alert = SecurityAlert(
alert_id=self._generate_alert_id(),
events=all_events,
severity=max_severity,
status=AlertStatus.NEW,
created_at=datetime.utcnow(),
updated_at=datetime.utcnow()
)
return alert
def _generate_alert_id(self) -> str:
"""生成告警ID"""
timestamp = int(time.time() * 1000) # 毫秒时间戳
random_str = hashlib.md5(str(timestamp).encode()).hexdigest()[:8]
return f"ALERT-{timestamp}-{random_str}"
async def _trigger_auto_response(self, event: SecurityEvent, threat_score: float):
"""触发自动响应"""
auto_response_config = self.config.get('auto_response', {})
# 基于威胁分数的响应
if threat_score > 0.9:
# 高威胁:立即阻断
await self._block_source_ip(event.source_ip)
elif threat_score > 0.7:
# 中等威胁:限制访问
await self._rate_limit_source_ip(event.source_ip)
# 基于事件类型的响应
if event.event_type == SecurityEventType.BRUTE_FORCE:
await self._handle_brute_force_attack(event)
elif event.event_type == SecurityEventType.DDOS_ATTACK:
await self._handle_ddos_attack(event)
async def _block_source_ip(self, ip: str):
"""阻断源IP"""
# 这里应该集成到防火墙或WAF
logger.warning(f"触发IP阻断: {ip}")
# 记录到Redis黑名单
self.redis_client.sadd("blocked_ips", ip)
self.redis_client.expire("blocked_ips", 3600) # 1小时后解除
async def _rate_limit_source_ip(self, ip: str):
"""限制源IP访问频率"""
logger.warning(f"触发IP限流: {ip}")
# 记录到Redis限流列表
self.redis_client.sadd("rate_limited_ips", ip)
self.redis_client.expire("rate_limited_ips", 1800) # 30分钟限流
def _dict_to_event(self, data: Dict[str, Any]) -> SecurityEvent:
"""字典转换为SecurityEvent对象"""
return SecurityEvent(
event_id=data['event_id'],
event_type=SecurityEventType(data['event_type']),
severity=SeverityLevel(data['severity']),
timestamp=datetime.fromisoformat(data['timestamp']),
source_ip=data['source_ip'],
user_id=data.get('user_id'),
resource=data.get('resource'),
description=data.get('description', ''),
raw_data=data.get('raw_data', {}),
metadata=data.get('metadata', {})
)
class ThreatDetector:
"""威胁检测器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# 机器学习模型
self.anomaly_detector = IsolationForest(
contamination=config.get('contamination', 0.1),
random_state=42
)
self.feature_scaler = StandardScaler()
# 规则引擎
self.rules = self._load_detection_rules()
# 行为基线
self.behavior_baselines = {}
# 威胁情报
self.threat_intelligence = ThreatIntelligence(
config.get('threat_intelligence', {})
)
async def start(self):
"""启动威胁检测器"""
logger.info("威胁检测器启动")
# 加载历史数据训练模型
await self._train_anomaly_detector()
# 启动威胁情报更新
await self.threat_intelligence.start()
async def stop(self):
"""停止威胁检测器"""
await self.threat_intelligence.stop()
logger.info("威胁检测器停止")
async def analyze_event(self, event: SecurityEvent) -> float:
"""分析事件威胁程度"""
threat_scores = []
# 1. 规则检测
rule_score = await self._rule_based_detection(event)
threat_scores.append(rule_score)
# 2. 异常检测
anomaly_score = await self._anomaly_detection(event)
threat_scores.append(anomaly_score)
# 3. 行为分析
behavior_score = await self._behavior_analysis(event)
threat_scores.append(behavior_score)
# 4. 威胁情报检查
intelligence_score = await self.threat_intelligence.check_threat(event)
threat_scores.append(intelligence_score)
# 计算综合威胁分数
weights = [0.3, 0.25, 0.25, 0.2]
final_score = sum(score * weight for score, weight in zip(threat_scores, weights))
return min(max(final_score, 0.0), 1.0)
async def _rule_based_detection(self, event: SecurityEvent) -> float:
"""基于规则的检测"""
max_score = 0.0
for rule in self.rules:
if rule.matches(event):
score = rule.get_threat_score(event)
max_score = max(max_score, score)
return max_score
async def _anomaly_detection(self, event: SecurityEvent) -> float:
"""异常检测"""
try:
# 提取特征
features = self._extract_features(event)
if features is None:
return 0.0
# 标准化特征
features_scaled = self.feature_scaler.transform([features])
# 异常检测
anomaly_score = self.anomaly_detector.decision_function(features_scaled)[0]
# 转换为0-1范围的威胁分数
normalized_score = (anomaly_score + 0.5) / 1.0 # 假设分数范围是-0.5到0.5
return max(0.0, min(1.0, 1.0 - normalized_score)) # 反转分数,异常越高威胁越大
except Exception as e:
logger.error(f"异常检测失败: {e}")
return 0.0
async def _behavior_analysis(self, event: SecurityEvent) -> float:
"""行为分析"""
if not event.user_id:
return 0.0
# 获取用户行为基线
baseline = await self._get_user_baseline(event.user_id)
if not baseline:
return 0.0
# 分析当前行为与基线的偏差
deviation_score = self._calculate_behavior_deviation(event, baseline)
return deviation_score
def _load_detection_rules(self) -> List['DetectionRule']:
"""加载检测规则"""
rules = []
# 暴力破解检测规则
rules.append(BruteForceDetectionRule())
# SQL注入检测规则
rules.append(SQLInjectionDetectionRule())
# XSS攻击检测规则
rules.append(XSSDetectionRule())
# DDoS攻击检测规则
rules.append(DDoSDetectionRule())
# 异常访问模式检测规则
rules.append(AnomalousAccessDetectionRule())
return rules
def _extract_features(self, event: SecurityEvent) -> Optional[List[float]]:
"""提取事件特征"""
try:
features = [
# 时间特征
event.timestamp.hour,
event.timestamp.weekday(),
# 事件类型特征
event.event_type.value.__hash__() % 100,
# 严重性特征
event.severity.value,
# IP特征(简化)
int(event.source_ip.split('.')[-1]) if '.' in event.source_ip else 0,
# 用户特征
hash(event.user_id) % 100 if event.user_id else 0,
# 资源特征
hash(event.resource) % 100 if event.resource else 0,
# 描述长度特征
len(event.description)
]
return features
except Exception as e:
logger.error(f"特征提取失败: {e}")
return None
async def _train_anomaly_detector(self):
"""训练异常检测模型"""
try:
# 这里应该从历史数据中加载训练数据
# 为了简化,我们使用模拟数据
training_data = []
for _ in range(1000):
# 生成模拟正常事件特征
features = [
np.random.randint(0, 24), # 小时
np.random.randint(0, 7), # 星期
np.random.randint(0, 100), # 事件类型
np.random.randint(1, 3), # 严重性
np.random.randint(0, 255), # IP
np.random.randint(0, 100), # 用户
np.random.randint(0, 100), # 资源
np.random.randint(10, 200) # 描述长度
]
training_data.append(features)
# 标准化特征
training_data_scaled = self.feature_scaler.fit_transform(training_data)
# 训练模型
self.anomaly_detector.fit(training_data_scaled)
logger.info("异常检测模型训练完成")
except Exception as e:
logger.error(f"训练异常检测模型失败: {e}")
async def _get_user_baseline(self, user_id: str) -> Optional[Dict[str, Any]]:
"""获取用户行为基线"""
# 这里应该从数据库或缓存中获取用户的历史行为基线
# 为了简化,返回模拟数据
return {
'avg_requests_per_hour': 10,
'typical_access_times': [9, 10, 11, 14, 15, 16],
'common_resources': ['/api/dashboard', '/api/profile'],
'usual_ip_ranges': ['192.168.1.0/24']
}
def _calculate_behavior_deviation(self, event: SecurityEvent,
baseline: Dict[str, Any]) -> float:
"""计算行为偏差"""
deviation_score = 0.0
# 访问时间偏差
current_hour = event.timestamp.hour
typical_hours = baseline.get('typical_access_times', [])
if typical_hours and current_hour not in typical_hours:
deviation_score += 0.3
# 资源访问偏差
if event.resource:
common_resources = baseline.get('common_resources', [])
if common_resources and event.resource not in common_resources:
deviation_score += 0.2
# IP地址偏差
usual_ranges = baseline.get('usual_ip_ranges', [])
if usual_ranges:
import ipaddress
ip_in_range = False
try:
client_ip = ipaddress.ip_address(event.source_ip)
for ip_range in usual_ranges:
if client_ip in ipaddress.ip_network(ip_range):
ip_in_range = True
break
if not ip_in_range:
deviation_score += 0.5
except ValueError:
deviation_score += 0.3 # 无效IP地址
return min(deviation_score, 1.0)
# 检测规则基类和具体实现
class DetectionRule:
"""检测规则基类"""
def __init__(self, name: str, threat_score: float):
self.name = name
self.threat_score = threat_score
def matches(self, event: SecurityEvent) -> bool:
"""检查事件是否匹配规则"""
raise NotImplementedError
def get_threat_score(self, event: SecurityEvent) -> float:
"""获取威胁分数"""
return self.threat_score
class BruteForceDetectionRule(DetectionRule):
"""暴力破解检测规则"""
def __init__(self):
super().__init__("暴力破解检测", 0.8)
self.failed_attempts = defaultdict(int)
self.attempt_windows = defaultdict(deque)
def matches(self, event: SecurityEvent) -> bool:
if event.event_type != SecurityEventType.AUTHENTICATION_FAILURE:
return False
# 检查失败次数
key = f"{event.source_ip}:{event.user_id or 'unknown'}"
# 清理过期的尝试记录
now = time.time()
window = self.attempt_windows[key]
while window and now - window[0] > 300: # 5分钟窗口
window.popleft()
self.failed_attempts[key] -= 1
# 添加新的失败尝试
window.append(now)
self.failed_attempts[key] += 1
# 检查是否超过阈值
return self.failed_attempts[key] >= 5
class SQLInjectionDetectionRule(DetectionRule):
"""SQL注入检测规则"""
def __init__(self):
super().__init__("SQL注入检测", 0.9)
self.sql_patterns = [
r"'\s*OR\s*'\d*'\s*=\s*'\d*",
r"UNION\s+SELECT",
r"DROP\s+TABLE",
r"INSERT\s+INTO",
r"DELETE\s+FROM",
r"--\s*$"
]
def matches(self, event: SecurityEvent) -> bool:
if event.event_type != SecurityEventType.INJECTION_ATTACK:
return False
# 检查事件数据中是否包含SQL注入模式
text_data = f"{event.description} {json.dumps(event.raw_data)}"
import re
for pattern in self.sql_patterns:
if re.search(pattern, text_data, re.IGNORECASE):
return True
return False
class ThreatIntelligence:
"""威胁情报系统"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.malicious_ips = set()
self.malicious_domains = set()
self.known_attack_signatures = {}
# 更新间隔
self.update_interval = config.get('update_interval', 3600) # 1小时
self.update_task = None
async def start(self):
"""启动威胁情报更新"""
logger.info("威胁情报系统启动")
# 立即更新一次
await self._update_threat_intelligence()
# 启动定期更新任务
self.update_task = asyncio.create_task(self._update_loop())
async def stop(self):
"""停止威胁情报更新"""
if self.update_task:
self.update_task.cancel()
logger.info("威胁情报系统停止")
async def check_threat(self, event: SecurityEvent) -> float:
"""检查事件威胁情报"""
threat_score = 0.0
# 检查恶意IP
if event.source_ip in self.malicious_ips:
threat_score = max(threat_score, 0.8)
# 检查攻击签名
for signature, score in self.known_attack_signatures.items():
if signature in event.description or signature in json.dumps(event.raw_data):
threat_score = max(threat_score, score)
return threat_score
async def _update_loop(self):
"""威胁情报更新循环"""
while True:
try:
await asyncio.sleep(self.update_interval)
await self._update_threat_intelligence()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"威胁情报更新失败: {e}")
async def _update_threat_intelligence(self):
"""更新威胁情报"""
logger.info("更新威胁情报...")
# 这里应该从真实的威胁情报源获取数据
# 为了演示,我们使用模拟数据
# 模拟恶意IP列表
new_malicious_ips = {
'192.168.100.1',
'192.168.100.2',
'10.0.0.100'
}
self.malicious_ips.update(new_malicious_ips)
# 模拟攻击签名
new_signatures = {
'eval(': 0.9,
'system(': 0.9,
'script>alert': 0.8,
'union select': 0.9
}
self.known_attack_signatures.update(new_signatures)
logger.info(f"威胁情报更新完成: {len(self.malicious_ips)}个恶意IP, "
f"{len(self.known_attack_signatures)}个攻击签名")
📊 智能日志分析系统
📝 高级日志分析器
python
# 智能日志分析系统
import re
import json
import gzip
from typing import Dict, List, Any, Optional, Iterator, Tuple
from datetime import datetime, timedelta
from dataclasses import dataclass
from collections import defaultdict, Counter
import asyncio
import logging
from pathlib import Path
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
logger = logging.getLogger(__name__)
@dataclass
class LogEntry:
"""日志条目"""
timestamp: datetime
level: str
source: str
message: str
metadata: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return {
'timestamp': self.timestamp.isoformat(),
'level': self.level,
'source': self.source,
'message': self.message,
'metadata': self.metadata
}
@dataclass
class AnalysisResult:
"""分析结果"""
summary: Dict[str, Any]
anomalies: List[LogEntry]
patterns: List[Dict[str, Any]]
recommendations: List[str]
threat_indicators: List[Dict[str, Any]]
class LogAnalyzer:
"""日志分析器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
# 日志解析器
self.log_parsers = {
'nginx': NginxLogParser(),
'apache': ApacheLogParser(),
'application': ApplicationLogParser(),
'system': SystemLogParser(),
'security': SecurityLogParser()
}
# 分析引擎
self.pattern_analyzer = PatternAnalyzer(config.get('pattern_analysis', {}))
self.anomaly_detector = LogAnomalyDetector(config.get('anomaly_detection', {}))
self.correlation_engine = CorrelationEngine(config.get('correlation', {}))
# 威胁检测规则
self.threat_rules = self._load_threat_rules()
async def analyze_logs(self, log_sources: List[str],
time_range: Tuple[datetime, datetime],
analysis_type: str = 'comprehensive') -> AnalysisResult:
"""分析日志"""
logger.info(f"开始分析日志: {len(log_sources)}个源, 时间范围: {time_range}")
# 1. 收集日志
log_entries = []
for source in log_sources:
entries = await self._collect_logs(source, time_range)
log_entries.extend(entries)
logger.info(f"收集到 {len(log_entries)} 条日志")
# 2. 预处理
processed_entries = await self._preprocess_logs(log_entries)
# 3. 执行分析
if analysis_type == 'comprehensive':
result = await self._comprehensive_analysis(processed_entries)
elif analysis_type == 'security':
result = await self._security_analysis(processed_entries)
elif analysis_type == 'performance':
result = await self._performance_analysis(processed_entries)
else:
result = await self._basic_analysis(processed_entries)
logger.info("日志分析完成")
return result
async def _collect_logs(self, source: str,
time_range: Tuple[datetime, datetime]) -> List[LogEntry]:
"""收集日志"""
log_entries = []
try:
# 确定日志类型
log_type = self._detect_log_type(source)
parser = self.log_parsers.get(log_type, self.log_parsers['application'])
# 读取日志文件
if source.startswith('file://'):
file_path = source[7:] # 去掉 'file://' 前缀
entries = await self._read_log_file(file_path, parser, time_range)
log_entries.extend(entries)
elif source.startswith('elastic://'):
# 从Elasticsearch读取
entries = await self._read_from_elasticsearch(source, time_range)
log_entries.extend(entries)
elif source.startswith('syslog://'):
# 从Syslog读取
entries = await self._read_from_syslog(source, time_range)
log_entries.extend(entries)
except Exception as e:
logger.error(f"收集日志失败 {source}: {e}")
return log_entries
async def _read_log_file(self, file_path: str, parser: 'LogParser',
time_range: Tuple[datetime, datetime]) -> List[LogEntry]:
"""读取日志文件"""
log_entries = []
try:
path = Path(file_path)
# 处理压缩文件
if path.suffix == '.gz':
open_func = gzip.open
mode = 'rt'
else:
open_func = open
mode = 'r'
with open_func(path, mode, encoding='utf-8', errors='ignore') as f:
for line_num, line in enumerate(f, 1):
try:
entry = parser.parse_line(line.strip())
if entry and self._is_in_time_range(entry.timestamp, time_range):
log_entries.append(entry)
except Exception as e:
logger.debug(f"解析日志行失败 {file_path}:{line_num}: {e}")
except Exception as e:
logger.error(f"读取日志文件失败 {file_path}: {e}")
return log_entries
def _detect_log_type(self, source: str) -> str:
"""检测日志类型"""
source_lower = source.lower()
if 'nginx' in source_lower:
return 'nginx'
elif 'apache' in source_lower:
return 'apache'
elif any(keyword in source_lower for keyword in ['security', 'auth', 'login']):
return 'security'
elif any(keyword in source_lower for keyword in ['system', 'syslog', 'kernel']):
return 'system'
else:
return 'application'
def _is_in_time_range(self, timestamp: datetime,
time_range: Tuple[datetime, datetime]) -> bool:
"""检查时间戳是否在范围内"""
start_time, end_time = time_range
return start_time <= timestamp <= end_time
async def _preprocess_logs(self, log_entries: List[LogEntry]) -> List[LogEntry]:
"""预处理日志"""
# 去重
seen_entries = set()
deduplicated_entries = []
for entry in log_entries:
# 基于时间戳和消息创建唯一标识
entry_hash = hash((entry.timestamp, entry.message, entry.source))
if entry_hash not in seen_entries:
seen_entries.add(entry_hash)
deduplicated_entries.append(entry)
# 排序
deduplicated_entries.sort(key=lambda x: x.timestamp)
logger.info(f"预处理完成: {len(log_entries)} -> {len(deduplicated_entries)} 条日志")
return deduplicated_entries
async def _comprehensive_analysis(self, log_entries: List[LogEntry]) -> AnalysisResult:
"""综合分析"""
# 基础统计
summary = self._generate_summary(log_entries)
# 模式分析
patterns = await self.pattern_analyzer.analyze_patterns(log_entries)
# 异常检测
anomalies = await self.anomaly_detector.detect_anomalies(log_entries)
# 威胁指标
threat_indicators = await self._detect_threat_indicators(log_entries)
# 关联分析
correlations = await self.correlation_engine.find_correlations(log_entries)
# 生成建议
recommendations = self._generate_recommendations(
summary, patterns, anomalies, threat_indicators
)
return AnalysisResult(
summary=summary,
anomalies=anomalies,
patterns=patterns,
recommendations=recommendations,
threat_indicators=threat_indicators
)
async def _security_analysis(self, log_entries: List[LogEntry]) -> AnalysisResult:
"""安全分析"""
# 过滤安全相关日志
security_entries = [
entry for entry in log_entries
if self._is_security_related(entry)
]
# 执行安全分析
summary = self._generate_security_summary(security_entries)
anomalies = await self.anomaly_detector.detect_security_anomalies(security_entries)
threat_indicators = await self._detect_threat_indicators(security_entries)
patterns = await self.pattern_analyzer.analyze_security_patterns(security_entries)
recommendations = self._generate_security_recommendations(
summary, patterns, anomalies, threat_indicators
)
return AnalysisResult(
summary=summary,
anomalies=anomalies,
patterns=patterns,
recommendations=recommendations,
threat_indicators=threat_indicators
)
def _generate_summary(self, log_entries: List[LogEntry]) -> Dict[str, Any]:
"""生成摘要统计"""
if not log_entries:
return {}
# 时间范围
timestamps = [entry.timestamp for entry in log_entries]
time_range = {
'start': min(timestamps).isoformat(),
'end': max(timestamps).isoformat(),
'duration': str(max(timestamps) - min(timestamps))
}
# 日志级别统计
level_counts = Counter(entry.level for entry in log_entries)
# 来源统计
source_counts = Counter(entry.source for entry in log_entries)
# 每小时日志数量
hourly_counts = defaultdict(int)
for entry in log_entries:
hour_key = entry.timestamp.strftime('%Y-%m-%d %H:00')
hourly_counts[hour_key] += 1
# 错误率计算
error_levels = {'ERROR', 'CRITICAL', 'FATAL'}
error_count = sum(level_counts[level] for level in error_levels if level in level_counts)
error_rate = error_count / len(log_entries) if log_entries else 0
return {
'total_entries': len(log_entries),
'time_range': time_range,
'level_distribution': dict(level_counts),
'source_distribution': dict(source_counts.most_common(10)),
'hourly_distribution': dict(hourly_counts),
'error_rate': error_rate,
'unique_sources': len(source_counts),
'avg_entries_per_hour': len(log_entries) / max(1, len(hourly_counts))
}
def _is_security_related(self, entry: LogEntry) -> bool:
"""判断是否为安全相关日志"""
security_keywords = [
'authentication', 'authorization', 'login', 'logout',
'failed', 'denied', 'blocked', 'attack', 'intrusion',
'unauthorized', 'forbidden', 'security', 'breach',
'malicious', 'suspicious', 'threat'
]
message_lower = entry.message.lower()
return any(keyword in message_lower for keyword in security_keywords)
async def _detect_threat_indicators(self, log_entries: List[LogEntry]) -> List[Dict[str, Any]]:
"""检测威胁指标"""
threat_indicators = []
for rule in self.threat_rules:
matches = rule.find_matches(log_entries)
for match in matches:
threat_indicators.append({
'rule_name': rule.name,
'severity': rule.severity,
'description': rule.description,
'matched_entries': [entry.to_dict() for entry in match['entries']],
'confidence': match['confidence'],
'timestamp': datetime.utcnow().isoformat()
})
return threat_indicators
def _generate_recommendations(self, summary: Dict[str, Any],
patterns: List[Dict[str, Any]],
anomalies: List[LogEntry],
threat_indicators: List[Dict[str, Any]]) -> List[str]:
"""生成建议"""
recommendations = []
# 基于错误率的建议
if summary.get('error_rate', 0) > 0.1:
recommendations.append("错误率较高(>10%),建议检查应用程序和系统配置")
# 基于异常的建议
if len(anomalies) > 10:
recommendations.append(f"检测到 {len(anomalies)} 个异常日志条目,建议详细调查")
# 基于威胁指标的建议
if threat_indicators:
high_severity_threats = [t for t in threat_indicators if t['severity'] >= 4]
if high_severity_threats:
recommendations.append(f"检测到 {len(high_severity_threats)} 个高严重性威胁,需要立即处理")
# 基于模式的建议
for pattern in patterns:
if pattern.get('anomaly_score', 0) > 0.8:
recommendations.append(f"发现异常模式: {pattern['description']}")
return recommendations
def _load_threat_rules(self) -> List['ThreatRule']:
"""加载威胁检测规则"""
rules = []
# 暴力破解检测
rules.append(BruteForceLogRule())
# SQL注入检测
rules.append(SQLInjectionLogRule())
# XSS攻击检测
rules.append(XSSLogRule())
# 异常访问模式检测
rules.append(AnomalousAccessLogRule())
# DDoS攻击检测
rules.append(DDoSLogRule())
return rules
# 日志解析器基类和具体实现
class LogParser:
"""日志解析器基类"""
def parse_line(self, line: str) -> Optional[LogEntry]:
"""解析日志行"""
raise NotImplementedError
class NginxLogParser(LogParser):
"""Nginx日志解析器"""
def __init__(self):
# Nginx默认日志格式的正则表达式
self.pattern = re.compile(
r'(?P<ip>\S+) - - \[(?P<timestamp>[^\]]+)\] '
r'"(?P<method>\S+) (?P<url>\S+) (?P<protocol>[^"]+)" '
r'(?P<status>\d+) (?P<size>\d+) '
r'"(?P<referer>[^"]*)" "(?P<user_agent>[^"]*)"'
)
def parse_line(self, line: str) -> Optional[LogEntry]:
match = self.pattern.match(line)
if not match:
return None
groups = match.groupdict()
try:
timestamp = datetime.strptime(
groups['timestamp'],
'%d/%b/%Y:%H:%M:%S %z'
)
except ValueError:
return None
# 确定日志级别
status_code = int(groups['status'])
if status_code >= 500:
level = 'ERROR'
elif status_code >= 400:
level = 'WARNING'
else:
level = 'INFO'
return LogEntry(
timestamp=timestamp,
level=level,
source='nginx',
message=f"{groups['method']} {groups['url']} {groups['status']}",
metadata={
'ip': groups['ip'],
'method': groups['method'],
'url': groups['url'],
'status': int(groups['status']),
'size': int(groups['size']),
'referer': groups['referer'],
'user_agent': groups['user_agent']
}
)
class ApplicationLogParser(LogParser):
"""应用程序日志解析器"""
def __init__(self):
# 通用应用日志格式
self.pattern = re.compile(
r'(?P<timestamp>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}[,\.]\d{3}) '
r'\[(?P<thread>[^\]]+)\] '
r'(?P<level>\w+) '
r'(?P<logger>[^\s]+) - '
r'(?P<message>.*)'
)
def parse_line(self, line: str) -> Optional[LogEntry]:
match = self.pattern.match(line)
if not match:
# 尝试简单格式
return self._parse_simple_format(line)
groups = match.groupdict()
try:
# 处理时间戳格式变化
timestamp_str = groups['timestamp']
if ',' in timestamp_str:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S,%f')
else:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S.%f')
except ValueError:
return None
return LogEntry(
timestamp=timestamp,
level=groups['level'],
source='application',
message=groups['message'],
metadata={
'thread': groups['thread'],
'logger': groups['logger']
}
)
def _parse_simple_format(self, line: str) -> Optional[LogEntry]:
"""解析简单格式的日志"""
# 尝试提取时间戳
timestamp_match = re.search(r'\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}', line)
if timestamp_match:
try:
timestamp = datetime.strptime(timestamp_match.group(), '%Y-%m-%d %H:%M:%S')
return LogEntry(
timestamp=timestamp,
level='INFO', # 默认级别
source='application',
message=line,
metadata={}
)
except ValueError:
pass
return None
# 威胁检测规则
class ThreatRule:
"""威胁检测规则基类"""
def __init__(self, name: str, severity: int, description: str):
self.name = name
self.severity = severity # 1-5
self.description = description
def find_matches(self, log_entries: List[LogEntry]) -> List[Dict[str, Any]]:
"""查找匹配的日志条目"""
raise NotImplementedError
class BruteForceLogRule(ThreatRule):
"""暴力破解日志规则"""
def __init__(self):
super().__init__(
"暴力破解攻击检测",
4,
"检测短时间内来自同一IP的大量认证失败"
)
def find_matches(self, log_entries: List[LogEntry]) -> List[Dict[str, Any]]:
matches = []
# 按IP分组统计认证失败
ip_failures = defaultdict(list)
for entry in log_entries:
if self._is_auth_failure(entry):
ip = self._extract_ip(entry)
if ip:
ip_failures[ip].append(entry)
# 检查每个IP的失败次数
for ip, failures in ip_failures.items():
if len(failures) >= 10: # 10次失败阈值
# 检查时间窗口
if self._within_time_window(failures, 300): # 5分钟窗口
matches.append({
'entries': failures,
'confidence': min(len(failures) / 20.0, 1.0),
'metadata': {'ip': ip, 'failure_count': len(failures)}
})
return matches
def _is_auth_failure(self, entry: LogEntry) -> bool:
"""判断是否为认证失败"""
failure_keywords = ['failed', 'invalid', 'denied', 'unauthorized', 'authentication']
message_lower = entry.message.lower()
return any(keyword in message_lower for keyword in failure_keywords)
def _extract_ip(self, entry: LogEntry) -> Optional[str]:
"""提取IP地址"""
# 从metadata中获取
if 'ip' in entry.metadata:
return entry.metadata['ip']
# 从消息中提取
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
match = re.search(ip_pattern, entry.message)
return match.group() if match else None
def _within_time_window(self, entries: List[LogEntry], window_seconds: int) -> bool:
"""检查条目是否在时间窗口内"""
if len(entries) < 2:
return False
timestamps = [entry.timestamp for entry in entries]
time_span = max(timestamps) - min(timestamps)
return time_span.total_seconds() <= window_seconds
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 实时监控架构:构建全方位的安全事件监控体系
✅ 智能威胁检测:实现基于规则、机器学习和行为分析的威胁检测
✅ 日志分析系统:建立智能化的日志收集、解析和分析pipeline
✅ 告警管理:设计高效的安全告警生成和处理机制
✅ 自动响应:实现基于威胁等级的自动化安全响应
🤔 思考题
- 监控覆盖:如何确保监控系统不留盲点且避免过度告警?
- 性能优化:在大规模日志场景下如何保证实时分析性能?
- 误报控制:如何持续优化检测算法以降低误报率?
监控是安全的眼睛,日志是系统的记忆! 👁️