Skip to content

4.1 创建简单MCP服务器

🎯 学习目标:从零开始创建你的第一个MCP服务器,理解MCP的基本架构和实现原理
⏱️ 预计时间:40分钟
📊 难度等级:⭐⭐

🎪 从Hello World开始

还记得你第一次写"Hello World"程序时的兴奋吗?现在我们要创建MCP世界的"Hello World"——一个简单但完整的MCP服务器!

🤔 我们要创建什么?

想象一下,你有一个智能助手,你对它说:

  • "帮我生成一个随机数"
  • "帮我获取当前时间"
  • "帮我计算两个数的和"

这个助手不仅能理解你的需求,还能实际执行这些操作。这就是我们即将创建的MCP服务器的功能!

🏗️ MCP服务器架构理解

📊 基础架构图

🔧 核心组件说明

组件功能重要性
协议处理器处理JSON-RPC 2.0消息⭐⭐⭐⭐⭐
工具管理器管理可用工具列表⭐⭐⭐⭐⭐
请求路由器将请求路由到正确的处理器⭐⭐⭐⭐
错误处理器统一处理各种错误情况⭐⭐⭐⭐
日志系统记录运行状态和调试信息⭐⭐⭐

🐍 Python版本实现

📁 项目结构设计

首先,让我们创建一个清晰的项目结构:

📁 my-first-mcp-server/
├── 📄 server.py              # 主服务器文件
├── 📄 tools.py               # 工具实现
├── 📄 config.py              # 配置管理
├── 📄 requirements.txt       # 依赖列表
├── 📄 README.md              # 项目说明
├── 📁 tests/                 # 测试文件
│   ├── 📄 test_server.py
│   └── 📄 test_tools.py
└── 📁 logs/                  # 日志目录

📦 依赖管理

创建 requirements.txt

txt
# MCP核心依赖
mcp==0.1.0                    # MCP Python SDK (假设版本)
asyncio-extra==1.3.2          # 异步扩展工具
pydantic==2.5.0              # 数据验证
typing-extensions==4.8.0     # 类型提示扩展

# Web服务器依赖
aiohttp==3.9.1               # 异步HTTP服务器
websockets==12.0             # WebSocket支持

# 工具和实用库
python-dotenv==1.0.0         # 环境变量管理
loguru==0.7.2                # 增强日志库
rich==13.7.0                 # 美化终端输出

# 开发和测试依赖
pytest==7.4.3               # 测试框架
pytest-asyncio==0.21.1      # 异步测试支持
black==23.11.0               # 代码格式化
flake8==6.1.0                # 代码检查

⚙️ 配置管理系统

创建 config.py

python
"""
config.py - MCP服务器配置管理
"""

import os
from dataclasses import dataclass
from typing import Dict, Any, List
from pathlib import Path
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

@dataclass
class ServerConfig:
    """服务器配置类"""
    
    # 服务器基础配置
    name: str = "我的第一个MCP服务器"
    version: str = "1.0.0"
    description: str = "一个简单的MCP服务器示例"
    
    # 网络配置
    host: str = "localhost"
    port: int = 8080
    protocol: str = "stdio"  # stdio, http, websocket
    
    # 日志配置
    log_level: str = "INFO"
    log_file: str = "logs/server.log"
    log_rotation: str = "10 MB"
    log_retention: str = "7 days"
    
    # 工具配置
    enabled_tools: List[str] = None
    tool_timeout: int = 30  # 秒
    
    # 开发配置
    debug: bool = False
    auto_reload: bool = False
    
    def __post_init__(self):
        """初始化后处理"""
        if self.enabled_tools is None:
            self.enabled_tools = [
                "random_number",
                "current_time", 
                "calculator"
            ]
        
        # 从环境变量覆盖配置
        self.host = os.getenv("MCP_HOST", self.host)
        self.port = int(os.getenv("MCP_PORT", self.port))
        self.debug = os.getenv("MCP_DEBUG", "false").lower() == "true"
        self.log_level = os.getenv("MCP_LOG_LEVEL", self.log_level)
        
        # 确保日志目录存在
        Path(self.log_file).parent.mkdir(parents=True, exist_ok=True)
    
    def to_dict(self) -> Dict[str, Any]:
        """转换为字典"""
        return {
            "name": self.name,
            "version": self.version,
            "description": self.description,
            "host": self.host,
            "port": self.port,
            "protocol": self.protocol,
            "debug": self.debug,
            "enabled_tools": self.enabled_tools
        }

