7.4 性能调优与扩容
⚡ 核心目标:构建高性能、可扩展的MCP服务器,应对大规模并发和负载挑战
⏱️ 预计时长:60分钟
📊 难度级别:⭐⭐⭐⭐⭐
🎯 学习目标
通过本节学习,你将掌握:
- 识别和解决性能瓶颈
- 实现水平和垂直扩容策略
- 优化数据库和缓存性能
- 构建弹性伸缩架构
- 进行压力测试和容量规划
📊 性能优化架构
性能优化层次图
⚡ 应用层性能优化
异步处理优化
python
# src/performance/async_optimization.py
import asyncio
import aiohttp
import uvloop
from typing import List, Dict, Any, Optional
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from functools import lru_cache, wraps
import time
import psutil
class AsyncPerformanceOptimizer:
"""异步性能优化器"""
def __init__(self):
# 使用uvloop替代默认事件循环
if hasattr(uvloop, 'install'):
uvloop.install()
# 线程池配置
self.thread_pool = ThreadPoolExecutor(
max_workers=min(32, (mp.cpu_count() or 1) + 4)
)
# 进程池配置
self.process_pool = ProcessPoolExecutor(
max_workers=mp.cpu_count() or 1
)
# 连接池配置
self.connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=30, # 每个主机连接数限制
keepalive_timeout=30,
enable_cleanup_closed=True
)
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=30)
)
async def batch_process(self, items: List[Any],
processor: callable, batch_size: int = 100,
max_concurrency: int = 10) -> List[Any]:
"""批量异步处理"""
semaphore = asyncio.Semaphore(max_concurrency)
async def process_batch(batch):
async with semaphore:
return await processor(batch)
# 分批处理
batches = [items[i:i + batch_size]
for i in range(0, len(items), batch_size)]
tasks = [process_batch(batch) for batch in batches]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 合并结果
merged_results = []
for result in results:
if isinstance(result, Exception):
print(f"批处理错误: {result}")
continue
merged_results.extend(result)
return merged_results
async def parallel_http_requests(self, urls: List[str]) -> List[Dict]:
"""并行HTTP请求"""
async def fetch_url(url: str) -> Dict:
try:
async with self.session.get(url) as response:
return {
'url': url,
'status': response.status,
'data': await response.json() if response.content_type == 'application/json' else await response.text(),
'headers': dict(response.headers)
}
except Exception as e:
return {
'url': url,
'error': str(e),
'status': None,
'data': None
}
tasks = [fetch_url(url) for url in urls]
return await asyncio.gather(*tasks)
def cpu_intensive_task(self, func: callable):
"""CPU密集型任务装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.process_pool, func, *args, **kwargs
)
return wrapper
def io_intensive_task(self, func: callable):
"""IO密集型任务装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
self.thread_pool, func, *args, **kwargs
)
return wrapper
async def cleanup(self):
"""清理资源"""
await self.session.close()
self.thread_pool.shutdown(wait=True)
self.process_pool.shutdown(wait=True)
# 缓存优化
class CacheOptimizer:
"""缓存优化器"""
def __init__(self):
self.cache_stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
def smart_cache(self, maxsize: int = 128, ttl: int = 300):
"""智能缓存装饰器"""
def decorator(func):
cache = {}
cache_times = {}
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = self._generate_cache_key(args, kwargs)
current_time = time.time()
# 检查缓存是否存在且未过期
if (cache_key in cache and
current_time - cache_times.get(cache_key, 0) < ttl):
self.cache_stats['hits'] += 1
return cache[cache_key]
# 执行函数
result = await func(*args, **kwargs)
# 缓存管理
if len(cache) >= maxsize:
# LRU淘汰策略
oldest_key = min(cache_times.keys(),
key=lambda k: cache_times[k])
del cache[oldest_key]
del cache_times[oldest_key]
self.cache_stats['evictions'] += 1
# 存储结果
cache[cache_key] = result
cache_times[cache_key] = current_time
self.cache_stats['misses'] += 1
return result
wrapper.cache_info = lambda: self.cache_stats.copy()
wrapper.cache_clear = lambda: (cache.clear(), cache_times.clear())
return wrapper
return decorator
def _generate_cache_key(self, args, kwargs) -> str:
"""生成缓存键"""
import hashlib
import json
key_data = {
'args': str(args),
'kwargs': json.dumps(kwargs, sort_keys=True, default=str)
}
key_string = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()
# 内存优化
class MemoryOptimizer:
"""内存优化器"""
def __init__(self):
self.memory_threshold = 0.8 # 80%内存阈值
def memory_monitor(self, func):
"""内存监控装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
# 执行前检查内存
memory_before = psutil.virtual_memory().percent
if memory_before > self.memory_threshold * 100:
print(f"⚠️ 内存使用率过高: {memory_before:.1f}%")
# 触发垃圾回收
import gc
gc.collect()
# 执行函数
result = await func(*args, **kwargs)
# 执行后检查内存
memory_after = psutil.virtual_memory().percent
memory_diff = memory_after - memory_before
if memory_diff > 10: # 内存增长超过10%
print(f"📊 内存增长: {memory_diff:.1f}%")
return result
return wrapper
def optimize_data_structures(self, data: Any) -> Any:
"""优化数据结构"""
if isinstance(data, list):
# 大列表使用生成器
if len(data) > 10000:
return (item for item in data)
elif isinstance(data, dict):
# 大字典使用slots优化
if len(data) > 1000:
class OptimizedDict:
__slots__ = list(data.keys())
def __init__(self, data_dict):
for key, value in data_dict.items():
setattr(self, key, value)
return OptimizedDict(data)
return data
# 全局优化器实例
async_optimizer = AsyncPerformanceOptimizer()
cache_optimizer = CacheOptimizer()
memory_optimizer = MemoryOptimizer()
数据库性能优化
python
# src/performance/database_optimization.py
import asyncpg
import asyncio
from typing import List, Dict, Any, Optional
import time
import psutil
from contextlib import asynccontextmanager
class DatabaseOptimizer:
"""数据库性能优化器"""
def __init__(self, database_url: str):
self.database_url = database_url
self.pool = None
self.query_stats = {}
async def initialize(self):
"""初始化连接池"""
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=10,
max_size=50,
max_queries=50000,
max_inactive_connection_lifetime=300,
command_timeout=60,
server_settings={
'application_name': 'mcp-server',
'tcp_keepalives_idle': '600',
'tcp_keepalives_interval': '30',
'tcp_keepalives_count': '3',
}
)
@asynccontextmanager
async def get_connection(self):
"""获取数据库连接"""
async with self.pool.acquire() as connection:
yield connection
async def execute_query(self, query: str, *args,
timeout: int = 30) -> List[Dict]:
"""执行查询并记录性能"""
start_time = time.time()
try:
async with self.get_connection() as conn:
result = await conn.fetch(query, *args, timeout=timeout)
# 记录查询统计
execution_time = time.time() - start_time
self._record_query_stats(query, execution_time, True)
return [dict(row) for row in result]
except Exception as e:
execution_time = time.time() - start_time
self._record_query_stats(query, execution_time, False)
raise
def _record_query_stats(self, query: str, execution_time: float,
success: bool):
"""记录查询统计"""
# 简化查询用于统计
query_template = self._normalize_query(query)
if query_template not in self.query_stats:
self.query_stats[query_template] = {
'count': 0,
'total_time': 0,
'avg_time': 0,
'max_time': 0,
'min_time': float('inf'),
'errors': 0
}
stats = self.query_stats[query_template]
stats['count'] += 1
stats['total_time'] += execution_time
stats['avg_time'] = stats['total_time'] / stats['count']
stats['max_time'] = max(stats['max_time'], execution_time)
stats['min_time'] = min(stats['min_time'], execution_time)
if not success:
stats['errors'] += 1
def _normalize_query(self, query: str) -> str:
"""标准化查询语句"""
import re
# 移除参数
normalized = re.sub(r'\$\d+', '?', query)
# 移除多余空格
normalized = re.sub(r'\s+', ' ', normalized).strip()
return normalized
async def batch_insert(self, table: str, columns: List[str],
data: List[List[Any]], batch_size: int = 1000):
"""批量插入优化"""
if not data:
return
# 构建插入语句
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
query = f"INSERT INTO {table} ({', '.join(columns)}) VALUES ({placeholders})"
async with self.get_connection() as conn:
# 使用事务批量处理
async with conn.transaction():
# 准备语句
stmt = await conn.prepare(query)
# 分批插入
for i in range(0, len(data), batch_size):
batch = data[i:i + batch_size]
await asyncio.gather(*[
stmt.fetchval(*row) for row in batch
])
async def optimize_indexes(self) -> Dict[str, Any]:
"""索引优化建议"""
suggestions = []
# 查找慢查询
slow_queries = [
query for query, stats in self.query_stats.items()
if stats['avg_time'] > 1.0 # 超过1秒的查询
]
if slow_queries:
suggestions.append({
'type': 'slow_queries',
'description': '发现慢查询,建议添加索引',
'queries': slow_queries
})
# 查找缺失索引
missing_indexes = await self._analyze_missing_indexes()
if missing_indexes:
suggestions.append({
'type': 'missing_indexes',
'description': '建议添加以下索引',
'indexes': missing_indexes
})
return {
'suggestions': suggestions,
'query_stats': self.query_stats
}
async def _analyze_missing_indexes(self) -> List[Dict]:
"""分析缺失的索引"""
# 查询pg_stat_user_tables获取表统计信息
query = """
SELECT
schemaname,
tablename,
seq_scan,
seq_tup_read,
idx_scan,
idx_tup_fetch,
n_tup_ins,
n_tup_upd,
n_tup_del
FROM pg_stat_user_tables
WHERE seq_scan > idx_scan * 2 -- 顺序扫描远多于索引扫描
"""
async with self.get_connection() as conn:
results = await conn.fetch(query)
missing_indexes = []
for row in results:
missing_indexes.append({
'table': f"{row['schemaname']}.{row['tablename']}",
'seq_scan': row['seq_scan'],
'idx_scan': row['idx_scan'],
'recommendation': '考虑为常用查询字段添加索引'
})
return missing_indexes
# 缓存层优化
class CacheLayerOptimizer:
"""缓存层优化器"""
def __init__(self, redis_url: str):
import aioredis
self.redis_url = redis_url
self.redis = None
self.cache_hit_rate = 0.0
self.total_requests = 0
self.cache_hits = 0
async def initialize(self):
"""初始化Redis连接"""
import aioredis
self.redis = aioredis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True,
max_connections=50
)
async def multi_level_cache_get(self, key: str) -> Optional[Any]:
"""多级缓存获取"""
self.total_requests += 1
# L1缓存:内存缓存
if hasattr(self, '_memory_cache') and key in self._memory_cache:
self.cache_hits += 1
return self._memory_cache[key]
# L2缓存:Redis缓存
value = await self.redis.get(key)
if value is not None:
# 回填L1缓存
if not hasattr(self, '_memory_cache'):
self._memory_cache = {}
self._memory_cache[key] = value
self.cache_hits += 1
return value
return None
async def multi_level_cache_set(self, key: str, value: Any,
ttl: int = 300):
"""多级缓存设置"""
# 设置Redis缓存
await self.redis.setex(key, ttl, value)
# 设置内存缓存(较短TTL)
if not hasattr(self, '_memory_cache'):
self._memory_cache = {}
self._memory_cache[key] = value
# 定时清理内存缓存
asyncio.create_task(self._cleanup_memory_cache(key, ttl // 10))
async def _cleanup_memory_cache(self, key: str, delay: int):
"""清理内存缓存"""
await asyncio.sleep(delay)
if hasattr(self, '_memory_cache') and key in self._memory_cache:
del self._memory_cache[key]
async def cache_warming(self, warm_keys: List[str]):
"""缓存预热"""
print("🔥 开始缓存预热...")
for key in warm_keys:
try:
# 这里应该调用实际的数据获取函数
# value = await get_data_for_key(key)
# await self.multi_level_cache_set(key, value)
pass
except Exception as e:
print(f"缓存预热失败 {key}: {e}")
print("✅ 缓存预热完成")
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
hit_rate = (self.cache_hits / self.total_requests * 100
if self.total_requests > 0 else 0)
return {
'hit_rate': hit_rate,
'total_requests': self.total_requests,
'cache_hits': self.cache_hits,
'cache_misses': self.total_requests - self.cache_hits
}
# 全局数据库优化器
db_optimizer = None
cache_optimizer_db = None
async def initialize_optimizers(database_url: str, redis_url: str):
"""初始化优化器"""
global db_optimizer, cache_optimizer_db
db_optimizer = DatabaseOptimizer(database_url)
await db_optimizer.initialize()
cache_optimizer_db = CacheLayerOptimizer(redis_url)
await cache_optimizer_db.initialize()
🚀 水平扩容架构
Kubernetes自动扩容
yaml
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: mcp-server-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
minReplicas: 3
maxReplicas: 20
metrics:
# CPU使用率
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
# 内存使用率
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 75
# 自定义指标:QPS
- type: Pods
pods:
metric:
name: http_requests_per_second
target:
type: AverageValue
averageValue: "100"
# 外部指标:队列长度
- type: External
external:
metric:
name: queue_length
selector:
matchLabels:
queue: mcp-tasks
target:
type: AverageValue
averageValue: "50"
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
- type: Pods
value: 2
periodSeconds: 60
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 30
- type: Pods
value: 5
periodSeconds: 30
selectPolicy: Max
---
# 垂直Pod自动扩容
apiVersion: autoscaling.k8s.io/v1
kind: VerticalPodAutoscaler
metadata:
name: mcp-server-vpa
namespace: production
spec:
targetRef:
apiVersion: apps/v1
kind: Deployment
name: mcp-server
updatePolicy:
updateMode: "Auto"
resourcePolicy:
containerPolicies:
- containerName: mcp-server
minAllowed:
cpu: 100m
memory: 128Mi
maxAllowed:
cpu: 2
memory: 4Gi
controlledResources: ["cpu", "memory"]
负载均衡配置
yaml
# k8s/ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: mcp-server-ingress
namespace: production
annotations:
kubernetes.io/ingress.class: nginx
nginx.ingress.kubernetes.io/rewrite-target: /
nginx.ingress.kubernetes.io/ssl-redirect: "true"
nginx.ingress.kubernetes.io/use-regex: "true"
# 负载均衡配置
nginx.ingress.kubernetes.io/load-balance: "ewma"
nginx.ingress.kubernetes.io/upstream-hash-by: "$remote_addr"
# 连接限制
nginx.ingress.kubernetes.io/limit-connections: "100"
nginx.ingress.kubernetes.io/limit-rps: "1000"
# 健康检查
nginx.ingress.kubernetes.io/health-check-path: "/health"
nginx.ingress.kubernetes.io/health-check-interval: "10s"
# 缓存配置
nginx.ingress.kubernetes.io/proxy-cache-valid: "200 302 10m"
nginx.ingress.kubernetes.io/proxy-cache-valid-error: "404 1m"
spec:
tls:
- hosts:
- api.example.com
secretName: mcp-server-tls
rules:
- host: api.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: mcp-server
port:
number: 80
---
# 服务配置
apiVersion: v1
kind: Service
metadata:
name: mcp-server
namespace: production
annotations:
service.beta.kubernetes.io/aws-load-balancer-type: nlb
service.beta.kubernetes.io/aws-load-balancer-cross-zone-load-balancing-enabled: "true"
spec:
type: LoadBalancer
sessionAffinity: ClientIP
sessionAffinityConfig:
clientIP:
timeoutSeconds: 10800
ports:
- port: 80
targetPort: 8000
protocol: TCP
name: http
- port: 443
targetPort: 8443
protocol: TCP
name: https
selector:
app: mcp-server
数据库分片策略
python
# src/scaling/database_sharding.py
import hashlib
from typing import Dict, List, Any, Optional
from enum import Enum
import asyncpg
class ShardingStrategy(Enum):
HASH = "hash"
RANGE = "range"
DIRECTORY = "directory"
class DatabaseSharding:
"""数据库分片管理器"""
def __init__(self, shard_configs: List[Dict[str, str]],
strategy: ShardingStrategy = ShardingStrategy.HASH):
self.shard_configs = shard_configs
self.strategy = strategy
self.shard_pools = {}
self.shard_count = len(shard_configs)
# 分片映射表(用于目录策略)
self.shard_directory = {}
async def initialize(self):
"""初始化分片连接池"""
for i, config in enumerate(self.shard_configs):
pool = await asyncpg.create_pool(
config['url'],
min_size=5,
max_size=20,
command_timeout=60
)
self.shard_pools[f'shard_{i}'] = pool
def get_shard_key(self, key: Any) -> str:
"""获取分片键"""
if self.strategy == ShardingStrategy.HASH:
# 哈希分片
hash_value = int(hashlib.md5(str(key).encode()).hexdigest(), 16)
shard_index = hash_value % self.shard_count
return f'shard_{shard_index}'
elif self.strategy == ShardingStrategy.RANGE:
# 范围分片(假设key是数字)
if isinstance(key, (int, float)):
shard_index = int(key) // 10000 # 每1万条记录一个分片
shard_index = min(shard_index, self.shard_count - 1)
return f'shard_{shard_index}'
elif self.strategy == ShardingStrategy.DIRECTORY:
# 目录分片
return self.shard_directory.get(str(key), 'shard_0')
return 'shard_0'
async def execute_query(self, shard_key: Any, query: str,
*args) -> List[Dict]:
"""在特定分片执行查询"""
shard_name = self.get_shard_key(shard_key)
pool = self.shard_pools[shard_name]
async with pool.acquire() as conn:
result = await conn.fetch(query, *args)
return [dict(row) for row in result]
async def execute_cross_shard_query(self, query: str,
*args) -> List[Dict]:
"""跨分片查询"""
tasks = []
for shard_name, pool in self.shard_pools.items():
async def query_shard(pool):
async with pool.acquire() as conn:
try:
result = await conn.fetch(query, *args)
return [dict(row) for row in result]
except Exception as e:
print(f"分片 {shard_name} 查询失败: {e}")
return []
tasks.append(query_shard(pool))
# 并行执行所有分片查询
results = await asyncio.gather(*tasks)
# 合并结果
merged_results = []
for result in results:
merged_results.extend(result)
return merged_results
async def distributed_transaction(self, operations: List[Dict]):
"""分布式事务(两阶段提交)"""
# 第一阶段:准备
prepared_transactions = {}
try:
for operation in operations:
shard_key = operation['shard_key']
shard_name = self.get_shard_key(shard_key)
pool = self.shard_pools[shard_name]
async with pool.acquire() as conn:
# 开始事务
tx = conn.transaction()
await tx.start()
# 执行操作
await conn.execute(operation['query'], *operation['args'])
# 准备提交
prepared_transactions[shard_name] = tx
# 第二阶段:提交所有事务
for shard_name, tx in prepared_transactions.items():
await tx.commit()
except Exception as e:
# 回滚所有已准备的事务
for shard_name, tx in prepared_transactions.items():
try:
await tx.rollback()
except:
pass
raise e
async def rebalance_shards(self, new_shard_count: int):
"""分片重平衡"""
if new_shard_count <= self.shard_count:
raise ValueError("新分片数量必须大于当前分片数量")
print(f"🔄 开始分片重平衡: {self.shard_count} -> {new_shard_count}")
# 创建新分片
for i in range(self.shard_count, new_shard_count):
# 这里需要添加新分片的配置
pass
# 数据迁移逻辑
# 这是一个复杂的过程,需要:
# 1. 计算新的分片映射
# 2. 迁移数据
# 3. 更新路由信息
# 4. 验证数据一致性
print("✅ 分片重平衡完成")
# 读写分离
class ReadWriteSplitter:
"""读写分离管理器"""
def __init__(self, master_url: str, slave_urls: List[str]):
self.master_url = master_url
self.slave_urls = slave_urls
self.master_pool = None
self.slave_pools = []
self.current_slave_index = 0
async def initialize(self):
"""初始化连接池"""
# 主库连接池
self.master_pool = await asyncpg.create_pool(
self.master_url,
min_size=10,
max_size=30
)
# 从库连接池
for slave_url in self.slave_urls:
pool = await asyncpg.create_pool(
slave_url,
min_size=5,
max_size=20
)
self.slave_pools.append(pool)
async def execute_write(self, query: str, *args) -> Any:
"""执行写操作(主库)"""
async with self.master_pool.acquire() as conn:
return await conn.execute(query, *args)
async def execute_read(self, query: str, *args) -> List[Dict]:
"""执行读操作(从库,负载均衡)"""
if not self.slave_pools:
# 没有从库,使用主库
async with self.master_pool.acquire() as conn:
result = await conn.fetch(query, *args)
return [dict(row) for row in result]
# 轮询选择从库
slave_pool = self.slave_pools[self.current_slave_index]
self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_pools)
try:
async with slave_pool.acquire() as conn:
result = await conn.fetch(query, *args)
return [dict(row) for row in result]
except Exception as e:
# 从库失败,回退到主库
print(f"从库查询失败,回退到主库: {e}")
async with self.master_pool.acquire() as conn:
result = await conn.fetch(query, *args)
return [dict(row) for row in result]
🧪 压力测试和性能基准
压力测试脚本
python
# tests/performance/load_test.py
import asyncio
import aiohttp
import time
import statistics
from typing import List, Dict, Any
import json
from dataclasses import dataclass
import argparse
@dataclass
class TestResult:
"""测试结果"""
total_requests: int
successful_requests: int
failed_requests: int
avg_response_time: float
p95_response_time: float
p99_response_time: float
requests_per_second: float
errors: List[str]
class LoadTester:
"""负载测试器"""
def __init__(self, base_url: str, max_connections: int = 100):
self.base_url = base_url
self.max_connections = max_connections
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_connections
)
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=aiohttp.ClientTimeout(total=30)
)
self.response_times = []
self.errors = []
self.successful_requests = 0
self.failed_requests = 0
async def single_request(self, endpoint: str, method: str = 'GET',
data: Dict = None) -> float:
"""单次请求"""
start_time = time.time()
try:
async with self.session.request(
method,
f"{self.base_url}{endpoint}",
json=data
) as response:
await response.text() # 读取响应内容
if response.status < 400:
self.successful_requests += 1
else:
self.failed_requests += 1
self.errors.append(f"HTTP {response.status}: {endpoint}")
response_time = time.time() - start_time
self.response_times.append(response_time)
return response_time
except Exception as e:
self.failed_requests += 1
self.errors.append(f"Exception: {str(e)}")
response_time = time.time() - start_time
self.response_times.append(response_time)
return response_time
async def concurrent_test(self, endpoint: str,
concurrent_users: int,
requests_per_user: int,
method: str = 'GET',
data: Dict = None) -> TestResult:
"""并发测试"""
print(f"🚀 开始负载测试:")
print(f" 端点: {endpoint}")
print(f" 并发用户: {concurrent_users}")
print(f" 每用户请求数: {requests_per_user}")
print(f" 总请求数: {concurrent_users * requests_per_user}")
start_time = time.time()
# 创建任务
tasks = []
for user in range(concurrent_users):
for request in range(requests_per_user):
task = self.single_request(endpoint, method, data)
tasks.append(task)
# 执行所有任务
await asyncio.gather(*tasks)
total_time = time.time() - start_time
total_requests = len(self.response_times)
# 计算统计数据
if self.response_times:
avg_response_time = statistics.mean(self.response_times)
p95_response_time = statistics.quantiles(self.response_times, n=20)[18] # 95th percentile
p99_response_time = statistics.quantiles(self.response_times, n=100)[98] # 99th percentile
else:
avg_response_time = p95_response_time = p99_response_time = 0
requests_per_second = total_requests / total_time if total_time > 0 else 0
return TestResult(
total_requests=total_requests,
successful_requests=self.successful_requests,
failed_requests=self.failed_requests,
avg_response_time=avg_response_time,
p95_response_time=p95_response_time,
p99_response_time=p99_response_time,
requests_per_second=requests_per_second,
errors=self.errors[:10] # 只保留前10个错误
)
async def ramp_up_test(self, endpoint: str,
max_users: int,
ramp_duration: int = 60) -> List[TestResult]:
"""渐进负载测试"""
results = []
step_size = max_users // 10 # 10个步骤
step_duration = ramp_duration // 10
for step in range(1, 11):
users = step * step_size
print(f"📊 步骤 {step}/10: {users} 并发用户")
# 重置统计
self.response_times = []
self.errors = []
self.successful_requests = 0
self.failed_requests = 0
# 执行测试
result = await self.concurrent_test(endpoint, users, 1)
results.append(result)
# 等待下一步
if step < 10:
await asyncio.sleep(step_duration)
return results
async def cleanup(self):
"""清理资源"""
await self.session.close()
def print_results(self, result: TestResult):
"""打印测试结果"""
print("\n" + "="*60)
print("📊 负载测试结果")
print("="*60)
print(f"总请求数: {result.total_requests}")
print(f"成功请求: {result.successful_requests}")
print(f"失败请求: {result.failed_requests}")
print(f"成功率: {result.successful_requests/result.total_requests*100:.1f}%")
print(f"平均响应时间: {result.avg_response_time*1000:.2f}ms")
print(f"95%响应时间: {result.p95_response_time*1000:.2f}ms")
print(f"99%响应时间: {result.p99_response_time*1000:.2f}ms")
print(f"每秒请求数: {result.requests_per_second:.2f} RPS")
if result.errors:
print(f"\n❌ 错误示例 (前10个):")
for error in result.errors:
print(f" {error}")
async def main():
"""主函数"""
parser = argparse.ArgumentParser(description='MCP服务器负载测试')
parser.add_argument('--url', default='http://localhost:8000', help='服务器URL')
parser.add_argument('--endpoint', default='/health', help='测试端点')
parser.add_argument('--users', type=int, default=50, help='并发用户数')
parser.add_argument('--requests', type=int, default=10, help='每用户请求数')
parser.add_argument('--ramp', action='store_true', help='渐进负载测试')
args = parser.parse_args()
tester = LoadTester(args.url)
try:
if args.ramp:
results = await tester.ramp_up_test(args.endpoint, args.users)
print("\n📈 渐进负载测试结果:")
for i, result in enumerate(results, 1):
print(f"\n步骤 {i}: {i * args.users // 10} 用户")
tester.print_results(result)
else:
result = await tester.concurrent_test(
args.endpoint,
args.users,
args.requests
)
tester.print_results(result)
finally:
await tester.cleanup()
if __name__ == '__main__':
asyncio.run(main())
性能基准测试
python
# tests/performance/benchmark.py
import asyncio
import time
import psutil
import json
from typing import Dict, Any, List
from dataclasses import dataclass, asdict
import statistics
@dataclass
class BenchmarkResult:
"""基准测试结果"""
test_name: str
duration: float
operations_per_second: float
memory_usage_mb: float
cpu_usage_percent: float
success_rate: float
error_count: int
class PerformanceBenchmark:
"""性能基准测试"""
def __init__(self):
self.results: List[BenchmarkResult] = []
async def benchmark_database_operations(self, db_optimizer) -> BenchmarkResult:
"""数据库操作基准测试"""
print("🔍 数据库操作基准测试...")
start_time = time.time()
start_memory = psutil.virtual_memory().used / 1024 / 1024
process = psutil.Process()
operations = 0
errors = 0
# 执行1000次数据库查询
for i in range(1000):
try:
await db_optimizer.execute_query(
"SELECT $1::int as test_value", i
)
operations += 1
except Exception as e:
errors += 1
end_time = time.time()
end_memory = psutil.virtual_memory().used / 1024 / 1024
duration = end_time - start_time
return BenchmarkResult(
test_name="database_operations",
duration=duration,
operations_per_second=operations / duration,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=process.cpu_percent(),
success_rate=operations / (operations + errors) * 100,
error_count=errors
)
async def benchmark_cache_operations(self, cache_optimizer) -> BenchmarkResult:
"""缓存操作基准测试"""
print("🔍 缓存操作基准测试...")
start_time = time.time()
start_memory = psutil.virtual_memory().used / 1024 / 1024
process = psutil.Process()
operations = 0
errors = 0
# 执行10000次缓存操作
for i in range(10000):
try:
key = f"test_key_{i % 100}" # 100个不同的键
value = f"test_value_{i}"
await cache_optimizer.multi_level_cache_set(key, value)
await cache_optimizer.multi_level_cache_get(key)
operations += 2
except Exception as e:
errors += 1
end_time = time.time()
end_memory = psutil.virtual_memory().used / 1024 / 1024
duration = end_time - start_time
return BenchmarkResult(
test_name="cache_operations",
duration=duration,
operations_per_second=operations / duration,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=process.cpu_percent(),
success_rate=operations / (operations + errors) * 100,
error_count=errors
)
async def benchmark_concurrent_processing(self) -> BenchmarkResult:
"""并发处理基准测试"""
print("🔍 并发处理基准测试...")
start_time = time.time()
start_memory = psutil.virtual_memory().used / 1024 / 1024
process = psutil.Process()
async def dummy_task(task_id: int) -> int:
"""模拟任务"""
await asyncio.sleep(0.01) # 模拟IO操作
return task_id * 2
# 并发执行1000个任务
tasks = [dummy_task(i) for i in range(1000)]
results = await asyncio.gather(*tasks, return_exceptions=True)
operations = len([r for r in results if not isinstance(r, Exception)])
errors = len([r for r in results if isinstance(r, Exception)])
end_time = time.time()
end_memory = psutil.virtual_memory().used / 1024 / 1024
duration = end_time - start_time
return BenchmarkResult(
test_name="concurrent_processing",
duration=duration,
operations_per_second=operations / duration,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=process.cpu_percent(),
success_rate=operations / (operations + errors) * 100,
error_count=errors
)
async def run_all_benchmarks(self, db_optimizer=None,
cache_optimizer=None) -> List[BenchmarkResult]:
"""运行所有基准测试"""
print("🚀 开始性能基准测试...")
if db_optimizer:
result = await self.benchmark_database_operations(db_optimizer)
self.results.append(result)
if cache_optimizer:
result = await self.benchmark_cache_operations(cache_optimizer)
self.results.append(result)
result = await self.benchmark_concurrent_processing()
self.results.append(result)
return self.results
def print_results(self):
"""打印基准测试结果"""
print("\n" + "="*80)
print("📊 性能基准测试结果")
print("="*80)
for result in self.results:
print(f"\n🔍 {result.test_name}")
print(f" 持续时间: {result.duration:.2f}s")
print(f" 操作/秒: {result.operations_per_second:.2f}")
print(f" 内存使用: {result.memory_usage_mb:.2f}MB")
print(f" CPU使用: {result.cpu_usage_percent:.1f}%")
print(f" 成功率: {result.success_rate:.1f}%")
print(f" 错误数: {result.error_count}")
def save_results(self, filename: str):
"""保存结果到文件"""
with open(filename, 'w') as f:
json.dump([asdict(result) for result in self.results], f, indent=2)
print(f"📄 结果已保存到 {filename}")
📈 容量规划工具
python
# src/scaling/capacity_planning.py
import math
from typing import Dict, Any, List
from dataclasses import dataclass
import json
@dataclass
class ResourceRequirement:
"""资源需求"""
cpu_cores: float
memory_gb: float
storage_gb: float
network_mbps: float
connections: int
@dataclass
class ScalingPlan:
"""扩容计划"""
current_capacity: ResourceRequirement
target_capacity: ResourceRequirement
scaling_factor: float
estimated_cost: float
recommendations: List[str]
class CapacityPlanner:
"""容量规划器"""
def __init__(self):
# 基准资源使用率(每RPS)
self.base_resource_per_rps = {
'cpu_cores': 0.001, # 每RPS需要0.001核心
'memory_mb': 2, # 每RPS需要2MB内存
'storage_mb': 1, # 每RPS需要1MB存储
'network_kbps': 10, # 每RPS需要10KB/s网络
'connections': 0.1 # 每RPS需要0.1个连接
}
# 资源成本(每月)
self.resource_costs = {
'cpu_core': 30, # 每核心每月30美元
'memory_gb': 5, # 每GB内存每月5美元
'storage_gb': 0.1, # 每GB存储每月0.1美元
'network_gb': 0.05 # 每GB流量0.05美元
}
def calculate_resource_requirements(self, target_rps: int,
peak_multiplier: float = 2.0,
safety_margin: float = 0.3) -> ResourceRequirement:
"""计算资源需求"""
# 考虑峰值和安全余量
effective_rps = target_rps * peak_multiplier * (1 + safety_margin)
cpu_cores = effective_rps * self.base_resource_per_rps['cpu_cores']
memory_gb = effective_rps * self.base_resource_per_rps['memory_mb'] / 1024
storage_gb = effective_rps * self.base_resource_per_rps['storage_mb'] / 1024
network_mbps = effective_rps * self.base_resource_per_rps['network_kbps'] / 1024
connections = int(effective_rps * self.base_resource_per_rps['connections'])
return ResourceRequirement(
cpu_cores=math.ceil(cpu_cores * 4) / 4, # 四舍五入到0.25核心
memory_gb=math.ceil(memory_gb),
storage_gb=math.ceil(storage_gb),
network_mbps=math.ceil(network_mbps),
connections=connections
)
def estimate_costs(self, requirements: ResourceRequirement) -> float:
"""估算成本"""
cpu_cost = requirements.cpu_cores * self.resource_costs['cpu_core']
memory_cost = requirements.memory_gb * self.resource_costs['memory_gb']
storage_cost = requirements.storage_gb * self.resource_costs['storage_gb']
# 网络成本按月估算(假设每RPS产生1KB响应)
monthly_network_gb = requirements.network_mbps * 0.125 * 86400 * 30 / 1024
network_cost = monthly_network_gb * self.resource_costs['network_gb']
return cpu_cost + memory_cost + storage_cost + network_cost
def create_scaling_plan(self, current_rps: int, target_rps: int,
current_resources: ResourceRequirement = None) -> ScalingPlan:
"""创建扩容计划"""
target_requirements = self.calculate_resource_requirements(target_rps)
if current_resources is None:
current_requirements = self.calculate_resource_requirements(current_rps)
else:
current_requirements = current_resources
scaling_factor = target_rps / current_rps if current_rps > 0 else 1
estimated_cost = self.estimate_costs(target_requirements)
# 生成建议
recommendations = []
if scaling_factor > 2:
recommendations.append("建议分阶段扩容,避免一次性大幅扩展")
if target_requirements.cpu_cores > 16:
recommendations.append("考虑使用多个较小实例而非单个大实例")
if target_requirements.memory_gb > 32:
recommendations.append("考虑实施缓存策略以减少内存需求")
if target_requirements.connections > 10000:
recommendations.append("考虑实施连接池和负载均衡")
return ScalingPlan(
current_capacity=current_requirements,
target_capacity=target_requirements,
scaling_factor=scaling_factor,
estimated_cost=estimated_cost,
recommendations=recommendations
)
def generate_scaling_report(self, plan: ScalingPlan) -> str:
"""生成扩容报告"""
report = f"""
📊 容量规划报告
{'='*50}
📈 扩容概览
- 扩容倍数: {plan.scaling_factor:.1f}x
- 预估成本: ${plan.estimated_cost:.2f}/月
🔧 当前资源配置
- CPU核心: {plan.current_capacity.cpu_cores}
- 内存: {plan.current_capacity.memory_gb}GB
- 存储: {plan.current_capacity.storage_gb}GB
- 网络: {plan.current_capacity.network_mbps}Mbps
- 连接数: {plan.current_capacity.connections}
🚀 目标资源配置
- CPU核心: {plan.target_capacity.cpu_cores}
- 内存: {plan.target_capacity.memory_gb}GB
- 存储: {plan.target_capacity.storage_gb}GB
- 网络: {plan.target_capacity.network_mbps}Mbps
- 连接数: {plan.target_capacity.connections}
💡 扩容建议
"""
for i, recommendation in enumerate(plan.recommendations, 1):
report += f"{i}. {recommendation}\n"
return report
# 使用示例
def main():
planner = CapacityPlanner()
# 创建扩容计划:从1000 RPS扩展到5000 RPS
plan = planner.create_scaling_plan(
current_rps=1000,
target_rps=5000
)
# 生成报告
report = planner.generate_scaling_report(plan)
print(report)
# 保存计划
with open('scaling_plan.json', 'w') as f:
json.dump({
'current_capacity': plan.current_capacity.__dict__,
'target_capacity': plan.target_capacity.__dict__,
'scaling_factor': plan.scaling_factor,
'estimated_cost': plan.estimated_cost,
'recommendations': plan.recommendations
}, f, indent=2)
if __name__ == '__main__':
main()
🎯 最佳实践
1. 性能优化原则
- 测量优先:先测量再优化,避免过早优化
- 瓶颈定位:找到系统的真正瓶颈
- 渐进优化:分步骤进行优化,验证效果
2. 扩容策略
- 水平扩容优先:通过增加实例数量扩容
- 无状态设计:确保应用无状态,便于扩容
- 渐进扩容:避免一次性大幅扩容
3. 监控指标
- 关键指标:QPS、响应时间、错误率、资源使用率
- 业务指标:工具调用成功率、用户满意度
- 趋势分析:观察长期趋势,预测扩容需求
4. 容量规划
- 基于数据:使用真实的性能数据进行规划
- 安全余量:预留足够的安全余量
- 成本考虑:平衡性能和成本
🚀 下一步
完成性能调优与扩容后,你可以:
- 学习故障排除技巧 → 7.5 故障排除与维护
- 了解最佳实践 → 第8章:最佳实践
- 查看案例研究 → 第9章:案例研究
📚 扩展阅读: