4.5 测试和调试
🎯 学习目标:掌握MCP服务器的测试方法和调试技巧,构建可靠的开发工作流
⏱️ 预计时间:40分钟
📊 难度等级:⭐⭐⭐
🔍 为什么测试和调试如此重要?
想象一下,你的AI助手在关键时刻告诉用户"抱歉,出了点问题",这会是多么尴尬!在MCP开发中,测试和调试不仅仅是找bug,更是确保:
- 🛡️ 可靠性:工具在各种情况下都能正常工作
- 🚀 性能:响应快速,不会让用户等待太久
- 🔒 安全性:防止恶意输入导致系统问题
- 📊 用户体验:提供清晰、有用的错误信息
🧪 测试策略全景图
🧩 单元测试实现
📋 测试环境配置
首先,让我们创建一个完整的测试配置文件 pytest.ini
:
ini
[tool:pytest]
# 测试目录和文件模式
testpaths = tests
python_files = test_*.py *_test.py
python_classes = Test*
python_functions = test_*
# 输出配置
addopts =
-v # 详细输出
--tb=short # 简短的错误追踪
--strict-markers # 严格标记模式
--strict-config # 严格配置检查
--color=yes # 彩色输出
--durations=10 # 显示最慢的10个测试
--cov=. # 代码覆盖率
--cov-report=html # HTML覆盖率报告
--cov-report=term # 终端覆盖率报告
--cov-fail-under=80 # 覆盖率低于80%时失败
# 测试标记
markers =
unit: 单元测试
integration: 集成测试
e2e: 端到端测试
slow: 慢速测试
network: 需要网络连接的测试
file_io: 文件I/O测试
security: 安全相关测试
# 过滤警告
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning
# 异步测试配置
asyncio_mode = auto
🛠️ 工具单元测试
创建 tests/test_tools.py
:
python
"""
test_tools.py - 工具单元测试
"""
import pytest
import tempfile
import json
from pathlib import Path
from unittest.mock import AsyncMock, patch, mock_open
# 导入要测试的工具
from tools import RandomNumberTool, CurrentTimeTool, CalculatorTool
from file_tools import ReadFileTool, WriteFileTool, ListDirectoryTool
from data_tools import JsonProcessorTool, TextProcessorTool
class TestBasicTools:
"""基础工具测试"""
@pytest.mark.unit
@pytest.mark.asyncio
async def test_random_number_tool_basic(self):
"""测试随机数生成工具基本功能"""
tool = RandomNumberTool()
# 测试默认参数
result = await tool.execute({})
assert result["success"] is True
assert "result" in result
assert 1 <= result["result"] <= 100
assert "metadata" in result
assert result["metadata"]["min"] == 1
assert result["metadata"]["max"] == 100
@pytest.mark.unit
@pytest.mark.asyncio
async def test_random_number_tool_custom_range(self):
"""测试自定义范围"""
tool = RandomNumberTool()
result = await tool.execute({"min": 50, "max": 60})
assert result["success"] is True
assert 50 <= result["result"] <= 60
assert result["metadata"]["range_size"] == 11
@pytest.mark.unit
@pytest.mark.asyncio
async def test_random_number_tool_invalid_params(self):
"""测试无效参数"""
tool = RandomNumberTool()
# 测试min >= max的情况
result = await tool.execute({"min": 100, "max": 50})
assert result["success"] is False
assert "error" in result
assert "min必须小于max" in result["error"]
@pytest.mark.unit
@pytest.mark.asyncio
async def test_calculator_tool_operations(self):
"""测试计算器工具各种操作"""
tool = CalculatorTool()
# 测试加法
result = await tool.execute({
"operation": "add",
"a": 10,
"b": 5
})
assert result["success"] is True
assert result["result"] == 15
# 测试减法
result = await tool.execute({
"operation": "subtract",
"a": 10,
"b": 3
})
assert result["success"] is True
assert result["result"] == 7
# 测试乘法
result = await tool.execute({
"operation": "multiply",
"a": 4,
"b": 6
})
assert result["success"] is True
assert result["result"] == 24
# 测试除法
result = await tool.execute({
"operation": "divide",
"a": 15,
"b": 3
})
assert result["success"] is True
assert result["result"] == 5
@pytest.mark.unit
@pytest.mark.asyncio
async def test_calculator_tool_division_by_zero(self):
"""测试除零错误"""
tool = CalculatorTool()
result = await tool.execute({
"operation": "divide",
"a": 10,
"b": 0
})
assert result["success"] is False
assert "除数不能为零" in result["error"]
@pytest.mark.unit
@pytest.mark.asyncio
async def test_current_time_tool(self):
"""测试时间工具"""
tool = CurrentTimeTool()
result = await tool.execute({
"format": "%Y-%m-%d",
"timezone": "UTC"
})
assert result["success"] is True
assert "result" in result
assert "metadata" in result
assert result["metadata"]["format"] == "%Y-%m-%d"
assert result["metadata"]["timezone"] == "UTC"
class TestFileTools:
"""文件操作工具测试"""
@pytest.mark.unit
@pytest.mark.file_io
@pytest.mark.asyncio
async def test_write_and_read_file(self):
"""测试文件写入和读取"""
# 使用临时目录
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "test.txt"
test_content = "Hello, MCP Test World!\n这是测试内容。"
# 测试写入
write_tool = WriteFileTool()
# 模拟安全目录检查
write_tool.safe_directories = {temp_dir}
write_result = await write_tool.execute({
"file_path": str(test_file),
"content": test_content,
"encoding": "utf-8"
})
assert write_result["success"] is True
assert Path(write_result["result"]).exists()
# 测试读取
read_tool = ReadFileTool()
read_result = await read_tool.execute({
"file_path": str(test_file),
"encoding": "utf-8"
})
assert read_result["success"] is True
assert read_result["result"] == test_content
assert read_result["metadata"]["char_count"] == len(test_content)
@pytest.mark.unit
@pytest.mark.file_io
@pytest.mark.asyncio
async def test_read_nonexistent_file(self):
"""测试读取不存在的文件"""
tool = ReadFileTool()
result = await tool.execute({
"file_path": "/nonexistent/file.txt"
})
assert result["success"] is False
assert "文件不存在" in result["error"]
@pytest.mark.unit
@pytest.mark.file_io
@pytest.mark.asyncio
async def test_list_directory(self):
"""测试目录列表功能"""
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# 创建测试文件和目录
(temp_path / "file1.txt").write_text("test1")
(temp_path / "file2.json").write_text('{"test": true}')
(temp_path / "subdir").mkdir()
(temp_path / "subdir" / "file3.txt").write_text("test3")
# 测试非递归列表
tool = ListDirectoryTool()
result = await tool.execute({
"directory_path": str(temp_path),
"recursive": False
})
assert result["success"] is True
assert result["metadata"]["total_items"] == 3 # 2文件 + 1目录
assert result["metadata"]["file_count"] == 2
assert result["metadata"]["directory_count"] == 1
# 测试递归列表
recursive_result = await tool.execute({
"directory_path": str(temp_path),
"recursive": True,
"max_depth": 2
})
assert recursive_result["success"] is True
assert recursive_result["metadata"]["total_items"] == 4 # 包括子目录文件
class TestDataTools:
"""数据处理工具测试"""
@pytest.mark.unit
@pytest.mark.asyncio
async def test_json_processor_parse(self):
"""测试JSON解析"""
tool = JsonProcessorTool()
test_json = '{"name": "张三", "age": 30, "hobbies": ["reading", "coding"]}'
result = await tool.execute({
"operation": "parse",
"data": test_json
})
assert result["success"] is True
assert result["result"]["name"] == "张三"
assert result["result"]["age"] == 30
assert len(result["result"]["hobbies"]) == 2
@pytest.mark.unit
@pytest.mark.asyncio
async def test_json_processor_format(self):
"""测试JSON格式化"""
tool = JsonProcessorTool()
test_json = '{"name":"张三","age":30}'
result = await tool.execute({
"operation": "format",
"data": test_json,
"indent": 4
})
assert result["success"] is True
assert " " in result["result"] # 检查缩进
assert '"name": "张三"' in result["result"]
@pytest.mark.unit
@pytest.mark.asyncio
async def test_json_processor_extract_path(self):
"""测试JSON路径提取"""
tool = JsonProcessorTool()
test_json = '{"user": {"profile": {"name": "张三", "settings": {"theme": "dark"}}}}'
result = await tool.execute({
"operation": "extract",
"data": test_json,
"path": "user.profile.name"
})
assert result["success"] is True
assert result["result"] == "张三"
@pytest.mark.unit
@pytest.mark.asyncio
async def test_json_processor_invalid_json(self):
"""测试无效JSON"""
tool = JsonProcessorTool()
result = await tool.execute({
"operation": "parse",
"data": '{"invalid": json,}'
})
assert result["success"] is False
assert "JSON格式错误" in result["error"]
@pytest.mark.unit
@pytest.mark.asyncio
async def test_text_processor_count(self):
"""测试文本统计"""
tool = TextProcessorTool()
test_text = "Hello world!\nThis is a test.\n\nAnother paragraph."
result = await tool.execute({
"operation": "count",
"text": test_text
})
assert result["success"] is True
stats = result["result"]
assert stats["words"] == 8
assert stats["lines"] == 4
assert stats["paragraphs"] == 2
@pytest.mark.unit
@pytest.mark.asyncio
async def test_text_processor_search(self):
"""测试文本搜索"""
tool = TextProcessorTool()
test_text = "Hello world! Hello MCP! Hello everyone!"
result = await tool.execute({
"operation": "search",
"text": test_text,
"pattern": "Hello"
})
assert result["success"] is True
assert result["result"]["count"] == 3
assert len(result["result"]["positions"]) == 3
@pytest.mark.unit
@pytest.mark.asyncio
async def test_text_processor_replace(self):
"""测试文本替换"""
tool = TextProcessorTool()
test_text = "Hello world! Hello MCP!"
result = await tool.execute({
"operation": "replace",
"text": test_text,
"pattern": "Hello",
"replacement": "Hi"
})
assert result["success"] is True
assert result["result"]["text"] == "Hi world! Hi MCP!"
assert result["result"]["replaced_count"] == 2
class TestToolManager:
"""工具管理器测试"""
@pytest.mark.unit
def test_tool_registration(self):
"""测试工具注册"""
from tools import ToolManager
manager = ToolManager()
# 检查默认工具是否注册
assert "random_number" in manager.tools
assert "current_time" in manager.tools
assert "calculator" in manager.tools
# 检查工具列表
tools_list = manager.list_tools()
assert "tools" in tools_list
assert len(tools_list["tools"]) > 0
@pytest.mark.unit
@pytest.mark.asyncio
async def test_tool_execution(self):
"""测试工具执行"""
from tools import ToolManager
manager = ToolManager()
# 测试成功的工具调用
result = await manager.execute_tool("calculator", {
"operation": "add",
"a": 5,
"b": 3
})
assert result["success"] is True
assert result["result"] == 8
@pytest.mark.unit
@pytest.mark.asyncio
async def test_nonexistent_tool(self):
"""测试调用不存在的工具"""
from tools import ToolManager
manager = ToolManager()
result = await manager.execute_tool("nonexistent_tool", {})
assert result["success"] is False
assert "未找到工具" in result["error"]
🔗 集成测试实现
创建 tests/test_integration.py
:
python
"""
test_integration.py - 集成测试
"""
import pytest
import asyncio
import json
import tempfile
from pathlib import Path
from unittest.mock import patch, AsyncMock
from server import MCPServer
from config import config
class TestMCPServerIntegration:
"""MCP服务器集成测试"""
@pytest.fixture
async def mcp_server(self):
"""MCP服务器测试fixture"""
server = MCPServer()
yield server
@pytest.mark.integration
@pytest.mark.asyncio
async def test_server_initialization(self, mcp_server):
"""测试服务器初始化流程"""
request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"clientInfo": {
"name": "测试客户端",
"version": "1.0.0"
}
}
}
response = await mcp_server.handle_request(request)
assert response["jsonrpc"] == "2.0"
assert response["id"] == 1
assert "result" in response
result = response["result"]
assert result["protocolVersion"] == "2024-11-05"
assert "capabilities" in result
assert "serverInfo" in result
assert result["serverInfo"]["name"] == config.name
@pytest.mark.integration
@pytest.mark.asyncio
async def test_tools_list_integration(self, mcp_server):
"""测试工具列表集成"""
request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
response = await mcp_server.handle_request(request)
assert response["jsonrpc"] == "2.0"
assert "result" in response
tools = response["result"]["tools"]
assert len(tools) > 0
# 检查工具格式
for tool in tools:
assert "name" in tool
assert "description" in tool
assert "inputSchema" in tool
@pytest.mark.integration
@pytest.mark.asyncio
async def test_tool_call_chain(self, mcp_server):
"""测试完整的工具调用链"""
# 1. 初始化
init_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {"tools": {}},
"clientInfo": {"name": "test", "version": "1.0"}
}
}
init_response = await mcp_server.handle_request(init_request)
assert "result" in init_response
# 2. 获取工具列表
list_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/list",
"params": {}
}
list_response = await mcp_server.handle_request(list_request)
tools = list_response["result"]["tools"]
# 3. 调用具体工具
calculator_found = any(tool["name"] == "calculator" for tool in tools)
assert calculator_found, "计算器工具应该在工具列表中"
call_request = {
"jsonrpc": "2.0",
"id": 3,
"method": "tools/call",
"params": {
"name": "calculator",
"arguments": {
"operation": "multiply",
"a": 6,
"b": 7
}
}
}
call_response = await mcp_server.handle_request(call_request)
assert "result" in call_response
assert call_response["result"]["isError"] is False
@pytest.mark.integration
@pytest.mark.asyncio
async def test_error_handling_integration(self, mcp_server):
"""测试错误处理集成"""
# 测试不存在的方法
invalid_method_request = {
"jsonrpc": "2.0",
"id": 1,
"method": "nonexistent/method",
"params": {}
}
response = await mcp_server.handle_request(invalid_method_request)
assert "error" in response
assert response["error"]["code"] == -32601
assert "方法未找到" in response["error"]["message"]
# 测试工具调用错误
invalid_tool_request = {
"jsonrpc": "2.0",
"id": 2,
"method": "tools/call",
"params": {
"name": "nonexistent_tool",
"arguments": {}
}
}
response = await mcp_server.handle_request(invalid_tool_request)
assert "error" in response
assert response["error"]["code"] == -32603
class TestFileOperationsIntegration:
"""文件操作集成测试"""
@pytest.mark.integration
@pytest.mark.file_io
@pytest.mark.asyncio
async def test_file_workflow(self):
"""测试完整的文件操作工作流"""
from tools import tool_manager
with tempfile.TemporaryDirectory() as temp_dir:
test_file = Path(temp_dir) / "integration_test.txt"
test_content = "Integration test content\n集成测试内容"
# 1. 写入文件
write_result = await tool_manager.execute_tool("write_file", {
"file_path": str(test_file),
"content": test_content
})
# 模拟安全目录检查通过
assert write_result["success"] is True or "重定向到安全路径" in write_result.get("message", "")
# 2. 读取文件 (使用实际写入的路径)
if write_result["success"]:
actual_path = write_result["result"]
read_result = await tool_manager.execute_tool("read_file", {
"file_path": actual_path
})
assert read_result["success"] is True
# 内容应该匹配
assert test_content in read_result["result"]
# 3. 列出目录
dir_to_list = str(Path(write_result["result"]).parent) if write_result["success"] else temp_dir
list_result = await tool_manager.execute_tool("list_directory", {
"directory_path": dir_to_list
})
assert list_result["success"] is True
assert list_result["metadata"]["file_count"] >= 1
class TestNetworkIntegration:
"""网络操作集成测试"""
@pytest.mark.integration
@pytest.mark.network
@pytest.mark.asyncio
async def test_http_request_integration(self):
"""测试HTTP请求集成(需要网络连接)"""
from tools import tool_manager
# 使用可靠的测试API
result = await tool_manager.execute_tool("http_request", {
"url": "https://httpbin.org/json",
"method": "GET"
})
if result["success"]:
assert result["result"]["status_code"] == 200
assert "slideshow" in result["result"]["json"]
else:
# 网络不可用时跳过
pytest.skip("网络连接不可用")
🐛 调试工具和技巧
🔧 创建调试中间件
创建 debug_middleware.py
:
python
"""
debug_middleware.py - 调试中间件
"""
import asyncio
import json
import time
import functools
from typing import Dict, Any, Callable
from loguru import logger
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.syntax import Syntax
console = Console()
class DebugMiddleware:
"""调试中间件"""
def __init__(self, enabled: bool = True):
self.enabled = enabled
self.request_count = 0
self.request_times = []
self.error_count = 0
def log_request(self, request: Dict[str, Any]):
"""记录请求"""
if not self.enabled:
return
self.request_count += 1
# 创建请求表格
table = Table(title=f"📨 MCP Request #{self.request_count}")
table.add_column("字段", style="cyan")
table.add_column("值", style="white")
table.add_row("JSON-RPC", request.get("jsonrpc", "N/A"))
table.add_row("ID", str(request.get("id", "N/A")))
table.add_row("方法", request.get("method", "N/A"))
# 格式化参数
params = request.get("params", {})
if params:
params_json = json.dumps(params, indent=2, ensure_ascii=False)
table.add_row("参数", params_json[:200] + "..." if len(params_json) > 200 else params_json)
console.print(table)
def log_response(self, response: Dict[str, Any], duration: float):
"""记录响应"""
if not self.enabled:
return
self.request_times.append(duration)
# 创建响应表格
table = Table(title=f"📤 MCP Response (耗时: {duration:.3f}s)")
table.add_column("字段", style="green")
table.add_column("值", style="white")
table.add_row("JSON-RPC", response.get("jsonrpc", "N/A"))
table.add_row("ID", str(response.get("id", "N/A")))
if "result" in response:
table.add_row("状态", "✅ 成功")
result = response["result"]
if isinstance(result, dict) and len(str(result)) < 300:
result_json = json.dumps(result, indent=2, ensure_ascii=False)
table.add_row("结果", result_json)
else:
table.add_row("结果", f"[{type(result).__name__}] {str(result)[:100]}...")
if "error" in response:
self.error_count += 1
table.add_row("状态", "❌ 错误")
error = response["error"]
table.add_row("错误码", str(error.get("code", "N/A")))
table.add_row("错误信息", error.get("message", "N/A"))
console.print(table)
def log_tool_execution(self, tool_name: str, params: Dict[str, Any], result: Dict[str, Any], duration: float):
"""记录工具执行"""
if not self.enabled:
return
status = "✅ 成功" if result.get("success") else "❌ 失败"
panel_content = f"""
🛠️ 工具名称: {tool_name}
⏱️ 执行时间: {duration:.3f}s
📊 状态: {status}
📥 参数: {json.dumps(params, ensure_ascii=False, indent=2)[:200]}...
📤 结果: {str(result.get('result', result.get('error', 'N/A')))[:200]}...
""".strip()
panel = Panel(
panel_content,
title=f"🔧 工具执行: {tool_name}",
border_style="green" if result.get("success") else "red"
)
console.print(panel)
def print_statistics(self):
"""打印统计信息"""
if not self.enabled or not self.request_times:
return
avg_time = sum(self.request_times) / len(self.request_times)
max_time = max(self.request_times)
min_time = min(self.request_times)
stats_table = Table(title="📊 调试统计")
stats_table.add_column("指标", style="cyan")
stats_table.add_column("值", style="white")
stats_table.add_row("总请求数", str(self.request_count))
stats_table.add_row("错误请求数", str(self.error_count))
stats_table.add_row("成功率", f"{((self.request_count - self.error_count) / self.request_count * 100):.1f}%")
stats_table.add_row("平均响应时间", f"{avg_time:.3f}s")
stats_table.add_row("最快响应时间", f"{min_time:.3f}s")
stats_table.add_row("最慢响应时间", f"{max_time:.3f}s")
console.print(stats_table)
# 全局调试中间件实例
debug_middleware = DebugMiddleware(enabled=True) # 可以通过配置控制
def debug_tool_execution(func: Callable):
"""工具执行调试装饰器"""
@functools.wraps(func)
async def wrapper(self, params: Dict[str, Any]):
start_time = time.time()
try:
result = await func(self, params)
duration = time.time() - start_time
debug_middleware.log_tool_execution(
self.name, params, result, duration
)
return result
except Exception as e:
duration = time.time() - start_time
error_result = {"success": False, "error": str(e)}
debug_middleware.log_tool_execution(
self.name, params, error_result, duration
)
raise
return wrapper
def debug_server_request(func: Callable):
"""服务器请求调试装饰器"""
@functools.wraps(func)
async def wrapper(self, request: Dict[str, Any]):
start_time = time.time()
debug_middleware.log_request(request)
try:
response = await func(self, request)
duration = time.time() - start_time
debug_middleware.log_response(response, duration)
return response
except Exception as e:
duration = time.time() - start_time
error_response = {
"jsonrpc": "2.0",
"id": request.get("id"),
"error": {
"code": -32603,
"message": "内部错误",
"data": str(e)
}
}
debug_middleware.log_response(error_response, duration)
raise
return wrapper
🎯 性能分析工具
创建 performance_profiler.py
:
python
"""
performance_profiler.py - 性能分析工具
"""
import asyncio
import time
import psutil
import functools
from typing import Dict, Any, List, Callable
from dataclasses import dataclass
from collections import defaultdict
import json
@dataclass
class PerformanceMetric:
"""性能指标"""
name: str
duration: float
memory_before: float
memory_after: float
cpu_percent: float
timestamp: float
class PerformanceProfiler:
"""性能分析器"""
def __init__(self):
self.metrics: List[PerformanceMetric] = []
self.tool_metrics: Dict[str, List[float]] = defaultdict(list)
self.method_metrics: Dict[str, List[float]] = defaultdict(list)
self.process = psutil.Process()
def record_metric(self, name: str, duration: float, memory_before: float, memory_after: float):
"""记录性能指标"""
metric = PerformanceMetric(
name=name,
duration=duration,
memory_before=memory_before,
memory_after=memory_after,
cpu_percent=self.process.cpu_percent(),
timestamp=time.time()
)
self.metrics.append(metric)
# 分类记录
if name.startswith("tool:"):
tool_name = name.replace("tool:", "")
self.tool_metrics[tool_name].append(duration)
elif name.startswith("method:"):
method_name = name.replace("method:", "")
self.method_metrics[method_name].append(duration)
def get_tool_stats(self, tool_name: str) -> Dict[str, Any]:
"""获取工具性能统计"""
durations = self.tool_metrics.get(tool_name, [])
if not durations:
return {"error": "没有找到该工具的性能数据"}
return {
"tool_name": tool_name,
"call_count": len(durations),
"avg_duration": sum(durations) / len(durations),
"min_duration": min(durations),
"max_duration": max(durations),
"total_duration": sum(durations)
}
def get_overall_stats(self) -> Dict[str, Any]:
"""获取整体性能统计"""
if not self.metrics:
return {"error": "没有性能数据"}
durations = [m.duration for m in self.metrics]
memory_usage = [m.memory_after - m.memory_before for m in self.metrics]
return {
"total_calls": len(self.metrics),
"avg_duration": sum(durations) / len(durations),
"total_duration": sum(durations),
"avg_memory_change": sum(memory_usage) / len(memory_usage),
"max_memory_change": max(memory_usage),
"tool_count": len(self.tool_metrics),
"method_count": len(self.method_metrics)
}
def export_report(self, filename: str = "performance_report.json"):
"""导出性能报告"""
report = {
"timestamp": time.time(),
"overall_stats": self.get_overall_stats(),
"tool_stats": {
tool: self.get_tool_stats(tool)
for tool in self.tool_metrics.keys()
},
"method_stats": {
method: {
"call_count": len(durations),
"avg_duration": sum(durations) / len(durations),
"total_duration": sum(durations)
}
for method, durations in self.method_metrics.items()
},
"detailed_metrics": [
{
"name": m.name,
"duration": m.duration,
"memory_change": m.memory_after - m.memory_before,
"cpu_percent": m.cpu_percent,
"timestamp": m.timestamp
}
for m in self.metrics
]
}
with open(filename, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
return filename
# 全局性能分析器
profiler = PerformanceProfiler()
def profile_performance(name_prefix: str):
"""性能分析装饰器"""
def decorator(func: Callable):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
memory_before = psutil.Process().memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
try:
result = await func(*args, **kwargs)
return result
finally:
end_time = time.time()
memory_after = psutil.Process().memory_info().rss / 1024 / 1024 # MB
duration = end_time - start_time
# 获取函数名或对象名
if hasattr(args[0], 'name'): # 工具对象
name = f"{name_prefix}:{args[0].name}"
else:
name = f"{name_prefix}:{func.__name__}"
profiler.record_metric(name, duration, memory_before, memory_after)
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
memory_before = psutil.Process().memory_info().rss / 1024 / 1024 # MB
start_time = time.time()
try:
result = func(*args, **kwargs)
return result
finally:
end_time = time.time()
memory_after = psutil.Process().memory_info().rss / 1024 / 1024 # MB
duration = end_time - start_time
if hasattr(args[0], 'name'):
name = f"{name_prefix}:{args[0].name}"
else:
name = f"{name_prefix}:{func.__name__}"
profiler.record_metric(name, duration, memory_before, memory_after)
# 检查是否是异步函数
if asyncio.iscoroutinefunction(func):
return async_wrapper
else:
return sync_wrapper
return decorator
🚀 集成调试工具到服务器
更新 server.py
来集成调试功能:
python
# 在server.py中添加调试支持
from debug_middleware import debug_server_request, debug_middleware
from performance_profiler import profile_performance, profiler
class MCPServer:
# ... 现有代码 ...
@debug_server_request
@profile_performance("method")
async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
"""处理MCP请求 - 带调试支持"""
# 现有的处理逻辑
# ...
def display_debug_info(self):
"""显示调试信息"""
if config.debug:
console.print("\n🐛 [bold yellow]调试模式已启用[/bold yellow]")
debug_middleware.print_statistics()
# 显示性能统计
stats = profiler.get_overall_stats()
if "error" not in stats:
console.print(f"📊 平均响应时间: {stats['avg_duration']:.3f}s")
console.print(f"📊 总调用次数: {stats['total_calls']}")
async def start(self):
"""启动服务器 - 带调试支持"""
self.display_welcome()
if config.debug:
self.display_debug_info()
try:
if config.protocol == "stdio":
await self.run_stdio()
except KeyboardInterrupt:
logger.info("服务器正在关闭...")
# 导出性能报告
if config.debug:
report_file = profiler.export_report()
console.print(f"📄 性能报告已保存: {report_file}")
finally:
if config.debug:
debug_middleware.print_statistics()
🧪 运行完整测试套件
创建测试运行脚本 run_tests.py
:
python
#!/usr/bin/env python3
"""
run_tests.py - 测试运行脚本
"""
import subprocess
import sys
import os
from pathlib import Path
def run_command(cmd, description):
"""运行命令并显示结果"""
print(f"\n🔍 {description}")
print("=" * 50)
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
print("✅ 通过")
if result.stdout:
print(result.stdout)
else:
print("❌ 失败")
if result.stderr:
print(result.stderr)
if result.stdout:
print(result.stdout)
return result.returncode == 0
def main():
"""主测试函数"""
print("🧪 开始运行MCP服务器测试套件")
# 确保在正确的目录
os.chdir(Path(__file__).parent)
# 测试命令列表
tests = [
("python -m pytest tests/test_tools.py -v", "单元测试"),
("python -m pytest tests/test_integration.py -v", "集成测试"),
("python -m pytest tests/ -v --cov=. --cov-report=term", "覆盖率测试"),
("python -m pytest tests/ -m 'not network' -v", "离线测试"),
("python test_advanced_tools.py", "高级工具测试"),
("python -m flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics", "代码质量检查"),
]
results = []
for cmd, description in tests:
success = run_command(cmd, description)
results.append((description, success))
# 显示总结
print("\n" + "=" * 60)
print("📊 测试结果总结")
print("=" * 60)
passed = 0
for description, success in results:
status = "✅ 通过" if success else "❌ 失败"
print(f"{description}: {status}")
if success:
passed += 1
print(f"\n通过率: {passed}/{len(results)} ({passed/len(results)*100:.1f}%)")
if passed == len(results):
print("🎉 所有测试都通过了!")
return 0
else:
print("⚠️ 存在测试失败,请检查上述输出")
return 1
if __name__ == "__main__":
sys.exit(main())
🎯 本节小结
通过这一小节,你已经构建了一个完整的测试和调试体系:
✅ 单元测试:测试个别工具和组件的功能
✅ 集成测试:测试组件间的协作和数据流
✅ 调试中间件:实时监控请求和响应
✅ 性能分析:监控响应时间和资源使用
✅ 自动化测试:一键运行所有测试
🛠️ 调试最佳实践
- 日志分级:使用不同级别的日志记录不同重要性的信息
- 异常捕获:在关键位置捕获和记录异常
- 性能监控:监控关键操作的性能指标
- 测试驱动:先写测试,再实现功能
- 持续集成:定期运行测试套件
🚀 运行测试
bash
# 运行所有测试
python run_tests.py
# 运行特定类型的测试
pytest tests/test_tools.py -v -m unit
pytest tests/test_integration.py -v -m integration
# 启用调试模式运行服务器
MCP_DEBUG=true python server.py
现在你的MCP服务器不仅功能强大,还具备了专业级的测试和调试能力!