7.3 监控与日志系统
📊 核心目标:构建全面的监控和日志系统,实现系统可观测性和运维自动化
⏱️ 预计时长:50分钟
📊 难度级别:⭐⭐⭐⭐⭐
🎯 学习目标
通过本节学习,你将掌握:
- 设计分层监控架构
- 实现应用性能监控(APM)
- 构建集中化日志系统
- 建立智能告警机制
- 创建可视化仪表板
📊 监控系统架构
监控架构图
🔧 Prometheus监控系统
Prometheus配置
yaml
# monitoring/prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
- "recording_rules.yml"
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093
scrape_configs:
# MCP服务器监控
- job_name: 'mcp-server'
static_configs:
- targets: ['mcp-server:8000']
metrics_path: '/metrics'
scrape_interval: 10s
scrape_timeout: 5s
# 系统监控
- job_name: 'node-exporter'
static_configs:
- targets: ['node-exporter:9100']
# Redis监控
- job_name: 'redis'
static_configs:
- targets: ['redis-exporter:9121']
# PostgreSQL监控
- job_name: 'postgres'
static_configs:
- targets: ['postgres-exporter:9187']
# Kubernetes监控
- job_name: 'kubernetes-apiservers'
kubernetes_sd_configs:
- role: endpoints
scheme: https
tls_config:
ca_file: /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
bearer_token_file: /var/run/secrets/kubernetes.io/serviceaccount/token
relabel_configs:
- source_labels: [__meta_kubernetes_namespace, __meta_kubernetes_service_name, __meta_kubernetes_endpoint_port_name]
action: keep
regex: default;kubernetes;https
- job_name: 'kubernetes-nodes'
kubernetes_sd_configs:
- role: node
relabel_configs:
- action: labelmap
regex: __meta_kubernetes_node_label_(.+)
- job_name: 'kubernetes-pods'
kubernetes_sd_configs:
- role: pod
relabel_configs:
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_scrape]
action: keep
regex: true
- source_labels: [__meta_kubernetes_pod_annotation_prometheus_io_path]
action: replace
target_label: __metrics_path__
regex: (.+)
应用指标收集
python
# src/monitoring/metrics.py
from prometheus_client import Counter, Histogram, Gauge, Info, start_http_server
import functools
import time
from typing import Callable, Any
import psutil
import asyncio
class MCPMetrics:
"""MCP服务器指标收集器"""
def __init__(self):
# HTTP请求指标
self.http_requests_total = Counter(
'http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status_code']
)
self.http_request_duration = Histogram(
'http_request_duration_seconds',
'HTTP request duration',
['method', 'endpoint'],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# MCP协议指标
self.mcp_connections = Gauge(
'mcp_connections_active',
'Active MCP connections'
)
self.mcp_messages_total = Counter(
'mcp_messages_total',
'Total MCP messages',
['message_type', 'direction']
)
self.mcp_errors_total = Counter(
'mcp_errors_total',
'Total MCP errors',
['error_type']
)
# 业务指标
self.tool_calls_total = Counter(
'mcp_tool_calls_total',
'Total tool calls',
['tool_name', 'status']
)
self.tool_call_duration = Histogram(
'mcp_tool_call_duration_seconds',
'Tool call duration',
['tool_name'],
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
# 资源使用指标
self.memory_usage = Gauge(
'process_memory_usage_bytes',
'Process memory usage'
)
self.cpu_usage = Gauge(
'process_cpu_usage_percent',
'Process CPU usage'
)
# 应用信息
self.app_info = Info(
'mcp_server_info',
'MCP server information'
)
# 启动资源监控
self._start_resource_monitoring()
def track_http_request(self, func: Callable) -> Callable:
"""HTTP请求装饰器"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
request = args[0] if args else kwargs.get('request')
method = request.method if request else 'UNKNOWN'
path = request.url.path if request else 'unknown'
start_time = time.time()
status_code = '200'
try:
response = await func(*args, **kwargs)
if hasattr(response, 'status_code'):
status_code = str(response.status_code)
return response
except Exception as e:
status_code = '500'
raise
finally:
duration = time.time() - start_time
self.http_requests_total.labels(
method=method,
endpoint=path,
status_code=status_code
).inc()
self.http_request_duration.labels(
method=method,
endpoint=path
).observe(duration)
return wrapper
def track_tool_call(self, tool_name: str):
"""工具调用装饰器"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.time()
status = 'success'
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
status = 'error'
self.mcp_errors_total.labels(
error_type=type(e).__name__
).inc()
raise
finally:
duration = time.time() - start_time
self.tool_calls_total.labels(
tool_name=tool_name,
status=status
).inc()
self.tool_call_duration.labels(
tool_name=tool_name
).observe(duration)
return wrapper
return decorator
def record_mcp_message(self, message_type: str, direction: str):
"""记录MCP消息"""
self.mcp_messages_total.labels(
message_type=message_type,
direction=direction
).inc()
def set_active_connections(self, count: int):
"""设置活跃连接数"""
self.mcp_connections.set(count)
def set_app_info(self, version: str, environment: str, build_id: str):
"""设置应用信息"""
self.app_info.info({
'version': version,
'environment': environment,
'build_id': build_id
})
def _start_resource_monitoring(self):
"""启动资源监控"""
async def monitor_resources():
while True:
try:
process = psutil.Process()
# 内存使用
memory_info = process.memory_info()
self.memory_usage.set(memory_info.rss)
# CPU使用率
cpu_percent = process.cpu_percent(interval=1)
self.cpu_usage.set(cpu_percent)
await asyncio.sleep(30) # 每30秒更新一次
except Exception as e:
print(f"资源监控错误: {e}")
await asyncio.sleep(60)
asyncio.create_task(monitor_resources())
# 全局指标实例
metrics = MCPMetrics()
# 启动指标服务器
def start_metrics_server(port: int = 8001):
"""启动Prometheus指标服务器"""
start_http_server(port)
print(f"📊 指标服务器启动在端口 {port}")
告警规则配置
yaml
# monitoring/alert_rules.yml
groups:
- name: mcp-server-alerts
rules:
# 服务可用性告警
- alert: MCPServerDown
expr: up{job="mcp-server"} == 0
for: 1m
labels:
severity: critical
service: mcp-server
annotations:
summary: "MCP Server is down"
description: "MCP Server instance {{ $labels.instance }} has been down for more than 1 minute"
runbook_url: "https://runbook.example.com/mcp-server-down"
# 高错误率告警
- alert: MCPServerHighErrorRate
expr: |
(
rate(http_requests_total{job="mcp-server",status_code=~"5.."}[5m]) /
rate(http_requests_total{job="mcp-server"}[5m])
) > 0.05
for: 3m
labels:
severity: warning
service: mcp-server
annotations:
summary: "High error rate detected"
description: "Error rate is {{ $value | humanizePercentage }} for instance {{ $labels.instance }}"
# 高延迟告警
- alert: MCPServerHighLatency
expr: |
histogram_quantile(0.95,
rate(http_request_duration_seconds_bucket{job="mcp-server"}[5m])
) > 2.0
for: 5m
labels:
severity: warning
service: mcp-server
annotations:
summary: "High latency detected"
description: "95th percentile latency is {{ $value }}s for instance {{ $labels.instance }}"
# 内存使用告警
- alert: MCPServerHighMemoryUsage
expr: process_memory_usage_bytes{job="mcp-server"} / 1024 / 1024 / 1024 > 1.0
for: 5m
labels:
severity: warning
service: mcp-server
annotations:
summary: "High memory usage"
description: "Memory usage is {{ $value | humanizeBytes }} for instance {{ $labels.instance }}"
# CPU使用告警
- alert: MCPServerHighCPUUsage
expr: process_cpu_usage_percent{job="mcp-server"} > 80
for: 10m
labels:
severity: warning
service: mcp-server
annotations:
summary: "High CPU usage"
description: "CPU usage is {{ $value }}% for instance {{ $labels.instance }}"
# 连接数告警
- alert: MCPServerTooManyConnections
expr: mcp_connections_active{job="mcp-server"} > 1000
for: 2m
labels:
severity: warning
service: mcp-server
annotations:
summary: "Too many active connections"
description: "Active connections: {{ $value }} for instance {{ $labels.instance }}"
# 工具调用失败率告警
- alert: MCPToolCallFailureRate
expr: |
(
rate(mcp_tool_calls_total{status="error"}[5m]) /
rate(mcp_tool_calls_total[5m])
) > 0.1
for: 3m
labels:
severity: warning
service: mcp-server
annotations:
summary: "High tool call failure rate"
description: "Tool call failure rate is {{ $value | humanizePercentage }}"
- name: infrastructure-alerts
rules:
# 磁盘空间告警
- alert: DiskSpaceLow
expr: |
(
node_filesystem_avail_bytes{mountpoint="/",fstype!="rootfs"} /
node_filesystem_size_bytes{mountpoint="/",fstype!="rootfs"}
) < 0.1
for: 5m
labels:
severity: critical
annotations:
summary: "Disk space is low"
description: "Disk space is below 10% on {{ $labels.instance }}"
# 节点负载告警
- alert: NodeHighLoad
expr: node_load1 > node_cpu_seconds_total{mode="idle"} * 0.8
for: 10m
labels:
severity: warning
annotations:
summary: "Node load is high"
description: "Load average is {{ $value }} on {{ $labels.instance }}"
- name: database-alerts
rules:
# 数据库连接告警
- alert: PostgreSQLTooManyConnections
expr: pg_stat_activity_count / pg_settings_max_connections > 0.8
for: 2m
labels:
severity: warning
annotations:
summary: "PostgreSQL has too many connections"
description: "Connection usage is {{ $value | humanizePercentage }}"
# 数据库锁等待告警
- alert: PostgreSQLLockWait
expr: pg_locks_count{mode="AccessShareLock"} > 100
for: 1m
labels:
severity: warning
annotations:
summary: "PostgreSQL has many locks waiting"
description: "Lock count is {{ $value }}"
📝 日志系统架构
ELK Stack配置
yaml
# logging/elasticsearch.yml
cluster.name: "mcp-logs"
node.name: "elasticsearch"
network.host: 0.0.0.0
http.port: 9200
discovery.type: single-node
# 索引模板
index.number_of_shards: 1
index.number_of_replicas: 0
# 日志轮转配置
action.auto_create_index: "+logs-*,-*"
yaml
# logging/logstash.conf
input {
beats {
port => 5044
}
tcp {
port => 5000
codec => json_lines
}
}
filter {
if [service] == "mcp-server" {
# 解析MCP服务器日志
grok {
match => { "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] %{LOGLEVEL:level} %{GREEDYDATA:msg}" }
}
# 提取请求ID
if [msg] =~ /request_id=([a-zA-Z0-9-]+)/ {
grok {
match => { "msg" => "request_id=%{UUID:request_id}" }
}
}
# 提取用户ID
if [msg] =~ /user_id=([0-9]+)/ {
grok {
match => { "msg" => "user_id=%{INT:user_id}" }
}
}
# 提取工具名称
if [msg] =~ /tool=([a-zA-Z_]+)/ {
grok {
match => { "msg" => "tool=%{WORD:tool_name}" }
}
}
# 解析JSON数据
if [msg] =~ /^\{.*\}$/ {
json {
source => "msg"
}
}
}
# 添加时间戳
date {
match => [ "timestamp", "ISO8601" ]
}
# 地理位置解析
if [client_ip] {
geoip {
source => "client_ip"
target => "geoip"
}
}
# 敏感信息脱敏
mutate {
gsub => [
"message", "password=[^&\s]+", "password=***",
"message", "token=[^&\s]+", "token=***",
"message", "api_key=[^&\s]+", "api_key=***"
]
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "logs-%{service}-%{+YYYY.MM.dd}"
}
# 错误日志单独存储
if [level] == "ERROR" {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "errors-%{service}-%{+YYYY.MM.dd}"
}
}
stdout { codec => rubydebug }
}
结构化日志实现
python
# src/logging/structured_logger.py
import logging
import json
import time
import traceback
from datetime import datetime
from typing import Dict, Any, Optional
from contextvars import ContextVar
import uuid
# 请求上下文
request_context: ContextVar[Dict[str, Any]] = ContextVar('request_context', default={})
class StructuredFormatter(logging.Formatter):
"""结构化日志格式化器"""
def format(self, record: logging.LogRecord) -> str:
# 基础日志信息
log_entry = {
'timestamp': datetime.utcnow().isoformat() + 'Z',
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# 添加请求上下文
context = request_context.get({})
if context:
log_entry.update(context)
# 添加异常信息
if record.exc_info:
log_entry['exception'] = {
'type': record.exc_info[0].__name__,
'message': str(record.exc_info[1]),
'traceback': traceback.format_exception(*record.exc_info)
}
# 添加自定义字段
if hasattr(record, 'extra_fields'):
log_entry.update(record.extra_fields)
return json.dumps(log_entry, ensure_ascii=False)
class MCPLogger:
"""MCP结构化日志器"""
def __init__(self, name: str = 'mcp-server'):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setFormatter(StructuredFormatter())
self.logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.FileHandler('logs/mcp-server.log')
file_handler.setFormatter(StructuredFormatter())
self.logger.addHandler(file_handler)
def set_request_context(self, **kwargs):
"""设置请求上下文"""
context = request_context.get({})
context.update(kwargs)
request_context.set(context)
def clear_request_context(self):
"""清除请求上下文"""
request_context.set({})
def info(self, message: str, **kwargs):
"""记录INFO日志"""
extra = {'extra_fields': kwargs} if kwargs else {}
self.logger.info(message, extra=extra)
def error(self, message: str, **kwargs):
"""记录ERROR日志"""
extra = {'extra_fields': kwargs} if kwargs else {}
self.logger.error(message, extra=extra)
def warning(self, message: str, **kwargs):
"""记录WARNING日志"""
extra = {'extra_fields': kwargs} if kwargs else {}
self.logger.warning(message, extra=extra)
def debug(self, message: str, **kwargs):
"""记录DEBUG日志"""
extra = {'extra_fields': kwargs} if kwargs else {}
self.logger.debug(message, extra=extra)
def log_request(self, method: str, path: str, status_code: int,
duration: float, user_id: Optional[str] = None):
"""记录HTTP请求"""
self.info(
"HTTP Request",
event_type="http_request",
method=method,
path=path,
status_code=status_code,
duration_ms=duration * 1000,
user_id=user_id
)
def log_tool_call(self, tool_name: str, parameters: Dict[str, Any],
duration: float, success: bool, error: Optional[str] = None):
"""记录工具调用"""
self.info(
f"Tool call: {tool_name}",
event_type="tool_call",
tool_name=tool_name,
parameters=parameters,
duration_ms=duration * 1000,
success=success,
error=error
)
def log_mcp_message(self, message_type: str, direction: str,
message_id: str, size: int):
"""记录MCP消息"""
self.debug(
f"MCP Message: {message_type}",
event_type="mcp_message",
message_type=message_type,
direction=direction,
message_id=message_id,
size_bytes=size
)
def log_business_event(self, event_name: str, **data):
"""记录业务事件"""
self.info(
f"Business Event: {event_name}",
event_type="business_event",
event_name=event_name,
**data
)
# 全局日志器实例
logger = MCPLogger()
# 请求追踪装饰器
def track_request(func):
"""请求追踪装饰器"""
async def wrapper(*args, **kwargs):
request_id = str(uuid.uuid4())
start_time = time.time()
# 设置请求上下文
logger.set_request_context(
request_id=request_id,
service="mcp-server"
)
try:
# 执行请求
result = await func(*args, **kwargs)
# 记录成功请求
duration = time.time() - start_time
logger.log_request(
method=kwargs.get('method', 'UNKNOWN'),
path=kwargs.get('path', '/'),
status_code=200,
duration=duration
)
return result
except Exception as e:
# 记录失败请求
duration = time.time() - start_time
logger.log_request(
method=kwargs.get('method', 'UNKNOWN'),
path=kwargs.get('path', '/'),
status_code=500,
duration=duration
)
logger.error(
f"Request failed: {str(e)}",
error_type=type(e).__name__,
request_id=request_id
)
raise
finally:
# 清除请求上下文
logger.clear_request_context()
return wrapper
Fluentd配置
ruby
# logging/fluent.conf
<source>
@type tail
path /var/log/mcp-server/*.log
pos_file /var/log/fluentd/mcp-server.log.pos
tag mcp.server
format json
time_key timestamp
time_format %Y-%m-%dT%H:%M:%S.%LZ
</source>
<source>
@type tail
path /var/log/nginx/access.log
pos_file /var/log/fluentd/nginx.access.log.pos
tag nginx.access
format nginx
</source>
<filter mcp.server>
@type record_transformer
<record>
service mcp-server
environment "#{ENV['ENVIRONMENT']}"
hostname "#{Socket.gethostname}"
</record>
</filter>
<filter nginx.access>
@type record_transformer
<record>
service nginx
environment "#{ENV['ENVIRONMENT']}"
hostname "#{Socket.gethostname}"
</record>
</filter>
# 错误日志单独处理
<filter mcp.server>
@type grep
<regexp>
key level
pattern ^ERROR$
</regexp>
@label @ERROR
</filter>
<label @ERROR>
<match mcp.server>
@type copy
<store>
@type elasticsearch
host elasticsearch
port 9200
index_name errors
type_name _doc
include_tag_key true
tag_key @log_name
flush_interval 1s
</store>
<store>
@type slack
webhook_url "#{ENV['SLACK_WEBHOOK_URL']}"
channel "#alerts"
username fluentd
icon_emoji :warning:
title "MCP Server Error"
message "%s"
message_keys message
</store>
</match>
</label>
<match mcp.**>
@type elasticsearch
host elasticsearch
port 9200
index_name logs
type_name _doc
include_tag_key true
tag_key @log_name
flush_interval 5s
<buffer>
@type file
path /var/log/fluentd/buffer/elasticsearch
flush_mode interval
retry_type exponential_backoff
flush_thread_count 2
flush_interval 5s
retry_forever
retry_max_interval 30
chunk_limit_size 2M
queue_limit_length 8
overflow_action block
</buffer>
</match>
🔍 链路追踪系统
Jaeger配置
python
# src/tracing/tracer.py
from opentelemetry import trace
from opentelemetry.exporter.jaeger.thrift import JaegerExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.requests import RequestsInstrumentor
from opentelemetry.instrumentation.psycopg2 import Psycopg2Instrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
import os
def setup_tracing():
"""设置分布式追踪"""
# 配置Tracer Provider
trace.set_tracer_provider(TracerProvider())
tracer_provider = trace.get_tracer_provider()
# 配置Jaeger Exporter
jaeger_exporter = JaegerExporter(
agent_host_name=os.getenv('JAEGER_AGENT_HOST', 'localhost'),
agent_port=int(os.getenv('JAEGER_AGENT_PORT', 6831)),
collector_endpoint=os.getenv('JAEGER_COLLECTOR_ENDPOINT'),
)
# 添加Span Processor
span_processor = BatchSpanProcessor(jaeger_exporter)
tracer_provider.add_span_processor(span_processor)
# 自动埋点
FastAPIInstrumentor.instrument()
RequestsInstrumentor.instrument()
Psycopg2Instrumentor.instrument()
RedisInstrumentor.instrument()
return trace.get_tracer(__name__)
class TracingMixin:
"""追踪混入类"""
def __init__(self):
self.tracer = trace.get_tracer(__name__)
def trace_operation(self, operation_name: str, **attributes):
"""操作追踪装饰器"""
def decorator(func):
async def wrapper(*args, **kwargs):
with self.tracer.start_as_current_span(operation_name) as span:
# 添加属性
for key, value in attributes.items():
span.set_attribute(key, str(value))
try:
result = await func(*args, **kwargs)
span.set_status(trace.Status(trace.StatusCode.OK))
return result
except Exception as e:
span.set_status(
trace.Status(
trace.StatusCode.ERROR,
description=str(e)
)
)
span.record_exception(e)
raise
return wrapper
return decorator
def add_span_attributes(self, **attributes):
"""添加span属性"""
current_span = trace.get_current_span()
if current_span:
for key, value in attributes.items():
current_span.set_attribute(key, str(value))
def add_span_event(self, name: str, **attributes):
"""添加span事件"""
current_span = trace.get_current_span()
if current_span:
current_span.add_event(name, attributes)
# 全局tracer
tracer = setup_tracing()
📊 Grafana仪表板
主要仪表板配置
json
{
"dashboard": {
"id": null,
"title": "MCP Server Dashboard",
"tags": ["mcp", "server"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "服务状态概览",
"type": "stat",
"targets": [
{
"expr": "up{job=\"mcp-server\"}",
"legendFormat": "{{instance}}"
}
],
"fieldConfig": {
"defaults": {
"color": {
"mode": "thresholds"
},
"thresholds": {
"steps": [
{"color": "red", "value": 0},
{"color": "green", "value": 1}
]
},
"mappings": [
{"options": {"0": {"text": "Down"}, "1": {"text": "Up"}}}
]
}
}
},
{
"id": 2,
"title": "请求率",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{job=\"mcp-server\"}[5m])",
"legendFormat": "{{method}} {{status_code}}"
}
],
"yAxes": [
{
"label": "请求/秒",
"min": 0
}
]
},
{
"id": 3,
"title": "响应时间",
"type": "graph",
"targets": [
{
"expr": "histogram_quantile(0.50, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
"legendFormat": "50th percentile"
},
{
"expr": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
"legendFormat": "95th percentile"
},
{
"expr": "histogram_quantile(0.99, rate(http_request_duration_seconds_bucket{job=\"mcp-server\"}[5m]))",
"legendFormat": "99th percentile"
}
],
"yAxes": [
{
"label": "秒",
"min": 0
}
]
},
{
"id": 4,
"title": "错误率",
"type": "graph",
"targets": [
{
"expr": "rate(http_requests_total{job=\"mcp-server\",status_code=~\"5..\"}[5m]) / rate(http_requests_total{job=\"mcp-server\"}[5m])",
"legendFormat": "Error Rate"
}
],
"yAxes": [
{
"label": "比例",
"min": 0,
"max": 1
}
]
},
{
"id": 5,
"title": "活跃连接",
"type": "graph",
"targets": [
{
"expr": "mcp_connections_active{job=\"mcp-server\"}",
"legendFormat": "Active Connections"
}
]
},
{
"id": 6,
"title": "工具调用统计",
"type": "graph",
"targets": [
{
"expr": "rate(mcp_tool_calls_total{job=\"mcp-server\"}[5m])",
"legendFormat": "{{tool_name}} ({{status}})"
}
]
},
{
"id": 7,
"title": "资源使用",
"type": "graph",
"targets": [
{
"expr": "process_memory_usage_bytes{job=\"mcp-server\"} / 1024 / 1024",
"legendFormat": "Memory (MB)",
"yAxisIndex": 0
},
{
"expr": "process_cpu_usage_percent{job=\"mcp-server\"}",
"legendFormat": "CPU (%)",
"yAxisIndex": 1
}
],
"yAxes": [
{"label": "MB", "min": 0},
{"label": "%", "min": 0, "max": 100}
]
},
{
"id": 8,
"title": "最近错误日志",
"type": "logs",
"targets": [
{
"expr": "{job=\"mcp-server\"} |= \"ERROR\"",
"refId": "A"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "5s"
}
}
🚨 AlertManager配置
yaml
# monitoring/alertmanager.yml
global:
smtp_smarthost: 'smtp.gmail.com:587'
smtp_from: 'alerts@example.com'
smtp_auth_username: 'alerts@example.com'
smtp_auth_password: 'password'
route:
group_by: ['alertname', 'service']
group_wait: 10s
group_interval: 10s
repeat_interval: 1h
receiver: 'default'
routes:
- match:
severity: critical
receiver: 'critical-alerts'
- match:
service: mcp-server
receiver: 'mcp-team'
receivers:
- name: 'default'
email_configs:
- to: 'team@example.com'
subject: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
body: |
{{ range .Alerts }}
Alert: {{ .Annotations.summary }}
Description: {{ .Annotations.description }}
Labels: {{ range .Labels.SortedPairs }}{{ .Name }}={{ .Value }} {{ end }}
{{ end }}
- name: 'critical-alerts'
email_configs:
- to: 'oncall@example.com'
subject: '🚨 CRITICAL: {{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
slack_configs:
- api_url: 'YOUR_SLACK_WEBHOOK_URL'
channel: '#critical-alerts'
title: '🚨 Critical Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}: {{ .Annotations.description }}{{ end }}'
pagerduty_configs:
- routing_key: 'YOUR_PAGERDUTY_KEY'
description: '{{ range .Alerts }}{{ .Annotations.summary }}{{ end }}'
- name: 'mcp-team'
slack_configs:
- api_url: 'YOUR_SLACK_WEBHOOK_URL'
channel: '#mcp-alerts'
title: 'MCP Server Alert'
text: '{{ range .Alerts }}{{ .Annotations.summary }}: {{ .Annotations.description }}{{ end }}'
inhibit_rules:
- source_match:
severity: 'critical'
target_match:
severity: 'warning'
equal: ['alertname', 'instance']
🛠️ 运维脚本
监控部署脚本
bash
#!/bin/bash
# scripts/deploy_monitoring.sh
set -e
NAMESPACE=${1:-monitoring}
ENVIRONMENT=${2:-production}
echo "🚀 部署监控系统到 $NAMESPACE 命名空间"
# 创建命名空间
kubectl create namespace $NAMESPACE --dry-run=client -o yaml | kubectl apply -f -
# 部署Prometheus
echo "📊 部署Prometheus..."
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo update
helm upgrade --install prometheus prometheus-community/kube-prometheus-stack \
--namespace $NAMESPACE \
--set prometheus.prometheusSpec.retention=30d \
--set prometheus.prometheusSpec.storageSpec.volumeClaimTemplate.spec.resources.requests.storage=50Gi \
--set grafana.adminPassword=admin123 \
--set grafana.service.type=LoadBalancer \
--values monitoring/prometheus-values.yaml
# 部署ELK Stack
echo "📝 部署ELK Stack..."
helm repo add elastic https://helm.elastic.co
helm repo update
# ElasticSearch
helm upgrade --install elasticsearch elastic/elasticsearch \
--namespace $NAMESPACE \
--set replicas=1 \
--set minimumMasterNodes=1 \
--set esJavaOpts="-Xmx1g -Xms1g" \
--set resources.requests.cpu=100m \
--set resources.requests.memory=2Gi \
--set volumeClaimTemplate.resources.requests.storage=30Gi
# Kibana
helm upgrade --install kibana elastic/kibana \
--namespace $NAMESPACE \
--set service.type=LoadBalancer
# Logstash
helm upgrade --install logstash elastic/logstash \
--namespace $NAMESPACE \
--set logstashConfig."logstash.yml"="http.host: 0.0.0.0" \
--set logstashPipeline."logstash.conf"="$(cat logging/logstash.conf)"
# 部署Jaeger
echo "🔍 部署Jaeger..."
helm repo add jaegertracing https://jaegertracing.github.io/helm-charts
helm repo update
helm upgrade --install jaeger jaegertracing/jaeger \
--namespace $NAMESPACE \
--set provisionDataStore.cassandra=false \
--set provisionDataStore.elasticsearch=true \
--set storage.type=elasticsearch \
--set storage.elasticsearch.host=elasticsearch-master \
--set storage.elasticsearch.port=9200
# 等待所有组件就绪
echo "⏳ 等待所有组件就绪..."
kubectl wait --for=condition=ready pod -l app.kubernetes.io/name=prometheus -n $NAMESPACE --timeout=300s
kubectl wait --for=condition=ready pod -l app=elasticsearch-master -n $NAMESPACE --timeout=300s
kubectl wait --for=condition=ready pod -l app=kibana -n $NAMESPACE --timeout=300s
# 获取访问信息
echo "✅ 监控系统部署完成!"
echo ""
echo "📊 Grafana访问信息:"
GRAFANA_IP=$(kubectl get svc prometheus-grafana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo " URL: http://$GRAFANA_IP"
echo " 用户名: admin"
echo " 密码: admin123"
echo ""
echo "📝 Kibana访问信息:"
KIBANA_IP=$(kubectl get svc kibana-kibana -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo " URL: http://$KIBANA_IP:5601"
echo ""
echo "🔍 Jaeger访问信息:"
JAEGER_IP=$(kubectl get svc jaeger-query -n $NAMESPACE -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
echo " URL: http://$JAEGER_IP:16686"
日志分析脚本
python
#!/usr/bin/env python3
# scripts/log_analyzer.py
import json
import re
from datetime import datetime, timedelta
from collections import defaultdict, Counter
from typing import Dict, List, Tuple
import argparse
class LogAnalyzer:
"""日志分析器"""
def __init__(self, log_file: str):
self.log_file = log_file
self.logs = []
self.load_logs()
def load_logs(self):
"""加载日志文件"""
try:
with open(self.log_file, 'r', encoding='utf-8') as f:
for line in f:
try:
log_entry = json.loads(line.strip())
self.logs.append(log_entry)
except json.JSONDecodeError:
continue
print(f"📄 加载了 {len(self.logs)} 条日志记录")
except FileNotFoundError:
print(f"❌ 日志文件 {self.log_file} 不存在")
def analyze_error_patterns(self) -> Dict[str, int]:
"""分析错误模式"""
error_patterns = Counter()
for log in self.logs:
if log.get('level') == 'ERROR':
message = log.get('message', '')
# 提取错误类型
if 'exception' in log:
error_type = log['exception'].get('type', 'Unknown')
error_patterns[error_type] += 1
else:
# 从消息中提取错误模式
if 'timeout' in message.lower():
error_patterns['Timeout'] += 1
elif 'connection' in message.lower():
error_patterns['Connection'] += 1
elif 'database' in message.lower():
error_patterns['Database'] += 1
else:
error_patterns['Other'] += 1
return dict(error_patterns)
def analyze_response_times(self) -> Dict[str, float]:
"""分析响应时间"""
response_times = []
endpoint_times = defaultdict(list)
for log in self.logs:
if log.get('event_type') == 'http_request':
duration = log.get('duration_ms', 0)
path = log.get('path', '/unknown')
response_times.append(duration)
endpoint_times[path].append(duration)
if not response_times:
return {}
result = {
'avg_response_time': sum(response_times) / len(response_times),
'max_response_time': max(response_times),
'min_response_time': min(response_times),
'total_requests': len(response_times)
}
# 各端点平均响应时间
for endpoint, times in endpoint_times.items():
result[f'{endpoint}_avg'] = sum(times) / len(times)
return result
def analyze_traffic_patterns(self) -> Dict[str, Any]:
"""分析流量模式"""
hourly_requests = defaultdict(int)
status_codes = Counter()
methods = Counter()
for log in self.logs:
if log.get('event_type') == 'http_request':
# 按小时统计请求
timestamp = log.get('timestamp')
if timestamp:
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
hour = dt.hour
hourly_requests[hour] += 1
except:
pass
# 状态码统计
status_code = log.get('status_code')
if status_code:
status_codes[status_code] += 1
# 请求方法统计
method = log.get('method')
if method:
methods[method] += 1
return {
'hourly_requests': dict(hourly_requests),
'status_codes': dict(status_codes),
'methods': dict(methods)
}
def analyze_tool_usage(self) -> Dict[str, Any]:
"""分析工具使用情况"""
tool_calls = Counter()
tool_errors = Counter()
tool_response_times = defaultdict(list)
for log in self.logs:
if log.get('event_type') == 'tool_call':
tool_name = log.get('tool_name')
success = log.get('success', True)
duration = log.get('duration_ms', 0)
if tool_name:
tool_calls[tool_name] += 1
tool_response_times[tool_name].append(duration)
if not success:
tool_errors[tool_name] += 1
# 计算平均响应时间
avg_response_times = {}
for tool, times in tool_response_times.items():
avg_response_times[tool] = sum(times) / len(times) if times else 0
return {
'tool_calls': dict(tool_calls),
'tool_errors': dict(tool_errors),
'tool_avg_response_times': avg_response_times
}
def generate_report(self) -> str:
"""生成分析报告"""
error_patterns = self.analyze_error_patterns()
response_times = self.analyze_response_times()
traffic_patterns = self.analyze_traffic_patterns()
tool_usage = self.analyze_tool_usage()
report = f"""
📊 MCP服务器日志分析报告
{'='*50}
📈 基础统计
- 总日志条数: {len(self.logs)}
- 分析时间范围: {self._get_time_range()}
🔥 响应时间分析
- 平均响应时间: {response_times.get('avg_response_time', 0):.2f}ms
- 最大响应时间: {response_times.get('max_response_time', 0):.2f}ms
- 最小响应时间: {response_times.get('min_response_time', 0):.2f}ms
- 总请求数: {response_times.get('total_requests', 0)}
❌ 错误模式分析
"""
for error_type, count in error_patterns.items():
report += f"- {error_type}: {count}次\n"
report += f"""
📊 流量模式分析
状态码分布:
"""
for status, count in traffic_patterns['status_codes'].items():
report += f"- {status}: {count}次\n"
report += f"""
请求方法分布:
"""
for method, count in traffic_patterns['methods'].items():
report += f"- {method}: {count}次\n"
report += f"""
🔧 工具使用分析
工具调用次数:
"""
for tool, count in tool_usage['tool_calls'].items():
avg_time = tool_usage['tool_avg_response_times'].get(tool, 0)
errors = tool_usage['tool_errors'].get(tool, 0)
error_rate = (errors / count * 100) if count > 0 else 0
report += f"- {tool}: {count}次 (平均{avg_time:.2f}ms, 错误率{error_rate:.1f}%)\n"
return report
def _get_time_range(self) -> str:
"""获取时间范围"""
timestamps = []
for log in self.logs:
timestamp = log.get('timestamp')
if timestamp:
try:
dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00'))
timestamps.append(dt)
except:
pass
if timestamps:
min_time = min(timestamps)
max_time = max(timestamps)
return f"{min_time.strftime('%Y-%m-%d %H:%M:%S')} - {max_time.strftime('%Y-%m-%d %H:%M:%S')}"
else:
return "未知"
def main():
parser = argparse.ArgumentParser(description='MCP服务器日志分析工具')
parser.add_argument('log_file', help='日志文件路径')
parser.add_argument('--output', '-o', help='报告输出文件')
args = parser.parse_args()
analyzer = LogAnalyzer(args.log_file)
report = analyzer.generate_report()
if args.output:
with open(args.output, 'w', encoding='utf-8') as f:
f.write(report)
print(f"📄 报告已保存到 {args.output}")
else:
print(report)
if __name__ == '__main__':
main()
🎯 最佳实践
1. 监控指标设计
- 四个黄金信号:延迟、流量、错误、饱和度
- 业务指标:用户转化率、工具成功率
- 基础设施指标:CPU、内存、磁盘、网络
2. 日志结构化
- 统一格式:JSON格式,包含必要字段
- 上下文信息:请求ID、用户ID、会话ID
- 敏感信息脱敏:密码、token等
3. 告警策略
- 分级告警:Critical、Warning、Info
- 避免告警疲劳:合理设置阈值和静默时间
- 可操作性:每个告警都应该有明确的处理流程
4. 链路追踪
- 关键路径覆盖:用户请求、工具调用、数据库操作
- 性能瓶颈识别:找出耗时最长的操作
- 错误传播追踪:快速定位错误源头
🚀 下一步
完成监控与日志系统搭建后,你可以:
- 学习性能调优技巧 → 7.4 性能调优与扩容
- 掌握故障排除方法 → 7.5 故障排除与维护
- 了解最佳实践 → 第8章:最佳实践
📚 扩展阅读: