Skip to content

7.3 监控与日志系统

📊 核心目标:构建全面的监控和日志系统,实现系统可观测性和运维自动化
⏱️ 预计时长:50分钟
📊 难度级别:⭐⭐⭐⭐⭐

🎯 学习目标

通过本节学习,你将掌握:

  • 设计分层监控架构
  • 实现应用性能监控(APM)
  • 构建集中化日志系统
  • 建立智能告警机制
  • 创建可视化仪表板

📊 监控系统架构

监控架构图

🔧 Prometheus监控系统

Prometheus配置

yaml
# monitoring/prometheus.yml
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"
  - "recording_rules.yml"

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

scrape_configs:
  # MCP服务器监控
  - job_name: 'mcp-server'
    static_configs:
      - targets: ['mcp-server:8000']
    metrics_path: '/metrics'
    scrape_interval: 10s
    scrape_timeout: 5s

  # 系统监控
  - job_name: 'node-exporter'
    static_configs:
      - targets: ['node-exporter:9100']

  # Redis监控
  - job_name: 'redis'
    static_configs:
      - targets: ['redis-exporter:9121']

  # PostgreSQL监控
  - job_name: 'postgres'
    static_configs:
      - targets: ['postgres-exporter:9187']

  # Kubernetes监控
  - job_name: 'kubernetes-apiservers'
    kubernetes_sd_configs:
    - role: endpoints
    scheme: https
    tls_config:
      ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
    relabel_configs:
    - source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
      action: keep
      regex: default;kubernetes;https

  - job_name: 'kubernetes-nodes'
    kubernetes_sd_configs:
    - role: node
    relabel_configs:
    - action: labelmap
      regex: __meta_kubernetes_node_label_(.+)

  - job_name: 'kubernetes-pods'
    kubernetes_sd_configs:
    - role: pod
    relabel_configs:
    - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
      action: keep
      regex: true
    - source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
      action: replace
      target_label: __metrics_path__
      regex: (.+)

应用指标收集

python
# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info, start_http_server
import functools
import time
from typing import Callable, Any
import psutil
import asyncio

class MCPMetrics:
    """MCP服务器指标收集器"""
    
    def __init__(self):
        # HTTP请求指标
        self.http_requests_total = Counter(
            'http_requests_total',
            'Total HTTP requests',
            ['method', 'endpoint', 'status_code']
        )
        
        self.http_request_duration = Histogram(
            'http_request_duration_seconds',
            'HTTP request duration',
            ['method', 'endpoint'],
            buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
        )
        
        # MCP协议指标
        self.mcp_connections = Gauge(
            'mcp_connections_active',
            'Active MCP connections'
        )
        
        self.mcp_messages_total = Counter(
            'mcp_messages_total',
            'Total MCP messages',
            ['message_type', 'direction']
        )
        
        self.mcp_errors_total = Counter(
            'mcp_errors_total',
            'Total MCP errors',
            ['error_type']
        )
        
        # 业务指标
        self.tool_calls_total = Counter(
            'mcp_tool_calls_total',
            'Total tool calls',
            ['tool_name', 'status']
        )
        
        self.tool_call_duration = Histogram(
            'mcp_tool_call_duration_seconds',
            'Tool call duration',
            ['tool_name'],
            buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
        )
        
        # 资源使用指标
        self.memory_usage = Gauge(
            'process_memory_usage_bytes',
            'Process memory usage'
        )
        
        self.cpu_usage = Gauge(
            'process_cpu_usage_percent',
            'Process CPU usage'
        )
        
        # 应用信息
        self.app_info = Info(
            'mcp_server_info',
            'MCP server information'
        )
        
        # 启动资源监控
        self._start_resource_monitoring()
    
    def track_http_request(self, func: Callable) -> Callable:
        """HTTP请求装饰器"""
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            request = args[0] if args else kwargs.get('request')
            method = request.method if request else 'UNKNOWN'
            path = request.url.path if request else 'unknown'
            
            start_time = time.time()
            status_code = '200'
            
            try:
                response = await func(*args, **kwargs)
                if hasattr(response, 'status_code'):
                    status_code = str(response.status_code)
                return response
            except Exception as e:
                status_code = '500'
                raise
            finally:
                duration = time.time() - start_time
                
                self.http_requests_total.labels(
                    method=method,
                    endpoint=path,
                    status_code=status_code
                ).inc()
                
                self.http_request_duration.labels(
                    method=method,
                    endpoint=path
                ).observe(duration)
        
        return wrapper
    
    def track_tool_call(self, tool_name: str):
        """工具调用装饰器"""
        def decorator(func: Callable) -> Callable:
            @functools.wraps(func)
            async def wrapper(*args, **kwargs):
                start_time = time.time()
                status = 'success'
                
                try:
                    result = await func(*args, **kwargs)
                    return result
                except Exception as e:
                    status = 'error'
                    self.mcp_errors_total.labels(
                        error_type=type(e).__name__
                    ).inc()
                    raise
                finally:
                    duration = time.time() - start_time
                    
                    self.tool_calls_total.labels(
                        tool_name=tool_name,
                        status=status
                    ).inc()
                    
                    self.tool_call_duration.labels(
                        tool_name=tool_name
                    ).observe(duration)
            
            return wrapper
        return decorator
    
    def record_mcp_message(self, message_type: str, direction: str):
        """记录MCP消息"""
        self.mcp_messages_total.labels(
            message_type=message_type,
            direction=direction
        ).inc()
    
    def set_active_connections(self, count: int):
        """设置活跃连接数"""
        self.mcp_connections.set(count)
    
    def set_app_info(self, version: str, environment: str, build_id: str):
        """设置应用信息"""
        self.app_info.info({
            'version': version,
            'environment': environment,
            'build_id': build_id
        })
    
    def _start_resource_monitoring(self):
        """启动资源监控"""
        async def monitor_resources():
            while True:
                try:
                    process = psutil.Process()
                    
                    # 内存使用
                    memory_info = process.memory_info()
                    self.memory_usage.set(memory_info.rss)
                    
                    # CPU使用率
                    cpu_percent = process.cpu_percent(interval=1)
                    self.cpu_usage.set(cpu_percent)
                    
                    await asyncio.sleep(30)  # 每30秒更新一次
                except Exception as e:
                    print(f"资源监控错误: {e}")
                    await asyncio.sleep(60)
        
        asyncio.create_task(monitor_resources())

# 全局指标实例
metrics = MCPMetrics()

# 启动指标服务器
def start_metrics_server(port: int = 8001):
    """启动Prometheus指标服务器"""
    start_http_server(port)
    print(f"📊 指标服务器启动在端口 {port}")

告警规则配置

yaml
# monitoring/alert_rules.yml
groups:
- name: mcp-server-alerts
  rules:
  # 服务可用性告警
  - alert: MCPServerDown
    expr: up{job="mcp-server"} == 0
    for: 1m
    labels:
      severity: critical
      service: mcp-server
    annotations:
      summary: "MCP Server is down"
      description: "MCP Server instance {{ $labels.instance }} has been down for more than 1 minute"
      runbook_url: "https://runbook.example.com/mcp-server-down"

  # 高错误率告警
  - alert: MCPServerHighErrorRate
    expr: |
      (
        rate(http_requests_total{job="mcp-server",status_code=~"5.."}[5m]) /
        rate(http_requests_total{job="mcp-server"}[5m])
      ) > 0.05
    for: 3m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "High error rate detected"
      description: "Error rate is {{ $value | humanizePercentage }} for instance {{ $labels.instance }}"

  # 高延迟告警
  - alert: MCPServerHighLatency
    expr: |
      histogram_quantile(0.95, 
        rate(http_request_duration_seconds_bucket{job="mcp-server"}[5m])
      ) > 2.0
    for: 5m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "High latency detected"
      description: "95th percentile latency is {{ $value }}s for instance {{ $labels.instance }}"

  # 内存使用告警
  - alert: MCPServerHighMemoryUsage
    expr: process_memory_usage_bytes{job="mcp-server"} / 1024 / 1024 / 1024 > 1.0
    for: 5m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "High memory usage"
      description: "Memory usage is {{ $value | humanizeBytes }} for instance {{ $labels.instance }}"

  # CPU使用告警
  - alert: MCPServerHighCPUUsage
    expr: process_cpu_usage_percent{job="mcp-server"} > 80
    for: 10m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "High CPU usage"
      description: "CPU usage is {{ $value }}% for instance {{ $labels.instance }}"

  # 连接数告警
  - alert: MCPServerTooManyConnections
    expr: mcp_connections_active{job="mcp-server"} > 1000
    for: 2m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "Too many active connections"
      description: "Active connections: {{ $value }} for instance {{ $labels.instance }}"

  # 工具调用失败率告警
  - alert: MCPToolCallFailureRate
    expr: |
      (
        rate(mcp_tool_calls_total{status="error"}[5m]) /
        rate(mcp_tool_calls_total[5m])
      ) > 0.1
    for: 3m
    labels:
      severity: warning
      service: mcp-server
    annotations:
      summary: "High tool call failure rate"
      description: "Tool call failure rate is {{ $value | humanizePercentage }}"

- name: infrastructure-alerts
  rules:
  # 磁盘空间告警
  - alert: DiskSpaceLow
    expr: |
      (
        node_filesystem_avail_bytes{mountpoint="/",fstype!="rootfs"} /
        node_filesystem_size_bytes{mountpoint="/",fstype!="rootfs"}
      ) < 0.1
    for: 5m
    labels:
      severity: critical
    annotations:
      summary: "Disk space is low"
      description: "Disk space is below 10% on {{ $labels.instance }}"

  # 节点负载告警
  - alert: NodeHighLoad
    expr: node_load1 > node_cpu_seconds_total{mode="idle"} * 0.8
    for: 10m
    labels:
      severity: warning
    annotations:
      summary: "Node load is high"
      description: "Load average is {{ $value }} on {{ $labels.instance }}"