# 全局配置实例
config = ServerConfig()

# MCP协议配置
MCP_PROTOCOL_VERSION = "2024-11-05"
MCP_CAPABILITIES = {
    "tools": {},
    "resources": {},
    "prompts": {},
    "logging": {}
}

# 工具配置
TOOL_CONFIGS = {
    "random_number": {
        "name": "random_number",
        "description": "生成指定范围内的随机数",
        "parameters": {
            "type": "object",
            "properties": {
                "min": {
                    "type": "integer",
                    "description": "最小值",
                    "default": 1
                },
                "max": {
                    "type": "integer", 
                    "description": "最大值",
                    "default": 100
                }
            }
        }
    },
    "current_time": {
        "name": "current_time",
        "description": "获取当前时间信息",
        "parameters": {
            "type": "object",
            "properties": {
                "format": {
                    "type": "string",
                    "description": "时间格式化字符串",
                    "default": "%Y-%m-%d %H:%M:%S"
                },
                "timezone": {
                    "type": "string",
                    "description": "时区",
                    "default": "Asia/Shanghai"
                }
            }
        }
    },
    "calculator": {
        "name": "calculator",
        "description": "执行基础数学计算",
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["add", "subtract", "multiply", "divide"],
                    "description": "计算操作类型"
                },
                "a": {
                    "type": "number",
                    "description": "第一个数字"
                },
                "b": {
                    "type": "number", 
                    "description": "第二个数字"
                }
            },
            "required": ["operation", "a", "b"]
        }
    }
}

🛠️ 工具实现

创建 tools.py

python
"""
tools.py - MCP工具实现
"""

import random
import datetime
import math
import pytz
from typing import Dict, Any, Union
from loguru import logger
from config import config

class MCPTool:
    """MCP工具基类"""
    
    def __init__(self, name: str, description: str):
        self.name = name
        self.description = description
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """执行工具,子类需要实现此方法"""
        raise NotImplementedError("子类必须实现execute方法")
    
    def validate_params(self, params: Dict[str, Any], schema: Dict[str, Any]) -> bool:
        """验证参数(简化版本)"""
        required = schema.get("required", [])
        for field in required:
            if field not in params:
                raise ValueError(f"缺少必需参数: {field}")
        return True

class RandomNumberTool(MCPTool):
    """随机数生成工具"""
    
    def __init__(self):
        super().__init__(
            name="random_number",
            description="生成指定范围内的随机数"
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """生成随机数"""
        try:
            min_val = params.get("min", 1)
            max_val = params.get("max", 100)
            
            # 参数验证
            if not isinstance(min_val, int) or not isinstance(max_val, int):
                raise ValueError("min和max必须是整数")
            
            if min_val >= max_val:
                raise ValueError("min必须小于max")
            
            # 生成随机数
            result = random.randint(min_val, max_val)
            
            logger.info(f"生成随机数: {result} (范围: {min_val}-{max_val})")
            
            return {
                "success": True,
                "result": result,
                "message": f"在范围 {min_val}-{max_val} 内生成随机数: {result}",
                "metadata": {
                    "min": min_val,
                    "max": max_val,
                    "range_size": max_val - min_val + 1
                }
            }
            
        except Exception as e:
            logger.error(f"随机数生成失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": "随机数生成失败"
            }

class CurrentTimeTool(MCPTool):
    """当前时间获取工具"""
    
    def __init__(self):
        super().__init__(
            name="current_time",
            description="获取当前时间信息"
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """获取当前时间"""
        try:
            format_str = params.get("format", "%Y-%m-%d %H:%M:%S")
            timezone_str = params.get("timezone", "Asia/Shanghai")
            
            # 获取时区
            try:
                tz = pytz.timezone(timezone_str)
            except pytz.exceptions.UnknownTimeZoneError:
                tz = pytz.timezone("Asia/Shanghai")
                logger.warning(f"未知时区 {timezone_str},使用默认时区 Asia/Shanghai")
            
            # 获取当前时间
            now = datetime.datetime.now(tz)
            formatted_time = now.strftime(format_str)
            
            logger.info(f"获取当前时间: {formatted_time} ({timezone_str})")
            
            return {
                "success": True,
                "result": formatted_time,
                "message": f"当前时间: {formatted_time}",
                "metadata": {
                    "timezone": timezone_str,
                    "format": format_str,
                    "timestamp": now.timestamp(),
                    "iso_format": now.isoformat(),
                    "weekday": now.strftime("%A"),
                    "day_of_year": now.timetuple().tm_yday
                }
            }
            
        except Exception as e:
            logger.error(f"时间获取失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": "时间获取失败"
            }

class CalculatorTool(MCPTool):
    """计算器工具"""
    
    def __init__(self):
        super().__init__(
            name="calculator",
            description="执行基础数学计算"
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """执行计算"""
        try:
            operation = params.get("operation")
            a = params.get("a")
            b = params.get("b")
            
            # 参数验证
            if operation not in ["add", "subtract", "multiply", "divide"]:
                raise ValueError("不支持的操作类型")
            
            if not isinstance(a, (int, float)) or not isinstance(b, (int, float)):
                raise ValueError("a和b必须是数字")
            
            # 执行计算
            result = None
            operation_symbol = ""
            
            if operation == "add":
                result = a + b
                operation_symbol = "+"
            elif operation == "subtract":
                result = a - b
                operation_symbol = "-"
            elif operation == "multiply":
                result = a * b
                operation_symbol = "×"
            elif operation == "divide":
                if b == 0:
                    raise ValueError("除数不能为零")
                result = a / b
                operation_symbol = "÷"
            
            logger.info(f"计算: {a} {operation_symbol} {b} = {result}")
            
            return {
                "success": True,
                "result": result,
                "message": f"{a} {operation_symbol} {b} = {result}",
                "metadata": {
                    "operation": operation,
                    "operand_a": a,
                    "operand_b": b,
                    "operation_symbol": operation_symbol,
                    "result_type": type(result).__name__
                }
            }
            
        except Exception as e:
            logger.error(f"计算失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"计算失败: {e}"
            }

class ToolManager:
    """工具管理器"""
    
    def __init__(self):
        self.tools: Dict[str, MCPTool] = {}
        self._register_default_tools()
    
    def _register_default_tools(self):
        """注册默认工具"""
        default_tools = [
            RandomNumberTool(),
            CurrentTimeTool(),
            CalculatorTool()
        ]
        
        for tool in default_tools:
            if tool.name in config.enabled_tools:
                self.register_tool(tool)
                logger.info(f"注册工具: {tool.name}")
    
    def register_tool(self, tool: MCPTool):
        """注册工具"""
        self.tools[tool.name] = tool
    
    def get_tool(self, name: str) -> MCPTool:
        """获取工具"""
        if name not in self.tools:
            raise ValueError(f"未找到工具: {name}")
        return self.tools[name]
    
    def list_tools(self) -> Dict[str, Any]:
        """列出所有工具"""
        from config import TOOL_CONFIGS
        
        tools = []
        for name, tool in self.tools.items():
            tool_config = TOOL_CONFIGS.get(name, {})
            tools.append({
                "name": name,
                "description": tool.description,
                "inputSchema": tool_config.get("parameters", {})
            })
        
        return {
            "tools": tools
        }
    
    async def execute_tool(self, name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """执行工具"""
        try:
            tool = self.get_tool(name)
            return await tool.execute(arguments)
        except Exception as e:
            logger.error(f"工具执行失败 {name}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"工具 {name} 执行失败"
            }

# 全局工具管理器实例
tool_manager = ToolManager()

🚀 主服务器实现

创建 server.py

python
"""
server.py - MCP服务器主程序
"""

import asyncio
import json
import sys
from typing import Dict, Any, Optional
from loguru import logger
from rich.console import Console
from rich.panel import Panel
from rich.text import Text

from config import config, MCP_PROTOCOL_VERSION, MCP_CAPABILITIES
from tools import tool_manager

# 初始化控制台
console = Console()

class MCPServer:
    """MCP服务器主类"""
    
    def __init__(self):
        self.running = False
        self.initialize_logging()
        
    def initialize_logging(self):
        """初始化日志系统"""
        # 移除默认处理器
        logger.remove()
        
        # 添加控制台输出
        logger.add(
            sys.stderr,
            level=config.log_level,
            format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"
        )
        
        # 添加文件输出
        logger.add(
            config.log_file,
            level=config.log_level,
            format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}",
            rotation=config.log_rotation,
            retention=config.log_retention,
            compression="zip"
        )
        
        logger.info("MCP服务器日志系统初始化完成")
    
    async def handle_initialize(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理初始化请求"""
        logger.info("收到初始化请求")
        
        params = request.get("params", {})
        client_info = params.get("clientInfo", {})
        
        logger.info(f"客户端信息: {client_info}")
        
        return {
            "jsonrpc": "2.0",
            "id": request.get("id"),
            "result": {
                "protocolVersion": MCP_PROTOCOL_VERSION,
                "capabilities": MCP_CAPABILITIES,
                "serverInfo": {
                    "name": config.name,
                    "version": config.version,
                    "description": config.description
                }
            }
        }
    
    async def handle_tools_list(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理工具列表请求"""
        logger.info("收到工具列表请求")
        
        tools_data = tool_manager.list_tools()
        
        return {
            "jsonrpc": "2.0", 
            "id": request.get("id"),
            "result": tools_data
        }
    
    async def handle_tools_call(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理工具调用请求"""
        params = request.get("params", {})
        tool_name = params.get("name")
        arguments = params.get("arguments", {})
        
        logger.info(f"收到工具调用请求: {tool_name} 参数: {arguments}")
        
        try:
            # 执行工具
            result = await tool_manager.execute_tool(tool_name, arguments)
            
            # 构造响应
            if result.get("success", False):
                response = {
                    "jsonrpc": "2.0",
                    "id": request.get("id"),
                    "result": {
                        "content": [
                            {
                                "type": "text",
                                "text": result.get("message", "")
                            }
                        ],
                        "isError": False
                    }
                }
            else:
                response = {
                    "jsonrpc": "2.0",
                    "id": request.get("id"),
                    "error": {
                        "code": -32603,
                        "message": result.get("message", "工具执行失败"),
                        "data": result.get("error", "")
                    }
                }
            
            return response
            
        except Exception as e:
            logger.error(f"工具调用异常: {e}")
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32603,
                    "message": "内部服务器错误",
                    "data": str(e)
                }
            }
    
    async def handle_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """处理MCP请求"""
        method = request.get("method")
        
        # 路由请求到对应的处理器
        if method == "initialize":
            return await self.handle_initialize(request)
        elif method == "tools/list":
            return await self.handle_tools_list(request)
        elif method == "tools/call":
            return await self.handle_tools_call(request)
        else:
            logger.warning(f"不支持的方法: {method}")
            return {
                "jsonrpc": "2.0",
                "id": request.get("id"),
                "error": {
                    "code": -32601,
                    "message": f"方法未找到: {method}"
                }
            }
    
    async def run_stdio(self):
        """运行STDIO模式"""
        logger.info("启动STDIO模式MCP服务器")
        
        self.running = True
        
        try:
            while self.running:
                # 读取标准输入
                line = await asyncio.get_event_loop().run_in_executor(
                    None, sys.stdin.readline
                )
                
                if not line:
                    break
                
                line = line.strip()
                if not line:
                    continue
                
                try:
                    # 解析JSON请求
                    request = json.loads(line)
                    logger.debug(f"收到请求: {request}")
                    
                    # 处理请求
                    response = await self.handle_request(request)
                    
                    # 发送响应
                    response_json = json.dumps(response, ensure_ascii=False)
                    print(response_json)
                    sys.stdout.flush()
                    
                    logger.debug(f"发送响应: {response}")
                    
                except json.JSONDecodeError as e:
                    logger.error(f"JSON解析错误: {e}")
                    error_response = {
                        "jsonrpc": "2.0",
                        "id": None,
                        "error": {
                            "code": -32700,
                            "message": "解析错误"
                        }
                    }
                    print(json.dumps(error_response))
                    
                except Exception as e:
                    logger.error(f"请求处理异常: {e}")
                    error_response = {
                        "jsonrpc": "2.0", 
                        "id": request.get("id") if 'request' in locals() else None,
                        "error": {
                            "code": -32603,
                            "message": "内部错误"
                        }
                    }
                    print(json.dumps(error_response))
                    
        except KeyboardInterrupt:
            logger.info("收到中断信号,正在关闭服务器...")
        finally:
            self.running = False
    
    def display_welcome(self):
        """显示欢迎信息"""
        welcome_text = Text()
        welcome_text.append("🎉 ", style="bold green")
        welcome_text.append(f"{config.name} ", style="bold blue")
        welcome_text.append("启动成功!", style="bold green")
        
        info_text = f"""
🚀 版本: {config.version}
📝 描述: {config.description}
🔧 协议: {config.protocol.upper()}
🛠️  可用工具: {len(tool_manager.tools)}
📊 日志级别: {config.log_level}
"""
        
        panel = Panel(
            info_text.strip(),
            title=welcome_text,
            border_style="bright_blue",
            padding=(1, 2)
        )
        
        console.print(panel)
        
        # 显示工具列表
        tools_info = tool_manager.list_tools()
        if tools_info["tools"]:
            console.print("\n🛠️  [bold cyan]可用工具:[/bold cyan]")
            for i, tool in enumerate(tools_info["tools"], 1):
                console.print(f"  {i}. [yellow]{tool['name']}[/yellow] - {tool['description']}")
        
        console.print(f"\n💡 [dim]使用 Ctrl+C 停止服务器[/dim]")
        console.print()
    
    async def start(self):
        """启动服务器"""
        self.display_welcome()
        
        if config.protocol == "stdio":
            await self.run_stdio()
        else:
            logger.error(f"不支持的协议: {config.protocol}")
            raise ValueError(f"不支持的协议: {config.protocol}")

async def main():
    """主函数"""
    try:
        server = MCPServer()
        await server.start()
    except KeyboardInterrupt:
        logger.info("服务器已停止")
    except Exception as e:
        logger.error(f"服务器启动失败: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(main())

🧪 测试我们的服务器

📝 创建测试脚本

创建 test_server_manual.py

python
"""
test_server_manual.py - 手动测试MCP服务器
"""

import asyncio
import json
import subprocess
import sys
from typing import Dict, Any

class MCPTester:
    """MCP服务器测试器"""
    
    def __init__(self):
        self.server_process = None
    
    async def send_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
        """发送请求到MCP服务器"""
        if not self.server_process:
            raise RuntimeError("服务器进程未启动")
        
        # 发送请求
        request_json = json.dumps(request) + "\n"
        self.server_process.stdin.write(request_json.encode())
        await self.server_process.stdin.drain()
        
        # 读取响应
        response_line = await self.server_process.stdout.readline()
        response = json.loads(response_line.decode().strip())
        
        return response
    
    async def start_server(self):
        """启动服务器进程"""
        self.server_process = await asyncio.create_subprocess_exec(
            sys.executable, "server.py",
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        print("🚀 MCP服务器已启动")
    
    async def test_initialize(self):
        """测试初始化"""
        print("\n🔍 测试初始化...")
        
        request = {
            "jsonrpc": "2.0",
            "id": 1,
            "method": "initialize",
            "params": {
                "protocolVersion": "2024-11-05",
                "capabilities": {"tools": {}},
                "clientInfo": {
                    "name": "MCP测试客户端",
                    "version": "1.0.0"
                }
            }
        }
        
        response = await self.send_request(request)
        print(f"响应: {json.dumps(response, indent=2, ensure_ascii=False)}")
        
        assert response["jsonrpc"] == "2.0"
        assert "result" in response
        print("✅ 初始化测试通过")
    
    async def test_list_tools(self):
        """测试工具列表"""
        print("\n🔍 测试工具列表...")
        
        request = {
            "jsonrpc": "2.0",
            "id": 2,
            "method": "tools/list",
            "params": {}
        }
        
        response = await self.send_request(request)
        print(f"响应: {json.dumps(response, indent=2, ensure_ascii=False)}")
        
        assert response["jsonrpc"] == "2.0"
        assert "result" in response
        assert "tools" in response["result"]
        print("✅ 工具列表测试通过")
    
    async def test_random_number(self):
        """测试随机数生成"""
        print("\n🔍 测试随机数生成...")
        
        request = {
            "jsonrpc": "2.0",
            "id": 3,
            "method": "tools/call",
            "params": {
                "name": "random_number",
                "arguments": {
                    "min": 1,
                    "max": 10
                }
            }
        }
        
        response = await self.send_request(request)
        print(f"响应: {json.dumps(response, indent=2, ensure_ascii=False)}")
        
        assert response["jsonrpc"] == "2.0"
        assert "result" in response
        print("✅ 随机数生成测试通过")
    
    async def test_current_time(self):
        """测试时间获取"""
        print("\n🔍 测试时间获取...")
        
        request = {
            "jsonrpc": "2.0",
            "id": 4,
            "method": "tools/call",
            "params": {
                "name": "current_time",
                "arguments": {
                    "format": "%Y年%m月%d日 %H:%M:%S"
                }
            }
        }
        
        response = await self.send_request(request)
        print(f"响应: {json.dumps(response, indent=2, ensure_ascii=False)}")
        
        assert response["jsonrpc"] == "2.0"
        assert "result" in response
        print("✅ 时间获取测试通过")
    
    async def test_calculator(self):
        """测试计算器"""
        print("\n🔍 测试计算器...")
        
        # 测试加法
        request = {
            "jsonrpc": "2.0",
            "id": 5,
            "method": "tools/call",
            "params": {
                "name": "calculator",
                "arguments": {
                    "operation": "add",
                    "a": 15,
                    "b": 25
                }
            }
        }
        
        response = await self.send_request(request)
        print(f"加法响应: {json.dumps(response, indent=2, ensure_ascii=False)}")
        
        assert response["jsonrpc"] == "2.0"
        assert "result" in response
        print("✅ 计算器测试通过")
    
    async def run_all_tests(self):
        """运行所有测试"""
        try:
            await self.start_server()
            
            # 等待服务器启动
            await asyncio.sleep(1)
            
            await self.test_initialize()
            await self.test_list_tools()
            await self.test_random_number()
            await self.test_current_time()
            await self.test_calculator()
            
            print("\n🎉 所有测试都通过了!")
            
        except Exception as e:
            print(f"\n❌ 测试失败: {e}")
            raise
        finally:
            if self.server_process:
                self.server_process.terminate()
                await self.server_process.wait()
                print("🛑 服务器已停止")

async def main():
    """主函数"""
    tester = MCPTester()
    await tester.run_all_tests()

if __name__ == "__main__":
    asyncio.run(main())

🎯 运行你的第一个MCP服务器

📋 安装依赖

bash
# 创建项目目录
mkdir my-first-mcp-server
cd my-first-mcp-server

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Linux/macOS
# 或者 venv\Scripts\activate  # Windows

# 安装依赖
pip install -r requirements.txt

🚀 启动服务器

bash
# 直接运行
python server.py

# 或者使用环境变量配置
MCP_DEBUG=true MCP_LOG_LEVEL=DEBUG python server.py

🧪 运行测试

bash
# 在另一个终端中运行测试
python test_server_manual.py

🎊 恭喜你!

如果你看到了 "🎉 所有测试都通过了!" 的消息,那么恭喜你!你已经成功创建了你的第一个MCP服务器!

这个服务器虽然简单,但它包含了MCP的所有核心概念:

  • 协议处理:正确处理JSON-RPC 2.0消息
  • 工具管理:管理和执行多个工具
  • 错误处理:优雅地处理各种错误情况
  • 日志系统:完整的日志记录和调试信息
  • 配置管理:灵活的配置系统

🤔 接下来学什么?

在下一小节中,我们将学习如何:

  • 🔧 添加更多复杂的工具
  • 🌐 支持HTTP和WebSocket协议
  • 📊 添加监控和性能指标
  • 🛡️ 实现安全认证机制

👉 下一小节:4.2 添加更多工具