7.5 故障排除与维护
🔧 核心目标:建立完善的故障排除体系和预防性维护机制,确保系统稳定运行
⏱️ 预计时长:50分钟
📊 难度级别:⭐⭐⭐⭐⭐
🎯 学习目标
通过本节学习,你将掌握:
- 系统故障诊断和根因分析
- 应急响应流程和回滚策略
- 预防性维护和健康检查
- 故障预警和自动恢复机制
- 运维文档和团队培训
🔍 故障诊断体系
故障诊断流程图
故障分级标准
python
# src/troubleshooting/incident_classifier.py
from enum import Enum
from typing import Dict, List, Any
from dataclasses import dataclass
import time
import logging
class IncidentSeverity(Enum):
"""故障严重程度"""
P0_CRITICAL = "P0" # 系统完全不可用
P1_HIGH = "P1" # 核心功能受影响
P2_MEDIUM = "P2" # 部分功能受影响
P3_LOW = "P3" # 性能下降或非核心功能问题
@dataclass
class IncidentMetrics:
"""故障指标"""
error_rate: float
response_time: float
availability: float
affected_users: int
business_impact: str
class IncidentClassifier:
"""故障分类器"""
def __init__(self):
self.classification_rules = {
IncidentSeverity.P0_CRITICAL: {
'error_rate': lambda x: x > 50,
'response_time': lambda x: x > 30000, # 30秒
'availability': lambda x: x < 90,
'affected_users': lambda x: x > 1000,
'keywords': ['down', 'outage', 'critical', '5xx']
},
IncidentSeverity.P1_HIGH: {
'error_rate': lambda x: x > 20,
'response_time': lambda x: x > 10000, # 10秒
'availability': lambda x: x < 95,
'affected_users': lambda x: x > 100,
'keywords': ['error', 'timeout', 'failure']
},
IncidentSeverity.P2_MEDIUM: {
'error_rate': lambda x: x > 5,
'response_time': lambda x: x > 5000, # 5秒
'availability': lambda x: x < 98,
'affected_users': lambda x: x > 10,
'keywords': ['slow', 'degraded', 'warning']
},
IncidentSeverity.P3_LOW: {
'error_rate': lambda x: x > 1,
'response_time': lambda x: x > 2000, # 2秒
'availability': lambda x: x < 99.5,
'affected_users': lambda x: x > 0,
'keywords': ['minor', 'performance', 'optimization']
}
}
def classify_incident(self, metrics: IncidentMetrics,
description: str = "") -> IncidentSeverity:
"""分类故障严重程度"""
# 从P0开始检查(最严重)
for severity, rules in self.classification_rules.items():
score = 0
total_rules = 0
# 检查数值规则
for metric_name, rule_func in rules.items():
if metric_name == 'keywords':
continue
metric_value = getattr(metrics, metric_name)
if rule_func(metric_value):
score += 1
total_rules += 1
# 检查关键字
keywords = rules.get('keywords', [])
if any(keyword in description.lower() for keyword in keywords):
score += 1
total_rules += 1
# 如果满足50%以上的规则,归类为该级别
if score / total_rules >= 0.5:
return severity
return IncidentSeverity.P3_LOW
# 故障响应协调器
class IncidentCoordinator:
"""故障响应协调器"""
def __init__(self):
self.active_incidents = {}
self.response_teams = {
IncidentSeverity.P0_CRITICAL: ["on-call-engineer", "engineering-lead", "product-manager"],
IncidentSeverity.P1_HIGH: ["on-call-engineer", "team-lead"],
IncidentSeverity.P2_MEDIUM: ["on-call-engineer"],
IncidentSeverity.P3_LOW: ["engineer"]
}
self.response_sla = {
IncidentSeverity.P0_CRITICAL: 5, # 5分钟
IncidentSeverity.P1_HIGH: 15, # 15分钟
IncidentSeverity.P2_MEDIUM: 60, # 1小时
IncidentSeverity.P3_LOW: 240 # 4小时
}
async def handle_incident(self, incident_id: str, metrics: IncidentMetrics,
description: str) -> Dict[str, Any]:
"""处理故障"""
classifier = IncidentClassifier()
severity = classifier.classify_incident(metrics, description)
incident = {
'id': incident_id,
'severity': severity,
'metrics': metrics,
'description': description,
'start_time': time.time(),
'status': 'investigating',
'assigned_team': self.response_teams[severity],
'sla_deadline': time.time() + self.response_sla[severity] * 60
}
self.active_incidents[incident_id] = incident
# 发送告警通知
await self._send_alert(incident)
# 启动自动恢复流程
await self._initiate_auto_recovery(incident)
return incident
async def _send_alert(self, incident: Dict[str, Any]):
"""发送告警通知"""
severity = incident['severity']
team = incident['assigned_team']
message = f"""
🚨 故障告警 - {severity.value}
-----------------------
ID: {incident['id']}
严重程度: {severity.value}
描述: {incident['description']}
影响用户: {incident['metrics'].affected_users}
错误率: {incident['metrics'].error_rate:.1f}%
可用性: {incident['metrics'].availability:.1f}%
响应团队: {', '.join(team)}
SLA截止: {time.ctime(incident['sla_deadline'])}
"""
# 这里集成实际的通知系统(Slack, PagerDuty等)
logging.critical(message)
print(message)
async def _initiate_auto_recovery(self, incident: Dict[str, Any]):
"""启动自动恢复"""
severity = incident['severity']
if severity == IncidentSeverity.P0_CRITICAL:
# P0故障:立即尝试自动恢复
await self._attempt_auto_recovery(incident)
elif severity == IncidentSeverity.P1_HIGH:
# P1故障:延迟5分钟后尝试恢复
import asyncio
await asyncio.sleep(300)
await self._attempt_auto_recovery(incident)
async def _attempt_auto_recovery(self, incident: Dict[str, Any]):
"""尝试自动恢复"""
print(f"🔄 尝试自动恢复故障: {incident['id']}")
recovery_actions = [
self._restart_unhealthy_pods,
self._scale_up_resources,
self._switch_to_backup_database,
self._clear_cache,
self._restart_load_balancer
]
for action in recovery_actions:
try:
success = await action(incident)
if success:
incident['status'] = 'auto_recovered'
print(f"✅ 自动恢复成功: {action.__name__}")
break
except Exception as e:
print(f"❌ 自动恢复失败 {action.__name__}: {e}")
if incident['status'] != 'auto_recovered':
print("⚠️ 自动恢复失败,需要人工介入")
async def _restart_unhealthy_pods(self, incident: Dict[str, Any]) -> bool:
"""重启不健康的Pod"""
# 模拟K8s Pod重启
print("🔄 重启不健康的Pod...")
await asyncio.sleep(2)
return True
async def _scale_up_resources(self, incident: Dict[str, Any]) -> bool:
"""扩容资源"""
print("📈 自动扩容资源...")
await asyncio.sleep(3)
return True
async def _switch_to_backup_database(self, incident: Dict[str, Any]) -> bool:
"""切换到备用数据库"""
print("🔄 切换到备用数据库...")
await asyncio.sleep(5)
return True
async def _clear_cache(self, incident: Dict[str, Any]) -> bool:
"""清理缓存"""
print("🧹 清理缓存...")
await asyncio.sleep(1)
return True
async def _restart_load_balancer(self, incident: Dict[str, Any]) -> bool:
"""重启负载均衡器"""
print("🔄 重启负载均衡器...")
await asyncio.sleep(4)
return False # 模拟失败
🔧 诊断工具集
系统健康检查器
python
# src/troubleshooting/health_checker.py
import asyncio
import aiohttp
import psutil
import asyncpg
from typing import Dict, List, Any, Optional
import time
import json
class HealthChecker:
"""系统健康检查器"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.checks = {
'application': self._check_application_health,
'database': self._check_database_health,
'redis': self._check_redis_health,
'external_api': self._check_external_apis,
'disk_space': self._check_disk_space,
'memory_usage': self._check_memory_usage,
'cpu_usage': self._check_cpu_usage,
'network': self._check_network_connectivity
}
async def run_all_checks(self) -> Dict[str, Any]:
"""运行所有健康检查"""
print("🔍 开始系统健康检查...")
start_time = time.time()
results = {}
for check_name, check_func in self.checks.items():
try:
result = await check_func()
results[check_name] = result
status = "✅" if result['healthy'] else "❌"
print(f"{status} {check_name}: {result['message']}")
except Exception as e:
results[check_name] = {
'healthy': False,
'message': f"检查失败: {str(e)}",
'details': {}
}
print(f"❌ {check_name}: 检查失败 - {e}")
# 生成整体健康报告
overall_health = all(result['healthy'] for result in results.values())
total_time = time.time() - start_time
health_report = {
'overall_healthy': overall_health,
'check_duration': total_time,
'timestamp': time.time(),
'details': results
}
print(f"\n📊 健康检查完成 ({total_time:.2f}s)")
print(f"整体状态: {'✅ 健康' if overall_health else '❌ 异常'}")
return health_report
async def _check_application_health(self) -> Dict[str, Any]:
"""检查应用程序健康状态"""
health_endpoint = self.config.get('health_endpoint', 'http://localhost:8000/health')
try:
async with aiohttp.ClientSession() as session:
async with session.get(health_endpoint, timeout=5) as response:
if response.status == 200:
data = await response.json()
return {
'healthy': True,
'message': '应用程序运行正常',
'details': {
'status_code': response.status,
'response_time': response.headers.get('X-Response-Time'),
'version': data.get('version'),
'uptime': data.get('uptime')
}
}
else:
return {
'healthy': False,
'message': f'应用程序响应异常: HTTP {response.status}',
'details': {'status_code': response.status}
}
except Exception as e:
return {
'healthy': False,
'message': f'无法连接到应用程序: {str(e)}',
'details': {}
}
async def _check_database_health(self) -> Dict[str, Any]:
"""检查数据库健康状态"""
db_url = self.config.get('database_url')
if not db_url:
return {
'healthy': True,
'message': '未配置数据库',
'details': {}
}
try:
conn = await asyncpg.connect(db_url)
# 测试查询
start_time = time.time()
result = await conn.fetchval('SELECT 1')
query_time = time.time() - start_time
# 获取连接数
connections = await conn.fetchval(
'SELECT count(*) FROM pg_stat_activity'
)
await conn.close()
return {
'healthy': True,
'message': '数据库连接正常',
'details': {
'query_time': query_time,
'active_connections': connections,
'test_result': result
}
}
except Exception as e:
return {
'healthy': False,
'message': f'数据库连接失败: {str(e)}',
'details': {}
}
async def _check_redis_health(self) -> Dict[str, Any]:
"""检查Redis健康状态"""
redis_url = self.config.get('redis_url')
if not redis_url:
return {
'healthy': True,
'message': '未配置Redis',
'details': {}
}
try:
import aioredis
redis = aioredis.from_url(redis_url)
# 测试连接
start_time = time.time()
pong = await redis.ping()
ping_time = time.time() - start_time
# 获取信息
info = await redis.info()
await redis.close()
return {
'healthy': pong,
'message': 'Redis连接正常' if pong else 'Redis连接失败',
'details': {
'ping_time': ping_time,
'used_memory': info.get('used_memory_human'),
'connected_clients': info.get('connected_clients'),
'version': info.get('redis_version')
}
}
except Exception as e:
return {
'healthy': False,
'message': f'Redis连接失败: {str(e)}',
'details': {}
}
async def _check_external_apis(self) -> Dict[str, Any]:
"""检查外部API健康状态"""
external_apis = self.config.get('external_apis', [])
if not external_apis:
return {
'healthy': True,
'message': '未配置外部API',
'details': {}
}
api_results = {}
all_healthy = True
async with aiohttp.ClientSession() as session:
for api in external_apis:
try:
start_time = time.time()
async with session.get(api['url'], timeout=10) as response:
response_time = time.time() - start_time
api_results[api['name']] = {
'healthy': response.status < 400,
'status_code': response.status,
'response_time': response_time
}
if response.status >= 400:
all_healthy = False
except Exception as e:
api_results[api['name']] = {
'healthy': False,
'error': str(e),
'response_time': None
}
all_healthy = False
return {
'healthy': all_healthy,
'message': '外部API检查完成',
'details': api_results
}
async def _check_disk_space(self) -> Dict[str, Any]:
"""检查磁盘空间"""
disk_usage = psutil.disk_usage('/')
free_percent = (disk_usage.free / disk_usage.total) * 100
healthy = free_percent > 10 # 剩余空间大于10%
return {
'healthy': healthy,
'message': f'磁盘空间剩余 {free_percent:.1f}%',
'details': {
'total_gb': disk_usage.total / (1024**3),
'used_gb': disk_usage.used / (1024**3),
'free_gb': disk_usage.free / (1024**3),
'free_percent': free_percent
}
}
async def _check_memory_usage(self) -> Dict[str, Any]:
"""检查内存使用"""
memory = psutil.virtual_memory()
healthy = memory.percent < 85 # 使用率小于85%
return {
'healthy': healthy,
'message': f'内存使用率 {memory.percent:.1f}%',
'details': {
'total_gb': memory.total / (1024**3),
'used_gb': memory.used / (1024**3),
'available_gb': memory.available / (1024**3),
'percent': memory.percent
}
}
async def _check_cpu_usage(self) -> Dict[str, Any]:
"""检查CPU使用"""
# 获取5秒平均CPU使用率
cpu_percent = psutil.cpu_percent(interval=5)
healthy = cpu_percent < 80 # 使用率小于80%
return {
'healthy': healthy,
'message': f'CPU使用率 {cpu_percent:.1f}%',
'details': {
'percent': cpu_percent,
'core_count': psutil.cpu_count(),
'load_average': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None
}
}
async def _check_network_connectivity(self) -> Dict[str, Any]:
"""检查网络连接"""
test_hosts = ['8.8.8.8', 'google.com', 'github.com']
connectivity_results = {}
for host in test_hosts:
try:
start_time = time.time()
async with aiohttp.ClientSession() as session:
async with session.get(f'http://{host}', timeout=5) as response:
response_time = time.time() - start_time
connectivity_results[host] = {
'reachable': True,
'response_time': response_time
}
except:
connectivity_results[host] = {
'reachable': False,
'response_time': None
}
reachable_count = sum(1 for result in connectivity_results.values() if result['reachable'])
healthy = reachable_count >= len(test_hosts) // 2 # 至少一半可达
return {
'healthy': healthy,
'message': f'网络连接正常 ({reachable_count}/{len(test_hosts)})',
'details': connectivity_results
}
日志分析器
python
# src/troubleshooting/log_analyzer.py
import re
import json
from typing import Dict, List, Any, Optional
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import asyncio
class LogAnalyzer:
"""日志分析器"""
def __init__(self):
self.error_patterns = {
'connection_timeout': r'connection.*timeout|timeout.*connection',
'database_error': r'database.*error|connection.*refused|deadlock',
'memory_error': r'out of memory|memory.*error|heap.*overflow',
'permission_error': r'permission.*denied|access.*denied|unauthorized',
'file_not_found': r'file.*not.*found|no such file',
'network_error': r'network.*error|connection.*reset|socket.*error',
'rate_limit': r'rate.*limit|too many requests|429',
'authentication_error': r'auth.*fail|invalid.*token|unauthorized',
'validation_error': r'validation.*error|invalid.*input|bad.*request'
}
self.log_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
async def analyze_logs(self, log_files: List[str],
time_range: Optional[timedelta] = None) -> Dict[str, Any]:
"""分析日志文件"""
print("📊 开始日志分析...")
if time_range is None:
time_range = timedelta(hours=1) # 默认分析最近1小时
cutoff_time = datetime.now() - time_range
analysis_results = {
'summary': {
'total_lines': 0,
'error_count': 0,
'warning_count': 0,
'time_range': str(time_range),
'analysis_time': datetime.now().isoformat()
},
'error_patterns': defaultdict(int),
'error_timeline': [],
'top_errors': [],
'recommendations': []
}
all_log_entries = []
# 读取所有日志文件
for log_file in log_files:
try:
entries = await self._parse_log_file(log_file, cutoff_time)
all_log_entries.extend(entries)
print(f"📄 已解析 {log_file}: {len(entries)} 条记录")
except Exception as e:
print(f"❌ 解析日志文件失败 {log_file}: {e}")
# 分析日志条目
await self._analyze_entries(all_log_entries, analysis_results)
# 生成建议
self._generate_recommendations(analysis_results)
print(f"✅ 日志分析完成,共分析 {analysis_results['summary']['total_lines']} 条记录")
return analysis_results
async def _parse_log_file(self, log_file: str,
cutoff_time: datetime) -> List[Dict[str, Any]]:
"""解析日志文件"""
entries = []
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if not line:
continue
entry = self._parse_log_line(line)
if entry and entry.get('timestamp', datetime.min) >= cutoff_time:
entries.append(entry)
return entries
def _parse_log_line(self, line: str) -> Optional[Dict[str, Any]]:
"""解析单行日志"""
# 尝试解析JSON格式日志
if line.startswith('{'):
try:
return json.loads(line)
except json.JSONDecodeError:
pass
# 解析标准格式日志
# 格式: 2024-01-26 10:30:45 [ERROR] module: message
pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s*\[(\w+)\]\s*(\w+)?\s*:?\s*(.*)'
match = re.match(pattern, line)
if match:
timestamp_str, level, module, message = match.groups()
try:
timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
return {
'timestamp': timestamp,
'level': level,
'module': module or 'unknown',
'message': message,
'raw_line': line
}
except ValueError:
pass
# 如果无法解析,返回基本信息
return {
'timestamp': datetime.now(),
'level': 'UNKNOWN',
'module': 'unknown',
'message': line,
'raw_line': line
}
async def _analyze_entries(self, entries: List[Dict[str, Any]],
results: Dict[str, Any]):
"""分析日志条目"""
results['summary']['total_lines'] = len(entries)
error_messages = []
hourly_errors = defaultdict(int)
for entry in entries:
level = entry.get('level', 'UNKNOWN')
message = entry.get('message', '')
timestamp = entry.get('timestamp', datetime.now())
# 统计错误和警告
if level in ['ERROR', 'CRITICAL']:
results['summary']['error_count'] += 1
error_messages.append(message)
# 按小时统计错误
hour_key = timestamp.strftime('%Y-%m-%d %H:00')
hourly_errors[hour_key] += 1
elif level == 'WARNING':
results['summary']['warning_count'] += 1
# 匹配错误模式
for pattern_name, pattern_regex in self.error_patterns.items():
if re.search(pattern_regex, message, re.IGNORECASE):
results['error_patterns'][pattern_name] += 1
# 生成错误时间线
results['error_timeline'] = [
{'time': hour, 'count': count}
for hour, count in sorted(hourly_errors.items())
]
# 获取最频繁的错误
error_counter = Counter(error_messages)
results['top_errors'] = [
{'message': msg, 'count': count}
for msg, count in error_counter.most_common(10)
]
def _generate_recommendations(self, results: Dict[str, Any]):
"""生成修复建议"""
recommendations = []
error_patterns = results['error_patterns']
# 基于错误模式生成建议
if error_patterns['connection_timeout'] > 0:
recommendations.append({
'issue': '连接超时',
'recommendation': '检查网络配置,增加连接超时时间,考虑使用连接池',
'priority': 'high'
})
if error_patterns['database_error'] > 0:
recommendations.append({
'issue': '数据库错误',
'recommendation': '检查数据库连接配置,优化查询性能,监控数据库资源',
'priority': 'high'
})
if error_patterns['memory_error'] > 0:
recommendations.append({
'issue': '内存错误',
'recommendation': '优化内存使用,增加服务器内存,检查内存泄漏',
'priority': 'critical'
})
if error_patterns['rate_limit'] > 0:
recommendations.append({
'issue': '频率限制',
'recommendation': '实施请求缓存,优化API调用频率,考虑升级API配额',
'priority': 'medium'
})
# 基于错误数量生成建议
error_rate = results['summary']['error_count'] / max(results['summary']['total_lines'], 1)
if error_rate > 0.1: # 错误率超过10%
recommendations.append({
'issue': '高错误率',
'recommendation': f'当前错误率 {error_rate:.1%},需要紧急调查根本原因',
'priority': 'critical'
})
results['recommendations'] = recommendations
def print_analysis_report(self, results: Dict[str, Any]):
"""打印分析报告"""
print("\n" + "="*60)
print("📊 日志分析报告")
print("="*60)
summary = results['summary']
print(f"📈 概览信息:")
print(f" 分析时间范围: {summary['time_range']}")
print(f" 总日志条数: {summary['total_lines']}")
print(f" 错误数量: {summary['error_count']}")
print(f" 警告数量: {summary['warning_count']}")
if summary['total_lines'] > 0:
error_rate = summary['error_count'] / summary['total_lines']
print(f" 错误率: {error_rate:.1%}")
# 错误模式
if results['error_patterns']:
print(f"\n🔍 错误模式分析:")
for pattern, count in sorted(results['error_patterns'].items(),
key=lambda x: x[1], reverse=True):
if count > 0:
print(f" {pattern}: {count} 次")
# 最频繁错误
if results['top_errors']:
print(f"\n❌ 最频繁错误 (Top 5):")
for i, error in enumerate(results['top_errors'][:5], 1):
print(f" {i}. ({error['count']}次) {error['message'][:80]}...")
# 修复建议
if results['recommendations']:
print(f"\n💡 修复建议:")
for rec in results['recommendations']:
priority_icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡'}.get(rec['priority'], '🔵')
print(f" {priority_icon} {rec['issue']}: {rec['recommendation']}")
🔄 自动恢复机制
自愈系统
python
# src/troubleshooting/self_healing.py
import asyncio
import time
from typing import Dict, List, Any, Callable
from enum import Enum
import logging
class RecoveryAction(Enum):
"""恢复动作类型"""
RESTART_SERVICE = "restart_service"
SCALE_UP = "scale_up"
CLEAR_CACHE = "clear_cache"
FAILOVER = "failover"
ROLLBACK = "rollback"
class SelfHealingSystem:
"""自愈系统"""
def __init__(self):
self.recovery_strategies = {
'high_error_rate': [
RecoveryAction.CLEAR_CACHE,
RecoveryAction.RESTART_SERVICE,
RecoveryAction.SCALE_UP
],
'high_latency': [
RecoveryAction.SCALE_UP,
RecoveryAction.CLEAR_CACHE
],
'service_unavailable': [
RecoveryAction.RESTART_SERVICE,
RecoveryAction.FAILOVER
],
'database_connection_error': [
RecoveryAction.RESTART_SERVICE,
RecoveryAction.FAILOVER
],
'memory_exhaustion': [
RecoveryAction.RESTART_SERVICE,
RecoveryAction.SCALE_UP
]
}
self.recovery_actions = {
RecoveryAction.RESTART_SERVICE: self._restart_service,
RecoveryAction.SCALE_UP: self._scale_up_service,
RecoveryAction.CLEAR_CACHE: self._clear_cache,
RecoveryAction.FAILOVER: self._perform_failover,
RecoveryAction.ROLLBACK: self._rollback_deployment
}
self.recovery_history = []
self.cooldown_period = 300 # 5分钟冷却期
async def trigger_recovery(self, issue_type: str, context: Dict[str, Any]) -> bool:
"""触发自动恢复"""
print(f"🔄 触发自动恢复: {issue_type}")
# 检查冷却期
if self._is_in_cooldown(issue_type):
print(f"⏳ 恢复动作在冷却期内,跳过: {issue_type}")
return False
# 获取恢复策略
strategies = self.recovery_strategies.get(issue_type, [])
if not strategies:
print(f"❌ 未找到恢复策略: {issue_type}")
return False
# 依次尝试恢复动作
for action in strategies:
try:
print(f"🛠️ 执行恢复动作: {action.value}")
recovery_func = self.recovery_actions[action]
success = await recovery_func(context)
# 记录恢复历史
self.recovery_history.append({
'timestamp': time.time(),
'issue_type': issue_type,
'action': action.value,
'success': success,
'context': context
})
if success:
print(f"✅ 恢复成功: {action.value}")
return True
else:
print(f"❌ 恢复失败: {action.value}")
except Exception as e:
print(f"❌ 恢复动作异常 {action.value}: {e}")
self.recovery_history.append({
'timestamp': time.time(),
'issue_type': issue_type,
'action': action.value,
'success': False,
'error': str(e),
'context': context
})
print(f"❌ 所有恢复动作均失败: {issue_type}")
return False
def _is_in_cooldown(self, issue_type: str) -> bool:
"""检查是否在冷却期内"""
current_time = time.time()
recent_recoveries = [
r for r in self.recovery_history[-10:] # 检查最近10次恢复
if r['issue_type'] == issue_type and
current_time - r['timestamp'] < self.cooldown_period
]
return len(recent_recoveries) > 0
async def _restart_service(self, context: Dict[str, Any]) -> bool:
"""重启服务"""
service_name = context.get('service_name', 'mcp-server')
try:
# 模拟重启服务(实际环境中调用K8s API或Docker API)
print(f"🔄 重启服务: {service_name}")
await asyncio.sleep(5) # 模拟重启时间
# 验证服务是否正常启动
health_check = await self._verify_service_health(service_name)
return health_check
except Exception as e:
print(f"❌ 重启服务失败: {e}")
return False
async def _scale_up_service(self, context: Dict[str, Any]) -> bool:
"""扩容服务"""
service_name = context.get('service_name', 'mcp-server')
current_replicas = context.get('current_replicas', 3)
target_replicas = min(current_replicas * 2, 10) # 最多扩容到10个实例
try:
print(f"📈 扩容服务 {service_name}: {current_replicas} -> {target_replicas}")
await asyncio.sleep(10) # 模拟扩容时间
# 验证扩容是否成功
return target_replicas > current_replicas
except Exception as e:
print(f"❌ 扩容服务失败: {e}")
return False
async def _clear_cache(self, context: Dict[str, Any]) -> bool:
"""清理缓存"""
cache_type = context.get('cache_type', 'redis')
try:
print(f"🧹 清理缓存: {cache_type}")
if cache_type == 'redis':
# 模拟清理Redis缓存
await asyncio.sleep(2)
elif cache_type == 'memory':
# 模拟清理内存缓存
await asyncio.sleep(1)
return True
except Exception as e:
print(f"❌ 清理缓存失败: {e}")
return False
async def _perform_failover(self, context: Dict[str, Any]) -> bool:
"""执行故障转移"""
primary_service = context.get('primary_service')
backup_service = context.get('backup_service')
try:
print(f"🔄 执行故障转移: {primary_service} -> {backup_service}")
# 模拟故障转移过程
await asyncio.sleep(8)
# 验证备用服务是否正常
health_check = await self._verify_service_health(backup_service)
return health_check
except Exception as e:
print(f"❌ 故障转移失败: {e}")
return False
async def _rollback_deployment(self, context: Dict[str, Any]) -> bool:
"""回滚部署"""
service_name = context.get('service_name')
previous_version = context.get('previous_version')
try:
print(f"⏪ 回滚部署 {service_name} 到版本: {previous_version}")
# 模拟回滚过程
await asyncio.sleep(15)
# 验证回滚是否成功
health_check = await self._verify_service_health(service_name)
return health_check
except Exception as e:
print(f"❌ 回滚部署失败: {e}")
return False
async def _verify_service_health(self, service_name: str) -> bool:
"""验证服务健康状态"""
try:
# 模拟健康检查
print(f"🔍 验证服务健康状态: {service_name}")
await asyncio.sleep(3)
# 在实际环境中,这里会发送HTTP请求到健康检查端点
return True # 模拟健康状态正常
except Exception as e:
print(f"❌ 健康检查失败: {e}")
return False
def get_recovery_stats(self) -> Dict[str, Any]:
"""获取恢复统计"""
if not self.recovery_history:
return {'total_recoveries': 0}
recent_history = [
r for r in self.recovery_history
if time.time() - r['timestamp'] < 86400 # 最近24小时
]
success_count = sum(1 for r in recent_history if r['success'])
total_count = len(recent_history)
return {
'total_recoveries': total_count,
'successful_recoveries': success_count,
'success_rate': success_count / total_count if total_count > 0 else 0,
'recent_history': recent_history[-10:] # 最近10次记录
}
📋 运维手册模板
故障处理标准流程
markdown
# MCP服务器故障处理手册
## 🚨 应急响应流程
### 1. 故障接收 (0-5分钟)
- [ ] 确认告警信息和故障级别
- [ ] 通知相关人员(根据故障级别)
- [ ] 创建故障记录单
### 2. 初步评估 (5-15分钟)
- [ ] 检查系统整体状态
- [ ] 确定影响范围
- [ ] 评估业务影响
- [ ] 确定是否需要立即缓解措施
### 3. 故障缓解 (15-30分钟)
- [ ] 执行应急措施降低影响
- [ ] 尝试自动/手动恢复
- [ ] 更新故障状态
- [ ] 通知相关方进展
### 4. 根因分析 (30分钟-2小时)
- [ ] 收集相关日志和指标
- [ ] 分析故障原因
- [ ] 确定根本原因
- [ ] 制定永久修复方案
### 5. 永久修复 (时间因情况而定)
- [ ] 实施修复方案
- [ ] 验证修复效果
- [ ] 更新监控和告警
- [ ] 更新文档
### 6. 故障复盘 (故障解决后24小时内)
- [ ] 组织复盘会议
- [ ] 分析故障原因和处理过程
- [ ] 识别改进机会
- [ ] 制定预防措施
- [ ] 更新流程和文档
常见故障处理指南
python
# src/troubleshooting/playbook.py
class TroubleshootingPlaybook:
"""故障处理手册"""
def __init__(self):
self.playbooks = {
'service_down': self._handle_service_down,
'high_latency': self._handle_high_latency,
'database_issues': self._handle_database_issues,
'memory_leak': self._handle_memory_leak,
'disk_full': self._handle_disk_full,
'network_issues': self._handle_network_issues
}
def get_troubleshooting_steps(self, issue_type: str) -> List[str]:
"""获取故障处理步骤"""
handler = self.playbooks.get(issue_type)
if handler:
return handler()
else:
return ["未找到对应的故障处理指南"]
def _handle_service_down(self) -> List[str]:
"""服务不可用处理步骤"""
return [
"1. 检查服务状态: kubectl get pods -n production",
"2. 查看Pod日志: kubectl logs <pod-name> -n production",
"3. 检查服务配置: kubectl describe service mcp-server",
"4. 验证健康检查端点: curl http://service/health",
"5. 重启服务: kubectl rollout restart deployment/mcp-server",
"6. 监控服务恢复: kubectl get pods -w",
"7. 验证服务功能: 执行端到端测试",
"8. 更新故障状态并通知相关方"
]
def _handle_high_latency(self) -> List[str]:
"""高延迟处理步骤"""
return [
"1. 检查系统资源使用情况",
"2. 分析慢查询日志",
"3. 检查缓存命中率",
"4. 监控网络延迟",
"5. 检查数据库连接池状态",
"6. 分析应用程序性能指标",
"7. 考虑临时扩容",
"8. 优化慢查询或增加缓存"
]
def _handle_database_issues(self) -> List[str]:
"""数据库问题处理步骤"""
return [
"1. 检查数据库连接状态",
"2. 查看数据库错误日志",
"3. 检查数据库资源使用",
"4. 分析慢查询和锁等待",
"5. 检查数据库配置",
"6. 考虑读写分离或主从切换",
"7. 优化数据库查询",
"8. 重启数据库服务(最后手段)"
]
def _handle_memory_leak(self) -> List[str]:
"""内存泄漏处理步骤"""
return [
"1. 监控内存使用趋势",
"2. 生成内存dump文件",
"3. 分析内存使用模式",
"4. 检查应用程序内存管理",
"5. 临时重启服务释放内存",
"6. 调整JVM堆大小(如适用)",
"7. 代码审查和内存优化",
"8. 增强内存监控告警"
]
def _handle_disk_full(self) -> List[str]:
"""磁盘空间不足处理步骤"""
return [
"1. 检查磁盘使用情况: df -h",
"2. 找出占用空间最大的文件: du -sh /* | sort -hr",
"3. 清理日志文件和临时文件",
"4. 压缩或归档旧数据",
"5. 清理Docker镜像和容器: docker system prune",
"6. 扩展磁盘空间",
"7. 配置日志轮转和自动清理",
"8. 设置磁盘使用监控告警"
]
def _handle_network_issues(self) -> List[str]:
"""网络问题处理步骤"""
return [
"1. 检查网络连通性: ping, telnet, curl",
"2. 检查DNS解析: nslookup, dig",
"3. 监控网络延迟和丢包率",
"4. 检查防火墙和安全组配置",
"5. 检查负载均衡器状态",
"6. 验证SSL证书有效性",
"7. 重启网络服务",
"8. 联系网络团队或云服务商"
]
🎯 最佳实践总结
1. 预防优于治疗
- 实施全面的监控和告警
- 定期进行健康检查和维护
- 建立预防性的运维流程
2. 快速响应
- 建立明确的故障分级制度
- 制定标准化的应急响应流程
- 培训团队快速诊断能力
3. 持续改进
- 定期进行故障复盘
- 不断优化监控和告警
- 更新故障处理手册
4. 团队协作
- 建立高效的沟通机制
- 明确角色和责任分工
- 定期进行应急演练
🚀 下一步
完成故障排除与维护后,你已经掌握了生产级MCP服务器的完整运维技能!现在可以:
- 学习最佳实践指南 → 第8章:最佳实践
- 查看案例研究 → 第9章:案例研究
- 探索AI工作流 → 第10章:AI工作流
📚 扩展阅读: