Skip to content

7.4 性能调优与扩容

核心目标:构建高性能、可扩展的MCP服务器,应对大规模并发和负载挑战
⏱️ 预计时长:60分钟
📊 难度级别:⭐⭐⭐⭐⭐

🎯 学习目标

通过本节学习,你将掌握:

  • 识别和解决性能瓶颈
  • 实现水平和垂直扩容策略
  • 优化数据库和缓存性能
  • 构建弹性伸缩架构
  • 进行压力测试和容量规划

📊 性能优化架构

性能优化层次图

⚡ 应用层性能优化

异步处理优化

python
# src/performance/async_optimization.py
import asyncio
import aiohttp
import uvloop
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from functools import lru_cache, wraps
import time
import psutil

class AsyncPerformanceOptimizer:
    """异步性能优化器"""
    
    def __init__(self):
        # 使用uvloop替代默认事件循环
        if hasattr(uvloop, 'install'):
            uvloop.install()
        
        # 线程池配置
        self.thread_pool = ThreadPoolExecutor(
            max_workers=min(32, (mp.cpu_count() or 1) + 4)
        )
        
        # 进程池配置  
        self.process_pool = ProcessPoolExecutor(
            max_workers=mp.cpu_count() or 1
        )
        
        # 连接池配置
        self.connector = aiohttp.TCPConnector(
            limit=100,  # 总连接数限制
            limit_per_host=30,  # 每个主机连接数限制
            keepalive_timeout=30,
            enable_cleanup_closed=True
        )
        
        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=aiohttp.ClientTimeout(total=30)
        )
    
    async def batch_process(self, items: List[Any], 
                          processor: callable, batch_size: int = 100,
                          max_concurrency: int = 10) -> List[Any]:
        """批量异步处理"""
        semaphore = asyncio.Semaphore(max_concurrency)
        
        async def process_batch(batch):
            async with semaphore:
                return await processor(batch)
        
        # 分批处理
        batches = [items[i:i + batch_size] 
                  for i in range(0, len(items), batch_size)]
        
        tasks = [process_batch(batch) for batch in batches]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 合并结果
        merged_results = []
        for result in results:
            if isinstance(result, Exception):
                print(f"批处理错误: {result}")
                continue
            merged_results.extend(result)
        
        return merged_results
    
    async def parallel_http_requests(self, urls: List[str]) -> List[Dict]:
        """并行HTTP请求"""
        async def fetch_url(url: str) -> Dict:
            try:
                async with self.session.get(url) as response:
                    return {
                        'url': url,
                        'status': response.status,
                        'data': await response.json() if response.content_type == 'application/json' else await response.text(),
                        'headers': dict(response.headers)
                    }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'status': None,
                    'data': None
                }
        
        tasks = [fetch_url(url) for url in urls]
        return await asyncio.gather(*tasks)
    
    def cpu_intensive_task(self, func: callable):
        """CPU密集型任务装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.process_pool, func, *args, **kwargs
            )
        return wrapper
    
    def io_intensive_task(self, func: callable):
        """IO密集型任务装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            loop = asyncio.get_event_loop()
            return await loop.run_in_executor(
                self.thread_pool, func, *args, **kwargs
            )
        return wrapper
    
    async def cleanup(self):
        """清理资源"""
        await self.session.close()
        self.thread_pool.shutdown(wait=True)
        self.process_pool.shutdown(wait=True)

# 缓存优化
class CacheOptimizer:
    """缓存优化器"""
    
    def __init__(self):
        self.cache_stats = {
            'hits': 0,
            'misses': 0,
            'evictions': 0
        }
    
    def smart_cache(self, maxsize: int = 128, ttl: int = 300):
        """智能缓存装饰器"""
        def decorator(func):
            cache = {}
            cache_times = {}
            
            @wraps(func)
            async def wrapper(*args, **kwargs):
                # 生成缓存键
                cache_key = self._generate_cache_key(args, kwargs)
                current_time = time.time()
                
                # 检查缓存是否存在且未过期
                if (cache_key in cache and 
                    current_time - cache_times.get(cache_key, 0) < ttl):
                    self.cache_stats['hits'] += 1
                    return cache[cache_key]
                
                # 执行函数
                result = await func(*args, **kwargs)
                
                # 缓存管理
                if len(cache) >= maxsize:
                    # LRU淘汰策略
                    oldest_key = min(cache_times.keys(), 
                                   key=lambda k: cache_times[k])
                    del cache[oldest_key]
                    del cache_times[oldest_key]
                    self.cache_stats['evictions'] += 1
                
                # 存储结果
                cache[cache_key] = result
                cache_times[cache_key] = current_time
                self.cache_stats['misses'] += 1
                
                return result
            
            wrapper.cache_info = lambda: self.cache_stats.copy()
            wrapper.cache_clear = lambda: (cache.clear(), cache_times.clear())
            
            return wrapper
        return decorator
    
    def _generate_cache_key(self, args, kwargs) -> str:
        """生成缓存键"""
        import hashlib
        import json
        
        key_data = {
            'args': str(args),
            'kwargs': json.dumps(kwargs, sort_keys=True, default=str)
        }
        key_string = json.dumps(key_data, sort_keys=True)
        return hashlib.md5(key_string.encode()).hexdigest()

