Skip to content

7.5 故障排除与维护

🔧 核心目标:建立完善的故障排除体系和预防性维护机制,确保系统稳定运行
⏱️ 预计时长:50分钟
📊 难度级别:⭐⭐⭐⭐⭐

🎯 学习目标

通过本节学习,你将掌握:

  • 系统故障诊断和根因分析
  • 应急响应流程和回滚策略
  • 预防性维护和健康检查
  • 故障预警和自动恢复机制
  • 运维文档和团队培训

🔍 故障诊断体系

故障诊断流程图

故障分级标准

python
# src/troubleshooting/incident_classifier.py
from enum import Enum
from typing import Dict, List, Any
from dataclasses import dataclass
import time
import logging

class IncidentSeverity(Enum):
    """故障严重程度"""
    P0_CRITICAL = "P0"  # 系统完全不可用
    P1_HIGH = "P1"      # 核心功能受影响
    P2_MEDIUM = "P2"    # 部分功能受影响
    P3_LOW = "P3"       # 性能下降或非核心功能问题

@dataclass
class IncidentMetrics:
    """故障指标"""
    error_rate: float
    response_time: float
    availability: float
    affected_users: int
    business_impact: str

class IncidentClassifier:
    """故障分类器"""
    
    def __init__(self):
        self.classification_rules = {
            IncidentSeverity.P0_CRITICAL: {
                'error_rate': lambda x: x > 50,
                'response_time': lambda x: x > 30000,  # 30秒
                'availability': lambda x: x < 90,
                'affected_users': lambda x: x > 1000,
                'keywords': ['down', 'outage', 'critical', '5xx']
            },
            IncidentSeverity.P1_HIGH: {
                'error_rate': lambda x: x > 20,
                'response_time': lambda x: x > 10000,  # 10秒
                'availability': lambda x: x < 95,
                'affected_users': lambda x: x > 100,
                'keywords': ['error', 'timeout', 'failure']
            },
            IncidentSeverity.P2_MEDIUM: {
                'error_rate': lambda x: x > 5,
                'response_time': lambda x: x > 5000,   # 5秒
                'availability': lambda x: x < 98,
                'affected_users': lambda x: x > 10,
                'keywords': ['slow', 'degraded', 'warning']
            },
            IncidentSeverity.P3_LOW: {
                'error_rate': lambda x: x > 1,
                'response_time': lambda x: x > 2000,   # 2秒
                'availability': lambda x: x < 99.5,
                'affected_users': lambda x: x > 0,
                'keywords': ['minor', 'performance', 'optimization']
            }
        }
    
    def classify_incident(self, metrics: IncidentMetrics, 
                         description: str = "") -> IncidentSeverity:
        """分类故障严重程度"""
        
        # 从P0开始检查(最严重)
        for severity, rules in self.classification_rules.items():
            score = 0
            total_rules = 0
            
            # 检查数值规则
            for metric_name, rule_func in rules.items():
                if metric_name == 'keywords':
                    continue
                
                metric_value = getattr(metrics, metric_name)
                if rule_func(metric_value):
                    score += 1
                total_rules += 1
            
            # 检查关键字
            keywords = rules.get('keywords', [])
            if any(keyword in description.lower() for keyword in keywords):
                score += 1
                total_rules += 1
            
            # 如果满足50%以上的规则,归类为该级别
            if score / total_rules >= 0.5:
                return severity
        
        return IncidentSeverity.P3_LOW

# 故障响应协调器
class IncidentCoordinator:
    """故障响应协调器"""
    
    def __init__(self):
        self.active_incidents = {}
        self.response_teams = {
            IncidentSeverity.P0_CRITICAL: ["on-call-engineer", "engineering-lead", "product-manager"],
            IncidentSeverity.P1_HIGH: ["on-call-engineer", "team-lead"],
            IncidentSeverity.P2_MEDIUM: ["on-call-engineer"],
            IncidentSeverity.P3_LOW: ["engineer"]
        }
        
        self.response_sla = {
            IncidentSeverity.P0_CRITICAL: 5,   # 5分钟
            IncidentSeverity.P1_HIGH: 15,      # 15分钟
            IncidentSeverity.P2_MEDIUM: 60,    # 1小时
            IncidentSeverity.P3_LOW: 240       # 4小时
        }
    
    async def handle_incident(self, incident_id: str, metrics: IncidentMetrics, 
                            description: str) -> Dict[str, Any]:
        """处理故障"""
        classifier = IncidentClassifier()
        severity = classifier.classify_incident(metrics, description)
        
        incident = {
            'id': incident_id,
            'severity': severity,
            'metrics': metrics,
            'description': description,
            'start_time': time.time(),
            'status': 'investigating',
            'assigned_team': self.response_teams[severity],
            'sla_deadline': time.time() + self.response_sla[severity] * 60
        }
        
        self.active_incidents[incident_id] = incident
        
        # 发送告警通知
        await self._send_alert(incident)
        
        # 启动自动恢复流程
        await self._initiate_auto_recovery(incident)
        
        return incident
    
    async def _send_alert(self, incident: Dict[str, Any]):
        """发送告警通知"""
        severity = incident['severity']
        team = incident['assigned_team']
        
        message = f"""
🚨 故障告警 - {severity.value}
-----------------------
ID: {incident['id']}
严重程度: {severity.value}
描述: {incident['description']}
影响用户: {incident['metrics'].affected_users}
错误率: {incident['metrics'].error_rate:.1f}%
可用性: {incident['metrics'].availability:.1f}%

响应团队: {', '.join(team)}
SLA截止: {time.ctime(incident['sla_deadline'])}
"""
        
        # 这里集成实际的通知系统(Slack, PagerDuty等)
        logging.critical(message)
        print(message)
    
    async def _initiate_auto_recovery(self, incident: Dict[str, Any]):
        """启动自动恢复"""
        severity = incident['severity']
        
        if severity == IncidentSeverity.P0_CRITICAL:
            # P0故障:立即尝试自动恢复
            await self._attempt_auto_recovery(incident)
        elif severity == IncidentSeverity.P1_HIGH:
            # P1故障:延迟5分钟后尝试恢复
            import asyncio
            await asyncio.sleep(300)
            await self._attempt_auto_recovery(incident)
    
    async def _attempt_auto_recovery(self, incident: Dict[str, Any]):
        """尝试自动恢复"""
        print(f"🔄 尝试自动恢复故障: {incident['id']}")
        
        recovery_actions = [
            self._restart_unhealthy_pods,
            self._scale_up_resources,
            self._switch_to_backup_database,
            self._clear_cache,
            self._restart_load_balancer
        ]
        
        for action in recovery_actions:
            try:
                success = await action(incident)
                if success:
                    incident['status'] = 'auto_recovered'
                    print(f"✅ 自动恢复成功: {action.__name__}")
                    break
            except Exception as e:
                print(f"❌ 自动恢复失败 {action.__name__}: {e}")
        
        if incident['status'] != 'auto_recovered':
            print("⚠️ 自动恢复失败,需要人工介入")
    
    async def _restart_unhealthy_pods(self, incident: Dict[str, Any]) -> bool:
        """重启不健康的Pod"""
        # 模拟K8s Pod重启
        print("🔄 重启不健康的Pod...")
        await asyncio.sleep(2)
        return True
    
    async def _scale_up_resources(self, incident: Dict[str, Any]) -> bool:
        """扩容资源"""
        print("📈 自动扩容资源...")
        await asyncio.sleep(3)
        return True
    
    async def _switch_to_backup_database(self, incident: Dict[str, Any]) -> bool:
        """切换到备用数据库"""
        print("🔄 切换到备用数据库...")
        await asyncio.sleep(5)
        return True
    
    async def _clear_cache(self, incident: Dict[str, Any]) -> bool:
        """清理缓存"""
        print("🧹 清理缓存...")
        await asyncio.sleep(1)
        return True
    
    async def _restart_load_balancer(self, incident: Dict[str, Any]) -> bool:
        """重启负载均衡器"""
        print("🔄 重启负载均衡器...")
        await asyncio.sleep(4)
        return False  # 模拟失败

🔧 诊断工具集

系统健康检查器

python
# src/troubleshooting/health_checker.py
import asyncio
import aiohttp
import psutil
import asyncpg
from typing import Dict, List, Any, Optional
import time
import json

class HealthChecker:
    """系统健康检查器"""
    
    def __init__(self, config: Dict[str, Any]):
        self.config = config
        self.checks = {
            'application': self._check_application_health,
            'database': self._check_database_health,
            'redis': self._check_redis_health,
            'external_api': self._check_external_apis,
            'disk_space': self._check_disk_space,
            'memory_usage': self._check_memory_usage,
            'cpu_usage': self._check_cpu_usage,
            'network': self._check_network_connectivity
        }
    
    async def run_all_checks(self) -> Dict[str, Any]:
        """运行所有健康检查"""
        print("🔍 开始系统健康检查...")
        start_time = time.time()
        
        results = {}
        for check_name, check_func in self.checks.items():
            try:
                result = await check_func()
                results[check_name] = result
                status = "✅" if result['healthy'] else "❌"
                print(f"{status} {check_name}: {result['message']}")
            except Exception as e:
                results[check_name] = {
                    'healthy': False,
                    'message': f"检查失败: {str(e)}",
                    'details': {}
                }
                print(f"❌ {check_name}: 检查失败 - {e}")
        
        # 生成整体健康报告
        overall_health = all(result['healthy'] for result in results.values())
        total_time = time.time() - start_time
        
        health_report = {
            'overall_healthy': overall_health,
            'check_duration': total_time,
            'timestamp': time.time(),
            'details': results
        }
        
        print(f"\n📊 健康检查完成 ({total_time:.2f}s)")
        print(f"整体状态: {'✅ 健康' if overall_health else '❌ 异常'}")
        
        return health_report
    
    async def _check_application_health(self) -> Dict[str, Any]:
        """检查应用程序健康状态"""
        health_endpoint = self.config.get('health_endpoint', 'http://localhost:8000/health')
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(health_endpoint, timeout=5) as response:
                    if response.status == 200:
                        data = await response.json()
                        return {
                            'healthy': True,
                            'message': '应用程序运行正常',
                            'details': {
                                'status_code': response.status,
                                'response_time': response.headers.get('X-Response-Time'),
                                'version': data.get('version'),
                                'uptime': data.get('uptime')
                            }
                        }
                    else:
                        return {
                            'healthy': False,
                            'message': f'应用程序响应异常: HTTP {response.status}',
                            'details': {'status_code': response.status}
                        }
        except Exception as e:
            return {
                'healthy': False,
                'message': f'无法连接到应用程序: {str(e)}',
                'details': {}
            }
    
    async def _check_database_health(self) -> Dict[str, Any]:
        """检查数据库健康状态"""
        db_url = self.config.get('database_url')
        if not db_url:
            return {
                'healthy': True,
                'message': '未配置数据库',
                'details': {}
            }
        
        try:
            conn = await asyncpg.connect(db_url)
            
            # 测试查询
            start_time = time.time()
            result = await conn.fetchval('SELECT 1')
            query_time = time.time() - start_time
            
            # 获取连接数
            connections = await conn.fetchval(
                'SELECT count(*) FROM pg_stat_activity'
            )
            
            await conn.close()
            
            return {
                'healthy': True,
                'message': '数据库连接正常',
                'details': {
                    'query_time': query_time,
                    'active_connections': connections,
                    'test_result': result
                }
            }
        except Exception as e:
            return {
                'healthy': False,
                'message': f'数据库连接失败: {str(e)}',
                'details': {}
            }
    
    async def _check_redis_health(self) -> Dict[str, Any]:
        """检查Redis健康状态"""
        redis_url = self.config.get('redis_url')
        if not redis_url:
            return {
                'healthy': True,
                'message': '未配置Redis',
                'details': {}
            }
        
        try:
            import aioredis
            redis = aioredis.from_url(redis_url)
            
            # 测试连接
            start_time = time.time()
            pong = await redis.ping()
            ping_time = time.time() - start_time
            
            # 获取信息
            info = await redis.info()
            
            await redis.close()
            
            return {
                'healthy': pong,
                'message': 'Redis连接正常' if pong else 'Redis连接失败',
                'details': {
                    'ping_time': ping_time,
                    'used_memory': info.get('used_memory_human'),
                    'connected_clients': info.get('connected_clients'),
                    'version': info.get('redis_version')
                }
            }
        except Exception as e:
            return {
                'healthy': False,
                'message': f'Redis连接失败: {str(e)}',
                'details': {}
            }
    
    async def _check_external_apis(self) -> Dict[str, Any]:
        """检查外部API健康状态"""
        external_apis = self.config.get('external_apis', [])
        if not external_apis:
            return {
                'healthy': True,
                'message': '未配置外部API',
                'details': {}
            }
        
        api_results = {}
        all_healthy = True
        
        async with aiohttp.ClientSession() as session:
            for api in external_apis:
                try:
                    start_time = time.time()
                    async with session.get(api['url'], timeout=10) as response:
                        response_time = time.time() - start_time
                        
                        api_results[api['name']] = {
                            'healthy': response.status < 400,
                            'status_code': response.status,
                            'response_time': response_time
                        }
                        
                        if response.status >= 400:
                            all_healthy = False
                            
                except Exception as e:
                    api_results[api['name']] = {
                        'healthy': False,
                        'error': str(e),
                        'response_time': None
                    }
                    all_healthy = False
        
        return {
            'healthy': all_healthy,
            'message': '外部API检查完成',
            'details': api_results
        }
    
    async def _check_disk_space(self) -> Dict[str, Any]:
        """检查磁盘空间"""
        disk_usage = psutil.disk_usage('/')
        free_percent = (disk_usage.free / disk_usage.total) * 100
        
        healthy = free_percent > 10  # 剩余空间大于10%
        
        return {
            'healthy': healthy,
            'message': f'磁盘空间剩余 {free_percent:.1f}%',
            'details': {
                'total_gb': disk_usage.total / (1024**3),
                'used_gb': disk_usage.used / (1024**3),
                'free_gb': disk_usage.free / (1024**3),
                'free_percent': free_percent
            }
        }
    
    async def _check_memory_usage(self) -> Dict[str, Any]:
        """检查内存使用"""
        memory = psutil.virtual_memory()
        healthy = memory.percent < 85  # 使用率小于85%
        
        return {
            'healthy': healthy,
            'message': f'内存使用率 {memory.percent:.1f}%',
            'details': {
                'total_gb': memory.total / (1024**3),
                'used_gb': memory.used / (1024**3),
                'available_gb': memory.available / (1024**3),
                'percent': memory.percent
            }
        }
    
    async def _check_cpu_usage(self) -> Dict[str, Any]:
        """检查CPU使用"""
        # 获取5秒平均CPU使用率
        cpu_percent = psutil.cpu_percent(interval=5)
        healthy = cpu_percent < 80  # 使用率小于80%
        
        return {
            'healthy': healthy,
            'message': f'CPU使用率 {cpu_percent:.1f}%',
            'details': {
                'percent': cpu_percent,
                'core_count': psutil.cpu_count(),
                'load_average': psutil.getloadavg() if hasattr(psutil, 'getloadavg') else None
            }
        }
    
    async def _check_network_connectivity(self) -> Dict[str, Any]:
        """检查网络连接"""
        test_hosts = ['8.8.8.8', 'google.com', 'github.com']
        connectivity_results = {}
        
        for host in test_hosts:
            try:
                start_time = time.time()
                async with aiohttp.ClientSession() as session:
                    async with session.get(f'http://{host}', timeout=5) as response:
                        response_time = time.time() - start_time
                        connectivity_results[host] = {
                            'reachable': True,
                            'response_time': response_time
                        }
            except:
                connectivity_results[host] = {
                    'reachable': False,
                    'response_time': None
                }
        
        reachable_count = sum(1 for result in connectivity_results.values() if result['reachable'])
        healthy = reachable_count >= len(test_hosts) // 2  # 至少一半可达
        
        return {
            'healthy': healthy,
            'message': f'网络连接正常 ({reachable_count}/{len(test_hosts)})',
            'details': connectivity_results
        }

日志分析器

python
# src/troubleshooting/log_analyzer.py
import re
import json
from typing import Dict, List, Any, Optional
from collections import defaultdict, Counter
from datetime import datetime, timedelta
import asyncio

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self):
        self.error_patterns = {
            'connection_timeout': r'connection.*timeout|timeout.*connection',
            'database_error': r'database.*error|connection.*refused|deadlock',
            'memory_error': r'out of memory|memory.*error|heap.*overflow',
            'permission_error': r'permission.*denied|access.*denied|unauthorized',
            'file_not_found': r'file.*not.*found|no such file',
            'network_error': r'network.*error|connection.*reset|socket.*error',
            'rate_limit': r'rate.*limit|too many requests|429',
            'authentication_error': r'auth.*fail|invalid.*token|unauthorized',
            'validation_error': r'validation.*error|invalid.*input|bad.*request'
        }
        
        self.log_levels = ['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
    
    async def analyze_logs(self, log_files: List[str], 
                          time_range: Optional[timedelta] = None) -> Dict[str, Any]:
        """分析日志文件"""
        print("📊 开始日志分析...")
        
        if time_range is None:
            time_range = timedelta(hours=1)  # 默认分析最近1小时
        
        cutoff_time = datetime.now() - time_range
        
        analysis_results = {
            'summary': {
                'total_lines': 0,
                'error_count': 0,
                'warning_count': 0,
                'time_range': str(time_range),
                'analysis_time': datetime.now().isoformat()
            },
            'error_patterns': defaultdict(int),
            'error_timeline': [],
            'top_errors': [],
            'recommendations': []
        }
        
        all_log_entries = []
        
        # 读取所有日志文件
        for log_file in log_files:
            try:
                entries = await self._parse_log_file(log_file, cutoff_time)
                all_log_entries.extend(entries)
                print(f"📄 已解析 {log_file}: {len(entries)} 条记录")
            except Exception as e:
                print(f"❌ 解析日志文件失败 {log_file}: {e}")
        
        # 分析日志条目
        await self._analyze_entries(all_log_entries, analysis_results)
        
        # 生成建议
        self._generate_recommendations(analysis_results)
        
        print(f"✅ 日志分析完成,共分析 {analysis_results['summary']['total_lines']} 条记录")
        return analysis_results
    
    async def _parse_log_file(self, log_file: str, 
                            cutoff_time: datetime) -> List[Dict[str, Any]]:
        """解析日志文件"""
        entries = []
        
        with open(log_file, 'r', encoding='utf-8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                
                entry = self._parse_log_line(line)
                if entry and entry.get('timestamp', datetime.min) >= cutoff_time:
                    entries.append(entry)
        
        return entries
    
    def _parse_log_line(self, line: str) -> Optional[Dict[str, Any]]:
        """解析单行日志"""
        # 尝试解析JSON格式日志
        if line.startswith('{'):
            try:
                return json.loads(line)
            except json.JSONDecodeError:
                pass
        
        # 解析标准格式日志
        # 格式: 2024-01-26 10:30:45 [ERROR] module: message
        pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\s*\[(\w+)\]\s*(\w+)?\s*:?\s*(.*)'
        match = re.match(pattern, line)
        
        if match:
            timestamp_str, level, module, message = match.groups()
            try:
                timestamp = datetime.strptime(timestamp_str, '%Y-%m-%d %H:%M:%S')
                return {
                    'timestamp': timestamp,
                    'level': level,
                    'module': module or 'unknown',
                    'message': message,
                    'raw_line': line
                }
            except ValueError:
                pass
        
        # 如果无法解析,返回基本信息
        return {
            'timestamp': datetime.now(),
            'level': 'UNKNOWN',
            'module': 'unknown',
            'message': line,
            'raw_line': line
        }
    
    async def _analyze_entries(self, entries: List[Dict[str, Any]], 
                             results: Dict[str, Any]):
        """分析日志条目"""
        results['summary']['total_lines'] = len(entries)
        
        error_messages = []
        hourly_errors = defaultdict(int)
        
        for entry in entries:
            level = entry.get('level', 'UNKNOWN')
            message = entry.get('message', '')
            timestamp = entry.get('timestamp', datetime.now())
            
            # 统计错误和警告
            if level in ['ERROR', 'CRITICAL']:
                results['summary']['error_count'] += 1
                error_messages.append(message)
                
                # 按小时统计错误
                hour_key = timestamp.strftime('%Y-%m-%d %H:00')
                hourly_errors[hour_key] += 1
                
            elif level == 'WARNING':
                results['summary']['warning_count'] += 1
            
            # 匹配错误模式
            for pattern_name, pattern_regex in self.error_patterns.items():
                if re.search(pattern_regex, message, re.IGNORECASE):
                    results['error_patterns'][pattern_name] += 1
        
        # 生成错误时间线
        results['error_timeline'] = [
            {'time': hour, 'count': count} 
            for hour, count in sorted(hourly_errors.items())
        ]
        
        # 获取最频繁的错误
        error_counter = Counter(error_messages)
        results['top_errors'] = [
            {'message': msg, 'count': count}
            for msg, count in error_counter.most_common(10)
        ]
    
    def _generate_recommendations(self, results: Dict[str, Any]):
        """生成修复建议"""
        recommendations = []
        error_patterns = results['error_patterns']
        
        # 基于错误模式生成建议
        if error_patterns['connection_timeout'] > 0:
            recommendations.append({
                'issue': '连接超时',
                'recommendation': '检查网络配置,增加连接超时时间,考虑使用连接池',
                'priority': 'high'
            })
        
        if error_patterns['database_error'] > 0:
            recommendations.append({
                'issue': '数据库错误',
                'recommendation': '检查数据库连接配置,优化查询性能,监控数据库资源',
                'priority': 'high'
            })
        
        if error_patterns['memory_error'] > 0:
            recommendations.append({
                'issue': '内存错误',
                'recommendation': '优化内存使用,增加服务器内存,检查内存泄漏',
                'priority': 'critical'
            })
        
        if error_patterns['rate_limit'] > 0:
            recommendations.append({
                'issue': '频率限制',
                'recommendation': '实施请求缓存,优化API调用频率,考虑升级API配额',
                'priority': 'medium'
            })
        
        # 基于错误数量生成建议
        error_rate = results['summary']['error_count'] / max(results['summary']['total_lines'], 1)
        if error_rate > 0.1:  # 错误率超过10%
            recommendations.append({
                'issue': '高错误率',
                'recommendation': f'当前错误率 {error_rate:.1%},需要紧急调查根本原因',
                'priority': 'critical'
            })
        
        results['recommendations'] = recommendations
    
    def print_analysis_report(self, results: Dict[str, Any]):
        """打印分析报告"""
        print("\n" + "="*60)
        print("📊 日志分析报告")
        print("="*60)
        
        summary = results['summary']
        print(f"📈 概览信息:")
        print(f"   分析时间范围: {summary['time_range']}")
        print(f"   总日志条数: {summary['total_lines']}")
        print(f"   错误数量: {summary['error_count']}")
        print(f"   警告数量: {summary['warning_count']}")
        
        if summary['total_lines'] > 0:
            error_rate = summary['error_count'] / summary['total_lines']
            print(f"   错误率: {error_rate:.1%}")
        
        # 错误模式
        if results['error_patterns']:
            print(f"\n🔍 错误模式分析:")
            for pattern, count in sorted(results['error_patterns'].items(), 
                                       key=lambda x: x[1], reverse=True):
                if count > 0:
                    print(f"   {pattern}: {count} 次")
        
        # 最频繁错误
        if results['top_errors']:
            print(f"\n❌ 最频繁错误 (Top 5):")
            for i, error in enumerate(results['top_errors'][:5], 1):
                print(f"   {i}. ({error['count']}次) {error['message'][:80]}...")
        
        # 修复建议
        if results['recommendations']:
            print(f"\n💡 修复建议:")
            for rec in results['recommendations']:
                priority_icon = {'critical': '🔴', 'high': '🟠', 'medium': '🟡'}.get(rec['priority'], '🔵')
                print(f"   {priority_icon} {rec['issue']}: {rec['recommendation']}")

🔄 自动恢复机制

自愈系统

python
# src/troubleshooting/self_healing.py
import asyncio
import time
from typing import Dict, List, Any, Callable
from enum import Enum
import logging

class RecoveryAction(Enum):
    """恢复动作类型"""
    RESTART_SERVICE = "restart_service"
    SCALE_UP = "scale_up"
    CLEAR_CACHE = "clear_cache"
    FAILOVER = "failover"
    ROLLBACK = "rollback"

class SelfHealingSystem:
    """自愈系统"""
    
    def __init__(self):
        self.recovery_strategies = {
            'high_error_rate': [
                RecoveryAction.CLEAR_CACHE,
                RecoveryAction.RESTART_SERVICE,
                RecoveryAction.SCALE_UP
            ],
            'high_latency': [
                RecoveryAction.SCALE_UP,
                RecoveryAction.CLEAR_CACHE
            ],
            'service_unavailable': [
                RecoveryAction.RESTART_SERVICE,
                RecoveryAction.FAILOVER
            ],
            'database_connection_error': [
                RecoveryAction.RESTART_SERVICE,
                RecoveryAction.FAILOVER
            ],
            'memory_exhaustion': [
                RecoveryAction.RESTART_SERVICE,
                RecoveryAction.SCALE_UP
            ]
        }
        
        self.recovery_actions = {
            RecoveryAction.RESTART_SERVICE: self._restart_service,
            RecoveryAction.SCALE_UP: self._scale_up_service,
            RecoveryAction.CLEAR_CACHE: self._clear_cache,
            RecoveryAction.FAILOVER: self._perform_failover,
            RecoveryAction.ROLLBACK: self._rollback_deployment
        }
        
        self.recovery_history = []
        self.cooldown_period = 300  # 5分钟冷却期
    
    async def trigger_recovery(self, issue_type: str, context: Dict[str, Any]) -> bool:
        """触发自动恢复"""
        print(f"🔄 触发自动恢复: {issue_type}")
        
        # 检查冷却期
        if self._is_in_cooldown(issue_type):
            print(f"⏳ 恢复动作在冷却期内,跳过: {issue_type}")
            return False
        
        # 获取恢复策略
        strategies = self.recovery_strategies.get(issue_type, [])
        if not strategies:
            print(f"❌ 未找到恢复策略: {issue_type}")
            return False
        
        # 依次尝试恢复动作
        for action in strategies:
            try:
                print(f"🛠️ 执行恢复动作: {action.value}")
                recovery_func = self.recovery_actions[action]
                success = await recovery_func(context)
                
                # 记录恢复历史
                self.recovery_history.append({
                    'timestamp': time.time(),
                    'issue_type': issue_type,
                    'action': action.value,
                    'success': success,
                    'context': context
                })
                
                if success:
                    print(f"✅ 恢复成功: {action.value}")
                    return True
                else:
                    print(f"❌ 恢复失败: {action.value}")
                    
            except Exception as e:
                print(f"❌ 恢复动作异常 {action.value}: {e}")
                self.recovery_history.append({
                    'timestamp': time.time(),
                    'issue_type': issue_type,
                    'action': action.value,
                    'success': False,
                    'error': str(e),
                    'context': context
                })
        
        print(f"❌ 所有恢复动作均失败: {issue_type}")
        return False
    
    def _is_in_cooldown(self, issue_type: str) -> bool:
        """检查是否在冷却期内"""
        current_time = time.time()
        recent_recoveries = [
            r for r in self.recovery_history[-10:]  # 检查最近10次恢复
            if r['issue_type'] == issue_type and 
               current_time - r['timestamp'] < self.cooldown_period
        ]
        return len(recent_recoveries) > 0
    
    async def _restart_service(self, context: Dict[str, Any]) -> bool:
        """重启服务"""
        service_name = context.get('service_name', 'mcp-server')
        
        try:
            # 模拟重启服务(实际环境中调用K8s API或Docker API)
            print(f"🔄 重启服务: {service_name}")
            await asyncio.sleep(5)  # 模拟重启时间
            
            # 验证服务是否正常启动
            health_check = await self._verify_service_health(service_name)
            return health_check
            
        except Exception as e:
            print(f"❌ 重启服务失败: {e}")
            return False
    
    async def _scale_up_service(self, context: Dict[str, Any]) -> bool:
        """扩容服务"""
        service_name = context.get('service_name', 'mcp-server')
        current_replicas = context.get('current_replicas', 3)
        target_replicas = min(current_replicas * 2, 10)  # 最多扩容到10个实例
        
        try:
            print(f"📈 扩容服务 {service_name}: {current_replicas} -> {target_replicas}")
            await asyncio.sleep(10)  # 模拟扩容时间
            
            # 验证扩容是否成功
            return target_replicas > current_replicas
            
        except Exception as e:
            print(f"❌ 扩容服务失败: {e}")
            return False
    
    async def _clear_cache(self, context: Dict[str, Any]) -> bool:
        """清理缓存"""
        cache_type = context.get('cache_type', 'redis')
        
        try:
            print(f"🧹 清理缓存: {cache_type}")
            
            if cache_type == 'redis':
                # 模拟清理Redis缓存
                await asyncio.sleep(2)
            elif cache_type == 'memory':
                # 模拟清理内存缓存
                await asyncio.sleep(1)
            
            return True
            
        except Exception as e:
            print(f"❌ 清理缓存失败: {e}")
            return False
    
    async def _perform_failover(self, context: Dict[str, Any]) -> bool:
        """执行故障转移"""
        primary_service = context.get('primary_service')
        backup_service = context.get('backup_service')
        
        try:
            print(f"🔄 执行故障转移: {primary_service} -> {backup_service}")
            
            # 模拟故障转移过程
            await asyncio.sleep(8)
            
            # 验证备用服务是否正常
            health_check = await self._verify_service_health(backup_service)
            return health_check
            
        except Exception as e:
            print(f"❌ 故障转移失败: {e}")
            return False
    
    async def _rollback_deployment(self, context: Dict[str, Any]) -> bool:
        """回滚部署"""
        service_name = context.get('service_name')
        previous_version = context.get('previous_version')
        
        try:
            print(f"⏪ 回滚部署 {service_name} 到版本: {previous_version}")
            
            # 模拟回滚过程
            await asyncio.sleep(15)
            
            # 验证回滚是否成功
            health_check = await self._verify_service_health(service_name)
            return health_check
            
        except Exception as e:
            print(f"❌ 回滚部署失败: {e}")
            return False
    
    async def _verify_service_health(self, service_name: str) -> bool:
        """验证服务健康状态"""
        try:
            # 模拟健康检查
            print(f"🔍 验证服务健康状态: {service_name}")
            await asyncio.sleep(3)
            
            # 在实际环境中,这里会发送HTTP请求到健康检查端点
            return True  # 模拟健康状态正常
            
        except Exception as e:
            print(f"❌ 健康检查失败: {e}")
            return False
    
    def get_recovery_stats(self) -> Dict[str, Any]:
        """获取恢复统计"""
        if not self.recovery_history:
            return {'total_recoveries': 0}
        
        recent_history = [
            r for r in self.recovery_history
            if time.time() - r['timestamp'] < 86400  # 最近24小时
        ]
        
        success_count = sum(1 for r in recent_history if r['success'])
        total_count = len(recent_history)
        
        return {
            'total_recoveries': total_count,
            'successful_recoveries': success_count,
            'success_rate': success_count / total_count if total_count > 0 else 0,
            'recent_history': recent_history[-10:]  # 最近10次记录
        }

📋 运维手册模板

故障处理标准流程

markdown
# MCP服务器故障处理手册

## 🚨 应急响应流程

### 1. 故障接收 (0-5分钟)
- [ ] 确认告警信息和故障级别
- [ ] 通知相关人员(根据故障级别)
- [ ] 创建故障记录单

### 2. 初步评估 (5-15分钟)
- [ ] 检查系统整体状态
- [ ] 确定影响范围
- [ ] 评估业务影响
- [ ] 确定是否需要立即缓解措施

### 3. 故障缓解 (15-30分钟)
- [ ] 执行应急措施降低影响
- [ ] 尝试自动/手动恢复
- [ ] 更新故障状态
- [ ] 通知相关方进展

### 4. 根因分析 (30分钟-2小时)
- [ ] 收集相关日志和指标
- [ ] 分析故障原因
- [ ] 确定根本原因
- [ ] 制定永久修复方案

### 5. 永久修复 (时间因情况而定)
- [ ] 实施修复方案
- [ ] 验证修复效果
- [ ] 更新监控和告警
- [ ] 更新文档

### 6. 故障复盘 (故障解决后24小时内)
- [ ] 组织复盘会议
- [ ] 分析故障原因和处理过程
- [ ] 识别改进机会
- [ ] 制定预防措施
- [ ] 更新流程和文档

常见故障处理指南

python
# src/troubleshooting/playbook.py
class TroubleshootingPlaybook:
    """故障处理手册"""
    
    def __init__(self):
        self.playbooks = {
            'service_down': self._handle_service_down,
            'high_latency': self._handle_high_latency,
            'database_issues': self._handle_database_issues,
            'memory_leak': self._handle_memory_leak,
            'disk_full': self._handle_disk_full,
            'network_issues': self._handle_network_issues
        }
    
    def get_troubleshooting_steps(self, issue_type: str) -> List[str]:
        """获取故障处理步骤"""
        handler = self.playbooks.get(issue_type)
        if handler:
            return handler()
        else:
            return ["未找到对应的故障处理指南"]
    
    def _handle_service_down(self) -> List[str]:
        """服务不可用处理步骤"""
        return [
            "1. 检查服务状态: kubectl get pods -n production",
            "2. 查看Pod日志: kubectl logs <pod-name> -n production",
            "3. 检查服务配置: kubectl describe service mcp-server",
            "4. 验证健康检查端点: curl http://service/health",
            "5. 重启服务: kubectl rollout restart deployment/mcp-server",
            "6. 监控服务恢复: kubectl get pods -w",
            "7. 验证服务功能: 执行端到端测试",
            "8. 更新故障状态并通知相关方"
        ]
    
    def _handle_high_latency(self) -> List[str]:
        """高延迟处理步骤"""
        return [
            "1. 检查系统资源使用情况",
            "2. 分析慢查询日志",
            "3. 检查缓存命中率",
            "4. 监控网络延迟",
            "5. 检查数据库连接池状态",
            "6. 分析应用程序性能指标",
            "7. 考虑临时扩容",
            "8. 优化慢查询或增加缓存"
        ]
    
    def _handle_database_issues(self) -> List[str]:
        """数据库问题处理步骤"""
        return [
            "1. 检查数据库连接状态",
            "2. 查看数据库错误日志",
            "3. 检查数据库资源使用",
            "4. 分析慢查询和锁等待",
            "5. 检查数据库配置",
            "6. 考虑读写分离或主从切换",
            "7. 优化数据库查询",
            "8. 重启数据库服务(最后手段)"
        ]
    
    def _handle_memory_leak(self) -> List[str]:
        """内存泄漏处理步骤"""
        return [
            "1. 监控内存使用趋势",
            "2. 生成内存dump文件",
            "3. 分析内存使用模式",
            "4. 检查应用程序内存管理",
            "5. 临时重启服务释放内存",
            "6. 调整JVM堆大小(如适用)",
            "7. 代码审查和内存优化",
            "8. 增强内存监控告警"
        ]
    
    def _handle_disk_full(self) -> List[str]:
        """磁盘空间不足处理步骤"""
        return [
            "1. 检查磁盘使用情况: df -h",
            "2. 找出占用空间最大的文件: du -sh /* | sort -hr",
            "3. 清理日志文件和临时文件",
            "4. 压缩或归档旧数据",
            "5. 清理Docker镜像和容器: docker system prune",
            "6. 扩展磁盘空间",
            "7. 配置日志轮转和自动清理",
            "8. 设置磁盘使用监控告警"
        ]
    
    def _handle_network_issues(self) -> List[str]:
        """网络问题处理步骤"""
        return [
            "1. 检查网络连通性: ping, telnet, curl",
            "2. 检查DNS解析: nslookup, dig",
            "3. 监控网络延迟和丢包率",
            "4. 检查防火墙和安全组配置",
            "5. 检查负载均衡器状态",
            "6. 验证SSL证书有效性",
            "7. 重启网络服务",
            "8. 联系网络团队或云服务商"
        ]

🎯 最佳实践总结

1. 预防优于治疗

  • 实施全面的监控和告警
  • 定期进行健康检查和维护
  • 建立预防性的运维流程

2. 快速响应

  • 建立明确的故障分级制度
  • 制定标准化的应急响应流程
  • 培训团队快速诊断能力

3. 持续改进

  • 定期进行故障复盘
  • 不断优化监控和告警
  • 更新故障处理手册

4. 团队协作

  • 建立高效的沟通机制
  • 明确角色和责任分工
  • 定期进行应急演练

🚀 下一步

完成故障排除与维护后,你已经掌握了生产级MCP服务器的完整运维技能!现在可以:

  1. 学习最佳实践指南第8章:最佳实践
  2. 查看案例研究第9章:案例研究
  3. 探索AI工作流第10章:AI工作流

📚 扩展阅读

🏠 返回教程首页 | 📖 查看完整目录 | ▶️ 下一章: 最佳实践