5.4 性能优化和监控
🎯 学习目标:构建高性能的MCP服务器,实现全面的性能监控和自动化优化
⏱️ 预计时间:55分钟
📊 难度等级:⭐⭐⭐⭐⭐
📊 性能优化策略
🎛️ 性能优化维度
⚡ 高性能核心框架
🚀 性能优化管理器
python
"""
performance.py - 性能优化和监控框架
"""
from typing import Dict, List, Any, Optional, Callable, Tuple, Union
from dataclasses import dataclass, field
from enum import Enum
import asyncio
import time
import threading
import multiprocessing
import psutil
import gc
import sys
import weakref
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import wraps, lru_cache
import cProfile
import pstats
from memory_profiler import profile as memory_profile
from loguru import logger
class PerformanceMetric(Enum):
"""性能指标类型"""
LATENCY = "latency"
THROUGHPUT = "throughput"
CPU_USAGE = "cpu_usage"
MEMORY_USAGE = "memory_usage"
DISK_IO = "disk_io"
NETWORK_IO = "network_io"
ERROR_RATE = "error_rate"
CONCURRENT_CONNECTIONS = "concurrent_connections"
QUEUE_SIZE = "queue_size"
CACHE_HIT_RATE = "cache_hit_rate"
@dataclass
class PerformanceSnapshot:
"""性能快照"""
timestamp: datetime
metrics: Dict[PerformanceMetric, float]
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"timestamp": self.timestamp.isoformat(),
"metrics": {k.value: v for k, v in self.metrics.items()},
"metadata": self.metadata
}
@dataclass
class OptimizationResult:
"""优化结果"""
optimization_type: str
before_metrics: Dict[str, float]
after_metrics: Dict[str, float]
improvement_percentage: float
applied_at: datetime
description: str
class PerformanceMonitor:
"""性能监控器"""
def __init__(self, sampling_interval: float = 1.0):
self.sampling_interval = sampling_interval
self.snapshots: List[PerformanceSnapshot] = []
self.max_snapshots = 10000
self.running = False
self.monitor_task: Optional[asyncio.Task] = None
# 性能计数器
self.request_count = 0
self.error_count = 0
self.start_time = time.time()
# 延迟统计
self.latency_samples: List[float] = []
self.max_latency_samples = 1000
# 缓存统计
self.cache_hits = 0
self.cache_misses = 0
async def start_monitoring(self):
"""开始监控"""
if self.running:
return
self.running = True
self.monitor_task = asyncio.create_task(self._monitoring_loop())
logger.info("性能监控已启动")
async def stop_monitoring(self):
"""停止监控"""
if not self.running:
return
self.running = False
if self.monitor_task:
self.monitor_task.cancel()
try:
await self.monitor_task
except asyncio.CancelledError:
pass
logger.info("性能监控已停止")
async def _monitoring_loop(self):
"""监控循环"""
while self.running:
try:
snapshot = await self._collect_snapshot()
self.snapshots.append(snapshot)
# 限制快照数量
if len(self.snapshots) > self.max_snapshots:
self.snapshots.pop(0)
await asyncio.sleep(self.sampling_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"性能监控错误: {e}")
await asyncio.sleep(1)
async def _collect_snapshot(self) -> PerformanceSnapshot:
"""收集性能快照"""
# 系统资源使用情况
cpu_percent = psutil.cpu_percent(interval=None)
memory_info = psutil.virtual_memory()
disk_io = psutil.disk_io_counters()
network_io = psutil.net_io_counters()
# 进程信息
process = psutil.Process()
process_memory = process.memory_info()
process_cpu = process.cpu_percent()
# 计算吞吐量
current_time = time.time()
elapsed_time = current_time - self.start_time
throughput = self.request_count / max(elapsed_time, 1)
# 计算错误率
error_rate = (self.error_count / max(self.request_count, 1)) * 100
# 计算平均延迟
avg_latency = sum(self.latency_samples) / max(len(self.latency_samples), 1)
# 计算缓存命中率
total_cache_requests = self.cache_hits + self.cache_misses
cache_hit_rate = (self.cache_hits / max(total_cache_requests, 1)) * 100
metrics = {
PerformanceMetric.CPU_USAGE: cpu_percent,
PerformanceMetric.MEMORY_USAGE: memory_info.percent,
PerformanceMetric.DISK_IO: disk_io.read_bytes + disk_io.write_bytes if disk_io else 0,
PerformanceMetric.NETWORK_IO: network_io.bytes_sent + network_io.bytes_recv if network_io else 0,
PerformanceMetric.THROUGHPUT: throughput,
PerformanceMetric.ERROR_RATE: error_rate,
PerformanceMetric.LATENCY: avg_latency,
PerformanceMetric.CACHE_HIT_RATE: cache_hit_rate
}
metadata = {
"process_memory_rss": process_memory.rss,
"process_memory_vms": process_memory.vms,
"process_cpu_percent": process_cpu,
"gc_counts": gc.get_count(),
"thread_count": threading.active_count()
}
return PerformanceSnapshot(
timestamp=datetime.now(),
metrics=metrics,
metadata=metadata
)
def record_request(self, latency: float, error: bool = False):
"""记录请求"""
self.request_count += 1
if error:
self.error_count += 1
# 记录延迟
self.latency_samples.append(latency)
if len(self.latency_samples) > self.max_latency_samples:
self.latency_samples.pop(0)
def record_cache_access(self, hit: bool):
"""记录缓存访问"""
if hit:
self.cache_hits += 1
else:
self.cache_misses += 1
def get_current_metrics(self) -> Dict[str, float]:
"""获取当前指标"""
if not self.snapshots:
return {}
latest_snapshot = self.snapshots[-1]
return {k.value: v for k, v in latest_snapshot.metrics.items()}
def get_historical_data(self,
metric: PerformanceMetric,
time_range: timedelta) -> List[Tuple[datetime, float]]:
"""获取历史数据"""
cutoff_time = datetime.now() - time_range
return [
(snapshot.timestamp, snapshot.metrics.get(metric, 0))
for snapshot in self.snapshots
if snapshot.timestamp >= cutoff_time
]
class PerformanceOptimizer:
"""性能优化器"""
def __init__(self, monitor: PerformanceMonitor):
self.monitor = monitor
self.optimization_history: List[OptimizationResult] = []
# 优化阈值
self.cpu_threshold = 80.0
self.memory_threshold = 80.0
self.latency_threshold = 1000.0 # ms
self.error_rate_threshold = 5.0 # %
# 优化器配置
self.gc_optimization_enabled = True
self.connection_pool_optimization = True
self.cache_optimization = True
async def analyze_and_optimize(self) -> List[OptimizationResult]:
"""分析并优化性能"""
current_metrics = self.monitor.get_current_metrics()
optimizations_applied = []
# CPU优化
if current_metrics.get("cpu_usage", 0) > self.cpu_threshold:
result = await self._optimize_cpu_usage()
if result:
optimizations_applied.append(result)
# 内存优化
if current_metrics.get("memory_usage", 0) > self.memory_threshold:
result = await self._optimize_memory_usage()
if result:
optimizations_applied.append(result)
# 延迟优化
if current_metrics.get("latency", 0) > self.latency_threshold:
result = await self._optimize_latency()
if result:
optimizations_applied.append(result)
# 错误率优化
if current_metrics.get("error_rate", 0) > self.error_rate_threshold:
result = await self._optimize_error_rate()
if result:
optimizations_applied.append(result)
self.optimization_history.extend(optimizations_applied)
return optimizations_applied
async def _optimize_cpu_usage(self) -> Optional[OptimizationResult]:
"""优化CPU使用"""
before_metrics = self.monitor.get_current_metrics()
try:
# 触发垃圾回收
if self.gc_optimization_enabled:
collected = gc.collect()
logger.info(f"垃圾回收清理了 {collected} 个对象")
# 等待一段时间观察效果
await asyncio.sleep(5)
after_metrics = self.monitor.get_current_metrics()
cpu_before = before_metrics.get("cpu_usage", 0)
cpu_after = after_metrics.get("cpu_usage", 0)
if cpu_after < cpu_before:
improvement = ((cpu_before - cpu_after) / cpu_before) * 100
return OptimizationResult(
optimization_type="cpu_gc_optimization",
before_metrics={"cpu_usage": cpu_before},
after_metrics={"cpu_usage": cpu_after},
improvement_percentage=improvement,
applied_at=datetime.now(),
description=f"垃圾回收优化,CPU使用率从 {cpu_before:.1f}% 降至 {cpu_after:.1f}%"
)
except Exception as e:
logger.error(f"CPU优化失败: {e}")
return None
async def _optimize_memory_usage(self) -> Optional[OptimizationResult]:
"""优化内存使用"""
before_metrics = self.monitor.get_current_metrics()
try:
# 强制垃圾回收
for generation in range(3):
gc.collect(generation)
# 清理对象引用
gc.set_debug(0)
await asyncio.sleep(3)
after_metrics = self.monitor.get_current_metrics()
memory_before = before_metrics.get("memory_usage", 0)
memory_after = after_metrics.get("memory_usage", 0)
if memory_after < memory_before:
improvement = ((memory_before - memory_after) / memory_before) * 100
return OptimizationResult(
optimization_type="memory_gc_optimization",
before_metrics={"memory_usage": memory_before},
after_metrics={"memory_usage": memory_after},
improvement_percentage=improvement,
applied_at=datetime.now(),
description=f"内存优化,使用率从 {memory_before:.1f}% 降至 {memory_after:.1f}%"
)
except Exception as e:
logger.error(f"内存优化失败: {e}")
return None
async def _optimize_latency(self) -> Optional[OptimizationResult]:
"""优化延迟"""
# 延迟优化通常需要具体的业务逻辑调整
logger.info("延迟优化需要根据具体业务场景实现")
return None
async def _optimize_error_rate(self) -> Optional[OptimizationResult]:
"""优化错误率"""
# 错误率优化通常需要分析具体错误原因
logger.info("错误率优化需要根据具体错误模式实现")
return None
class ConnectionPool:
"""连接池"""
def __init__(self,
min_connections: int = 5,
max_connections: int = 100,
connection_timeout: int = 30):
self.min_connections = min_connections
self.max_connections = max_connections
self.connection_timeout = connection_timeout
self.available_connections: asyncio.Queue = asyncio.Queue()
self.active_connections: Dict[str, Any] = {}
self.connection_count = 0
self.lock = asyncio.Lock()
async def initialize(self):
"""初始化连接池"""
async with self.lock:
for _ in range(self.min_connections):
connection = await self._create_connection()
await self.available_connections.put(connection)
self.connection_count += 1
logger.info(f"连接池已初始化,创建了 {self.min_connections} 个连接")
async def get_connection(self) -> Any:
"""获取连接"""
try:
# 尝试从可用连接中获取
connection = await asyncio.wait_for(
self.available_connections.get(),
timeout=5.0
)
connection_id = id(connection)
self.active_connections[connection_id] = connection
return connection
except asyncio.TimeoutError:
# 如果没有可用连接且未达到最大连接数,创建新连接
async with self.lock:
if self.connection_count < self.max_connections:
connection = await self._create_connection()
self.connection_count += 1
connection_id = id(connection)
self.active_connections[connection_id] = connection
return connection
raise Exception("连接池已满,无法获取连接")
async def return_connection(self, connection: Any):
"""归还连接"""
connection_id = id(connection)
if connection_id in self.active_connections:
del self.active_connections[connection_id]
# 检查连接是否仍然有效
if await self._is_connection_valid(connection):
await self.available_connections.put(connection)
else:
# 连接无效,减少连接计数
async with self.lock:
self.connection_count -= 1
# 如果连接数低于最小值,创建新连接
if self.connection_count < self.min_connections:
new_connection = await self._create_connection()
await self.available_connections.put(new_connection)
self.connection_count += 1
async def _create_connection(self) -> Any:
"""创建连接(需要子类实现)"""
# 这里应该根据具体的连接类型实现
return {"id": time.time(), "created_at": datetime.now()}
async def _is_connection_valid(self, connection: Any) -> bool:
"""检查连接是否有效"""
# 这里应该根据具体的连接类型实现
created_at = connection.get("created_at")
if created_at:
age = datetime.now() - created_at
return age.total_seconds() < self.connection_timeout
return False
async def close_all(self):
"""关闭所有连接"""
# 关闭活跃连接
for connection in self.active_connections.values():
await self._close_connection(connection)
self.active_connections.clear()
# 关闭可用连接
while not self.available_connections.empty():
try:
connection = self.available_connections.get_nowait()
await self._close_connection(connection)
except asyncio.QueueEmpty:
break
self.connection_count = 0
logger.info("所有连接已关闭")
async def _close_connection(self, connection: Any):
"""关闭连接"""
# 这里应该根据具体的连接类型实现
pass
class AdvancedCache:
"""高级缓存系统"""
def __init__(self,
max_size: int = 10000,
ttl_seconds: int = 3600,
cleanup_interval: int = 300):
self.max_size = max_size
self.default_ttl = ttl_seconds
self.cleanup_interval = cleanup_interval
self.data: Dict[str, Dict[str, Any]] = {}
self.access_times: Dict[str, datetime] = {}
self.hit_count = 0
self.miss_count = 0
self.cleanup_task: Optional[asyncio.Task] = None
self.lock = asyncio.Lock()
async def start(self):
"""启动缓存系统"""
self.cleanup_task = asyncio.create_task(self._cleanup_loop())
logger.info("缓存系统已启动")
async def stop(self):
"""停止缓存系统"""
if self.cleanup_task:
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
logger.info("缓存系统已停止")
async def get(self, key: str) -> Optional[Any]:
"""获取缓存值"""
async with self.lock:
if key not in self.data:
self.miss_count += 1
return None
entry = self.data[key]
# 检查是否过期
if datetime.now() > entry["expires_at"]:
del self.data[key]
self.access_times.pop(key, None)
self.miss_count += 1
return None
# 更新访问时间
self.access_times[key] = datetime.now()
self.hit_count += 1
return entry["value"]
async def set(self,
key: str,
value: Any,
ttl: Optional[int] = None) -> bool:
"""设置缓存值"""
async with self.lock:
# 如果缓存已满,清理最少使用的项
if len(self.data) >= self.max_size and key not in self.data:
await self._evict_lru()
ttl_to_use = ttl if ttl is not None else self.default_ttl
expires_at = datetime.now() + timedelta(seconds=ttl_to_use)
self.data[key] = {
"value": value,
"created_at": datetime.now(),
"expires_at": expires_at
}
self.access_times[key] = datetime.now()
return True
async def delete(self, key: str) -> bool:
"""删除缓存项"""
async with self.lock:
if key in self.data:
del self.data[key]
self.access_times.pop(key, None)
return True
return False
async def clear(self):
"""清空缓存"""
async with self.lock:
self.data.clear()
self.access_times.clear()
logger.info("缓存已清空")
async def _cleanup_loop(self):
"""清理循环"""
while True:
try:
await asyncio.sleep(self.cleanup_interval)
await self._cleanup_expired()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"缓存清理错误: {e}")
async def _cleanup_expired(self):
"""清理过期项"""
async with self.lock:
now = datetime.now()
expired_keys = [
key for key, entry in self.data.items()
if now > entry["expires_at"]
]
for key in expired_keys:
del self.data[key]
self.access_times.pop(key, None)
if expired_keys:
logger.debug(f"清理了 {len(expired_keys)} 个过期缓存项")
async def _evict_lru(self):
"""清理最少使用的项"""
if not self.access_times:
return
# 找到最少使用的键
lru_key = min(self.access_times, key=self.access_times.get)
del self.data[lru_key]
del self.access_times[lru_key]
logger.debug(f"LRU清理了缓存项: {lru_key}")
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计"""
total_requests = self.hit_count + self.miss_count
hit_rate = (self.hit_count / max(total_requests, 1)) * 100
return {
"size": len(self.data),
"max_size": self.max_size,
"hit_count": self.hit_count,
"miss_count": self.miss_count,
"hit_rate": hit_rate,
"usage_percentage": (len(self.data) / self.max_size) * 100
}
def performance_timer(metric_name: str = "execution_time"):
"""性能计时装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
end_time = time.time()
duration = (end_time - start_time) * 1000 # 转换为毫秒
logger.debug(f"{func.__name__} {metric_name}: {duration:.2f}ms")
@wraps(func)
def sync_wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = time.time()
duration = (end_time - start_time) * 1000 # 转换为毫秒
logger.debug(f"{func.__name__} {metric_name}: {duration:.2f}ms")
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
def memory_monitor(threshold_mb: float = 100.0):
"""内存监控装饰器"""
def decorator(func):
@wraps(func)
async def async_wrapper(*args, **kwargs):
import tracemalloc
tracemalloc.start()
try:
result = await func(*args, **kwargs)
return result
finally:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
current_mb = current / 1024 / 1024
peak_mb = peak / 1024 / 1024
if peak_mb > threshold_mb:
logger.warning(
f"{func.__name__} 内存使用过高: "
f"当前 {current_mb:.2f}MB, 峰值 {peak_mb:.2f}MB"
)
else:
logger.debug(
f"{func.__name__} 内存使用: "
f"当前 {current_mb:.2f}MB, 峰值 {peak_mb:.2f}MB"
)
@wraps(func)
def sync_wrapper(*args, **kwargs):
import tracemalloc
tracemalloc.start()
try:
result = func(*args, **kwargs)
return result
finally:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
current_mb = current / 1024 / 1024
peak_mb = peak / 1024 / 1024
if peak_mb > threshold_mb:
logger.warning(
f"{func.__name__} 内存使用过高: "
f"当前 {current_mb:.2f}MB, 峰值 {peak_mb:.2f}MB"
)
else:
logger.debug(
f"{func.__name__} 内存使用: "
f"当前 {current_mb:.2f}MB, 峰值 {peak_mb:.2f}MB"
)
return async_wrapper if asyncio.iscoroutinefunction(func) else sync_wrapper
return decorator
class PerformanceManager:
"""性能管理器"""
def __init__(self):
self.monitor = PerformanceMonitor()
self.optimizer = PerformanceOptimizer(self.monitor)
self.cache = AdvancedCache()
self.connection_pools: Dict[str, ConnectionPool] = {}
# 性能分析器
self.profiler: Optional[cProfile.Profile] = None
self.profiling_enabled = False
async def initialize(self):
"""初始化性能管理器"""
await self.monitor.start_monitoring()
await self.cache.start()
# 启动自动优化任务
asyncio.create_task(self._auto_optimization_loop())
logger.info("性能管理器已初始化")
async def shutdown(self):
"""关闭性能管理器"""
await self.monitor.stop_monitoring()
await self.cache.stop()
# 关闭所有连接池
for pool in self.connection_pools.values():
await pool.close_all()
if self.profiling_enabled:
self.stop_profiling()
logger.info("性能管理器已关闭")
def register_connection_pool(self, name: str, pool: ConnectionPool):
"""注册连接池"""
self.connection_pools[name] = pool
logger.info(f"已注册连接池: {name}")
def start_profiling(self):
"""开始性能分析"""
if not self.profiling_enabled:
self.profiler = cProfile.Profile()
self.profiler.enable()
self.profiling_enabled = True
logger.info("性能分析已启动")
def stop_profiling(self) -> Optional[str]:
"""停止性能分析并返回报告"""
if self.profiling_enabled and self.profiler:
self.profiler.disable()
self.profiling_enabled = False
# 生成报告
import io
s = io.StringIO()
stats = pstats.Stats(self.profiler, stream=s)
stats.sort_stats('cumulative')
stats.print_stats(50) # 显示前50个函数
report = s.getvalue()
logger.info("性能分析已停止")
return report
return None
async def _auto_optimization_loop(self):
"""自动优化循环"""
while True:
try:
await asyncio.sleep(300) # 每5分钟运行一次
optimizations = await self.optimizer.analyze_and_optimize()
if optimizations:
logger.info(f"应用了 {len(optimizations)} 个性能优化")
for opt in optimizations:
logger.info(f"优化: {opt.description}")
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"自动优化错误: {e}")
def get_performance_report(self) -> Dict[str, Any]:
"""获取性能报告"""
current_metrics = self.monitor.get_current_metrics()
cache_stats = self.cache.get_stats()
# 计算历史趋势
cpu_history = self.monitor.get_historical_data(
PerformanceMetric.CPU_USAGE,
timedelta(hours=1)
)
memory_history = self.monitor.get_historical_data(
PerformanceMetric.MEMORY_USAGE,
timedelta(hours=1)
)
return {
"timestamp": datetime.now().isoformat(),
"current_metrics": current_metrics,
"cache_stats": cache_stats,
"optimization_history": [
{
"type": opt.optimization_type,
"improvement": opt.improvement_percentage,
"applied_at": opt.applied_at.isoformat(),
"description": opt.description
}
for opt in self.optimizer.optimization_history[-10:] # 最近10次优化
],
"trends": {
"cpu_trend": len(cpu_history),
"memory_trend": len(memory_history),
"avg_cpu_1h": sum(v for _, v in cpu_history) / max(len(cpu_history), 1),
"avg_memory_1h": sum(v for _, v in memory_history) / max(len(memory_history), 1)
},
"connection_pools": {
name: {
"active_connections": len(pool.active_connections),
"total_connections": pool.connection_count,
"max_connections": pool.max_connections
}
for name, pool in self.connection_pools.items()
}
}
📊 性能监控仪表板
📈 实时监控界面
python
"""
monitoring_dashboard.py - 性能监控仪表板
"""
from typing import Dict, List, Any, Optional
import asyncio
import json
from datetime import datetime, timedelta
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
import plotly.graph_objs as go
import plotly.utils
from .performance import PerformanceManager, PerformanceMetric
class MonitoringDashboard:
"""监控仪表板"""
def __init__(self, performance_manager: PerformanceManager):
self.performance_manager = performance_manager
self.active_connections: List[WebSocket] = []
self.update_interval = 5.0 # 5秒更新一次
self.broadcast_task: Optional[asyncio.Task] = None
def setup_routes(self, app: FastAPI):
"""设置路由"""
@app.get("/dashboard", response_class=HTMLResponse)
async def dashboard_page():
"""仪表板页面"""
return self._generate_dashboard_html()
@app.websocket("/ws/dashboard")
async def dashboard_websocket(websocket: WebSocket):
"""仪表板WebSocket连接"""
await websocket.accept()
self.active_connections.append(websocket)
try:
while True:
# 等待客户端消息(保持连接活跃)
await websocket.receive_text()
except WebSocketDisconnect:
self.active_connections.remove(websocket)
@app.get("/api/performance/current")
async def get_current_metrics():
"""获取当前性能指标"""
return self.performance_manager.get_performance_report()
@app.get("/api/performance/history/{metric}")
async def get_metric_history(metric: str, hours: int = 1):
"""获取指标历史数据"""
try:
metric_enum = PerformanceMetric(metric)
time_range = timedelta(hours=hours)
history = self.performance_manager.monitor.get_historical_data(
metric_enum,
time_range
)
return {
"metric": metric,
"time_range_hours": hours,
"data": [
{"timestamp": ts.isoformat(), "value": value}
for ts, value in history
]
}
except ValueError:
return {"error": f"无效的指标类型: {metric}"}
@app.post("/api/performance/optimize")
async def trigger_optimization():
"""手动触发性能优化"""
optimizations = await self.performance_manager.optimizer.analyze_and_optimize()
return {
"optimizations_applied": len(optimizations),
"results": [
{
"type": opt.optimization_type,
"improvement": opt.improvement_percentage,
"description": opt.description
}
for opt in optimizations
]
}
@app.post("/api/profiling/start")
async def start_profiling():
"""开始性能分析"""
self.performance_manager.start_profiling()
return {"message": "性能分析已启动"}
@app.post("/api/profiling/stop")
async def stop_profiling():
"""停止性能分析"""
report = self.performance_manager.stop_profiling()
return {"report": report}
async def start_broadcasting(self):
"""开始广播更新"""
self.broadcast_task = asyncio.create_task(self._broadcast_loop())
async def stop_broadcasting(self):
"""停止广播"""
if self.broadcast_task:
self.broadcast_task.cancel()
try:
await self.broadcast_task
except asyncio.CancelledError:
pass
async def _broadcast_loop(self):
"""广播循环"""
while True:
try:
if self.active_connections:
# 获取最新性能数据
report = self.performance_manager.get_performance_report()
# 广播给所有连接的客户端
disconnected = []
for websocket in self.active_connections:
try:
await websocket.send_text(json.dumps(report))
except Exception:
disconnected.append(websocket)
# 清理断开的连接
for websocket in disconnected:
self.active_connections.remove(websocket)
await asyncio.sleep(self.update_interval)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"广播更新错误: {e}")
await asyncio.sleep(1)
def _generate_dashboard_html(self) -> str:
"""生成仪表板HTML"""
return """
<!DOCTYPE html>
<html>
<head>
<title>MCP性能监控仪表板</title>
<meta charset="utf-8">
<script src="https://cdn.plot.ly/plotly-latest.min.js"></script>
<script src="https://unpkg.com/vue@3/dist/vue.global.js"></script>
<style>
body {
font-family: 'Segoe UI', sans-serif;
margin: 0;
padding: 20px;
background-color: #f5f5f5;
}
.dashboard {
max-width: 1400px;
margin: 0 auto;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
border-radius: 10px;
margin-bottom: 20px;
box-shadow: 0 4px 6px rgba(0,0,0,0.1);
}
.metrics-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 20px;
margin-bottom: 20px;
}
.metric-card {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
border-left: 4px solid #667eea;
}
.metric-value {
font-size: 2.5em;
font-weight: bold;
color: #333;
}
.metric-label {
color: #666;
margin-top: 5px;
}
.chart-container {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.status-indicator {
display: inline-block;
width: 12px;
height: 12px;
border-radius: 50%;
margin-right: 8px;
}
.status-good { background-color: #4CAF50; }
.status-warning { background-color: #FF9800; }
.status-critical { background-color: #F44336; }
.optimization-history {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.optimization-item {
padding: 10px;
border-left: 3px solid #4CAF50;
margin-bottom: 10px;
background-color: #f8f9fa;
}
.controls {
background: white;
padding: 20px;
border-radius: 10px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
margin-bottom: 20px;
}
.btn {
background: #667eea;
color: white;
border: none;
padding: 10px 20px;
border-radius: 5px;
cursor: pointer;
margin-right: 10px;
}
.btn:hover {
background: #5a67d8;
}
</style>
</head>
<body>
<div id="app">
<div class="dashboard">
<div class="header">
<h1>🚀 MCP性能监控仪表板</h1>
<p>实时监控系统性能指标和优化建议</p>
<div>
<span class="status-indicator" :class="getStatusClass()"></span>
系统状态: {{ getSystemStatus() }}
<span style="float: right;">最后更新: {{ lastUpdate }}</span>
</div>
</div>
<div class="controls">
<button class="btn" @click="triggerOptimization">执行优化</button>
<button class="btn" @click="startProfiling">开始分析</button>
<button class="btn" @click="stopProfiling">停止分析</button>
</div>
<div class="metrics-grid">
<div class="metric-card">
<div class="metric-value">{{ formatValue(metrics.cpu_usage) }}%</div>
<div class="metric-label">CPU使用率</div>
</div>
<div class="metric-card">
<div class="metric-value">{{ formatValue(metrics.memory_usage) }}%</div>
<div class="metric-label">内存使用率</div>
</div>
<div class="metric-card">
<div class="metric-value">{{ formatValue(metrics.throughput) }}</div>
<div class="metric-label">吞吐量 (req/s)</div>
</div>
<div class="metric-card">
<div class="metric-value">{{ formatValue(metrics.latency) }}ms</div>
<div class="metric-label">平均延迟</div>
</div>
<div class="metric-card">
<div class="metric-value">{{ formatValue(metrics.error_rate) }}%</div>
<div class="metric-label">错误率</div>
</div>
<div class="metric-card">
<div class="metric-value">{{ formatValue(cacheStats.hit_rate) }}%</div>
<div class="metric-label">缓存命中率</div>
</div>
</div>
<div class="chart-container">
<h3>📊 性能趋势图</h3>
<div id="performance-chart" style="height: 400px;"></div>
</div>
<div class="optimization-history">
<h3>⚡ 最近优化记录</h3>
<div v-for="opt in optimizationHistory" :key="opt.applied_at" class="optimization-item">
<strong>{{ opt.type }}</strong> - 改进 {{ formatValue(opt.improvement) }}%
<div style="color: #666; font-size: 0.9em;">{{ opt.description }}</div>
<div style="color: #999; font-size: 0.8em;">{{ formatTime(opt.applied_at) }}</div>
</div>
</div>
</div>
</div>
<script>
const { createApp } = Vue;
createApp({
data() {
return {
metrics: {},
cacheStats: {},
optimizationHistory: [],
lastUpdate: '',
websocket: null
}
},
mounted() {
this.connectWebSocket();
this.loadInitialData();
},
methods: {
connectWebSocket() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.host}/ws/dashboard`;
this.websocket = new WebSocket(wsUrl);
this.websocket.onmessage = (event) => {
const data = JSON.parse(event.data);
this.updateData(data);
};
this.websocket.onclose = () => {
setTimeout(() => this.connectWebSocket(), 5000);
};
this.websocket.onopen = () => {
this.websocket.send('ping');
};
},
async loadInitialData() {
try {
const response = await fetch('/api/performance/current');
const data = await response.json();
this.updateData(data);
} catch (error) {
console.error('加载初始数据失败:', error);
}
},
updateData(data) {
this.metrics = data.current_metrics || {};
this.cacheStats = data.cache_stats || {};
this.optimizationHistory = data.optimization_history || [];
this.lastUpdate = new Date().toLocaleTimeString();
this.updateChart(data);
},
updateChart(data) {
const trace1 = {
y: [this.metrics.cpu_usage || 0],
name: 'CPU使用率',
type: 'scatter',
mode: 'lines+markers'
};
const trace2 = {
y: [this.metrics.memory_usage || 0],
name: '内存使用率',
type: 'scatter',
mode: 'lines+markers'
};
const layout = {
title: '实时性能指标',
xaxis: { title: '时间' },
yaxis: { title: '使用率 (%)' }
};
Plotly.newPlot('performance-chart', [trace1, trace2], layout);
},
async triggerOptimization() {
try {
const response = await fetch('/api/performance/optimize', {
method: 'POST'
});
const result = await response.json();
alert(`应用了 ${result.optimizations_applied} 个优化`);
} catch (error) {
alert('优化失败: ' + error.message);
}
},
async startProfiling() {
try {
await fetch('/api/profiling/start', { method: 'POST' });
alert('性能分析已启动');
} catch (error) {
alert('启动分析失败: ' + error.message);
}
},
async stopProfiling() {
try {
const response = await fetch('/api/profiling/stop', { method: 'POST' });
const result = await response.json();
if (result.report) {
const blob = new Blob([result.report], { type: 'text/plain' });
const url = URL.createObjectURL(blob);
const a = document.createElement('a');
a.href = url;
a.download = 'performance_report.txt';
a.click();
}
alert('性能分析已停止,报告已下载');
} catch (error) {
alert('停止分析失败: ' + error.message);
}
},
formatValue(value) {
if (typeof value === 'number') {
return value.toFixed(2);
}
return value || 'N/A';
},
formatTime(timestamp) {
return new Date(timestamp).toLocaleString();
},
getSystemStatus() {
const cpu = this.metrics.cpu_usage || 0;
const memory = this.metrics.memory_usage || 0;
const errorRate = this.metrics.error_rate || 0;
if (cpu > 80 || memory > 80 || errorRate > 5) {
return '警告';
} else if (cpu > 60 || memory > 60 || errorRate > 2) {
return '注意';
} else {
return '正常';
}
},
getStatusClass() {
const status = this.getSystemStatus();
if (status === '警告') return 'status-critical';
if (status === '注意') return 'status-warning';
return 'status-good';
}
}
}).mount('#app');
</script>
</body>
</html>
"""
🎯 本节小结
通过这一小节,你已经构建了一个全面的性能优化和监控系统:
✅ 性能监控:实时收集CPU、内存、I/O、网络等关键指标
✅ 自动优化:基于阈值的智能性能优化
✅ 高级缓存:LRU缓存系统,支持TTL和自动清理
✅ 连接池管理:高效的数据库和网络连接管理
✅ 性能分析:集成cProfile和内存分析工具
✅ 可视化仪表板:实时监控界面和性能趋势图
🚀 使用示例
python
# 初始化性能管理器
performance_manager = PerformanceManager()
await performance_manager.initialize()
# 注册连接池
db_pool = ConnectionPool(min_connections=10, max_connections=100)
await db_pool.initialize()
performance_manager.register_connection_pool("database", db_pool)
# 设置监控仪表板
dashboard = MonitoringDashboard(performance_manager)
app = FastAPI()
dashboard.setup_routes(app)
await dashboard.start_broadcasting()
# 使用性能装饰器
@performance_timer("api_request_time")
@memory_monitor(threshold_mb=50.0)
async def process_request(data):
# 处理请求逻辑
return {"result": "success"}
# 手动记录性能指标
performance_manager.monitor.record_request(
latency=150.5, # 毫秒
error=False
)
# 获取性能报告
report = performance_manager.get_performance_report()
print(f"当前CPU使用率: {report['current_metrics']['cpu_usage']:.2f}%")
📊 性能优化建议
- 异步优先:使用async/await提升并发性能
- 缓存策略:合理使用缓存减少重复计算
- 连接复用:使用连接池避免频繁建立连接
- 批量处理:批量操作提升I/O效率
- 资源监控:定期监控资源使用情况
- 性能测试:定期进行压力测试和性能基准测试
现在你的MCP服务器具备了企业级的性能优化和监控能力,可以在高负载环境下稳定运行!