- name: database-alerts
  rules:
  # 数据库连接告警
  - alert: PostgreSQLTooManyConnections
    expr: pg_stat_activity_count / pg_settings_max_connections > 0.8
    for: 2m
    labels:
      severity: warning
    annotations:
      summary: "PostgreSQL has too many connections"
      description: "Connection usage is {{ $value | humanizePercentage }}"

  # 数据库锁等待告警
  - alert: PostgreSQLLockWait
    expr: pg_locks_count{mode="AccessShareLock"} > 100
    for: 1m
    labels:
      severity: warning
    annotations:
      summary: "PostgreSQL has many locks waiting"
      description: "Lock count is {{ $value }}"

📝 日志系统架构

ELK Stack配置

yaml
# logging/elasticsearch.yml
cluster.name: "mcp-logs"
node.name: "elasticsearch"
network.host: 0.0.0.0
http.port: 9200
discovery.type: single-node

# 索引模板
index.number_of_shards: 1
index.number_of_replicas: 0

# 日志轮转配置
action.auto_create_index: "+logs-*,-*"
yaml
# logging/logstash.conf
input {
  beats {
    port => 5044
  }
  
  tcp {
    port => 5000
    codec => json_lines
  }
}

filter {
  if [service] == "mcp-server" {
    # 解析MCP服务器日志
    grok {
      match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{GREEDYDATA:msg}" }
    }
    
    # 提取请求ID
    if [msg] =~ /request_id=([a-zA-Z0-9-]+)/ {
      grok {
        match => { "msg" => "request_id=%{UUID:request_id}" }
      }
    }
    
    # 提取用户ID
    if [msg] =~ /user_id=([0-9]+)/ {
      grok {
        match => { "msg" => "user_id=%{INT:user_id}" }
      }
    }
    
    # 提取工具名称
    if [msg] =~ /tool=([a-zA-Z_]+)/ {
      grok {
        match => { "msg" => "tool=%{WORD:tool_name}" }
      }
    }
    
    # 解析JSON数据
    if [msg] =~ /^\{.*\}$/ {
      json {
        source => "msg"
      }
    }
  }
  
  # 添加时间戳
  date {
    match => [ "timestamp", "ISO8601" ]
  }
  
  # 地理位置解析
  if [client_ip] {
    geoip {
      source => "client_ip"
      target => "geoip"
    }
  }
  
  # 敏感信息脱敏
  mutate {
    gsub => [
      "message", "password=[^&\s]+", "password=***",
      "message", "token=[^&\s]+", "token=***",
      "message", "api_key=[^&\s]+", "api_key=***"
    ]
  }
}

output {
  elasticsearch {
    hosts => ["elasticsearch:9200"]
    index => "logs-%{service}-%{+YYYY.MM.dd}"
  }
  
  # 错误日志单独存储
  if [level] == "ERROR" {
    elasticsearch {
      hosts => ["elasticsearch:9200"]
      index => "errors-%{service}-%{+YYYY.MM.dd}"
    }
  }
  
  stdout { codec => rubydebug }
}

结构化日志实现

python
# src/logging/structured_logger.py
import logging
import json
import time
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
from contextvars import ContextVar
import uuid

# 请求上下文
request_context: ContextVar[Dict[str, Any]] = ContextVar('request_context', default={})

class StructuredFormatter(logging.Formatter):
    """结构化日志格式化器"""
    
    def format(self, record: logging.LogRecord) -> str:
        # 基础日志信息
        log_entry = {
            'timestamp': datetime.utcnow().isoformat() + 'Z',
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
        }
        
        # 添加请求上下文
        context = request_context.get({})
        if context:
            log_entry.update(context)
        
        # 添加异常信息
        if record.exc_info:
            log_entry['exception'] = {
                'type': record.exc_info[0].__name__,
                'message': str(record.exc_info[1]),
                'traceback': traceback.format_exception(*record.exc_info)
            }
        
        # 添加自定义字段
        if hasattr(record, 'extra_fields'):
            log_entry.update(record.extra_fields)
        
        return json.dumps(log_entry, ensure_ascii=False)

class MCPLogger:
    """MCP结构化日志器"""
    
    def __init__(self, name: str = 'mcp-server'):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setFormatter(StructuredFormatter())
        self.logger.addHandler(console_handler)
        
        # 文件处理器
        file_handler = logging.FileHandler('logs/mcp-server.log')
        file_handler.setFormatter(StructuredFormatter())
        self.logger.addHandler(file_handler)
    
    def set_request_context(self, **kwargs):
        """设置请求上下文"""
        context = request_context.get({})
        context.update(kwargs)
        request_context.set(context)
    
    def clear_request_context(self):
        """清除请求上下文"""
        request_context.set({})
    
    def info(self, message: str, **kwargs):
        """记录INFO日志"""
        extra = {'extra_fields': kwargs} if kwargs else {}
        self.logger.info(message, extra=extra)
    
    def error(self, message: str, **kwargs):
        """记录ERROR日志"""
        extra = {'extra_fields': kwargs} if kwargs else {}
        self.logger.error(message, extra=extra)
    
    def warning(self, message: str, **kwargs):
        """记录WARNING日志"""
        extra = {'extra_fields': kwargs} if kwargs else {}
        self.logger.warning(message, extra=extra)
    
    def debug(self, message: str, **kwargs):
        """记录DEBUG日志"""
        extra = {'extra_fields': kwargs} if kwargs else {}
        self.logger.debug(message, extra=extra)
    
    def log_request(self, method: str, path: str, status_code: int, 
                   duration: float, user_id: Optional[str] = None):
        """记录HTTP请求"""
        self.info(
            "HTTP Request",
            event_type="http_request",
            method=method,
            path=path,
            status_code=status_code,
            duration_ms=duration * 1000,
            user_id=user_id
        )
    
    def log_tool_call(self, tool_name: str, parameters: Dict[str, Any], 
                     duration: float, success: bool, error: Optional[str] = None):
        """记录工具调用"""
        self.info(
            f"Tool call: {tool_name}",
            event_type="tool_call",
            tool_name=tool_name,
            parameters=parameters,
            duration_ms=duration * 1000,
            success=success,
            error=error
        )
    
    def log_mcp_message(self, message_type: str, direction: str, 
                       message_id: str, size: int):
        """记录MCP消息"""
        self.debug(
            f"MCP Message: {message_type}",
            event_type="mcp_message",
            message_type=message_type,
            direction=direction,
            message_id=message_id,
            size_bytes=size
        )
    
    def log_business_event(self, event_name: str, **data):
        """记录业务事件"""
        self.info(
            f"Business Event: {event_name}",
            event_type="business_event",
            event_name=event_name,
            **data
        )

# 全局日志器实例
logger = MCPLogger()

# 请求追踪装饰器
def track_request(func):
    """请求追踪装饰器"""
    async def wrapper(*args, **kwargs):
        request_id = str(uuid.uuid4())
        start_time = time.time()
        
        # 设置请求上下文
        logger.set_request_context(
            request_id=request_id,
            service="mcp-server"
        )
        
        try:
            # 执行请求
            result = await func(*args, **kwargs)
            
            # 记录成功请求
            duration = time.time() - start_time
            logger.log_request(
                method=kwargs.get('method', 'UNKNOWN'),
                path=kwargs.get('path', '/'),
                status_code=200,
                duration=duration
            )
            
            return result
            
        except Exception as e:
            # 记录失败请求
            duration = time.time() - start_time
            logger.log_request(
                method=kwargs.get('method', 'UNKNOWN'),
                path=kwargs.get('path', '/'),
                status_code=500,
                duration=duration
            )
            
            logger.error(
                f"Request failed: {str(e)}",
                error_type=type(e).__name__,
                request_id=request_id
            )
            raise
            
        finally:
            # 清除请求上下文
            logger.clear_request_context()
    
    return wrapper

Fluentd配置

ruby
# logging/fluent.conf
<source>
  @type tail
  path /var/log/mcp-server/*.log
  pos_file /var/log/fluentd/mcp-server.log.pos
  tag mcp.server
  format json
  time_key timestamp
  time_format %Y-%m-%dT%H:%M:%S.%LZ
</source>

<source>
  @type tail
  path /var/log/nginx/access.log
  pos_file /var/log/fluentd/nginx.access.log.pos
  tag nginx.access
  format nginx
</source>

<filter mcp.server>
  @type record_transformer
  <record>
    service mcp-server
    environment "#{ENV['ENVIRONMENT']}"
    hostname "#{Socket.gethostname}"
  </record>
</filter>

<filter nginx.access>
  @type record_transformer
  <record>
    service nginx
    environment "#{ENV['ENVIRONMENT']}"
    hostname "#{Socket.gethostname}"
  </record>
</filter>

# 错误日志单独处理
<filter mcp.server>
  @type grep
  <regexp>
    key level
    pattern ^ERROR$
  </regexp>
  @label @ERROR
</filter>

<label @ERROR>
  <match mcp.server>
    @type copy
    <store>
      @type elasticsearch
      host elasticsearch
      port 9200
      index_name errors
      type_name _doc
      include_tag_key true
      tag_key @log_name
      flush_interval 1s
    </store>
    <store>
      @type slack
      webhook_url "#{ENV['SLACK_WEBHOOK_URL']}"
      channel "#alerts"
      username fluentd
      icon_emoji :warning:
      title "MCP Server Error"
      message "%s"
      message_keys message
    </store>
  </match>
</label>

<match mcp.**>
  @type elasticsearch
  host elasticsearch
  port 9200
  index_name logs
  type_name _doc
  include_tag_key true
  tag_key @log_name
  flush_interval 5s
  
  <buffer>
    @type file
    path /var/log/fluentd/buffer/elasticsearch
    flush_mode interval
    retry_type exponential_backoff
    flush_thread_count 2
    flush_interval 5s
    retry_forever
    retry_max_interval 30
    chunk_limit_size 2M
    queue_limit_length 8
    overflow_action block
  </buffer>
</match>

🔍 链路追踪系统

Jaeger配置

python
# src/tracing/tracer.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter  
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
import os

def setup_tracing():
    """设置分布式追踪"""
    
    # 配置Tracer Provider
    trace.set_tracer_provider(TracerProvider())
    tracer_provider = trace.get_tracer_provider()
    
    # 配置Jaeger Exporter
    jaeger_exporter = JaegerExporter(
        agent_host_name=os.getenv('JAEGER_AGENT_HOST', 'localhost'),
        agent_port=int(os.getenv('JAEGER_AGENT_PORT', 6831)),
        collector_endpoint=os.getenv('JAEGER_COLLECTOR_ENDPOINT'),
    )
    
    # 添加Span Processor
    span_processor = BatchSpanProcessor(jaeger_exporter)
    tracer_provider.add_span_processor(span_processor)
    
    # 自动埋点
    FastAPIInstrumentor.instrument()
    RequestsInstrumentor.instrument()
    Psycopg2Instrumentor.instrument()
    RedisInstrumentor.instrument()
    
    return trace.get_tracer(__name__)

class TracingMixin:
    """追踪混入类"""
    
    def __init__(self):
        self.tracer = trace.get_tracer(__name__)
    
    def trace_operation(self, operation_name: str, **attributes):
        """操作追踪装饰器"""
        def decorator(func):
            async def wrapper(*args, **kwargs):
                with self.tracer.start_as_current_span(operation_name) as span:
                    # 添加属性
                    for key, value in attributes.items():
                        span.set_attribute(key, str(value))
                    
                    try:
                        result = await func(*args, **kwargs)
                        span.set_status(trace.Status(trace.StatusCode.OK))
                        return result
                    except Exception as e:
                        span.set_status(
                            trace.Status(
                                trace.StatusCode.ERROR,
                                description=str(e)
                            )
                        )
                        span.record_exception(e)
                        raise
            return wrapper
        return decorator
    
    def add_span_attributes(self, **attributes):
        """添加span属性"""
        current_span = trace.get_current_span()
        if current_span:
            for key, value in attributes.items():
                current_span.set_attribute(key, str(value))
    
    def add_span_event(self, name: str, **attributes):
        """添加span事件"""
        current_span = trace.get_current_span()
        if current_span:
            current_span.add_event(name, attributes)

# 全局tracer
tracer = setup_tracing()

📊 Grafana仪表板

主要仪表板配置

json
{
  "dashboard": {
    "id": null,
    "title": "MCP Server Dashboard",
    "tags": ["mcp", "server"],
    "timezone": "browser",
    "panels": [
      {
        "id": 1,
        "title": "服务状态概览",
        "type": "stat",
        "targets": [
          {
            "expr": "up{job=\"mcp-server\"}",
            "legendFormat": "{{instance}}"
          }
        ],
        "fieldConfig": {
          "defaults": {
            "color": {
              "mode": "thresholds"
            },
            "thresholds": {
              "steps": [
                {"color": "red", "value": 0},
                {"color": "green", "value": 1}
              ]
            },
            "mappings": [
              {"options": {"0": {"text": "Down"}, "1": {"text": "Up"}}}
            ]
          }
        }
      },
      {
        "id": 2,
        "title": "请求率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total{job=\"mcp-server\"}[5m])",
            "legendFormat": "{{method}} {{status_code}}"
          }
        ],
        "yAxes": [
          {
            "label": "请求/秒",
            "min": 0
          }
        ]
      },
      {
        "id": 3,
        "title": "响应时间",
        "type": "graph",
        "targets": [
          {
            "expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
            "legendFormat": "50th percentile"
          },
          {
            "expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
            "legendFormat": "95th percentile"
          },
          {
            "expr": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
            "legendFormat": "99th percentile"
          }
        ],
        "yAxes": [
          {
            "label": "秒",
            "min": 0
          }
        ]
      },
      {
        "id": 4,
        "title": "错误率",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(http_requests_total{job=\"mcp-server\",status_code=~\"5..\"}[5m]) / rate(http_requests_total{job=\"mcp-server\"}[5m])",
            "legendFormat": "Error Rate"
          }
        ],
        "yAxes": [
          {
            "label": "比例",
            "min": 0,
            "max": 1
          }
        ]
      },
      {
        "id": 5,
        "title": "活跃连接",
        "type": "graph",
        "targets": [
          {
            "expr": "mcp_connections_active{job=\"mcp-server\"}",
            "legendFormat": "Active Connections"
          }
        ]
      },
      {
        "id": 6,
        "title": "工具调用统计",
        "type": "graph",
        "targets": [
          {
            "expr": "rate(mcp_tool_calls_total{job=\"mcp-server\"}[5m])",
            "legendFormat": "{{tool_name}} ({{status}})"
          }
        ]
      },
      {
        "id": 7,
        "title": "资源使用",
        "type": "graph",
        "targets": [
          {
            "expr": "process_memory_usage_bytes{job=\"mcp-server\"} / 1024 / 1024",
            "legendFormat": "Memory (MB)",
            "yAxisIndex": 0
          },
          {
            "expr": "process_cpu_usage_percent{job=\"mcp-server\"}",
            "legendFormat": "CPU (%)",
            "yAxisIndex": 1
          }
        ],
        "yAxes": [
          {"label": "MB", "min": 0},
          {"label": "%", "min": 0, "max": 100}
        ]
      },
      {
        "id": 8,
        "title": "最近错误日志",
        "type": "logs",
        "targets": [
          {
            "expr": "{job=\"mcp-server\"} |= \"ERROR\"",
            "refId": "A"
          }
        ]
      }
    ],
    "time": {
      "from": "now-1h",
      "to": "now"
    },
    "refresh": "5s"
  }
}

🚨 AlertManager配置

yaml
# monitoring/alertmanager.yml
global:
  smtp_smarthost: 'smtp.gmail.com:587'
  smtp_from: 'alerts@example.com'
  smtp_auth_username: 'alerts@example.com'
  smtp_auth_password: 'password'

route:
  group_by: ['alertname', 'service']
  group_wait: 10s
  group_interval: 10s
  repeat_interval: 1h
  receiver: 'default'
  routes:
  - match:
      severity: critical
    receiver: 'critical-alerts'
  - match:
      service: mcp-server
    receiver: 'mcp-team'

receivers:
- name: 'default'
  email_configs:
  - to: 'team@example.com'
    subject: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
    body: |
      {{ range .Alerts }}
      Alert: {{ .Annotations.summary }}
      Description: {{ .Annotations.description }}
      Labels: {{ range .Labels.SortedPairs }}{{ .Name }}={{ .Value }} {{ end }}
      {{ end }}

- name: 'critical-alerts'
  email_configs:
  - to: 'oncall@example.com'
    subject: '🚨 CRITICAL: {{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
  slack_configs:
  - api_url: 'YOUR_SLACK_WEBHOOK_URL'
    channel: '#critical-alerts'
    title: '🚨 Critical Alert'
    text: '{{ range .Alerts }}{{ .Annotations.summary }}: {{ .Annotations.description }}{{ end }}'
  pagerduty_configs:
  - routing_key: 'YOUR_PAGERDUTY_KEY'
    description: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'

- name: 'mcp-team'
  slack_configs:
  - api_url: 'YOUR_SLACK_WEBHOOK_URL'
    channel: '#mcp-alerts'
    title: 'MCP Server Alert'
    text: '{{ range .Alerts }}{{ .Annotations.summary }}: {{ .Annotations.description }}{{ end }}'

inhibit_rules:
- source_match:
    severity: 'critical'
  target_match:
    severity: 'warning'
  equal: ['alertname', 'instance']

🛠️ 运维脚本

监控部署脚本

bash
#!/bin/bash
# scripts/deploy_monitoring.sh

set -e

NAMESPACE=${1:-monitoring}
ENVIRONMENT=${2:-production}

echo "🚀 部署监控系统到 $NAMESPACE 命名空间"

# 创建命名空间
kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f -

# 部署Prometheus
echo "📊 部署Prometheus..."
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update

helm upgrade --install prometheus prometheus-community/kube-prometheus-stack \
  --namespace $NAMESPACE \
  --set prometheus.prometheusSpec.retention=30d \
  --set prometheus.prometheusSpec.storageSpec.volumeClaimTemplate.spec.resources.requests.storage=50Gi \
  --set grafana.adminPassword=admin123 \
  --set grafana.service.type=LoadBalancer \
  --values monitoring/prometheus-values.yaml

# 部署ELK Stack
echo "📝 部署ELK Stack..."
helm repo add elastic https://helm.elastic.co
helm repo update

# ElasticSearch
helm upgrade --install elasticsearch elastic/elasticsearch \
  --namespace $NAMESPACE \
  --set replicas=1 \
  --set minimumMasterNodes=1 \
  --set esJavaOpts="-Xmx1g -Xms1g" \
  --set resources.requests.cpu=100m \
  --set resources.requests.memory=2Gi \
  --set volumeClaimTemplate.resources.requests.storage=30Gi

# Kibana
helm upgrade --install kibana elastic/kibana \
  --namespace $NAMESPACE \
  --set service.type=LoadBalancer

# Logstash
helm upgrade --install logstash elastic/logstash \
  --namespace $NAMESPACE \
  --set logstashConfig."logstash.yml"="http.host: 0.0.0.0" \
  --set logstashPipeline."logstash.conf"="$(cat logging/logstash.conf)"

# 部署Jaeger
echo "🔍 部署Jaeger..."
helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
helm repo update

helm upgrade --install jaeger jaegertracing/jaeger \
  --namespace $NAMESPACE \
  --set provisionDataStore.cassandra=false \
  --set provisionDataStore.elasticsearch=true \
  --set storage.type=elasticsearch \
  --set storage.elasticsearch.host=elasticsearch-master \
  --set storage.elasticsearch.port=9200

# 等待所有组件就绪
echo "⏳ 等待所有组件就绪..."
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=prometheus -n $NAMESPACE --timeout=300s
kubectl wait --for=condition=ready pod -l app=elasticsearch-master -n $NAMESPACE --timeout=300s
kubectl wait --for=condition=ready pod -l app=kibana -n $NAMESPACE --timeout=300s

# 获取访问信息
echo "✅ 监控系统部署完成!"
echo ""
echo "📊 Grafana访问信息:"
GRAFANA_IP=$(kubectl get svc prometheus-grafana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "   URL: http://$GRAFANA_IP"
echo "   用户名: admin"
echo "   密码: admin123"
echo ""
echo "📝 Kibana访问信息:"
KIBANA_IP=$(kubectl get svc kibana-kibana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "   URL: http://$KIBANA_IP:5601"
echo ""
echo "🔍 Jaeger访问信息:"
JAEGER_IP=$(kubectl get svc jaeger-query -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo "   URL: http://$JAEGER_IP:16686"

日志分析脚本

python
#!/usr/bin/env python3
# scripts/log_analyzer.py

import json
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from typing import Dict, List, Tuple
import argparse

class LogAnalyzer:
    """日志分析器"""
    
    def __init__(self, log_file: str):
        self.log_file = log_file
        self.logs = []
        self.load_logs()
    
    def load_logs(self):
        """加载日志文件"""
        try:
            with open(self.log_file, 'r', encoding='utf-8') as f:
                for line in f:
                    try:
                        log_entry = json.loads(line.strip())
                        self.logs.append(log_entry)
                    except json.JSONDecodeError:
                        continue
            print(f"📄 加载了 {len(self.logs)} 条日志记录")
        except FileNotFoundError:
            print(f"❌ 日志文件 {self.log_file} 不存在")
    
    def analyze_error_patterns(self) -> Dict[str, int]:
        """分析错误模式"""
        error_patterns = Counter()
        
        for log in self.logs:
            if log.get('level') == 'ERROR':
                message = log.get('message', '')
                # 提取错误类型
                if 'exception' in log:
                    error_type = log['exception'].get('type', 'Unknown')
                    error_patterns[error_type] += 1
                else:
                    # 从消息中提取错误模式
                    if 'timeout' in message.lower():
                        error_patterns['Timeout'] += 1
                    elif 'connection' in message.lower():
                        error_patterns['Connection'] += 1
                    elif 'database' in message.lower():
                        error_patterns['Database'] += 1
                    else:
                        error_patterns['Other'] += 1
        
        return dict(error_patterns)
    
    def analyze_response_times(self) -> Dict[str, float]:
        """分析响应时间"""
        response_times = []
        endpoint_times = defaultdict(list)
        
        for log in self.logs:
            if log.get('event_type') == 'http_request':
                duration = log.get('duration_ms', 0)
                path = log.get('path', '/unknown')
                
                response_times.append(duration)
                endpoint_times[path].append(duration)
        
        if not response_times:
            return {}
        
        result = {
            'avg_response_time': sum(response_times) / len(response_times),
            'max_response_time': max(response_times),
            'min_response_time': min(response_times),
            'total_requests': len(response_times)
        }
        
        # 各端点平均响应时间
        for endpoint, times in endpoint_times.items():
            result[f'{endpoint}_avg'] = sum(times) / len(times)
        
        return result
    
    def analyze_traffic_patterns(self) -> Dict[str, Any]:
        """分析流量模式"""
        hourly_requests = defaultdict(int)
        status_codes = Counter()
        methods = Counter()
        
        for log in self.logs:
            if log.get('event_type') == 'http_request':
                # 按小时统计请求
                timestamp = log.get('timestamp')
                if timestamp:
                    try:
                        dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
                        hour = dt.hour
                        hourly_requests[hour] += 1
                    except:
                        pass
                
                # 状态码统计
                status_code = log.get('status_code')
                if status_code:
                    status_codes[status_code] += 1
                
                # 请求方法统计
                method = log.get('method')
                if method:
                    methods[method] += 1
        
        return {
            'hourly_requests': dict(hourly_requests),
            'status_codes': dict(status_codes),
            'methods': dict(methods)
        }
    
    def analyze_tool_usage(self) -> Dict[str, Any]:
        """分析工具使用情况"""
        tool_calls = Counter()
        tool_errors = Counter()
        tool_response_times = defaultdict(list)
        
        for log in self.logs:
            if log.get('event_type') == 'tool_call':
                tool_name = log.get('tool_name')
                success = log.get('success', True)
                duration = log.get('duration_ms', 0)
                
                if tool_name:
                    tool_calls[tool_name] += 1
                    tool_response_times[tool_name].append(duration)
                    
                    if not success:
                        tool_errors[tool_name] += 1
        
        # 计算平均响应时间
        avg_response_times = {}
        for tool, times in tool_response_times.items():
            avg_response_times[tool] = sum(times) / len(times) if times else 0
        
        return {
            'tool_calls': dict(tool_calls),
            'tool_errors': dict(tool_errors),
            'tool_avg_response_times': avg_response_times
        }
    
    def generate_report(self) -> str:
        """生成分析报告"""
        error_patterns = self.analyze_error_patterns()
        response_times = self.analyze_response_times()
        traffic_patterns = self.analyze_traffic_patterns()
        tool_usage = self.analyze_tool_usage()
        
        report = f"""
📊 MCP服务器日志分析报告
{'='*50}

📈 基础统计
- 总日志条数: {len(self.logs)}
- 分析时间范围: {self._get_time_range()}

🔥 响应时间分析
- 平均响应时间: {response_times.get('avg_response_time', 0):.2f}ms
- 最大响应时间: {response_times.get('max_response_time', 0):.2f}ms
- 最小响应时间: {response_times.get('min_response_time', 0):.2f}ms
- 总请求数: {response_times.get('total_requests', 0)}

❌ 错误模式分析
"""
        
        for error_type, count in error_patterns.items():
            report += f"- {error_type}: {count}\n"
        
        report += f"""
📊 流量模式分析
状态码分布:
"""
        for status, count in traffic_patterns['status_codes'].items():
            report += f"- {status}: {count}\n"
        
        report += f"""
请求方法分布:
"""
        for method, count in traffic_patterns['methods'].items():
            report += f"- {method}: {count}\n"
        
        report += f"""
🔧 工具使用分析
工具调用次数:
"""
        for tool, count in tool_usage['tool_calls'].items():
            avg_time = tool_usage['tool_avg_response_times'].get(tool, 0)
            errors = tool_usage['tool_errors'].get(tool, 0)
            error_rate = (errors / count * 100) if count > 0 else 0
            report += f"- {tool}: {count}次 (平均{avg_time:.2f}ms, 错误率{error_rate:.1f}%)\n"
        
        return report
    
    def _get_time_range(self) -> str:
        """获取时间范围"""
        timestamps = []
        for log in self.logs:
            timestamp = log.get('timestamp')
            if timestamp:
                try:
                    dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
                    timestamps.append(dt)
                except:
                    pass
        
        if timestamps:
            min_time = min(timestamps)
            max_time = max(timestamps)
            return f"{min_time.strftime('%Y-%m-%d %H:%M:%S')} - {max_time.strftime('%Y-%m-%d %H:%M:%S')}"
        else:
            return "未知"

def main():
    parser = argparse.ArgumentParser(description='MCP服务器日志分析工具')
    parser.add_argument('log_file', help='日志文件路径')
    parser.add_argument('--output', '-o', help='报告输出文件')
    
    args = parser.parse_args()
    
    analyzer = LogAnalyzer(args.log_file)
    report = analyzer.generate_report()
    
    if args.output:
        with open(args.output, 'w', encoding='utf-8') as f:
            f.write(report)
        print(f"📄 报告已保存到 {args.output}")
    else:
        print(report)

if __name__ == '__main__':
    main()

🎯 最佳实践

1. 监控指标设计

  • 四个黄金信号:延迟、流量、错误、饱和度
  • 业务指标:用户转化率、工具成功率
  • 基础设施指标:CPU、内存、磁盘、网络

2. 日志结构化

  • 统一格式:JSON格式,包含必要字段
  • 上下文信息:请求ID、用户ID、会话ID
  • 敏感信息脱敏:密码、token等

3. 告警策略

  • 分级告警:Critical、Warning、Info
  • 避免告警疲劳:合理设置阈值和静默时间
  • 可操作性:每个告警都应该有明确的处理流程

4. 链路追踪

  • 关键路径覆盖:用户请求、工具调用、数据库操作
  • 性能瓶颈识别:找出耗时最长的操作
  • 错误传播追踪:快速定位错误源头

🚀 下一步

完成监控与日志系统搭建后,你可以:

  1. 学习性能调优技巧7.4 性能调优与扩容
  2. 掌握故障排除方法7.5 故障排除与维护
  3. 了解最佳实践第8章:最佳实践

📚 扩展阅读

🏠 返回教程首页 | 📖 查看完整目录 | ▶️ 下一节: 性能调优