# 内存优化
class MemoryOptimizer:
    """内存优化器"""
    
    def __init__(self):
        self.memory_threshold = 0.8  # 80%内存阈值
    
    def memory_monitor(self, func):
        """内存监控装饰器"""
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 执行前检查内存
            memory_before = psutil.virtual_memory().percent
            
            if memory_before > self.memory_threshold * 100:
                print(f"⚠️ 内存使用率过高: {memory_before:.1f}%")
                # 触发垃圾回收
                import gc
                gc.collect()
            
            # 执行函数
            result = await func(*args, **kwargs)
            
            # 执行后检查内存
            memory_after = psutil.virtual_memory().percent
            memory_diff = memory_after - memory_before
            
            if memory_diff > 10:  # 内存增长超过10%
                print(f"📊 内存增长: {memory_diff:.1f}%")
            
            return result
        return wrapper
    
    def optimize_data_structures(self, data: Any) -> Any:
        """优化数据结构"""
        if isinstance(data, list):
            # 大列表使用生成器
            if len(data) > 10000:
                return (item for item in data)
        
        elif isinstance(data, dict):
            # 大字典使用slots优化
            if len(data) > 1000:
                class OptimizedDict:
                    __slots__ = list(data.keys())
                    
                    def __init__(self, data_dict):
                        for key, value in data_dict.items():
                            setattr(self, key, value)
                
                return OptimizedDict(data)
        
        return data

# 全局优化器实例
async_optimizer = AsyncPerformanceOptimizer()
cache_optimizer = CacheOptimizer()
memory_optimizer = MemoryOptimizer()

数据库性能优化

python
# src/performance/database_optimization.py
import asyncpg
import asyncio
from typing import List, Dict, Any, Optional
import time
import psutil
from contextlib import asynccontextmanager

class DatabaseOptimizer:
    """数据库性能优化器"""
    
    def __init__(self, database_url: str):
        self.database_url = database_url
        self.pool = None
        self.query_stats = {}
        
    async def initialize(self):
        """初始化连接池"""
        self.pool = await asyncpg.create_pool(
            self.database_url,
            min_size=10,
            max_size=50,
            max_queries=50000,
            max_inactive_connection_lifetime=300,
            command_timeout=60,
            server_settings={
                'application_name': 'mcp-server',
                'tcp_keepalives_idle': '600',
                'tcp_keepalives_interval': '30',
                'tcp_keepalives_count': '3',
            }
        )
    
    @asynccontextmanager
    async def get_connection(self):
        """获取数据库连接"""
        async with self.pool.acquire() as connection:
            yield connection
    
    async def execute_query(self, query: str, *args, 
                          timeout: int = 30) -> List[Dict]:
        """执行查询并记录性能"""
        start_time = time.time()
        
        try:
            async with self.get_connection() as conn:
                result = await conn.fetch(query, *args, timeout=timeout)
                
                # 记录查询统计
                execution_time = time.time() - start_time
                self._record_query_stats(query, execution_time, True)
                
                return [dict(row) for row in result]
                
        except Exception as e:
            execution_time = time.time() - start_time
            self._record_query_stats(query, execution_time, False)
            raise
    
    def _record_query_stats(self, query: str, execution_time: float, 
                           success: bool):
        """记录查询统计"""
        # 简化查询用于统计
        query_template = self._normalize_query(query)
        
        if query_template not in self.query_stats:
            self.query_stats[query_template] = {
                'count': 0,
                'total_time': 0,
                'avg_time': 0,
                'max_time': 0,
                'min_time': float('inf'),
                'errors': 0
            }
        
        stats = self.query_stats[query_template]
        stats['count'] += 1
        stats['total_time'] += execution_time
        stats['avg_time'] = stats['total_time'] / stats['count']
        stats['max_time'] = max(stats['max_time'], execution_time)
        stats['min_time'] = min(stats['min_time'], execution_time)
        
        if not success:
            stats['errors'] += 1
    
    def _normalize_query(self, query: str) -> str:
        """标准化查询语句"""
        import re
        # 移除参数
        normalized = re.sub(r'\$\d+', '?', query)
        # 移除多余空格
        normalized = re.sub(r'\s+', ' ', normalized).strip()
        return normalized
    
    async def batch_insert(self, table: str, columns: List[str], 
                          data: List[List[Any]], batch_size: int = 1000):
        """批量插入优化"""
        if not data:
            return
        
        # 构建插入语句
        placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
        query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
        
        async with self.get_connection() as conn:
            # 使用事务批量处理
            async with conn.transaction():
                # 准备语句
                stmt = await conn.prepare(query)
                
                # 分批插入
                for i in range(0, len(data), batch_size):
                    batch = data[i:i + batch_size]
                    await asyncio.gather(*[
                        stmt.fetchval(*row) for row in batch
                    ])
    
    async def optimize_indexes(self) -> Dict[str, Any]:
        """索引优化建议"""
        suggestions = []
        
        # 查找慢查询
        slow_queries = [
            query for query, stats in self.query_stats.items()
            if stats['avg_time'] > 1.0  # 超过1秒的查询
        ]
        
        if slow_queries:
            suggestions.append({
                'type': 'slow_queries',
                'description': '发现慢查询,建议添加索引',
                'queries': slow_queries
            })
        
        # 查找缺失索引
        missing_indexes = await self._analyze_missing_indexes()
        if missing_indexes:
            suggestions.append({
                'type': 'missing_indexes',
                'description': '建议添加以下索引',
                'indexes': missing_indexes
            })
        
        return {
            'suggestions': suggestions,
            'query_stats': self.query_stats
        }
    
    async def _analyze_missing_indexes(self) -> List[Dict]:
        """分析缺失的索引"""
        # 查询pg_stat_user_tables获取表统计信息
        query = """
        SELECT 
            schemaname,
            tablename,
            seq_scan,
            seq_tup_read,
            idx_scan,
            idx_tup_fetch,
            n_tup_ins,
            n_tup_upd,
            n_tup_del
        FROM pg_stat_user_tables
        WHERE seq_scan > idx_scan * 2  -- 顺序扫描远多于索引扫描
        """
        
        async with self.get_connection() as conn:
            results = await conn.fetch(query)
            
        missing_indexes = []
        for row in results:
            missing_indexes.append({
                'table': f"{row['schemaname']}.{row['tablename']}",
                'seq_scan': row['seq_scan'],
                'idx_scan': row['idx_scan'],
                'recommendation': '考虑为常用查询字段添加索引'
            })
        
        return missing_indexes

# 缓存层优化
class CacheLayerOptimizer:
    """缓存层优化器"""
    
    def __init__(self, redis_url: str):
        import aioredis
        self.redis_url = redis_url
        self.redis = None
        self.cache_hit_rate = 0.0
        self.total_requests = 0
        self.cache_hits = 0
    
    async def initialize(self):
        """初始化Redis连接"""
        import aioredis
        self.redis = aioredis.from_url(
            self.redis_url,
            encoding="utf-8",
            decode_responses=True,
            max_connections=50
        )
    
    async def multi_level_cache_get(self, key: str) -> Optional[Any]:
        """多级缓存获取"""
        self.total_requests += 1
        
        # L1缓存:内存缓存
        if hasattr(self, '_memory_cache') and key in self._memory_cache:
            self.cache_hits += 1
            return self._memory_cache[key]
        
        # L2缓存:Redis缓存
        value = await self.redis.get(key)
        if value is not None:
            # 回填L1缓存
            if not hasattr(self, '_memory_cache'):
                self._memory_cache = {}
            self._memory_cache[key] = value
            self.cache_hits += 1
            return value
        
        return None
    
    async def multi_level_cache_set(self, key: str, value: Any, 
                                  ttl: int = 300):
        """多级缓存设置"""
        # 设置Redis缓存
        await self.redis.setex(key, ttl, value)
        
        # 设置内存缓存(较短TTL)
        if not hasattr(self, '_memory_cache'):
            self._memory_cache = {}
        self._memory_cache[key] = value
        
        # 定时清理内存缓存
        asyncio.create_task(self._cleanup_memory_cache(key, ttl // 10))
    
    async def _cleanup_memory_cache(self, key: str, delay: int):
        """清理内存缓存"""
        await asyncio.sleep(delay)
        if hasattr(self, '_memory_cache') and key in self._memory_cache:
            del self._memory_cache[key]
    
    async def cache_warming(self, warm_keys: List[str]):
        """缓存预热"""
        print("🔥 开始缓存预热...")
        
        for key in warm_keys:
            try:
                # 这里应该调用实际的数据获取函数
                # value = await get_data_for_key(key)
                # await self.multi_level_cache_set(key, value)
                pass
            except Exception as e:
                print(f"缓存预热失败 {key}: {e}")
        
        print("✅ 缓存预热完成")
    
    def get_cache_stats(self) -> Dict[str, Any]:
        """获取缓存统计"""
        hit_rate = (self.cache_hits / self.total_requests * 100 
                   if self.total_requests > 0 else 0)
        
        return {
            'hit_rate': hit_rate,
            'total_requests': self.total_requests,
            'cache_hits': self.cache_hits,
            'cache_misses': self.total_requests - self.cache_hits
        }

# 全局数据库优化器
db_optimizer = None
cache_optimizer_db = None

async def initialize_optimizers(database_url: str, redis_url: str):
    """初始化优化器"""
    global db_optimizer, cache_optimizer_db
    
    db_optimizer = DatabaseOptimizer(database_url)
    await db_optimizer.initialize()
    
    cache_optimizer_db = CacheLayerOptimizer(redis_url)
    await cache_optimizer_db.initialize()

🚀 水平扩容架构

Kubernetes自动扩容

yaml
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: mcp-server-hpa
  namespace: production
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-server
  minReplicas: 3
  maxReplicas: 20
  metrics:
  # CPU使用率
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  
  # 内存使用率
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 75
  
  # 自定义指标:QPS
  - type: Pods
    pods:
      metric:
        name: http_requests_per_second
      target:
        type: AverageValue
        averageValue: "100"
  
  # 外部指标:队列长度
  - type: External
    external:
      metric:
        name: queue_length
        selector:
          matchLabels:
            queue: mcp-tasks
      target:
        type: AverageValue
        averageValue: "50"

  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
      - type: Pods
        value: 2
        periodSeconds: 60
      selectPolicy: Min
    
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30
      - type: Pods
        value: 5
        periodSeconds: 30
      selectPolicy: Max

---
# 垂直Pod自动扩容
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
  name: mcp-server-vpa
  namespace: production
spec:
  targetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: mcp-server
  updatePolicy:
    updateMode: "Auto"
  resourcePolicy:
    containerPolicies:
    - containerName: mcp-server
      minAllowed:
        cpu: 100m
        memory: 128Mi
      maxAllowed:
        cpu: 2
        memory: 4Gi
      controlledResources: ["cpu", "memory"]

负载均衡配置

yaml
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: mcp-server-ingress
  namespace: production
  annotations:
    kubernetes.io/ingress.class: nginx
    nginx.ingress.kubernetes.io/rewrite-target: /
    nginx.ingress.kubernetes.io/ssl-redirect: "true"
    nginx.ingress.kubernetes.io/use-regex: "true"
    
    # 负载均衡配置
    nginx.ingress.kubernetes.io/load-balance: "ewma"
    nginx.ingress.kubernetes.io/upstream-hash-by: "$remote_addr"
    
    # 连接限制
    nginx.ingress.kubernetes.io/limit-connections: "100"
    nginx.ingress.kubernetes.io/limit-rps: "1000"
    
    # 健康检查
    nginx.ingress.kubernetes.io/health-check-path: "/health"
    nginx.ingress.kubernetes.io/health-check-interval: "10s"
    
    # 缓存配置
    nginx.ingress.kubernetes.io/proxy-cache-valid: "200 302 10m"
    nginx.ingress.kubernetes.io/proxy-cache-valid-error: "404 1m"

spec:
  tls:
  - hosts:
    - api.example.com
    secretName: mcp-server-tls
  
  rules:
  - host: api.example.com
    http:
      paths:
      - path: /
        pathType: Prefix
        backend:
          service:
            name: mcp-server
            port:
              number: 80

---
# 服务配置
apiVersion: v1
kind: Service
metadata:
  name: mcp-server
  namespace: production
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: nlb
    service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled: "true"
spec:
  type: LoadBalancer
  sessionAffinity: ClientIP
  sessionAffinityConfig:
    clientIP:
      timeoutSeconds: 10800
  
  ports:
  - port: 80
    targetPort: 8000
    protocol: TCP
    name: http
  - port: 443
    targetPort: 8443
    protocol: TCP
    name: https
  
  selector:
    app: mcp-server

数据库分片策略

python
# src/scaling/database_sharding.py
import hashlib
from typing import Dict, List, Any, Optional
from enum import Enum
import asyncpg

class ShardingStrategy(Enum):
    HASH = "hash"
    RANGE = "range"
    DIRECTORY = "directory"

class DatabaseSharding:
    """数据库分片管理器"""
    
    def __init__(self, shard_configs: List[Dict[str, str]], 
                 strategy: ShardingStrategy = ShardingStrategy.HASH):
        self.shard_configs = shard_configs
        self.strategy = strategy
        self.shard_pools = {}
        self.shard_count = len(shard_configs)
        
        # 分片映射表(用于目录策略)
        self.shard_directory = {}
    
    async def initialize(self):
        """初始化分片连接池"""
        for i, config in enumerate(self.shard_configs):
            pool = await asyncpg.create_pool(
                config['url'],
                min_size=5,
                max_size=20,
                command_timeout=60
            )
            self.shard_pools[f'shard_{i}'] = pool
    
    def get_shard_key(self, key: Any) -> str:
        """获取分片键"""
        if self.strategy == ShardingStrategy.HASH:
            # 哈希分片
            hash_value = int(hashlib.md5(str(key).encode()).hexdigest(), 16)
            shard_index = hash_value % self.shard_count
            return f'shard_{shard_index}'
        
        elif self.strategy == ShardingStrategy.RANGE:
            # 范围分片(假设key是数字)
            if isinstance(key, (int, float)):
                shard_index = int(key) // 10000  # 每1万条记录一个分片
                shard_index = min(shard_index, self.shard_count - 1)
                return f'shard_{shard_index}'
        
        elif self.strategy == ShardingStrategy.DIRECTORY:
            # 目录分片
            return self.shard_directory.get(str(key), 'shard_0')
        
        return 'shard_0'
    
    async def execute_query(self, shard_key: Any, query: str, 
                           *args) -> List[Dict]:
        """在特定分片执行查询"""
        shard_name = self.get_shard_key(shard_key)
        pool = self.shard_pools[shard_name]
        
        async with pool.acquire() as conn:
            result = await conn.fetch(query, *args)
            return [dict(row) for row in result]
    
    async def execute_cross_shard_query(self, query: str, 
                                      *args) -> List[Dict]:
        """跨分片查询"""
        tasks = []
        for shard_name, pool in self.shard_pools.items():
            async def query_shard(pool):
                async with pool.acquire() as conn:
                    try:
                        result = await conn.fetch(query, *args)
                        return [dict(row) for row in result]
                    except Exception as e:
                        print(f"分片 {shard_name} 查询失败: {e}")
                        return []
            
            tasks.append(query_shard(pool))
        
        # 并行执行所有分片查询
        results = await asyncio.gather(*tasks)
        
        # 合并结果
        merged_results = []
        for result in results:
            merged_results.extend(result)
        
        return merged_results
    
    async def distributed_transaction(self, operations: List[Dict]):
        """分布式事务(两阶段提交)"""
        # 第一阶段:准备
        prepared_transactions = {}
        
        try:
            for operation in operations:
                shard_key = operation['shard_key']
                shard_name = self.get_shard_key(shard_key)
                pool = self.shard_pools[shard_name]
                
                async with pool.acquire() as conn:
                    # 开始事务
                    tx = conn.transaction()
                    await tx.start()
                    
                    # 执行操作
                    await conn.execute(operation['query'], *operation['args'])
                    
                    # 准备提交
                    prepared_transactions[shard_name] = tx
            
            # 第二阶段:提交所有事务
            for shard_name, tx in prepared_transactions.items():
                await tx.commit()
                
        except Exception as e:
            # 回滚所有已准备的事务
            for shard_name, tx in prepared_transactions.items():
                try:
                    await tx.rollback()
                except:
                    pass
            raise e
    
    async def rebalance_shards(self, new_shard_count: int):
        """分片重平衡"""
        if new_shard_count <= self.shard_count:
            raise ValueError("新分片数量必须大于当前分片数量")
        
        print(f"🔄 开始分片重平衡: {self.shard_count} -> {new_shard_count}")
        
        # 创建新分片
        for i in range(self.shard_count, new_shard_count):
            # 这里需要添加新分片的配置
            pass
        
        # 数据迁移逻辑
        # 这是一个复杂的过程,需要:
        # 1. 计算新的分片映射
        # 2. 迁移数据
        # 3. 更新路由信息
        # 4. 验证数据一致性
        
        print("✅ 分片重平衡完成")

# 读写分离
class ReadWriteSplitter:
    """读写分离管理器"""
    
    def __init__(self, master_url: str, slave_urls: List[str]):
        self.master_url = master_url
        self.slave_urls = slave_urls
        self.master_pool = None
        self.slave_pools = []
        self.current_slave_index = 0
    
    async def initialize(self):
        """初始化连接池"""
        # 主库连接池
        self.master_pool = await asyncpg.create_pool(
            self.master_url,
            min_size=10,
            max_size=30
        )
        
        # 从库连接池
        for slave_url in self.slave_urls:
            pool = await asyncpg.create_pool(
                slave_url,
                min_size=5,
                max_size=20
            )
            self.slave_pools.append(pool)
    
    async def execute_write(self, query: str, *args) -> Any:
        """执行写操作(主库)"""
        async with self.master_pool.acquire() as conn:
            return await conn.execute(query, *args)
    
    async def execute_read(self, query: str, *args) -> List[Dict]:
        """执行读操作(从库,负载均衡)"""
        if not self.slave_pools:
            # 没有从库,使用主库
            async with self.master_pool.acquire() as conn:
                result = await conn.fetch(query, *args)
                return [dict(row) for row in result]
        
        # 轮询选择从库
        slave_pool = self.slave_pools[self.current_slave_index]
        self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_pools)
        
        try:
            async with slave_pool.acquire() as conn:
                result = await conn.fetch(query, *args)
                return [dict(row) for row in result]
        except Exception as e:
            # 从库失败,回退到主库
            print(f"从库查询失败,回退到主库: {e}")
            async with self.master_pool.acquire() as conn:
                result = await conn.fetch(query, *args)
                return [dict(row) for row in result]

🧪 压力测试和性能基准

压力测试脚本

python
# tests/performance/load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Any
import json
from dataclasses import dataclass
import argparse

@dataclass
class TestResult:
    """测试结果"""
    total_requests: int
    successful_requests: int
    failed_requests: int
    avg_response_time: float
    p95_response_time: float
    p99_response_time: float
    requests_per_second: float
    errors: List[str]

class LoadTester:
    """负载测试器"""
    
    def __init__(self, base_url: str, max_connections: int = 100):
        self.base_url = base_url
        self.max_connections = max_connections
        self.connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=max_connections
        )
        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=aiohttp.ClientTimeout(total=30)
        )
        
        self.response_times = []
        self.errors = []
        self.successful_requests = 0
        self.failed_requests = 0
    
    async def single_request(self, endpoint: str, method: str = 'GET',
                           data: Dict = None) -> float:
        """单次请求"""
        start_time = time.time()
        
        try:
            async with self.session.request(
                method, 
                f"{self.base_url}{endpoint}",
                json=data
            ) as response:
                await response.text()  # 读取响应内容
                
                if response.status < 400:
                    self.successful_requests += 1
                else:
                    self.failed_requests += 1
                    self.errors.append(f"HTTP {response.status}: {endpoint}")
                
                response_time = time.time() - start_time
                self.response_times.append(response_time)
                return response_time
                
        except Exception as e:
            self.failed_requests += 1
            self.errors.append(f"Exception: {str(e)}")
            response_time = time.time() - start_time
            self.response_times.append(response_time)
            return response_time
    
    async def concurrent_test(self, endpoint: str, 
                            concurrent_users: int, 
                            requests_per_user: int,
                            method: str = 'GET',
                            data: Dict = None) -> TestResult:
        """并发测试"""
        print(f"🚀 开始负载测试:")
        print(f"   端点: {endpoint}")
        print(f"   并发用户: {concurrent_users}")
        print(f"   每用户请求数: {requests_per_user}")
        print(f"   总请求数: {concurrent_users * requests_per_user}")
        
        start_time = time.time()
        
        # 创建任务
        tasks = []
        for user in range(concurrent_users):
            for request in range(requests_per_user):
                task = self.single_request(endpoint, method, data)
                tasks.append(task)
        
        # 执行所有任务
        await asyncio.gather(*tasks)
        
        total_time = time.time() - start_time
        total_requests = len(self.response_times)
        
        # 计算统计数据
        if self.response_times:
            avg_response_time = statistics.mean(self.response_times)
            p95_response_time = statistics.quantiles(self.response_times, n=20)[18]  # 95th percentile
            p99_response_time = statistics.quantiles(self.response_times, n=100)[98]  # 99th percentile
        else:
            avg_response_time = p95_response_time = p99_response_time = 0
        
        requests_per_second = total_requests / total_time if total_time > 0 else 0
        
        return TestResult(
            total_requests=total_requests,
            successful_requests=self.successful_requests,
            failed_requests=self.failed_requests,
            avg_response_time=avg_response_time,
            p95_response_time=p95_response_time,
            p99_response_time=p99_response_time,
            requests_per_second=requests_per_second,
            errors=self.errors[:10]  # 只保留前10个错误
        )
    
    async def ramp_up_test(self, endpoint: str, 
                          max_users: int, 
                          ramp_duration: int = 60) -> List[TestResult]:
        """渐进负载测试"""
        results = []
        step_size = max_users // 10  # 10个步骤
        step_duration = ramp_duration // 10
        
        for step in range(1, 11):
            users = step * step_size
            print(f"📊 步骤 {step}/10: {users} 并发用户")
            
            # 重置统计
            self.response_times = []
            self.errors = []
            self.successful_requests = 0
            self.failed_requests = 0
            
            # 执行测试
            result = await self.concurrent_test(endpoint, users, 1)
            results.append(result)
            
            # 等待下一步
            if step < 10:
                await asyncio.sleep(step_duration)
        
        return results
    
    async def cleanup(self):
        """清理资源"""
        await self.session.close()
    
    def print_results(self, result: TestResult):
        """打印测试结果"""
        print("\n" + "="*60)
        print("📊 负载测试结果")
        print("="*60)
        print(f"总请求数:        {result.total_requests}")
        print(f"成功请求:        {result.successful_requests}")
        print(f"失败请求:        {result.failed_requests}")
        print(f"成功率:          {result.successful_requests/result.total_requests*100:.1f}%")
        print(f"平均响应时间:    {result.avg_response_time*1000:.2f}ms")
        print(f"95%响应时间:     {result.p95_response_time*1000:.2f}ms")
        print(f"99%响应时间:     {result.p99_response_time*1000:.2f}ms")
        print(f"每秒请求数:      {result.requests_per_second:.2f} RPS")
        
        if result.errors:
            print(f"\n❌ 错误示例 (前10个):")
            for error in result.errors:
                print(f"   {error}")

async def main():
    """主函数"""
    parser = argparse.ArgumentParser(description='MCP服务器负载测试')
    parser.add_argument('--url', default='http://localhost:8000', help='服务器URL')
    parser.add_argument('--endpoint', default='/health', help='测试端点')
    parser.add_argument('--users', type=int, default=50, help='并发用户数')
    parser.add_argument('--requests', type=int, default=10, help='每用户请求数')
    parser.add_argument('--ramp', action='store_true', help='渐进负载测试')
    
    args = parser.parse_args()
    
    tester = LoadTester(args.url)
    
    try:
        if args.ramp:
            results = await tester.ramp_up_test(args.endpoint, args.users)
            print("\n📈 渐进负载测试结果:")
            for i, result in enumerate(results, 1):
                print(f"\n步骤 {i}: {i * args.users // 10} 用户")
                tester.print_results(result)
        else:
            result = await tester.concurrent_test(
                args.endpoint, 
                args.users, 
                args.requests
            )
            tester.print_results(result)
            
    finally:
        await tester.cleanup()

if __name__ == '__main__':
    asyncio.run(main())

性能基准测试

python
# tests/performance/benchmark.py
import asyncio
import time
import psutil
import json
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
import statistics

@dataclass
class BenchmarkResult:
    """基准测试结果"""
    test_name: str
    duration: float
    operations_per_second: float
    memory_usage_mb: float
    cpu_usage_percent: float
    success_rate: float
    error_count: int

class PerformanceBenchmark:
    """性能基准测试"""
    
    def __init__(self):
        self.results: List[BenchmarkResult] = []
    
    async def benchmark_database_operations(self, db_optimizer) -> BenchmarkResult:
        """数据库操作基准测试"""
        print("🔍 数据库操作基准测试...")
        
        start_time = time.time()
        start_memory = psutil.virtual_memory().used / 1024 / 1024
        process = psutil.Process()
        
        operations = 0
        errors = 0
        
        # 执行1000次数据库查询
        for i in range(1000):
            try:
                await db_optimizer.execute_query(
                    "SELECT $1::int as test_value", i
                )
                operations += 1
            except Exception as e:
                errors += 1
        
        end_time = time.time()
        end_memory = psutil.virtual_memory().used / 1024 / 1024
        duration = end_time - start_time
        
        return BenchmarkResult(
            test_name="database_operations",
            duration=duration,
            operations_per_second=operations / duration,
            memory_usage_mb=end_memory - start_memory,
            cpu_usage_percent=process.cpu_percent(),
            success_rate=operations / (operations + errors) * 100,
            error_count=errors
        )
    
    async def benchmark_cache_operations(self, cache_optimizer) -> BenchmarkResult:
        """缓存操作基准测试"""
        print("🔍 缓存操作基准测试...")
        
        start_time = time.time()
        start_memory = psutil.virtual_memory().used / 1024 / 1024
        process = psutil.Process()
        
        operations = 0
        errors = 0
        
        # 执行10000次缓存操作
        for i in range(10000):
            try:
                key = f"test_key_{i % 100}"  # 100个不同的键
                value = f"test_value_{i}"
                
                await cache_optimizer.multi_level_cache_set(key, value)
                await cache_optimizer.multi_level_cache_get(key)
                operations += 2
            except Exception as e:
                errors += 1
        
        end_time = time.time()
        end_memory = psutil.virtual_memory().used / 1024 / 1024
        duration = end_time - start_time
        
        return BenchmarkResult(
            test_name="cache_operations",
            duration=duration,
            operations_per_second=operations / duration,
            memory_usage_mb=end_memory - start_memory,
            cpu_usage_percent=process.cpu_percent(),
            success_rate=operations / (operations + errors) * 100,
            error_count=errors
        )
    
    async def benchmark_concurrent_processing(self) -> BenchmarkResult:
        """并发处理基准测试"""
        print("🔍 并发处理基准测试...")
        
        start_time = time.time()
        start_memory = psutil.virtual_memory().used / 1024 / 1024
        process = psutil.Process()
        
        async def dummy_task(task_id: int) -> int:
            """模拟任务"""
            await asyncio.sleep(0.01)  # 模拟IO操作
            return task_id * 2
        
        # 并发执行1000个任务
        tasks = [dummy_task(i) for i in range(1000)]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        operations = len([r for r in results if not isinstance(r, Exception)])
        errors = len([r for r in results if isinstance(r, Exception)])
        
        end_time = time.time()
        end_memory = psutil.virtual_memory().used / 1024 / 1024
        duration = end_time - start_time
        
        return BenchmarkResult(
            test_name="concurrent_processing",
            duration=duration,
            operations_per_second=operations / duration,
            memory_usage_mb=end_memory - start_memory,
            cpu_usage_percent=process.cpu_percent(),
            success_rate=operations / (operations + errors) * 100,
            error_count=errors
        )
    
    async def run_all_benchmarks(self, db_optimizer=None, 
                                cache_optimizer=None) -> List[BenchmarkResult]:
        """运行所有基准测试"""
        print("🚀 开始性能基准测试...")
        
        if db_optimizer:
            result = await self.benchmark_database_operations(db_optimizer)
            self.results.append(result)
        
        if cache_optimizer:
            result = await self.benchmark_cache_operations(cache_optimizer)
            self.results.append(result)
        
        result = await self.benchmark_concurrent_processing()
        self.results.append(result)
        
        return self.results
    
    def print_results(self):
        """打印基准测试结果"""
        print("\n" + "="*80)
        print("📊 性能基准测试结果")
        print("="*80)
        
        for result in self.results:
            print(f"\n🔍 {result.test_name}")
            print(f"   持续时间:        {result.duration:.2f}s")
            print(f"   操作/秒:         {result.operations_per_second:.2f}")
            print(f"   内存使用:        {result.memory_usage_mb:.2f}MB")
            print(f"   CPU使用:         {result.cpu_usage_percent:.1f}%")
            print(f"   成功率:          {result.success_rate:.1f}%")
            print(f"   错误数:          {result.error_count}")
    
    def save_results(self, filename: str):
        """保存结果到文件"""
        with open(filename, 'w') as f:
            json.dump([asdict(result) for result in self.results], f, indent=2)
        print(f"📄 结果已保存到 {filename}")

📈 容量规划工具

python
# src/scaling/capacity_planning.py
import math
from typing import Dict, Any, List
from dataclasses import dataclass
import json

@dataclass
class ResourceRequirement:
    """资源需求"""
    cpu_cores: float
    memory_gb: float
    storage_gb: float
    network_mbps: float
    connections: int

@dataclass
class ScalingPlan:
    """扩容计划"""
    current_capacity: ResourceRequirement
    target_capacity: ResourceRequirement
    scaling_factor: float
    estimated_cost: float
    recommendations: List[str]

class CapacityPlanner:
    """容量规划器"""
    
    def __init__(self):
        # 基准资源使用率(每RPS)
        self.base_resource_per_rps = {
            'cpu_cores': 0.001,    # 每RPS需要0.001核心
            'memory_mb': 2,        # 每RPS需要2MB内存
            'storage_mb': 1,       # 每RPS需要1MB存储
            'network_kbps': 10,    # 每RPS需要10KB/s网络
            'connections': 0.1     # 每RPS需要0.1个连接
        }
        
        # 资源成本(每月)
        self.resource_costs = {
            'cpu_core': 30,        # 每核心每月30美元
            'memory_gb': 5,        # 每GB内存每月5美元
            'storage_gb': 0.1,     # 每GB存储每月0.1美元
            'network_gb': 0.05     # 每GB流量0.05美元
        }
    
    def calculate_resource_requirements(self, target_rps: int,
                                     peak_multiplier: float = 2.0,
                                     safety_margin: float = 0.3) -> ResourceRequirement:
        """计算资源需求"""
        # 考虑峰值和安全余量
        effective_rps = target_rps * peak_multiplier * (1 + safety_margin)
        
        cpu_cores = effective_rps * self.base_resource_per_rps['cpu_cores']
        memory_gb = effective_rps * self.base_resource_per_rps['memory_mb'] / 1024
        storage_gb = effective_rps * self.base_resource_per_rps['storage_mb'] / 1024
        network_mbps = effective_rps * self.base_resource_per_rps['network_kbps'] / 1024
        connections = int(effective_rps * self.base_resource_per_rps['connections'])
        
        return ResourceRequirement(
            cpu_cores=math.ceil(cpu_cores * 4) / 4,  # 四舍五入到0.25核心
            memory_gb=math.ceil(memory_gb),
            storage_gb=math.ceil(storage_gb),
            network_mbps=math.ceil(network_mbps),
            connections=connections
        )
    
    def estimate_costs(self, requirements: ResourceRequirement) -> float:
        """估算成本"""
        cpu_cost = requirements.cpu_cores * self.resource_costs['cpu_core']
        memory_cost = requirements.memory_gb * self.resource_costs['memory_gb']
        storage_cost = requirements.storage_gb * self.resource_costs['storage_gb']
        
        # 网络成本按月估算(假设每RPS产生1KB响应)
        monthly_network_gb = requirements.network_mbps * 0.125 * 86400 * 30 / 1024
        network_cost = monthly_network_gb * self.resource_costs['network_gb']
        
        return cpu_cost + memory_cost + storage_cost + network_cost
    
    def create_scaling_plan(self, current_rps: int, target_rps: int,
                          current_resources: ResourceRequirement = None) -> ScalingPlan:
        """创建扩容计划"""
        target_requirements = self.calculate_resource_requirements(target_rps)
        
        if current_resources is None:
            current_requirements = self.calculate_resource_requirements(current_rps)
        else:
            current_requirements = current_resources
        
        scaling_factor = target_rps / current_rps if current_rps > 0 else 1
        estimated_cost = self.estimate_costs(target_requirements)
        
        # 生成建议
        recommendations = []
        
        if scaling_factor > 2:
            recommendations.append("建议分阶段扩容,避免一次性大幅扩展")
        
        if target_requirements.cpu_cores > 16:
            recommendations.append("考虑使用多个较小实例而非单个大实例")
        
        if target_requirements.memory_gb > 32:
            recommendations.append("考虑实施缓存策略以减少内存需求")
        
        if target_requirements.connections > 10000:
            recommendations.append("考虑实施连接池和负载均衡")
        
        return ScalingPlan(
            current_capacity=current_requirements,
            target_capacity=target_requirements,
            scaling_factor=scaling_factor,
            estimated_cost=estimated_cost,
            recommendations=recommendations
        )
    
    def generate_scaling_report(self, plan: ScalingPlan) -> str:
        """生成扩容报告"""
        report = f"""
📊 容量规划报告
{'='*50}

📈 扩容概览
- 扩容倍数: {plan.scaling_factor:.1f}x
- 预估成本: ${plan.estimated_cost:.2f}/月

🔧 当前资源配置
- CPU核心: {plan.current_capacity.cpu_cores}
- 内存: {plan.current_capacity.memory_gb}GB
- 存储: {plan.current_capacity.storage_gb}GB
- 网络: {plan.current_capacity.network_mbps}Mbps
- 连接数: {plan.current_capacity.connections}

🚀 目标资源配置
- CPU核心: {plan.target_capacity.cpu_cores}
- 内存: {plan.target_capacity.memory_gb}GB
- 存储: {plan.target_capacity.storage_gb}GB
- 网络: {plan.target_capacity.network_mbps}Mbps
- 连接数: {plan.target_capacity.connections}

💡 扩容建议
"""
        
        for i, recommendation in enumerate(plan.recommendations, 1):
            report += f"{i}. {recommendation}\n"
        
        return report

# 使用示例
def main():
    planner = CapacityPlanner()
    
    # 创建扩容计划:从1000 RPS扩展到5000 RPS
    plan = planner.create_scaling_plan(
        current_rps=1000,
        target_rps=5000
    )
    
    # 生成报告
    report = planner.generate_scaling_report(plan)
    print(report)
    
    # 保存计划
    with open('scaling_plan.json', 'w') as f:
        json.dump({
            'current_capacity': plan.current_capacity.__dict__,
            'target_capacity': plan.target_capacity.__dict__,
            'scaling_factor': plan.scaling_factor,
            'estimated_cost': plan.estimated_cost,
            'recommendations': plan.recommendations
        }, f, indent=2)

if __name__ == '__main__':
    main()

🎯 最佳实践

1. 性能优化原则

  • 测量优先:先测量再优化,避免过早优化
  • 瓶颈定位:找到系统的真正瓶颈
  • 渐进优化:分步骤进行优化,验证效果

2. 扩容策略

  • 水平扩容优先:通过增加实例数量扩容
  • 无状态设计:确保应用无状态,便于扩容
  • 渐进扩容:避免一次性大幅扩容

3. 监控指标

  • 关键指标:QPS、响应时间、错误率、资源使用率
  • 业务指标:工具调用成功率、用户满意度
  • 趋势分析:观察长期趋势,预测扩容需求

4. 容量规划

  • 基于数据:使用真实的性能数据进行规划
  • 安全余量:预留足够的安全余量
  • 成本考虑:平衡性能和成本

🚀 下一步

完成性能调优与扩容后,你可以:

  1. 学习故障排除技巧7.5 故障排除与维护
  2. 了解最佳实践第8章:最佳实践
  3. 查看案例研究第9章:案例研究

📚 扩展阅读

🏠 返回教程首页 | 📖 查看完整目录 | ▶️ 下一节: 故障排除