4.4 客户端集成与测试
🎯 学习目标:学习如何创建MCP客户端并与服务器进行交互
⏱️ 预计时间:35分钟
📊 难度等级:⭐⭐
🎭 客户端的角色
如果MCP服务器是"服务员",那么MCP客户端就是"顾客"。客户端负责:
- 🤝 与服务器建立连接
- 📋 发现服务器能提供什么服务
- 🛎️ 发起具体的服务请求
- 📤 处理服务器的响应
🏗️ 客户端架构设计
📊 客户端架构图
🔧 核心组件
组件 | 功能 | 重要性 |
---|---|---|
连接管理器 | 管理与服务器的连接状态 | ⭐⭐⭐⭐⭐ |
消息处理器 | 处理JSON-RPC消息的序列化和反序列化 | ⭐⭐⭐⭐⭐ |
工具调用器 | 执行对服务器工具的调用 | ⭐⭐⭐⭐ |
资源管理器 | 管理从服务器获取的资源 | ⭐⭐⭐ |
💻 多语言客户端实现
🐍 Python客户端
python
# requirements.txt
"""
mcp>=1.0.0
asyncio
websockets
"""
# client.py
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MCPCalculatorClient:
def __init__(self):
self.session = None
self.client = None
async def connect(self):
"""连接到MCP服务器"""
try:
# 配置服务器参数
server_params = StdioServerParameters(
command="python",
args=["server.py"]
)
# 建立连接
self.client = stdio_client(server_params)
self.session = await self.client.__aenter__()
# 初始化会话
await self.session.initialize()
print("✅ 成功连接到MCP服务器")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
async def list_tools(self):
"""获取服务器可用工具列表"""
try:
result = await self.session.list_tools()
print("🛠️ 可用工具:")
for tool in result.tools:
print(f" - {tool.name}: {tool.description}")
return result.tools
except Exception as e:
print(f"❌ 获取工具列表失败: {e}")
return []
async def call_tool(self, name: str, arguments: dict):
"""调用指定工具"""
try:
print(f"🔧 调用工具: {name}")
print(f"📝 参数: {arguments}")
result = await self.session.call_tool(name, arguments)
print(f"✅ 调用成功")
print(f"📊 结果: {result.content}")
return result
except Exception as e:
print(f"❌ 工具调用失败: {e}")
return None
async def disconnect(self):
"""断开连接"""
if self.client:
await self.client.__aexit__(None, None, None)
print("🔌 已断开连接")
async def main():
"""主函数:演示客户端使用"""
client = MCPCalculatorClient()
# 连接服务器
if not await client.connect():
return
try:
# 获取工具列表
tools = await client.list_tools()
if not tools:
print("⚠️ 服务器没有可用工具")
return
# 测试各种工具
test_cases = [
{
"name": "add",
"arguments": {"a": 10, "b": 5}
},
{
"name": "multiply",
"arguments": {"a": 7, "b": 8}
},
{
"name": "get_random_number",
"arguments": {"min": 1, "max": 100}
},
{
"name": "get_current_time",
"arguments": {}
}
]
print("\n🧪 开始测试工具调用...")
for test in test_cases:
print(f"\n--- 测试 {test['name']} ---")
await client.call_tool(test["name"], test["arguments"])
await asyncio.sleep(1) # 避免过于频繁的调用
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(main())
🌐 JavaScript/TypeScript客户端
typescript
// package.json
{
"name": "mcp-calculator-client",
"version": "1.0.0",
"type": "module",
"dependencies": {
"@modelcontextprotocol/sdk": "^1.0.0",
"ws": "^8.0.0"
},
"scripts": {
"start": "node client.js",
"dev": "nodemon client.js"
}
}
// client.ts
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';
class MCPCalculatorClient {
private client: Client | null = null;
private transport: StdioClientTransport | null = null;
async connect(): Promise<boolean> {
try {
// 启动服务器进程
const serverProcess = spawn('node', ['server.js']);
// 创建传输层
this.transport = new StdioClientTransport({
command: 'node',
args: ['server.js']
});
// 创建客户端
this.client = new Client({
name: "calculator-client",
version: "1.0.0"
}, {
capabilities: {}
});
// 连接到服务器
await this.client.connect(this.transport);
console.log("✅ 成功连接到MCP服务器");
return true;
} catch (error) {
console.error("❌ 连接失败:", error);
return false;
}
}
async listTools(): Promise<any[]> {
try {
if (!this.client) {
throw new Error("客户端未连接");
}
const result = await this.client.listTools();
console.log("🛠️ 可用工具:");
result.tools.forEach(tool => {
console.log(` - ${tool.name}: ${tool.description}`);
});
return result.tools;
} catch (error) {
console.error("❌ 获取工具列表失败:", error);
return [];
}
}
async callTool(name: string, arguments: Record<string, any>): Promise<any> {
try {
if (!this.client) {
throw new Error("客户端未连接");
}
console.log(`🔧 调用工具: ${name}`);
console.log(`📝 参数:`, arguments);
const result = await this.client.callTool({
name,
arguments
});
console.log("✅ 调用成功");
console.log("📊 结果:", result.content);
return result;
} catch (error) {
console.error(`❌ 工具调用失败 (${name}):`, error);
return null;
}
}
async disconnect(): Promise<void> {
try {
if (this.client) {
await this.client.close();
}
if (this.transport) {
await this.transport.close();
}
console.log("🔌 已断开连接");
} catch (error) {
console.error("❌ 断开连接时出错:", error);
}
}
}
// 主函数
async function main() {
const client = new MCPCalculatorClient();
// 连接服务器
if (!await client.connect()) {
return;
}
try {
// 获取工具列表
const tools = await client.listTools();
if (tools.length === 0) {
console.log("⚠️ 服务器没有可用工具");
return;
}
// 测试用例
const testCases = [
{
name: "add",
arguments: { a: 15, b: 25 }
},
{
name: "multiply",
arguments: { a: 6, b: 7 }
},
{
name: "get_random_number",
arguments: { min: 1, max: 50 }
},
{
name: "get_current_time",
arguments: {}
}
];
console.log("\n🧪 开始测试工具调用...");
for (const test of testCases) {
console.log(`\n--- 测试 ${test.name} ---`);
await client.callTool(test.name, test.arguments);
// 添加延迟避免过于频繁调用
await new Promise(resolve => setTimeout(resolve, 1000));
}
} finally {
await client.disconnect();
}
}
// 运行客户端
main().catch(console.error);
🔷 C# 客户端
csharp
// McpCalculatorClient.csproj
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MCP.NET" Version="1.0.0" />
<PackageReference Include="System.Text.Json" Version="8.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
</ItemGroup>
</Project>
// Program.cs
using System;
using System.Threading.Tasks;
using System.Text.Json;
using MCP.NET;
using Microsoft.Extensions.Logging;
namespace McpCalculatorClient
{
public class MCPCalculatorClient
{
private readonly ILogger<MCPCalculatorClient> _logger;
private McpClient? _client;
public MCPCalculatorClient()
{
var loggerFactory = LoggerFactory.Create(builder =>
builder.AddConsole().SetMinimumLevel(LogLevel.Information));
_logger = loggerFactory.CreateLogger<MCPCalculatorClient>();
}
public async Task<bool> ConnectAsync()
{
try
{
// 配置服务器连接
var serverConfig = new McpServerConfig
{
Command = "dotnet",
Args = new[] { "run", "--project", "../McpCalculatorServer" },
WorkingDirectory = Environment.CurrentDirectory
};
// 创建客户端
_client = new McpClient(serverConfig);
// 建立连接
await _client.ConnectAsync();
_logger.LogInformation("✅ 成功连接到MCP服务器");
return true;
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 连接失败");
return false;
}
}
public async Task<List<McpTool>> ListToolsAsync()
{
try
{
if (_client == null)
{
throw new InvalidOperationException("客户端未连接");
}
var tools = await _client.ListToolsAsync();
_logger.LogInformation("🛠️ 可用工具:");
foreach (var tool in tools)
{
_logger.LogInformation($" - {tool.Name}: {tool.Description}");
}
return tools;
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 获取工具列表失败");
return new List<McpTool>();
}
}
public async Task<McpToolResult?> CallToolAsync(string name, object arguments)
{
try
{
if (_client == null)
{
throw new InvalidOperationException("客户端未连接");
}
_logger.LogInformation($"🔧 调用工具: {name}");
_logger.LogInformation($"📝 参数: {JsonSerializer.Serialize(arguments)}");
var result = await _client.CallToolAsync(name, arguments);
_logger.LogInformation("✅ 调用成功");
_logger.LogInformation($"📊 结果: {JsonSerializer.Serialize(result.Content)}");
return result;
}
catch (Exception ex)
{
_logger.LogError(ex, $"❌ 工具调用失败 ({name})");
return null;
}
}
public async Task DisconnectAsync()
{
try
{
if (_client != null)
{
await _client.DisconnectAsync();
_client.Dispose();
}
_logger.LogInformation("🔌 已断开连接");
}
catch (Exception ex)
{
_logger.LogError(ex, "❌ 断开连接时出错");
}
}
}
class Program
{
static async Task Main(string[] args)
{
var client = new MCPCalculatorClient();
// 连接服务器
if (!await client.ConnectAsync())
{
return;
}
try
{
// 获取工具列表
var tools = await client.ListToolsAsync();
if (tools.Count == 0)
{
Console.WriteLine("⚠️ 服务器没有可用工具");
return;
}
// 测试用例
var testCases = new[]
{
new { name = "add", arguments = new { a = 20, b = 30 } },
new { name = "multiply", arguments = new { a = 4, b = 9 } },
new { name = "get_random_number", arguments = new { min = 1, max = 10 } },
new { name = "get_current_time", arguments = new { } }
};
Console.WriteLine("\n🧪 开始测试工具调用...");
foreach (var test in testCases)
{
Console.WriteLine($"\n--- 测试 {test.name} ---");
await client.CallToolAsync(test.name, test.arguments);
await Task.Delay(1000); // 避免过于频繁调用
}
}
finally
{
await client.DisconnectAsync();
}
}
}
}
🔍 MCP Inspector使用指南
MCP Inspector是官方提供的调试工具,就像是MCP世界的"万能遥控器"。
🚀 启动Inspector
bash
# 安装MCP Inspector (一次性安装)
npm install -g @modelcontextprotocol/inspector
# 启动Inspector连接到Python服务器
mcp-inspector python server.py
# 启动Inspector连接到Node.js服务器
mcp-inspector node server.js
# 启动Inspector连接到.NET服务器
mcp-inspector dotnet run --project McpCalculatorServer
🖥️ Inspector界面详解
当你启动Inspector后,会看到一个网页界面:
🌐 MCP Inspector - http://localhost:5173
┌─────────────────────────────────────────┐
│ 🔌 连接状态: ✅ 已连接 │
│ 📡 服务器: Calculator Server v1.0 │
│ ⏰ 连接时间: 2025-01-20 14:30:25 │
└─────────────────────────────────────────┘
📋 服务器信息
├── 名称: MCP Calculator Server
├── 版本: 1.0.0
├── 协议版本: 2025-06-18
└── 功能: tools, resources
🛠️ 可用工具 (4个)
├── ➕ add - 两数相加
├── ✖️ multiply - 两数相乘
├── 🎲 get_random_number - 生成随机数
└── 🕐 get_current_time - 获取当前时间
📁 可用资源 (0个)
└── (暂无资源)
💬 提示模板 (0个)
└── (暂无模板)
🧪 使用Inspector测试工具
测试加法工具
json
{
"tool": "add",
"arguments": {
"a": 100,
"b": 200
}
}
预期结果:
json
{
"content": [
{
"type": "text",
"text": "300"
}
]
}
测试随机数生成
json
{
"tool": "get_random_number",
"arguments": {
"min": 1,
"max": 100
}
}
预期结果:
json
{
"content": [
{
"type": "text",
"text": "42"
}
]
}
🔧 客户端最佳实践
🛡️ 错误处理策略
python
class RobustMCPClient:
def __init__(self):
self.max_retries = 3
self.retry_delay = 1.0
self.timeout = 30.0
async def call_tool_with_retry(self, name: str, arguments: dict):
"""带重试机制的工具调用"""
for attempt in range(self.max_retries):
try:
result = await asyncio.wait_for(
self.session.call_tool(name, arguments),
timeout=self.timeout
)
return result
except asyncio.TimeoutError:
print(f"⏰ 调用超时 (尝试 {attempt + 1}/{self.max_retries})")
if attempt < self.max_retries - 1:
await asyncio.sleep(self.retry_delay)
except Exception as e:
print(f"❌ 调用失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(self.retry_delay)
raise Exception(f"工具调用失败,已重试{self.max_retries}次")
📊 连接状态监控
typescript
class ConnectionMonitor {
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 2000;
async monitorConnection(client: MCPCalculatorClient) {
setInterval(async () => {
try {
// 发送心跳检测
await client.callTool("ping", {});
this.reconnectAttempts = 0;
console.log("💓 连接正常");
} catch (error) {
console.log("💔 连接异常,尝试重连...");
await this.attemptReconnect(client);
}
}, 10000); // 每10秒检查一次
}
private async attemptReconnect(client: MCPCalculatorClient) {
if (this.reconnectAttempts >= this.maxReconnectAttempts) {
console.log("❌ 重连次数已达上限,放弃重连");
return;
}
this.reconnectAttempts++;
console.log(`🔄 重连尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);
try {
await client.disconnect();
await new Promise(resolve => setTimeout(resolve, this.reconnectDelay));
await client.connect();
console.log("✅ 重连成功");
} catch (error) {
console.log(`❌ 重连失败: ${error}`);
}
}
}
🎯 性能优化技巧
python
import asyncio
from asyncio import Queue
from typing import Dict, Any
class PerformantMCPClient:
def __init__(self):
self.request_queue = Queue()
self.batch_size = 10
self.batch_timeout = 1.0
self.cache = {}
self.cache_ttl = 300 # 5分钟缓存
async def batch_process_requests(self):
"""批量处理请求以提高性能"""
while True:
batch = []
deadline = asyncio.get_event_loop().time() + self.batch_timeout
# 收集批量请求
while len(batch) < self.batch_size:
try:
timeout = max(0, deadline - asyncio.get_event_loop().time())
if timeout <= 0:
break
request = await asyncio.wait_for(
self.request_queue.get(),
timeout=timeout
)
batch.append(request)
except asyncio.TimeoutError:
break
if batch:
await self.process_batch(batch)
def get_cache_key(self, tool_name: str, arguments: Dict[str, Any]) -> str:
"""生成缓存键"""
import hashlib
import json
cache_data = f"{tool_name}:{json.dumps(arguments, sort_keys=True)}"
return hashlib.md5(cache_data.encode()).hexdigest()
async def call_tool_cached(self, name: str, arguments: dict):
"""带缓存的工具调用"""
cache_key = self.get_cache_key(name, arguments)
# 检查缓存
if cache_key in self.cache:
cached_result, timestamp = self.cache[cache_key]
if time.time() - timestamp < self.cache_ttl:
print(f"🎯 缓存命中: {name}")
return cached_result
# 执行调用
result = await self.session.call_tool(name, arguments)
# 更新缓存
self.cache[cache_key] = (result, time.time())
return result
🧪 综合测试示例
最后,让我们创建一个综合测试程序,展示客户端的完整功能:
python
import asyncio
import time
import json
from datetime import datetime
class ComprehensiveClientTest:
def __init__(self):
self.client = MCPCalculatorClient()
self.test_results = []
async def run_comprehensive_test(self):
"""运行综合测试套件"""
print("🚀 开始MCP客户端综合测试")
print("=" * 50)
# 1. 连接测试
await self.test_connection()
# 2. 服务发现测试
await self.test_service_discovery()
# 3. 功能测试
await self.test_functionality()
# 4. 性能测试
await self.test_performance()
# 5. 错误处理测试
await self.test_error_handling()
# 6. 生成测试报告
self.generate_test_report()
async def test_connection(self):
"""测试连接功能"""
test_name = "连接测试"
print(f"\n🔌 {test_name}")
start_time = time.time()
success = await self.client.connect()
end_time = time.time()
self.test_results.append({
"name": test_name,
"success": success,
"duration": end_time - start_time,
"details": "服务器连接" if success else "连接失败"
})
async def test_service_discovery(self):
"""测试服务发现"""
test_name = "服务发现测试"
print(f"\n🔍 {test_name}")
start_time = time.time()
tools = await self.client.list_tools()
end_time = time.time()
success = len(tools) > 0
self.test_results.append({
"name": test_name,
"success": success,
"duration": end_time - start_time,
"details": f"发现{len(tools)}个工具"
})
async def test_functionality(self):
"""测试基础功能"""
test_cases = [
("add", {"a": 10, "b": 5}, "15"),
("multiply", {"a": 6, "b": 7}, "42"),
("get_random_number", {"min": 1, "max": 1}, "1"),
]
for tool_name, args, expected in test_cases:
test_name = f"功能测试-{tool_name}"
print(f"\n⚙️ {test_name}")
start_time = time.time()
result = await self.client.call_tool(tool_name, args)
end_time = time.time()
success = result is not None
if success and tool_name != "get_random_number":
# 对于确定性结果的验证
actual = str(result.content[0].text).strip()
success = actual == expected
self.test_results.append({
"name": test_name,
"success": success,
"duration": end_time - start_time,
"details": f"预期:{expected}, 实际:{result.content if result else 'None'}"
})
async def test_performance(self):
"""测试性能"""
test_name = "性能测试"
print(f"\n⚡ {test_name}")
# 并发调用测试
tasks = []
start_time = time.time()
for i in range(10):
task = self.client.call_tool("add", {"a": i, "b": i})
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
success_count = sum(1 for r in results if not isinstance(r, Exception))
success = success_count == len(tasks)
self.test_results.append({
"name": test_name,
"success": success,
"duration": end_time - start_time,
"details": f"10个并发请求,{success_count}个成功"
})
async def test_error_handling(self):
"""测试错误处理"""
test_name = "错误处理测试"
print(f"\n🛡️ {test_name}")
# 测试无效工具调用
start_time = time.time()
result = await self.client.call_tool("invalid_tool", {})
end_time = time.time()
# 错误处理测试:应该返回None或错误,而不是崩溃
success = result is None
self.test_results.append({
"name": test_name,
"success": success,
"duration": end_time - start_time,
"details": "无效工具调用正确处理"
})
def generate_test_report(self):
"""生成测试报告"""
print("\n" + "=" * 50)
print("📊 测试报告")
print("=" * 50)
total_tests = len(self.test_results)
passed_tests = sum(1 for r in self.test_results if r["success"])
failed_tests = total_tests - passed_tests
print(f"总测试数: {total_tests}")
print(f"通过: {passed_tests} ✅")
print(f"失败: {failed_tests} ❌")
print(f"成功率: {passed_tests/total_tests*100:.1f}%")
print("\n详细结果:")
for result in self.test_results:
status = "✅" if result["success"] else "❌"
duration = f"{result['duration']:.3f}s"
print(f"{status} {result['name']} ({duration}) - {result['details']}")
# 保存JSON报告
report = {
"timestamp": datetime.now().isoformat(),
"summary": {
"total": total_tests,
"passed": passed_tests,
"failed": failed_tests,
"success_rate": passed_tests/total_tests*100
},
"details": self.test_results
}
with open("test_report.json", "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print(f"\n📄 详细报告已保存到: test_report.json")
# 运行综合测试
async def main():
test_suite = ComprehensiveClientTest()
await test_suite.run_comprehensive_test()
if __name__ == "__main__":
asyncio.run(main())
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 客户端架构:理解客户端的职责和组件设计
✅ 多语言实现:Python、JavaScript/TypeScript、C#客户端开发
✅ MCP Inspector:使用官方调试工具测试服务器
✅ 最佳实践:错误处理、性能优化、连接监控
✅ 综合测试:完整的测试套件和报告生成
🤔 思考题
- 连接策略:如果MCP服务器不稳定,你会设计怎样的重连策略?
- 缓存机制:哪些类型的工具调用适合加缓存?为什么?
- 性能优化:除了批量处理,还有哪些优化客户端性能的方法?
🔗 相关资源
恭喜! 🎉 你已经学会了MCP客户端开发。下一节我们将学习如何为服务器添加更多高级工具。