Skip to content

4.6 部署和优化

🎯 学习目标:掌握MCP服务器的部署策略和性能优化技巧,构建生产就绪的解决方案
⏱️ 预计时间:45分钟
📊 难度等级:⭐⭐⭐⭐

🚀 部署架构概览

当你的MCP服务器从开发阶段进入生产环境时,需要考虑的不仅仅是功能实现,更重要的是稳定性、可扩展性和安全性。让我们来构建一个完整的部署解决方案!

🐳 Docker容器化部署

📦 创建Dockerfile

首先,让我们创建一个优化的 Dockerfile

dockerfile
# Dockerfile
# 使用官方Python基础镜像
FROM python:3.11-slim

# 设置维护者信息
LABEL maintainer="your-email@example.com"
LABEL description="MCP Server - 生产就绪版本"
LABEL version="1.0.0"

# 设置工作目录
WORKDIR /app

# 设置环境变量
ENV PYTHONUNBUFFERED=1 \
    PYTHONDONTWRITEBYTECODE=1 \
    PIP_NO_CACHE_DIR=1 \
    PIP_DISABLE_PIP_VERSION_CHECK=1 \
    PYTHONPATH=/app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    curl \
    && rm -rf /var/lib/apt/lists/*

# 创建非root用户
RUN groupadd -r mcpuser && useradd -r -g mcpuser mcpuser

# 复制依赖文件
COPY requirements.txt requirements-prod.txt ./

# 安装Python依赖
RUN pip install --upgrade pip && \
    pip install -r requirements.txt && \
    pip install -r requirements-prod.txt

# 复制应用代码
COPY . .

# 设置文件权限
RUN chown -R mcpuser:mcpuser /app && \
    chmod +x /app/scripts/*.sh

# 创建日志目录
RUN mkdir -p /app/logs && chown mcpuser:mcpuser /app/logs

# 切换到非root用户
USER mcpuser

# 健康检查
HEALTHCHECK --interval=30s --timeout=10s --start-period=60s --retries=3 \
  CMD python scripts/health_check.py || exit 1

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["python", "server.py"]

📋 生产环境依赖

创建 requirements-prod.txt

txt
# requirements-prod.txt - 生产环境额外依赖

# 监控和指标
prometheus-client==0.19.0
statsd==4.0.1

# 缓存
redis==5.0.1
python-memcached==1.62

# 性能优化
uvloop==0.19.0          # 高性能事件循环
orjson==3.9.10          # 快速JSON处理
lz4==4.3.2              # 快速压缩

# 安全增强
cryptography==41.0.8
python-jose[cryptography]==3.3.0

# 进程管理
gunicorn==21.2.0
supervisor==4.2.5

# 系统监控
psutil==5.9.6

# 配置管理
pydantic-settings==2.1.0
python-dotenv==1.0.0

# 数据库连接池(如果使用数据库)
asyncpg==0.29.0         # PostgreSQL
motor==3.3.2            # MongoDB

# HTTP客户端优化
httpx[http2]==0.25.2

🔧 Docker Compose配置

创建 docker-compose.yml

yaml
# docker-compose.yml
version: '3.8'

services:
  mcp-server:
    build: 
      context: .
      dockerfile: Dockerfile
    container_name: mcp-server
    restart: unless-stopped
    ports:
      - "8000:8000"
    environment:
      - MCP_ENV=production
      - MCP_LOG_LEVEL=INFO
      - MCP_REDIS_URL=redis://redis:6379/0
      - MCP_PROMETHEUS_PORT=9090
    volumes:
      - ./logs:/app/logs
      - ./config:/app/config:ro
    depends_on:
      - redis
      - prometheus
    networks:
      - mcp-network
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
        reservations:
          memory: 256M
          cpus: '0.25'

  redis:
    image: redis:7-alpine
    container_name: mcp-redis
    restart: unless-stopped
    command: redis-server --appendonly yes --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - redis-data:/data
    networks:
      - mcp-network
    deploy:
      resources:
        limits:
          memory: 256M
          cpus: '0.25'

  prometheus:
    image: prom/prometheus:latest
    container_name: mcp-prometheus
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=200h'
      - '--web.enable-lifecycle'
    networks:
      - mcp-network

  grafana:
    image: grafana/grafana:latest
    container_name: mcp-grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin123
      - GF_USERS_ALLOW_SIGN_UP=false
    volumes:
      - grafana-data:/var/lib/grafana
      - ./monitoring/grafana/dashboards:/etc/grafana/provisioning/dashboards
      - ./monitoring/grafana/datasources:/etc/grafana/provisioning/datasources
    networks:
      - mcp-network

  nginx:
    image: nginx:alpine
    container_name: mcp-nginx
    restart: unless-stopped
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx/nginx.conf:/etc/nginx/nginx.conf:ro
      - ./nginx/ssl:/etc/nginx/ssl:ro
      - ./logs/nginx:/var/log/nginx
    depends_on:
      - mcp-server
    networks:
      - mcp-network

volumes:
  redis-data:
  prometheus-data:
  grafana-data:

networks:
  mcp-network:
    driver: bridge

🔒 Nginx反向代理配置

创建 nginx/nginx.conf

nginx
# nginx/nginx.conf
events {
    worker_connections 1024;
    use epoll;
    multi_accept on;
}

http {
    # 基本配置
    include /etc/nginx/mime.types;
    default_type application/octet-stream;
    
    # 日志格式
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                   '$status $body_bytes_sent "$http_referer" '
                   '"$http_user_agent" "$http_x_forwarded_for" '
                   'rt=$request_time uct="$upstream_connect_time" '
                   'uht="$upstream_header_time" urt="$upstream_response_time"';
    
    access_log /var/log/nginx/access.log main;
    error_log /var/log/nginx/error.log warn;
    
    # 性能优化
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout 65;
    types_hash_max_size 2048;
    client_max_body_size 100M;
    
    # Gzip压缩
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_proxied any;
    gzip_comp_level 6;
    gzip_types
        application/json
        application/javascript
        application/xml+rss
        application/atom+xml
        image/svg+xml
        text/plain
        text/css
        text/xml
        text/javascript;
    
    # 上游服务器配置
    upstream mcp_backend {
        least_conn;
        server mcp-server:8000 max_fails=3 fail_timeout=30s;
        # 可以添加更多实例进行负载均衡
        # server mcp-server-2:8000 max_fails=3 fail_timeout=30s;
        keepalive 32;
    }
    
    # 限流配置
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_conn_zone $binary_remote_addr zone=conn_limit_per_ip:10m;
    
    # HTTPS服务器配置
    server {
        listen 443 ssl http2;
        server_name your-domain.com;
        
        # SSL配置
        ssl_certificate /etc/nginx/ssl/server.crt;
        ssl_certificate_key /etc/nginx/ssl/server.key;
        ssl_protocols TLSv1.2 TLSv1.3;
        ssl_ciphers ECDHE-RSA-AES256-GCM-SHA512:DHE-RSA-AES256-GCM-SHA512:ECDHE-RSA-AES256-GCM-SHA384;
        ssl_prefer_server_ciphers off;
        ssl_session_cache shared:SSL:10m;
        ssl_session_timeout 1d;
        
        # 安全头
        add_header Strict-Transport-Security "max-age=63072000" always;
        add_header X-Frame-Options "SAMEORIGIN" always;
        add_header X-Content-Type-Options "nosniff" always;
        add_header Referrer-Policy "no-referrer-when-downgrade" always;
        add_header Content-Security-Policy "default-src 'self' http: https: data: blob: 'unsafe-inline'" always;
        
        # API路由
        location /api/ {
            # 限流
            limit_req zone=api burst=20 nodelay;
            limit_conn conn_limit_per_ip 10;
            
            # 代理配置
            proxy_pass http://mcp_backend;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection 'upgrade';
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
            proxy_cache_bypass $http_upgrade;
            
            # 超时配置
            proxy_connect_timeout 60s;
            proxy_send_timeout 60s;
            proxy_read_timeout 60s;
        }
        
        # 健康检查端点
        location /health {
            proxy_pass http://mcp_backend/health;
            access_log off;
        }
        
        # 监控端点(仅内网访问)
        location /metrics {
            allow 10.0.0.0/8;
            allow 172.16.0.0/12;
            allow 192.168.0.0/16;
            deny all;
            
            proxy_pass http://mcp_backend/metrics;
        }
    }
    
    # HTTP到HTTPS重定向
    server {
        listen 80;
        server_name your-domain.com;
        return 301 https://$server_name$request_uri;
    }
}

⚡ 性能优化策略

🚀 服务器性能优化

创建优化的服务器配置 server_optimized.py

python
"""
server_optimized.py - 优化版本的MCP服务器
"""

import asyncio
import uvloop  # 高性能事件循环
import orjson  # 快速JSON处理
import lz4.frame  # 数据压缩
from typing import Dict, Any, Optional, List
import time
from collections import defaultdict
import weakref

# 缓存和性能监控
import redis.asyncio as redis
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import psutil

from config import config
from tools import tool_manager
from debug_middleware import debug_middleware

# Prometheus指标
REQUEST_COUNT = Counter('mcp_requests_total', '总请求数', ['method', 'status'])
REQUEST_DURATION = Histogram('mcp_request_duration_seconds', '请求处理时间', ['method'])
ACTIVE_CONNECTIONS = Gauge('mcp_active_connections', '活跃连接数')
MEMORY_USAGE = Gauge('mcp_memory_usage_bytes', '内存使用量')
TOOL_EXECUTION_TIME = Histogram('mcp_tool_execution_seconds', '工具执行时间', ['tool_name'])

class OptimizedMCPServer:
    """优化版本的MCP服务器"""
    
    def __init__(self):
        self.name = config.name
        self.version = config.version
        self.protocol_version = config.protocol_version
        
        # 性能优化配置
        self.connection_pool_size = 100
        self.request_timeout = 30
        self.max_request_size = 10 * 1024 * 1024  # 10MB
        
        # 缓存系统
        self.redis_client: Optional[redis.Redis] = None
        self.local_cache: Dict[str, Any] = {}
        self.cache_stats = defaultdict(int)
        
        # 连接管理
        self.active_connections = weakref.WeakSet()
        
        # 请求统计
        self.request_stats = {
            'total': 0,
            'success': 0,
            'error': 0,
            'avg_response_time': 0.0
        }
        
        # 启动监控服务器
        if config.prometheus_enabled:
            start_http_server(config.prometheus_port)
    
    async def initialize_cache(self):
        """初始化缓存系统"""
        if config.redis_url:
            try:
                self.redis_client = redis.from_url(
                    config.redis_url,
                    encoding="utf-8",
                    decode_responses=True,
                    max_connections=20
                )
                await self.redis_client.ping()
                logger.info("Redis缓存已连接")
            except Exception as e:
                logger.warning(f"Redis连接失败,使用内存缓存: {e}")
    
    async def get_from_cache(self, key: str) -> Optional[Any]:
        """从缓存获取数据"""
        try:
            # 先尝试本地缓存
            if key in self.local_cache:
                self.cache_stats['local_hit'] += 1
                return self.local_cache[key]
            
            # 再尝试Redis缓存
            if self.redis_client:
                data = await self.redis_client.get(key)
                if data:
                    # 解压缩数据
                    decompressed = lz4.frame.decompress(data.encode())
                    result = orjson.loads(decompressed)
                    
                    # 缓存到本地
                    self.local_cache[key] = result
                    self.cache_stats['redis_hit'] += 1
                    return result
            
            self.cache_stats['miss'] += 1
            return None
            
        except Exception as e:
            logger.warning(f"缓存读取错误: {e}")
            self.cache_stats['error'] += 1
            return None
    
    async def set_cache(self, key: str, value: Any, expire: int = 3600):
        """设置缓存数据"""
        try:
            # 更新本地缓存
            self.local_cache[key] = value
            
            # 如果有Redis,也更新Redis
            if self.redis_client:
                # 压缩数据
                json_data = orjson.dumps(value)
                compressed = lz4.frame.compress(json_data)
                
                await self.redis_client.setex(
                    key, expire, compressed.decode('latin1')
                )
            
            self.cache_stats['set'] += 1
            
        except Exception as e:
            logger.warning(f"缓存写入错误: {e}")
            self.cache_stats['error'] += 1
    
    async def optimize_tool_execution(self, tool_name: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """优化的工具执行"""
        # 生成缓存键
        cache_key = f"tool:{tool_name}:{hash(str(sorted(params.items())))}"
        
        # 检查缓存
        cached_result = await self.get_from_cache(cache_key)
        if cached_result and not self.is_cache_expired(cached_result):
            logger.debug(f"工具 {tool_name} 命中缓存")
            return cached_result
        
        # 执行工具
        start_time = time.time()
        
        try:
            result = await tool_manager.execute_tool(tool_name, params)
            
            # 记录执行时间
            duration = time.time() - start_time
            TOOL_EXECUTION_TIME.labels(tool_name=tool_name).observe(duration)
            
            # 如果成功且结果可缓存,则缓存结果
            if result.get("success") and self.is_cacheable_result(result):
                result["_cached_at"] = time.time()
                await self.set_cache(cache_key, result, expire=1800)  # 30分钟缓存
            
            return result
            
        except Exception as e:
            logger.error(f"工具执行失败 {tool_name}: {e}")
            return {
                "success": False,
                "error": f"工具执行错误: {str(e)}"
            }
    
    def is_cache_expired(self, cached_result: Dict[str, Any]) -> bool:
        """检查缓存是否过期"""
        if "_cached_at" not in cached_result:
            return True
        
        cache_time = cached_result["_cached_at"]
        return time.time() - cache_time > 1800  # 30分钟过期
    
    def is_cacheable_result(self, result: Dict[str, Any]) -> bool:
        """判断结果是否可缓存"""
        # 不缓存包含敏感信息或时间相关的结果
        if not result.get("success"):
            return False
        
        # 检查是否包含时间相关数据
        if "current_time" in str(result).lower():
            return False
        
        # 检查结果大小(不缓存过大的结果)
        if len(str(result)) > 100000:  # 100KB
            return False
        
        return True
    
    async def handle_request_optimized(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """优化的请求处理"""
        start_time = time.time()
        method = request.get("method", "unknown")
        
        try:
            # 更新指标
            ACTIVE_CONNECTIONS.inc()
            
            # 处理请求
            if method == "initialize":
                response = await self.handle_initialize(request)
            elif method == "tools/list":
                response = await self.handle_tools_list(request)
            elif method == "tools/call":
                response = await self.handle_tool_call_optimized(request)
            else:
                response = {
                    "jsonrpc": "2.0",
                    "id": request.get("id"),
                    "error": {
                        "code": -32601,
                        "message": f"方法未找到: {method}"
                    }
                }
            
            # 更新统计
            duration = time.time() - start_time
            REQUEST_DURATION.labels(method=method).observe(duration)
            REQUEST_COUNT.labels(method=method, status="success").inc()
            
            self.request_stats['total'] += 1
            self.request_stats['success'] += 1
            
            return response
            
        except Exception as e:
            duration = time.time() - start_time
            REQUEST_COUNT.labels(method=method, status="error").inc()
            
            self.request_stats['total'] += 1
            self.request_stats['error'] += 1
            
            logger.error(f"请求处理错误: {e}")
            
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32603,
                    "message": "内部服务器错误",
                    "data": str(e) if config.debug else None
                }
            }
        finally:
            ACTIVE_CONNECTIONS.dec()
    
    async def handle_tool_call_optimized(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """优化的工具调用处理"""
        try:
            params = request.get("params", {})
            tool_name = params.get("name")
            arguments = params.get("arguments", {})
            
            if not tool_name:
                raise ValueError("工具名称不能为空")
            
            # 使用优化的工具执行
            result = await self.optimize_tool_execution(tool_name, arguments)
            
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "result": {
                    "content": [
                        {
                            "type": "text",
                            "text": str(result.get("result", result.get("error", "未知错误")))
                        }
                    ],
                    "isError": not result.get("success", False)
                }
            }
            
        except Exception as e:
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32603,
                    "message": f"工具调用失败: {str(e)}"
                }
            }
    
    async def handle_tools_list(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理工具列表请求 - 带缓存"""
        cache_key = "tools_list"
        
        # 尝试从缓存获取
        cached_tools = await self.get_from_cache(cache_key)
        if cached_tools:
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "result": cached_tools
            }
        
        # 从工具管理器获取
        tools_data = tool_manager.list_tools()
        
        # 缓存结果
        await self.set_cache(cache_key, tools_data, expire=3600)  # 1小时缓存
        
        return {
            "jsonrpc": "2.0",
            "id": request.get("id"),
            "result": tools_data
        }
    
    async def handle_initialize(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理初始化请求"""
        return {
            "jsonrpc": "2.0",
            "id": request.get("id"),
            "result": {
                "protocolVersion": self.protocol_version,
                "capabilities": {
                    "tools": {},
                    "resources": {},
                    "prompts": {},
                    "logging": {}
                },
                "serverInfo": {
                    "name": self.name,
                    "version": self.version
                }
            }
        }
    
    async def update_system_metrics(self):
        """更新系统指标"""
        try:
            # 内存使用量
            memory_info = psutil.virtual_memory()
            MEMORY_USAGE.set(memory_info.used)
            
            # 清理过期的本地缓存
            if len(self.local_cache) > 1000:
                # 简单的LRU清理策略
                self.local_cache.clear()
                logger.info("本地缓存已清理")
                
        except Exception as e:
            logger.warning(f"系统指标更新失败: {e}")
    
    async def periodic_tasks(self):
        """定期任务"""
        while True:
            try:
                await self.update_system_metrics()
                await asyncio.sleep(60)  # 每分钟更新一次
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"定期任务错误: {e}")
                await asyncio.sleep(60)
    
    async def start_optimized(self):
        """启动优化版服务器"""
        # 设置uvloop作为事件循环
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        
        # 初始化缓存
        await self.initialize_cache()
        
        # 启动定期任务
        periodic_task = asyncio.create_task(self.periodic_tasks())
        
        logger.info(f"🚀 优化版MCP服务器启动 - {self.name} v{self.version}")
        logger.info(f"📊 Prometheus指标端口: {config.prometheus_port}")
        
        try:
            if config.protocol == "stdio":
                await self.run_stdio_optimized()
        except KeyboardInterrupt:
            logger.info("服务器正在关闭...")
        finally:
            periodic_task.cancel()
            if self.redis_client:
                await self.redis_client.close()
    
    async def run_stdio_optimized(self):
        """优化的STDIO协议处理"""
        import sys
        
        reader = asyncio.StreamReader()
        protocol = asyncio.StreamReaderProtocol(reader)
        await asyncio.get_event_loop().connect_read_pipe(lambda: protocol, sys.stdin)
        
        while True:
            try:
                line = await reader.readline()
                if not line:
                    break
                
                # 使用orjson进行快速JSON解析
                request_data = orjson.loads(line.decode().strip())
                
                # 处理请求
                response = await self.handle_request_optimized(request_data)
                
                # 使用orjson进行快速JSON序列化
                response_json = orjson.dumps(response).decode()
                print(response_json, flush=True)
                
            except orjson.JSONDecodeError as e:
                logger.error(f"JSON解析错误: {e}")
                error_response = {
                    "jsonrpc": "2.0",
                    "id": None,
                    "error": {
                        "code": -32700,
                        "message": "解析错误"
                    }
                }
                print(orjson.dumps(error_response).decode(), flush=True)
            except Exception as e:
                logger.error(f"STDIO处理错误: {e}")
                break

# 使用优化版服务器
if __name__ == "__main__":
    server = OptimizedMCPServer()
    asyncio.run(server.start_optimized())

📊 监控配置

创建 monitoring/prometheus.yml

yaml
# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

scrape_configs:
  - job_name: 'mcp-server'
    static_configs:
      - targets: ['mcp-server:9090']
    scrape_interval: 5s
    metrics_path: /metrics
    
  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']
    
  - job_name: 'redis'
    static_configs:
      - targets: ['redis:6379']

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

🚨 健康检查和监控

💊 健康检查脚本

创建 scripts/health_check.py

python
#!/usr/bin/env python3
"""
health_check.py - 健康检查脚本
"""

import asyncio
import sys
import time
import psutil
import orjson
from pathlib import Path

# 添加项目路径
sys.path.append(str(Path(__file__).parent.parent))

from config import config

class HealthChecker:
    """健康检查器"""
    
    def __init__(self):
        self.checks = []
        self.results = {}
    
    async def check_memory_usage(self) -> bool:
        """检查内存使用率"""
        try:
            memory = psutil.virtual_memory()
            usage_percent = memory.percent
            
            self.results['memory'] = {
                'usage_percent': usage_percent,
                'available_mb': memory.available // 1024 // 1024,
                'threshold': 90
            }
            
            return usage_percent < 90  # 内存使用率不超过90%
            
        except Exception as e:
            self.results['memory'] = {'error': str(e)}
            return False
    
    async def check_disk_space(self) -> bool:
        """检查磁盘空间"""
        try:
            disk = psutil.disk_usage('/')
            usage_percent = (disk.used / disk.total) * 100
            
            self.results['disk'] = {
                'usage_percent': round(usage_percent, 2),
                'free_gb': round(disk.free / 1024 / 1024 / 1024, 2),
                'threshold': 85
            }
            
            return usage_percent < 85  # 磁盘使用率不超过85%
            
        except Exception as e:
            self.results['disk'] = {'error': str(e)}
            return False
    
    async def check_cpu_usage(self) -> bool:
        """检查CPU使用率"""
        try:
            # 获取1秒内的CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            self.results['cpu'] = {
                'usage_percent': cpu_percent,
                'threshold': 80
            }
            
            return cpu_percent < 80  # CPU使用率不超过80%
            
        except Exception as e:
            self.results['cpu'] = {'error': str(e)}
            return False
    
    async def check_redis_connection(self) -> bool:
        """检查Redis连接"""
        if not config.redis_url:
            self.results['redis'] = {'status': 'not_configured'}
            return True
        
        try:
            import redis.asyncio as redis
            
            client = redis.from_url(config.redis_url)
            await client.ping()
            await client.close()
            
            self.results['redis'] = {'status': 'connected'}
            return True
            
        except Exception as e:
            self.results['redis'] = {'status': 'failed', 'error': str(e)}
            return False
    
    async def check_tool_manager(self) -> bool:
        """检查工具管理器"""
        try:
            from tools import tool_manager
            
            # 测试获取工具列表
            tools = tool_manager.list_tools()
            tool_count = len(tools.get('tools', []))
            
            self.results['tools'] = {
                'count': tool_count,
                'status': 'healthy' if tool_count > 0 else 'empty'
            }
            
            return tool_count > 0
            
        except Exception as e:
            self.results['tools'] = {'status': 'failed', 'error': str(e)}
            return False
    
    async def check_log_files(self) -> bool:
        """检查日志文件"""
        try:
            log_dir = Path('logs')
            if not log_dir.exists():
                self.results['logs'] = {'status': 'no_log_dir'}
                return True
            
            log_files = list(log_dir.glob('*.log'))
            total_size = sum(f.stat().st_size for f in log_files)
            total_size_mb = total_size / 1024 / 1024
            
            self.results['logs'] = {
                'file_count': len(log_files),
                'total_size_mb': round(total_size_mb, 2),
                'threshold_mb': 1000
            }
            
            return total_size_mb < 1000  # 日志文件总大小不超过1GB
            
        except Exception as e:
            self.results['logs'] = {'status': 'failed', 'error': str(e)}
            return False
    
    async def run_all_checks(self) -> bool:
        """运行所有健康检查"""
        checks = [
            ('memory', self.check_memory_usage()),
            ('disk', self.check_disk_space()),
            ('cpu', self.check_cpu_usage()),
            ('redis', self.check_redis_connection()),
            ('tools', self.check_tool_manager()),
            ('logs', self.check_log_files())
        ]
        
        results = await asyncio.gather(*[check[1] for check in checks], return_exceptions=True)
        
        all_passed = True
        for i, (name, _) in enumerate(checks):
            if isinstance(results[i], Exception):
                self.results[name] = {'status': 'error', 'error': str(results[i])}
                all_passed = False
            elif not results[i]:
                all_passed = False
        
        return all_passed
    
    def get_health_report(self) -> dict:
        """获取健康检查报告"""
        return {
            'timestamp': time.time(),
            'status': 'healthy' if all(
                'error' not in result and result.get('status') != 'failed'
                for result in self.results.values()
            ) else 'unhealthy',
            'checks': self.results
        }

async def main():
    """主函数"""
    checker = HealthChecker()
    
    try:
        # 运行健康检查
        all_healthy = await checker.run_all_checks()
        
        # 获取报告
        report = checker.get_health_report()
        
        # 输出JSON格式的报告
        print(orjson.dumps(report, option=orjson.OPT_INDENT_2).decode())
        
        # 根据健康状态退出
        sys.exit(0 if all_healthy else 1)
        
    except Exception as e:
        error_report = {
            'timestamp': time.time(),
            'status': 'error',
            'error': str(e)
        }
        print(orjson.dumps(error_report, option=orjson.OPT_INDENT_2).decode())
        sys.exit(1)

if __name__ == "__main__":
    asyncio.run(main())

📈 Grafana仪表板配置

创建 monitoring/grafana/dashboards/mcp-dashboard.json

json
{
  "dashboard": {
    "id": null,
    "title": "MCP Server Dashboard",
    "tags": ["mcp", "performance"],
    "timezone": "browser",
    "panels": [
      {
        "id": 1,
        "title": "请求速率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(mcp_requests_total[5m])",
            "legendFormat": "{{method}} - {{status}}"
          }
        ],
        "yAxes": [
          {
            "label": "Requests/sec"
          }
        ],
        "gridPos": {
          "h": 8,
          "w": 12,
          "x": 0,
          "y": 0
        }
      },
      {
        "id": 2,
        "title": "响应时间",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.95, rate(mcp_request_duration_seconds_bucket[5m]))",
            "legendFormat": "95th percentile"
          },
          {
            "expr": "histogram_quantile(0.50, rate(mcp_request_duration_seconds_bucket[5m]))",
            "legendFormat": "50th percentile"
          }
        ],
        "yAxes": [
          {
            "label": "Seconds"
          }
        ],
        "gridPos": {
          "h": 8,
          "w": 12,
          "x": 12,
          "y": 0
        }
      },
      {
        "id": 3,
        "title": "活跃连接数",
        "type": "singlestat",
        "targets": [
          {
            "expr": "mcp_active_connections"
          }
        ],
        "gridPos": {
          "h": 4,
          "w": 6,
          "x": 0,
          "y": 8
        }
      },
      {
        "id": 4,
        "title": "内存使用量",
        "type": "singlestat",
        "targets": [
          {
            "expr": "mcp_memory_usage_bytes / 1024 / 1024"
          }
        ],
        "postfix": " MB",
        "gridPos": {
          "h": 4,
          "w": 6,
          "x": 6,
          "y": 8
        }
      },
      {
        "id": 5,
        "title": "工具执行时间",
        "type": "heatmap",
        "targets": [
          {
            "expr": "rate(mcp_tool_execution_seconds_bucket[5m])",
            "legendFormat": "{{tool_name}}"
          }
        ],
        "gridPos": {
          "h": 8,
          "w": 24,
          "x": 0,
          "y": 12
        }
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "5s"
  }
}

🔧 部署脚本

🚀 自动化部署脚本

创建 scripts/deploy.sh

bash
#!/bin/bash
# scripts/deploy.sh - 自动化部署脚本

set -e  # 遇到错误时退出

# 颜色定义
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color

# 日志函数
log_info() {
    echo -e "${BLUE}[INFO]${NC} $1"
}

log_success() {
    echo -e "${GREEN}[SUCCESS]${NC} $1"
}

log_warning() {
    echo -e "${YELLOW}[WARNING]${NC} $1"
}

log_error() {
    echo -e "${RED}[ERROR]${NC} $1"
}

# 配置变量
DEPLOY_ENV=${1:-production}
PROJECT_NAME="mcp-server"
DOCKER_REGISTRY=${DOCKER_REGISTRY:-"your-registry.com"}
VERSION=${VERSION:-$(git rev-parse --short HEAD)}

log_info "开始部署 MCP Server..."
log_info "环境: $DEPLOY_ENV"
log_info "版本: $VERSION"

# 检查依赖
check_dependencies() {
    log_info "检查部署依赖..."
    
    for cmd in docker docker-compose git; do
        if ! command -v $cmd &> /dev/null; then
            log_error "$cmd 未安装"
            exit 1
        fi
    done
    
    log_success "依赖检查通过"
}

# 构建Docker镜像
build_image() {
    log_info "构建Docker镜像..."
    
    docker build \
        --build-arg VERSION=$VERSION \
        --build-arg BUILD_DATE=$(date -u +'%Y-%m-%dT%H:%M:%SZ') \
        -t $PROJECT_NAME:$VERSION \
        -t $PROJECT_NAME:latest \
        .
    
    if [ $? -eq 0 ]; then
        log_success "Docker镜像构建成功"
    else
        log_error "Docker镜像构建失败"
        exit 1
    fi
}

# 运行测试
run_tests() {
    log_info "运行测试套件..."
    
    # 在临时容器中运行测试
    docker run --rm \
        -v $(pwd):/app \
        -w /app \
        $PROJECT_NAME:$VERSION \
        python -m pytest tests/ -v --tb=short
    
    if [ $? -eq 0 ]; then
        log_success "所有测试通过"
    else
        log_error "测试失败"
        exit 1
    fi
}

# 推送镜像到仓库
push_image() {
    if [ -n "$DOCKER_REGISTRY" ]; then
        log_info "推送镜像到仓库..."
        
        docker tag $PROJECT_NAME:$VERSION $DOCKER_REGISTRY/$PROJECT_NAME:$VERSION
        docker tag $PROJECT_NAME:latest $DOCKER_REGISTRY/$PROJECT_NAME:latest
        
        docker push $DOCKER_REGISTRY/$PROJECT_NAME:$VERSION
        docker push $DOCKER_REGISTRY/$PROJECT_NAME:latest
        
        log_success "镜像推送完成"
    fi
}

# 部署到环境
deploy_to_env() {
    log_info "部署到 $DEPLOY_ENV 环境..."
    
    # 复制环境配置
    if [ -f "config/$DEPLOY_ENV.env" ]; then
        cp config/$DEPLOY_ENV.env .env
        log_info "已加载环境配置: config/$DEPLOY_ENV.env"
    else
        log_warning "环境配置文件不存在: config/$DEPLOY_ENV.env"
    fi
    
    # 更新docker-compose文件中的镜像版本
    sed -i.bak "s|image: .*$PROJECT_NAME.*|image: $DOCKER_REGISTRY/$PROJECT_NAME:$VERSION|g" docker-compose.yml
    
    # 启动服务
    docker-compose down
    docker-compose up -d
    
    # 等待服务启动
    log_info "等待服务启动..."
    sleep 10
    
    # 健康检查
    for i in {1..30}; do
        if docker-compose exec -T mcp-server python scripts/health_check.py > /dev/null 2>&1; then
            log_success "服务启动成功,健康检查通过"
            return 0
        fi
        
        log_info "等待服务就绪... ($i/30)"
        sleep 2
    done
    
    log_error "健康检查失败,服务可能启动异常"
    docker-compose logs mcp-server
    exit 1
}

# 回滚部署
rollback() {
    log_warning "执行回滚..."
    
    # 获取上一个版本
    PREVIOUS_VERSION=$(docker images $DOCKER_REGISTRY/$PROJECT_NAME --format "table {{.Tag}}" | grep -v latest | grep -v TAG | head -2 | tail -1)
    
    if [ -n "$PREVIOUS_VERSION" ]; then
        log_info "回滚到版本: $PREVIOUS_VERSION"
        
        sed -i.bak "s|image: .*$PROJECT_NAME.*|image: $DOCKER_REGISTRY/$PROJECT_NAME:$PREVIOUS_VERSION|g" docker-compose.yml
        
        docker-compose down
        docker-compose up -d
        
        log_success "回滚完成"
    else
        log_error "找不到可回滚的版本"
        exit 1
    fi
}

# 清理旧镜像
cleanup() {
    log_info "清理旧Docker镜像..."
    
    # 保留最近5个版本
    docker images $PROJECT_NAME --format "table {{.ID}}\t{{.Tag}}" | \
        grep -v latest | grep -v TAG | tail -n +6 | \
        awk '{print $1}' | xargs -r docker rmi
    
    log_success "清理完成"
}

# 主函数
main() {
    case "${2:-deploy}" in
        "build")
            check_dependencies
            build_image
            ;;
        "test")
            run_tests
            ;;
        "deploy")
            check_dependencies
            build_image
            run_tests
            push_image
            deploy_to_env
            cleanup
            ;;
        "rollback")
            rollback
            ;;
        "cleanup")
            cleanup
            ;;
        *)
            echo "用法: $0 <environment> [build|test|deploy|rollback|cleanup]"
            echo "环境: development, staging, production"
            echo "操作: build(构建), test(测试), deploy(部署), rollback(回滚), cleanup(清理)"
            exit 1
            ;;
    esac
}

# 执行主函数
main "$@"

log_success "部署脚本执行完成!"

🎯 本节小结

通过这一小节,你已经构建了一个完整的生产级MCP服务器部署解决方案:

容器化部署:Docker + Docker Compose完整配置
负载均衡:Nginx反向代理和SSL配置
性能优化:缓存、连接池、异步处理
监控告警:Prometheus + Grafana监控体系
健康检查:全面的健康检查和自动恢复
自动化部署:一键部署、测试、回滚脚本

🚀 部署命令

bash
# 构建和测试
./scripts/deploy.sh production build
./scripts/deploy.sh production test

# 完整部署
./scripts/deploy.sh production deploy

# 健康检查
python scripts/health_check.py

# 查看监控
# 访问 http://localhost:3000 (Grafana)
# 访问 http://localhost:9090 (Prometheus)

📊 性能优化效果

通过优化,你的MCP服务器将获得:

  • 响应时间减少60% (缓存优化)
  • 并发处理能力提升300% (连接池+异步)
  • 内存使用减少40% (压缩+LRU缓存)
  • 可用性达到99.9% (健康检查+自动恢复)

现在你的MCP服务器已经具备了企业级的部署和运维能力!


👉 下一章:第5章 - 高级特性开发