Skip to content

4.4 客户端集成与测试

🎯 学习目标:学习如何创建MCP客户端并与服务器进行交互
⏱️ 预计时间:35分钟
📊 难度等级:⭐⭐

🎭 客户端的角色

如果MCP服务器是"服务员",那么MCP客户端就是"顾客"。客户端负责:

  • 🤝 与服务器建立连接
  • 📋 发现服务器能提供什么服务
  • 🛎️ 发起具体的服务请求
  • 📤 处理服务器的响应

🏗️ 客户端架构设计

📊 客户端架构图

🔧 核心组件

组件功能重要性
连接管理器管理与服务器的连接状态⭐⭐⭐⭐⭐
消息处理器处理JSON-RPC消息的序列化和反序列化⭐⭐⭐⭐⭐
工具调用器执行对服务器工具的调用⭐⭐⭐⭐
资源管理器管理从服务器获取的资源⭐⭐⭐

💻 多语言客户端实现

🐍 Python客户端

python
# requirements.txt
"""
mcp>=1.0.0
asyncio
websockets
"""

# client.py
import asyncio
import json
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client

class MCPCalculatorClient:
    def __init__(self):
        self.session = None
        self.client = None
    
    async def connect(self):
        """连接到MCP服务器"""
        try:
            # 配置服务器参数
            server_params = StdioServerParameters(
                command="python",
                args=["server.py"]
            )
            
            # 建立连接
            self.client = stdio_client(server_params)
            self.session = await self.client.__aenter__()
            
            # 初始化会话
            await self.session.initialize()
            
            print("✅ 成功连接到MCP服务器")
            return True
            
        except Exception as e:
            print(f"❌ 连接失败: {e}")
            return False
    
    async def list_tools(self):
        """获取服务器可用工具列表"""
        try:
            result = await self.session.list_tools()
            print("🛠️  可用工具:")
            for tool in result.tools:
                print(f"  - {tool.name}: {tool.description}")
            return result.tools
        except Exception as e:
            print(f"❌ 获取工具列表失败: {e}")
            return []
    
    async def call_tool(self, name: str, arguments: dict):
        """调用指定工具"""
        try:
            print(f"🔧 调用工具: {name}")
            print(f"📝 参数: {arguments}")
            
            result = await self.session.call_tool(name, arguments)
            
            print(f"✅ 调用成功")
            print(f"📊 结果: {result.content}")
            return result
            
        except Exception as e:
            print(f"❌ 工具调用失败: {e}")
            return None
    
    async def disconnect(self):
        """断开连接"""
        if self.client:
            await self.client.__aexit__(None, None, None)
            print("🔌 已断开连接")

async def main():
    """主函数:演示客户端使用"""
    client = MCPCalculatorClient()
    
    # 连接服务器
    if not await client.connect():
        return
    
    try:
        # 获取工具列表
        tools = await client.list_tools()
        
        if not tools:
            print("⚠️  服务器没有可用工具")
            return
        
        # 测试各种工具
        test_cases = [
            {
                "name": "add",
                "arguments": {"a": 10, "b": 5}
            },
            {
                "name": "multiply", 
                "arguments": {"a": 7, "b": 8}
            },
            {
                "name": "get_random_number",
                "arguments": {"min": 1, "max": 100}
            },
            {
                "name": "get_current_time",
                "arguments": {}
            }
        ]
        
        print("\n🧪 开始测试工具调用...")
        for test in test_cases:
            print(f"\n--- 测试 {test['name']} ---")
            await client.call_tool(test["name"], test["arguments"])
            await asyncio.sleep(1)  # 避免过于频繁的调用
    
    finally:
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

🌐 JavaScript/TypeScript客户端

typescript
// package.json
{
  "name": "mcp-calculator-client",
  "version": "1.0.0",
  "type": "module",
  "dependencies": {
    "@modelcontextprotocol/sdk": "^1.0.0",
    "ws": "^8.0.0"
  },
  "scripts": {
    "start": "node client.js",
    "dev": "nodemon client.js"
  }
}

// client.ts
import { Client } from '@modelcontextprotocol/sdk/client/index.js';
import { StdioClientTransport } from '@modelcontextprotocol/sdk/client/stdio.js';
import { spawn } from 'child_process';

class MCPCalculatorClient {
  private client: Client | null = null;
  private transport: StdioClientTransport | null = null;

  async connect(): Promise<boolean> {
    try {
      // 启动服务器进程
      const serverProcess = spawn('node', ['server.js']);
      
      // 创建传输层
      this.transport = new StdioClientTransport({
        command: 'node',
        args: ['server.js']
      });
      
      // 创建客户端
      this.client = new Client({
        name: "calculator-client",
        version: "1.0.0"
      }, {
        capabilities: {}
      });
      
      // 连接到服务器
      await this.client.connect(this.transport);
      
      console.log("✅ 成功连接到MCP服务器");
      return true;
      
    } catch (error) {
      console.error("❌ 连接失败:", error);
      return false;
    }
  }

  async listTools(): Promise<any[]> {
    try {
      if (!this.client) {
        throw new Error("客户端未连接");
      }
      
      const result = await this.client.listTools();
      
      console.log("🛠️ 可用工具:");
      result.tools.forEach(tool => {
        console.log(`  - ${tool.name}: ${tool.description}`);
      });
      
      return result.tools;
      
    } catch (error) {
      console.error("❌ 获取工具列表失败:", error);
      return [];
    }
  }

  async callTool(name: string, arguments: Record<string, any>): Promise<any> {
    try {
      if (!this.client) {
        throw new Error("客户端未连接");
      }
      
      console.log(`🔧 调用工具: ${name}`);
      console.log(`📝 参数:`, arguments);
      
      const result = await this.client.callTool({
        name,
        arguments
      });
      
      console.log("✅ 调用成功");
      console.log("📊 结果:", result.content);
      
      return result;
      
    } catch (error) {
      console.error(`❌ 工具调用失败 (${name}):`, error);
      return null;
    }
  }

  async disconnect(): Promise<void> {
    try {
      if (this.client) {
        await this.client.close();
      }
      if (this.transport) {
        await this.transport.close();
      }
      console.log("🔌 已断开连接");
    } catch (error) {
      console.error("❌ 断开连接时出错:", error);
    }
  }
}

// 主函数
async function main() {
  const client = new MCPCalculatorClient();
  
  // 连接服务器
  if (!await client.connect()) {
    return;
  }
  
  try {
    // 获取工具列表
    const tools = await client.listTools();
    
    if (tools.length === 0) {
      console.log("⚠️ 服务器没有可用工具");
      return;
    }
    
    // 测试用例
    const testCases = [
      {
        name: "add",
        arguments: { a: 15, b: 25 }
      },
      {
        name: "multiply",
        arguments: { a: 6, b: 7 }
      },
      {
        name: "get_random_number", 
        arguments: { min: 1, max: 50 }
      },
      {
        name: "get_current_time",
        arguments: {}
      }
    ];
    
    console.log("\n🧪 开始测试工具调用...");
    
    for (const test of testCases) {
      console.log(`\n--- 测试 ${test.name} ---`);
      await client.callTool(test.name, test.arguments);
      // 添加延迟避免过于频繁调用
      await new Promise(resolve => setTimeout(resolve, 1000));
    }
    
  } finally {
    await client.disconnect();
  }
}

// 运行客户端
main().catch(console.error);

🔷 C# 客户端

csharp
// McpCalculatorClient.csproj
<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net8.0</TargetFramework>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  
  <ItemGroup>
    <PackageReference Include="MCP.NET" Version="1.0.0" />
    <PackageReference Include="System.Text.Json" Version="8.0.0" />
    <PackageReference Include="Microsoft.Extensions.Logging" Version="8.0.0" />
  </ItemGroup>
</Project>

// Program.cs
using System;
using System.Threading.Tasks;
using System.Text.Json;
using MCP.NET;
using Microsoft.Extensions.Logging;

namespace McpCalculatorClient
{
    public class MCPCalculatorClient
    {
        private readonly ILogger<MCPCalculatorClient> _logger;
        private McpClient? _client;

        public MCPCalculatorClient()
        {
            var loggerFactory = LoggerFactory.Create(builder => 
                builder.AddConsole().SetMinimumLevel(LogLevel.Information));
            _logger = loggerFactory.CreateLogger<MCPCalculatorClient>();
        }

        public async Task<bool> ConnectAsync()
        {
            try
            {
                // 配置服务器连接
                var serverConfig = new McpServerConfig
                {
                    Command = "dotnet",
                    Args = new[] { "run", "--project", "../McpCalculatorServer" },
                    WorkingDirectory = Environment.CurrentDirectory
                };

                // 创建客户端
                _client = new McpClient(serverConfig);
                
                // 建立连接
                await _client.ConnectAsync();
                
                _logger.LogInformation("✅ 成功连接到MCP服务器");
                return true;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "❌ 连接失败");
                return false;
            }
        }

        public async Task<List<McpTool>> ListToolsAsync()
        {
            try
            {
                if (_client == null)
                {
                    throw new InvalidOperationException("客户端未连接");
                }

                var tools = await _client.ListToolsAsync();
                
                _logger.LogInformation("🛠️ 可用工具:");
                foreach (var tool in tools)
                {
                    _logger.LogInformation($"  - {tool.Name}: {tool.Description}");
                }

                return tools;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "❌ 获取工具列表失败");
                return new List<McpTool>();
            }
        }

        public async Task<McpToolResult?> CallToolAsync(string name, object arguments)
        {
            try
            {
                if (_client == null)
                {
                    throw new InvalidOperationException("客户端未连接");
                }

                _logger.LogInformation($"🔧 调用工具: {name}");
                _logger.LogInformation($"📝 参数: {JsonSerializer.Serialize(arguments)}");

                var result = await _client.CallToolAsync(name, arguments);

                _logger.LogInformation("✅ 调用成功");
                _logger.LogInformation($"📊 结果: {JsonSerializer.Serialize(result.Content)}");

                return result;
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, $"❌ 工具调用失败 ({name})");
                return null;
            }
        }

        public async Task DisconnectAsync()
        {
            try
            {
                if (_client != null)
                {
                    await _client.DisconnectAsync();
                    _client.Dispose();
                }
                _logger.LogInformation("🔌 已断开连接");
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "❌ 断开连接时出错");
            }
        }
    }

    class Program
    {
        static async Task Main(string[] args)
        {
            var client = new MCPCalculatorClient();

            // 连接服务器
            if (!await client.ConnectAsync())
            {
                return;
            }

            try
            {
                // 获取工具列表
                var tools = await client.ListToolsAsync();

                if (tools.Count == 0)
                {
                    Console.WriteLine("⚠️ 服务器没有可用工具");
                    return;
                }

                // 测试用例
                var testCases = new[]
                {
                    new { name = "add", arguments = new { a = 20, b = 30 } },
                    new { name = "multiply", arguments = new { a = 4, b = 9 } },
                    new { name = "get_random_number", arguments = new { min = 1, max = 10 } },
                    new { name = "get_current_time", arguments = new { } }
                };

                Console.WriteLine("\n🧪 开始测试工具调用...");

                foreach (var test in testCases)
                {
                    Console.WriteLine($"\n--- 测试 {test.name} ---");
                    await client.CallToolAsync(test.name, test.arguments);
                    await Task.Delay(1000); // 避免过于频繁调用
                }
            }
            finally
            {
                await client.DisconnectAsync();
            }
        }
    }
}

🔍 MCP Inspector使用指南

MCP Inspector是官方提供的调试工具,就像是MCP世界的"万能遥控器"。

🚀 启动Inspector

bash
# 安装MCP Inspector (一次性安装)
npm install -g @modelcontextprotocol/inspector

# 启动Inspector连接到Python服务器
mcp-inspector python server.py

# 启动Inspector连接到Node.js服务器  
mcp-inspector node server.js

# 启动Inspector连接到.NET服务器
mcp-inspector dotnet run --project McpCalculatorServer

🖥️ Inspector界面详解

当你启动Inspector后,会看到一个网页界面:

🌐 MCP Inspector - http://localhost:5173
┌─────────────────────────────────────────┐
│  🔌 连接状态: ✅ 已连接                   │
│  📡 服务器: Calculator Server v1.0      │
│  ⏰ 连接时间: 2025-01-20 14:30:25       │
└─────────────────────────────────────────┘

📋 服务器信息
├── 名称: MCP Calculator Server  
├── 版本: 1.0.0
├── 协议版本: 2025-06-18
└── 功能: tools, resources

🛠️ 可用工具 (4个)
├── ➕ add - 两数相加
├── ✖️ multiply - 两数相乘  
├── 🎲 get_random_number - 生成随机数
└── 🕐 get_current_time - 获取当前时间

📁 可用资源 (0个)
└── (暂无资源)

💬 提示模板 (0个) 
└── (暂无模板)

🧪 使用Inspector测试工具

测试加法工具

json
{
  "tool": "add",
  "arguments": {
    "a": 100,
    "b": 200
  }
}

预期结果:

json
{
  "content": [
    {
      "type": "text",
      "text": "300"
    }
  ]
}

测试随机数生成

json
{
  "tool": "get_random_number",
  "arguments": {
    "min": 1,
    "max": 100
  }
}

预期结果:

json
{
  "content": [
    {
      "type": "text", 
      "text": "42"
    }
  ]
}

🔧 客户端最佳实践

🛡️ 错误处理策略

python
class RobustMCPClient:
    def __init__(self):
        self.max_retries = 3
        self.retry_delay = 1.0
        self.timeout = 30.0
    
    async def call_tool_with_retry(self, name: str, arguments: dict):
        """带重试机制的工具调用"""
        for attempt in range(self.max_retries):
            try:
                result = await asyncio.wait_for(
                    self.session.call_tool(name, arguments),
                    timeout=self.timeout
                )
                return result
                
            except asyncio.TimeoutError:
                print(f"⏰ 调用超时 (尝试 {attempt + 1}/{self.max_retries})")
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(self.retry_delay)
                    
            except Exception as e:
                print(f"❌ 调用失败 (尝试 {attempt + 1}/{self.max_retries}): {e}")
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(self.retry_delay)
        
        raise Exception(f"工具调用失败,已重试{self.max_retries}次")

📊 连接状态监控

typescript
class ConnectionMonitor {
  private reconnectAttempts = 0;
  private maxReconnectAttempts = 5;
  private reconnectDelay = 2000;

  async monitorConnection(client: MCPCalculatorClient) {
    setInterval(async () => {
      try {
        // 发送心跳检测
        await client.callTool("ping", {});
        this.reconnectAttempts = 0;
        console.log("💓 连接正常");
      } catch (error) {
        console.log("💔 连接异常,尝试重连...");
        await this.attemptReconnect(client);
      }
    }, 10000); // 每10秒检查一次
  }

  private async attemptReconnect(client: MCPCalculatorClient) {
    if (this.reconnectAttempts >= this.maxReconnectAttempts) {
      console.log("❌ 重连次数已达上限,放弃重连");
      return;
    }

    this.reconnectAttempts++;
    console.log(`🔄 重连尝试 ${this.reconnectAttempts}/${this.maxReconnectAttempts}`);

    try {
      await client.disconnect();
      await new Promise(resolve => setTimeout(resolve, this.reconnectDelay));
      await client.connect();
      console.log("✅ 重连成功");
    } catch (error) {
      console.log(`❌ 重连失败: ${error}`);
    }
  }
}

🎯 性能优化技巧

python
import asyncio
from asyncio import Queue
from typing import Dict, Any

class PerformantMCPClient:
    def __init__(self):
        self.request_queue = Queue()
        self.batch_size = 10
        self.batch_timeout = 1.0
        self.cache = {}
        self.cache_ttl = 300  # 5分钟缓存
    
    async def batch_process_requests(self):
        """批量处理请求以提高性能"""
        while True:
            batch = []
            deadline = asyncio.get_event_loop().time() + self.batch_timeout
            
            # 收集批量请求
            while len(batch) < self.batch_size:
                try:
                    timeout = max(0, deadline - asyncio.get_event_loop().time())
                    if timeout <= 0:
                        break
                    
                    request = await asyncio.wait_for(
                        self.request_queue.get(), 
                        timeout=timeout
                    )
                    batch.append(request)
                    
                except asyncio.TimeoutError:
                    break
            
            if batch:
                await self.process_batch(batch)
    
    def get_cache_key(self, tool_name: str, arguments: Dict[str, Any]) -> str:
        """生成缓存键"""
        import hashlib
        import json
        
        cache_data = f"{tool_name}:{json.dumps(arguments, sort_keys=True)}"
        return hashlib.md5(cache_data.encode()).hexdigest()
    
    async def call_tool_cached(self, name: str, arguments: dict):
        """带缓存的工具调用"""
        cache_key = self.get_cache_key(name, arguments)
        
        # 检查缓存
        if cache_key in self.cache:
            cached_result, timestamp = self.cache[cache_key]
            if time.time() - timestamp < self.cache_ttl:
                print(f"🎯 缓存命中: {name}")
                return cached_result
        
        # 执行调用
        result = await self.session.call_tool(name, arguments)
        
        # 更新缓存
        self.cache[cache_key] = (result, time.time())
        
        return result

🧪 综合测试示例

最后,让我们创建一个综合测试程序,展示客户端的完整功能:

python
import asyncio
import time
import json
from datetime import datetime

class ComprehensiveClientTest:
    def __init__(self):
        self.client = MCPCalculatorClient()
        self.test_results = []
    
    async def run_comprehensive_test(self):
        """运行综合测试套件"""
        print("🚀 开始MCP客户端综合测试")
        print("=" * 50)
        
        # 1. 连接测试
        await self.test_connection()
        
        # 2. 服务发现测试
        await self.test_service_discovery()
        
        # 3. 功能测试
        await self.test_functionality()
        
        # 4. 性能测试
        await self.test_performance()
        
        # 5. 错误处理测试
        await self.test_error_handling()
        
        # 6. 生成测试报告
        self.generate_test_report()
    
    async def test_connection(self):
        """测试连接功能"""
        test_name = "连接测试"
        print(f"\n🔌 {test_name}")
        
        start_time = time.time()
        success = await self.client.connect()
        end_time = time.time()
        
        self.test_results.append({
            "name": test_name,
            "success": success,
            "duration": end_time - start_time,
            "details": "服务器连接" if success else "连接失败"
        })
    
    async def test_service_discovery(self):
        """测试服务发现"""
        test_name = "服务发现测试"
        print(f"\n🔍 {test_name}")
        
        start_time = time.time()
        tools = await self.client.list_tools()
        end_time = time.time()
        
        success = len(tools) > 0
        
        self.test_results.append({
            "name": test_name,
            "success": success,
            "duration": end_time - start_time,
            "details": f"发现{len(tools)}个工具"
        })
    
    async def test_functionality(self):
        """测试基础功能"""
        test_cases = [
            ("add", {"a": 10, "b": 5}, "15"),
            ("multiply", {"a": 6, "b": 7}, "42"),
            ("get_random_number", {"min": 1, "max": 1}, "1"),
        ]
        
        for tool_name, args, expected in test_cases:
            test_name = f"功能测试-{tool_name}"
            print(f"\n⚙️ {test_name}")
            
            start_time = time.time()
            result = await self.client.call_tool(tool_name, args)
            end_time = time.time()
            
            success = result is not None
            if success and tool_name != "get_random_number":
                # 对于确定性结果的验证
                actual = str(result.content[0].text).strip()
                success = actual == expected
            
            self.test_results.append({
                "name": test_name,
                "success": success,
                "duration": end_time - start_time,
                "details": f"预期:{expected}, 实际:{result.content if result else 'None'}"
            })
    
    async def test_performance(self):
        """测试性能"""
        test_name = "性能测试"
        print(f"\n{test_name}")
        
        # 并发调用测试
        tasks = []
        start_time = time.time()
        
        for i in range(10):
            task = self.client.call_tool("add", {"a": i, "b": i})
            tasks.append(task)
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        end_time = time.time()
        
        success_count = sum(1 for r in results if not isinstance(r, Exception))
        success = success_count == len(tasks)
        
        self.test_results.append({
            "name": test_name,
            "success": success,
            "duration": end_time - start_time,
            "details": f"10个并发请求,{success_count}个成功"
        })
    
    async def test_error_handling(self):
        """测试错误处理"""
        test_name = "错误处理测试"
        print(f"\n🛡️ {test_name}")
        
        # 测试无效工具调用
        start_time = time.time()
        result = await self.client.call_tool("invalid_tool", {})
        end_time = time.time()
        
        # 错误处理测试:应该返回None或错误,而不是崩溃
        success = result is None
        
        self.test_results.append({
            "name": test_name,
            "success": success,
            "duration": end_time - start_time,
            "details": "无效工具调用正确处理"
        })
    
    def generate_test_report(self):
        """生成测试报告"""
        print("\n" + "=" * 50)
        print("📊 测试报告")
        print("=" * 50)
        
        total_tests = len(self.test_results)
        passed_tests = sum(1 for r in self.test_results if r["success"])
        failed_tests = total_tests - passed_tests
        
        print(f"总测试数: {total_tests}")
        print(f"通过: {passed_tests} ✅")
        print(f"失败: {failed_tests} ❌")
        print(f"成功率: {passed_tests/total_tests*100:.1f}%")
        
        print("\n详细结果:")
        for result in self.test_results:
            status = "✅" if result["success"] else "❌"
            duration = f"{result['duration']:.3f}s"
            print(f"{status} {result['name']} ({duration}) - {result['details']}")
        
        # 保存JSON报告
        report = {
            "timestamp": datetime.now().isoformat(),
            "summary": {
                "total": total_tests,
                "passed": passed_tests,
                "failed": failed_tests,
                "success_rate": passed_tests/total_tests*100
            },
            "details": self.test_results
        }
        
        with open("test_report.json", "w", encoding="utf-8") as f:
            json.dump(report, f, indent=2, ensure_ascii=False)
        
        print(f"\n📄 详细报告已保存到: test_report.json")

# 运行综合测试
async def main():
    test_suite = ComprehensiveClientTest()
    await test_suite.run_comprehensive_test()

if __name__ == "__main__":
    asyncio.run(main())

🎯 本节小结

通过本节学习,你已经掌握了:

客户端架构:理解客户端的职责和组件设计
多语言实现:Python、JavaScript/TypeScript、C#客户端开发
MCP Inspector:使用官方调试工具测试服务器
最佳实践:错误处理、性能优化、连接监控
综合测试:完整的测试套件和报告生成

🤔 思考题

  1. 连接策略:如果MCP服务器不稳定,你会设计怎样的重连策略?
  2. 缓存机制:哪些类型的工具调用适合加缓存?为什么?
  3. 性能优化:除了批量处理,还有哪些优化客户端性能的方法?

🔗 相关资源


恭喜! 🎉 你已经学会了MCP客户端开发。下一节我们将学习如何为服务器添加更多高级工具。

👉 下一节:4.3 添加工具和功能