Skip to content

4.2 添加更多工具

🎯 学习目标:扩展MCP服务器功能,添加文件操作、网络请求、数据处理等实用工具
⏱️ 预计时间:45分钟
📊 难度等级:⭐⭐⭐

🚀 从简单到复杂的进化

还记得我们在上一节创建的简单工具吗?现在是时候让你的MCP服务器变得更加强大了!我们将添加一些真正实用的工具,让你的AI助手能够:

  • 📁 操作文件系统:读写文件、创建目录
  • 🌐 发送网络请求:获取API数据、下载内容
  • 📊 处理数据:JSON解析、CSV处理、图像分析
  • 🔍 搜索和查询:文本搜索、数据过滤

🛠️ 工具设计原则

在添加新工具之前,让我们了解一些重要的设计原则:

🎯 设计原则

📁 文件操作工具集

🗂️ 扩展工具配置

首先,让我们扩展 config.py 文件:

python
# 在 config.py 中添加新的工具配置

# 文件操作工具配置
FILE_TOOL_CONFIGS = {
    "read_file": {
        "name": "read_file",
        "description": "读取文件内容",
        "parameters": {
            "type": "object",
            "properties": {
                "file_path": {
                    "type": "string",
                    "description": "文件路径"
                },
                "encoding": {
                    "type": "string", 
                    "description": "文件编码",
                    "default": "utf-8"
                },
                "max_size": {
                    "type": "integer",
                    "description": "最大文件大小(字节)",
                    "default": 1048576  # 1MB
                }
            },
            "required": ["file_path"]
        }
    },
    "write_file": {
        "name": "write_file",
        "description": "写入文件内容",
        "parameters": {
            "type": "object",
            "properties": {
                "file_path": {
                    "type": "string",
                    "description": "文件路径"
                },
                "content": {
                    "type": "string",
                    "description": "要写入的内容"
                },
                "encoding": {
                    "type": "string",
                    "description": "文件编码", 
                    "default": "utf-8"
                },
                "mode": {
                    "type": "string",
                    "enum": ["write", "append"],
                    "description": "写入模式",
                    "default": "write"
                }
            },
            "required": ["file_path", "content"]
        }
    },
    "list_directory": {
        "name": "list_directory",
        "description": "列出目录内容",
        "parameters": {
            "type": "object",
            "properties": {
                "directory_path": {
                    "type": "string",
                    "description": "目录路径"
                },
                "include_hidden": {
                    "type": "boolean",
                    "description": "是否包含隐藏文件",
                    "default": False
                },
                "recursive": {
                    "type": "boolean",
                    "description": "是否递归列出子目录",
                    "default": False
                },
                "max_depth": {
                    "type": "integer",
                    "description": "最大递归深度",
                    "default": 3
                }
            },
            "required": ["directory_path"]
        }
    }
}

# 网络请求工具配置
NETWORK_TOOL_CONFIGS = {
    "http_request": {
        "name": "http_request",
        "description": "发送HTTP请求",
        "parameters": {
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "请求URL"
                },
                "method": {
                    "type": "string",
                    "enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
                    "description": "HTTP方法",
                    "default": "GET"
                },
                "headers": {
                    "type": "object",
                    "description": "请求头",
                    "default": {}
                },
                "params": {
                    "type": "object", 
                    "description": "URL参数",
                    "default": {}
                },
                "data": {
                    "type": "object",
                    "description": "请求体数据",
                    "default": {}
                },
                "timeout": {
                    "type": "integer",
                    "description": "超时时间(秒)",
                    "default": 30
                }
            },
            "required": ["url"]
        }
    },
    "download_file": {
        "name": "download_file",
        "description": "下载文件",
        "parameters": {
            "type": "object",
            "properties": {
                "url": {
                    "type": "string",
                    "description": "下载URL"
                },
                "save_path": {
                    "type": "string",
                    "description": "保存路径"
                },
                "chunk_size": {
                    "type": "integer",
                    "description": "下载块大小",
                    "default": 8192
                },
                "max_size": {
                    "type": "integer",
                    "description": "最大文件大小(字节)",
                    "default": 104857600  # 100MB
                }
            },
            "required": ["url", "save_path"]
        }
    }
}

# 数据处理工具配置
DATA_TOOL_CONFIGS = {
    "json_processor": {
        "name": "json_processor",
        "description": "处理JSON数据",
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["parse", "format", "validate", "extract", "merge"],
                    "description": "操作类型"
                },
                "data": {
                    "type": "string",
                    "description": "JSON字符串或文件路径"
                },
                "path": {
                    "type": "string",
                    "description": "JSON路径(用于extract操作)",
                    "default": ""
                },
                "indent": {
                    "type": "integer",
                    "description": "格式化缩进",
                    "default": 2
                }
            },
            "required": ["operation", "data"]
        }
    },
    "text_processor": {
        "name": "text_processor", 
        "description": "处理文本数据",
        "parameters": {
            "type": "object",
            "properties": {
                "operation": {
                    "type": "string",
                    "enum": ["count", "search", "replace", "split", "join", "clean"],
                    "description": "操作类型"
                },
                "text": {
                    "type": "string",
                    "description": "要处理的文本"
                },
                "pattern": {
                    "type": "string",
                    "description": "搜索/替换模式",
                    "default": ""
                },
                "replacement": {
                    "type": "string", 
                    "description": "替换文本",
                    "default": ""
                },
                "separator": {
                    "type": "string",
                    "description": "分隔符",
                    "default": " "
                }
            },
            "required": ["operation", "text"]
        }
    }
}

# 更新主配置
TOOL_CONFIGS.update(FILE_TOOL_CONFIGS)
TOOL_CONFIGS.update(NETWORK_TOOL_CONFIGS) 
TOOL_CONFIGS.update(DATA_TOOL_CONFIGS)

# 更新默认启用工具列表
config.enabled_tools.extend([
    "read_file", "write_file", "list_directory",
    "http_request", "download_file",
    "json_processor", "text_processor"
])

📁 文件操作工具实现

创建 file_tools.py

python
"""
file_tools.py - 文件操作工具实现
"""

import os
import json
from pathlib import Path
from typing import Dict, Any, List
from loguru import logger
import aiofiles
import aiofiles.os

from tools import MCPTool

class ReadFileTool(MCPTool):
    """文件读取工具"""
    
    def __init__(self):
        super().__init__(
            name="read_file",
            description="读取文件内容"
        )
        self.allowed_extensions = {'.txt', '.json', '.csv', '.xml', '.yaml', '.yml', '.md', '.py', '.js', '.html', '.css'}
        self.max_file_size = 10 * 1024 * 1024  # 10MB
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """读取文件"""
        try:
            file_path = params.get("file_path")
            encoding = params.get("encoding", "utf-8")
            max_size = params.get("max_size", self.max_file_size)
            
            # 安全性检查
            path_obj = Path(file_path).resolve()
            
            # 检查文件是否存在
            if not await aiofiles.os.path.exists(path_obj):
                raise FileNotFoundError(f"文件不存在: {file_path}")
            
            # 检查是否为文件
            if not await aiofiles.os.path.isfile(path_obj):
                raise ValueError(f"路径不是文件: {file_path}")
            
            # 检查文件扩展名
            if path_obj.suffix.lower() not in self.allowed_extensions:
                raise ValueError(f"不支持的文件类型: {path_obj.suffix}")
            
            # 检查文件大小
            file_stat = await aiofiles.os.stat(path_obj)
            if file_stat.st_size > max_size:
                raise ValueError(f"文件过大: {file_stat.st_size} bytes (最大: {max_size} bytes)")
            
            # 读取文件内容
            async with aiofiles.open(path_obj, 'r', encoding=encoding) as f:
                content = await f.read()
            
            logger.info(f"成功读取文件: {file_path} ({len(content)} 字符)")
            
            return {
                "success": True,
                "result": content,
                "message": f"成功读取文件: {file_path}",
                "metadata": {
                    "file_path": str(path_obj),
                    "file_size": file_stat.st_size,
                    "encoding": encoding,
                    "char_count": len(content),
                    "line_count": content.count('\n') + 1,
                    "file_extension": path_obj.suffix
                }
            }
            
        except Exception as e:
            logger.error(f"文件读取失败 {file_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"文件读取失败: {e}"
            }

class WriteFileTool(MCPTool):
    """文件写入工具"""
    
    def __init__(self):
        super().__init__(
            name="write_file",
            description="写入文件内容"
        )
        self.safe_directories = {'./data', './output', './temp'}
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """写入文件"""
        try:
            file_path = params.get("file_path")
            content = params.get("content")
            encoding = params.get("encoding", "utf-8")
            mode = params.get("mode", "write")
            
            path_obj = Path(file_path).resolve()
            
            # 安全性检查:确保写入到允许的目录
            is_safe = any(
                str(path_obj).startswith(str(Path(safe_dir).resolve()))
                for safe_dir in self.safe_directories
            )
            
            if not is_safe:
                # 创建安全目录如果不存在
                safe_path = Path('./output') / Path(file_path).name
                path_obj = safe_path.resolve()
                logger.warning(f"重定向到安全路径: {path_obj}")
            
            # 确保目录存在
            path_obj.parent.mkdir(parents=True, exist_ok=True)
            
            # 确定写入模式
            file_mode = 'a' if mode == 'append' else 'w'
            
            # 写入文件
            async with aiofiles.open(path_obj, file_mode, encoding=encoding) as f:
                await f.write(content)
            
            # 获取文件信息
            file_stat = await aiofiles.os.stat(path_obj)
            
            logger.info(f"成功写入文件: {path_obj} ({len(content)} 字符, 模式: {mode})")
            
            return {
                "success": True,
                "result": str(path_obj),
                "message": f"成功写入文件: {path_obj.name}",
                "metadata": {
                    "file_path": str(path_obj),
                    "file_size": file_stat.st_size,
                    "encoding": encoding,
                    "mode": mode,
                    "content_length": len(content),
                    "lines_written": content.count('\n') + 1
                }
            }
            
        except Exception as e:
            logger.error(f"文件写入失败 {file_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"文件写入失败: {e}"
            }

class ListDirectoryTool(MCPTool):
    """目录列表工具"""
    
    def __init__(self):
        super().__init__(
            name="list_directory",
            description="列出目录内容"
        )
    
    async def _list_directory_recursive(self, path: Path, max_depth: int, current_depth: int = 0, include_hidden: bool = False) -> List[Dict[str, Any]]:
        """递归列出目录内容"""
        items = []
        
        if current_depth >= max_depth:
            return items
        
        try:
            for item in path.iterdir():
                # 跳过隐藏文件
                if not include_hidden and item.name.startswith('.'):
                    continue
                
                try:
                    stat_info = await aiofiles.os.stat(item)
                    is_dir = await aiofiles.os.path.isdir(item)
                    
                    item_info = {
                        "name": item.name,
                        "path": str(item),
                        "type": "directory" if is_dir else "file",
                        "size": stat_info.st_size if not is_dir else None,
                        "modified_time": stat_info.st_mtime,
                        "depth": current_depth
                    }
                    
                    if not is_dir:
                        item_info["extension"] = item.suffix.lower()
                    
                    items.append(item_info)
                    
                    # 递归处理子目录
                    if is_dir and current_depth < max_depth - 1:
                        sub_items = await self._list_directory_recursive(
                            item, max_depth, current_depth + 1, include_hidden
                        )
                        items.extend(sub_items)
                        
                except (PermissionError, OSError) as e:
                    logger.warning(f"无法访问 {item}: {e}")
                    continue
                    
        except (PermissionError, OSError) as e:
            logger.error(f"无法列出目录 {path}: {e}")
            
        return items
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """列出目录内容"""
        try:
            directory_path = params.get("directory_path")
            include_hidden = params.get("include_hidden", False)
            recursive = params.get("recursive", False)
            max_depth = params.get("max_depth", 3)
            
            path_obj = Path(directory_path).resolve()
            
            # 检查目录是否存在
            if not await aiofiles.os.path.exists(path_obj):
                raise FileNotFoundError(f"目录不存在: {directory_path}")
            
            # 检查是否为目录
            if not await aiofiles.os.path.isdir(path_obj):
                raise ValueError(f"路径不是目录: {directory_path}")
            
            # 列出目录内容
            if recursive:
                items = await self._list_directory_recursive(
                    path_obj, max_depth, 0, include_hidden
                )
            else:
                items = []
                for item in path_obj.iterdir():
                    if not include_hidden and item.name.startswith('.'):
                        continue
                    
                    try:
                        stat_info = await aiofiles.os.stat(item)
                        is_dir = await aiofiles.os.path.isdir(item)
                        
                        item_info = {
                            "name": item.name,
                            "path": str(item),
                            "type": "directory" if is_dir else "file",
                            "size": stat_info.st_size if not is_dir else None,
                            "modified_time": stat_info.st_mtime,
                            "depth": 0
                        }
                        
                        if not is_dir:
                            item_info["extension"] = item.suffix.lower()
                        
                        items.append(item_info)
                        
                    except (PermissionError, OSError) as e:
                        logger.warning(f"无法访问 {item}: {e}")
                        continue
            
            # 统计信息
            file_count = sum(1 for item in items if item["type"] == "file")
            dir_count = sum(1 for item in items if item["type"] == "directory")
            total_size = sum(item["size"] or 0 for item in items if item["type"] == "file")
            
            logger.info(f"列出目录: {directory_path} ({len(items)} 项)")
            
            return {
                "success": True,
                "result": items,
                "message": f"成功列出目录: {path_obj.name}",
                "metadata": {
                    "directory_path": str(path_obj),
                    "total_items": len(items),
                    "file_count": file_count,
                    "directory_count": dir_count,
                    "total_size": total_size,
                    "recursive": recursive,
                    "max_depth": max_depth if recursive else 1,
                    "include_hidden": include_hidden
                }
            }
            
        except Exception as e:
            logger.error(f"目录列表失败 {directory_path}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"目录列表失败: {e}"
            }

🌐 网络请求工具实现

创建 network_tools.py

python
"""
network_tools.py - 网络请求工具实现
"""

import aiohttp
import aiofiles
from pathlib import Path
from typing import Dict, Any
from loguru import logger
from urllib.parse import urlparse

from tools import MCPTool

class HttpRequestTool(MCPTool):
    """HTTP请求工具"""
    
    def __init__(self):
        super().__init__(
            name="http_request",
            description="发送HTTP请求"
        )
        self.allowed_schemes = {'http', 'https'}
        self.max_response_size = 10 * 1024 * 1024  # 10MB
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """发送HTTP请求"""
        try:
            url = params.get("url")
            method = params.get("method", "GET").upper()
            headers = params.get("headers", {})
            url_params = params.get("params", {})
            data = params.get("data", {})
            timeout = params.get("timeout", 30)
            
            # URL验证
            parsed_url = urlparse(url)
            if parsed_url.scheme not in self.allowed_schemes:
                raise ValueError(f"不支持的协议: {parsed_url.scheme}")
            
            # 设置默认headers
            default_headers = {
                'User-Agent': 'MCP-Server/1.0.0',
                'Accept': '*/*'
            }
            request_headers = {**default_headers, **headers}
            
            # 创建会话
            timeout_obj = aiohttp.ClientTimeout(total=timeout)
            
            async with aiohttp.ClientSession(timeout=timeout_obj) as session:
                # 发送请求
                async with session.request(
                    method=method,
                    url=url,
                    headers=request_headers,
                    params=url_params,
                    json=data if data else None
                ) as response:
                    
                    # 检查响应大小
                    content_length = response.headers.get('Content-Length')
                    if content_length and int(content_length) > self.max_response_size:
                        raise ValueError(f"响应过大: {content_length} bytes")
                    
                    # 读取响应
                    response_text = await response.text()
                    
                    # 尝试解析JSON
                    response_data = None
                    try:
                        if response.content_type == 'application/json':
                            response_data = await response.json()
                    except:
                        pass
                    
                    logger.info(f"HTTP请求成功: {method} {url} -> {response.status}")
                    
                    return {
                        "success": True,
                        "result": {
                            "status_code": response.status,
                            "headers": dict(response.headers),
                            "text": response_text,
                            "json": response_data,
                            "url": str(response.url)
                        },
                        "message": f"HTTP请求成功: {method} {url}",
                        "metadata": {
                            "method": method,
                            "url": url,
                            "status_code": response.status,
                            "content_type": response.content_type,
                            "content_length": len(response_text),
                            "response_time": "N/A"  # aiohttp不直接提供响应时间
                        }
                    }
                    
        except Exception as e:
            logger.error(f"HTTP请求失败 {method} {url}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"HTTP请求失败: {e}"
            }

class DownloadFileTool(MCPTool):
    """文件下载工具"""
    
    def __init__(self):
        super().__init__(
            name="download_file",
            description="下载文件"
        )
        self.allowed_schemes = {'http', 'https'}
        self.safe_directories = {'./downloads', './temp', './output'}
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """下载文件"""
        try:
            url = params.get("url")
            save_path = params.get("save_path")
            chunk_size = params.get("chunk_size", 8192)
            max_size = params.get("max_size", 100 * 1024 * 1024)  # 100MB
            
            # URL验证
            parsed_url = urlparse(url)
            if parsed_url.scheme not in self.allowed_schemes:
                raise ValueError(f"不支持的协议: {parsed_url.scheme}")
            
            # 路径安全检查
            path_obj = Path(save_path).resolve()
            is_safe = any(
                str(path_obj).startswith(str(Path(safe_dir).resolve()))
                for safe_dir in self.safe_directories
            )
            
            if not is_safe:
                # 重定向到安全目录
                safe_path = Path('./downloads') / Path(save_path).name
                path_obj = safe_path.resolve()
                logger.warning(f"重定向到安全路径: {path_obj}")
            
            # 确保目录存在
            path_obj.parent.mkdir(parents=True, exist_ok=True)
            
            # 开始下载
            downloaded_size = 0
            
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    response.raise_for_status()
                    
                    # 检查文件大小
                    content_length = response.headers.get('Content-Length')
                    if content_length and int(content_length) > max_size:
                        raise ValueError(f"文件过大: {content_length} bytes")
                    
                    # 保存文件
                    async with aiofiles.open(path_obj, 'wb') as f:
                        async for chunk in response.content.iter_chunked(chunk_size):
                            await f.write(chunk)
                            downloaded_size += len(chunk)
                            
                            # 检查下载大小
                            if downloaded_size > max_size:
                                raise ValueError(f"下载大小超限: {downloaded_size} bytes")
            
            # 获取文件信息
            file_stat = await aiofiles.os.stat(path_obj)
            
            logger.info(f"文件下载成功: {url} -> {path_obj} ({downloaded_size} bytes)")
            
            return {
                "success": True,
                "result": str(path_obj),
                "message": f"文件下载成功: {path_obj.name}",
                "metadata": {
                    "url": url,
                    "save_path": str(path_obj),
                    "file_size": file_stat.st_size,
                    "downloaded_size": downloaded_size,
                    "content_type": response.headers.get('Content-Type', 'unknown'),
                    "status_code": response.status
                }
            }
            
        except Exception as e:
            logger.error(f"文件下载失败 {url}: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"文件下载失败: {e}"
            }

📊 数据处理工具实现

创建 data_tools.py

python
"""
data_tools.py - 数据处理工具实现
"""

import json
import re
from typing import Dict, Any, List, Union
from loguru import logger
from pathlib import Path
import aiofiles

from tools import MCPTool

class JsonProcessorTool(MCPTool):
    """JSON处理工具"""
    
    def __init__(self):
        super().__init__(
            name="json_processor",
            description="处理JSON数据"
        )
    
    async def _extract_json_path(self, data: Any, path: str) -> Any:
        """提取JSON路径对应的值"""
        if not path:
            return data
        
        parts = path.split('.')
        current = data
        
        for part in parts:
            if isinstance(current, dict):
                if part in current:
                    current = current[part]
                else:
                    raise KeyError(f"路径不存在: {part}")
            elif isinstance(current, list):
                try:
                    index = int(part)
                    current = current[index]
                except (ValueError, IndexError):
                    raise KeyError(f"无效的数组索引: {part}")
            else:
                raise TypeError(f"无法访问路径: {part}")
        
        return current
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """处理JSON数据"""
        try:
            operation = params.get("operation")
            data_input = params.get("data")
            path = params.get("path", "")
            indent = params.get("indent", 2)
            
            # 读取数据
            if data_input.startswith('{') or data_input.startswith('['):
                # 直接JSON字符串
                json_data = json.loads(data_input)
            else:
                # 文件路径
                file_path = Path(data_input)
                if not file_path.exists():
                    raise FileNotFoundError(f"文件不存在: {data_input}")
                
                async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
                    content = await f.read()
                    json_data = json.loads(content)
            
            result = None
            message = ""
            
            if operation == "parse":
                result = json_data
                message = "JSON解析成功"
                
            elif operation == "format":
                result = json.dumps(json_data, indent=indent, ensure_ascii=False)
                message = f"JSON格式化成功 (缩进: {indent})"
                
            elif operation == "validate":
                # 数据已经解析成功,说明JSON有效
                result = {"valid": True, "type": type(json_data).__name__}
                message = "JSON验证通过"
                
            elif operation == "extract":
                if not path:
                    raise ValueError("extract操作需要指定path参数")
                
                extracted = await self._extract_json_path(json_data, path)
                result = extracted
                message = f"成功提取路径: {path}"
                
            elif operation == "merge":
                # 如果数据是数组,则合并所有对象
                if isinstance(json_data, list):
                    merged = {}
                    for item in json_data:
                        if isinstance(item, dict):
                            merged.update(item)
                    result = merged
                    message = f"合并了 {len(json_data)} 个对象"
                else:
                    result = json_data
                    message = "数据不是数组,无需合并"
            
            else:
                raise ValueError(f"不支持的操作: {operation}")
            
            logger.info(f"JSON处理成功: {operation}")
            
            return {
                "success": True,
                "result": result,
                "message": message,
                "metadata": {
                    "operation": operation,
                    "data_type": type(json_data).__name__,
                    "data_size": len(str(json_data)),
                    "path": path if path else None,
                    "indent": indent if operation == "format" else None
                }
            }
            
        except json.JSONDecodeError as e:
            logger.error(f"JSON解析错误: {e}")
            return {
                "success": False,
                "error": f"JSON格式错误: {e}",
                "message": "JSON数据格式无效"
            }
        except Exception as e:
            logger.error(f"JSON处理失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"JSON处理失败: {e}"
            }

class TextProcessorTool(MCPTool):
    """文本处理工具"""
    
    def __init__(self):
        super().__init__(
            name="text_processor",
            description="处理文本数据"
        )
    
    async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """处理文本数据"""
        try:
            operation = params.get("operation")
            text = params.get("text")
            pattern = params.get("pattern", "")
            replacement = params.get("replacement", "")
            separator = params.get("separator", " ")
            
            result = None
            message = ""
            
            if operation == "count":
                result = {
                    "characters": len(text),
                    "words": len(text.split()),
                    "lines": text.count('\n') + 1,
                    "paragraphs": len([p for p in text.split('\n\n') if p.strip()])
                }
                message = "文本统计完成"
                
            elif operation == "search":
                if not pattern:
                    raise ValueError("search操作需要指定pattern参数")
                
                # 支持正则表达式搜索
                matches = re.findall(pattern, text, re.IGNORECASE)
                positions = []
                for match in re.finditer(pattern, text, re.IGNORECASE):
                    positions.append({
                        "match": match.group(),
                        "start": match.start(),
                        "end": match.end()
                    })
                
                result = {
                    "matches": matches,
                    "positions": positions,
                    "count": len(matches)
                }
                message = f"找到 {len(matches)} 个匹配项"
                
            elif operation == "replace":
                if not pattern:
                    raise ValueError("replace操作需要指定pattern参数")
                
                # 支持正则表达式替换
                new_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
                replaced_count = len(re.findall(pattern, text, re.IGNORECASE))
                
                result = {
                    "text": new_text,
                    "replaced_count": replaced_count
                }
                message = f"替换了 {replaced_count} 处匹配项"
                
            elif operation == "split":
                if separator:
                    parts = text.split(separator)
                else:
                    parts = text.split()
                
                result = {
                    "parts": parts,
                    "count": len(parts)
                }
                message = f"分割为 {len(parts)} 部分"
                
            elif operation == "join":
                # 假设text是以换行符分隔的多行文本
                lines = text.split('\n')
                joined_text = separator.join(lines)
                
                result = {
                    "text": joined_text,
                    "line_count": len(lines)
                }
                message = f"连接了 {len(lines)} 行文本"
                
            elif operation == "clean":
                # 清理文本:去除多余空白、标准化换行等
                cleaned = re.sub(r'\s+', ' ', text.strip())
                cleaned = re.sub(r'\n\s*\n', '\n\n', cleaned)  # 标准化段落分隔
                
                result = {
                    "text": cleaned,
                    "removed_chars": len(text) - len(cleaned)
                }
                message = f"清理文本完成,移除了 {len(text) - len(cleaned)} 个多余字符"
                
            else:
                raise ValueError(f"不支持的操作: {operation}")
            
            logger.info(f"文本处理成功: {operation}")
            
            return {
                "success": True,
                "result": result,
                "message": message,
                "metadata": {
                    "operation": operation,
                    "original_length": len(text),
                    "pattern": pattern if pattern else None,
                    "separator": separator if separator != " " else None
                }
            }
            
        except Exception as e:
            logger.error(f"文本处理失败: {e}")
            return {
                "success": False,
                "error": str(e),
                "message": f"文本处理失败: {e}"
            }

🔄 更新工具管理器

现在我们需要更新 tools.py 中的工具管理器来注册新工具:

python
# 在 tools.py 文件末尾添加

from file_tools import ReadFileTool, WriteFileTool, ListDirectoryTool
from network_tools import HttpRequestTool, DownloadFileTool
from data_tools import JsonProcessorTool, TextProcessorTool

class ToolManager:
    """工具管理器 - 更新版本"""
    
    def __init__(self):
        self.tools: Dict[str, MCPTool] = {}
        self._register_all_tools()
    
    def _register_all_tools(self):
        """注册所有工具"""
        # 基础工具
        basic_tools = [
            RandomNumberTool(),
            CurrentTimeTool(),
            CalculatorTool()
        ]
        
        # 文件操作工具
        file_tools = [
            ReadFileTool(),
            WriteFileTool(),
            ListDirectoryTool()
        ]
        
        # 网络工具
        network_tools = [
            HttpRequestTool(),
            DownloadFileTool()
        ]
        
        # 数据处理工具
        data_tools = [
            JsonProcessorTool(),
            TextProcessorTool()
        ]
        
        # 注册所有工具
        all_tools = basic_tools + file_tools + network_tools + data_tools
        
        for tool in all_tools:
            if tool.name in config.enabled_tools:
                self.register_tool(tool)
                logger.info(f"注册工具: {tool.name}")
    
    # ... 其他方法保持不变

🧪 测试新工具

创建 test_advanced_tools.py

python
"""
test_advanced_tools.py - 测试高级工具
"""

import asyncio
import json
import tempfile
from pathlib import Path
from file_tools import ReadFileTool, WriteFileTool, ListDirectoryTool
from network_tools import HttpRequestTool
from data_tools import JsonProcessorTool, TextProcessorTool

async def test_file_operations():
    """测试文件操作工具"""
    print("🧪 测试文件操作工具")
    
    # 创建临时目录
    with tempfile.TemporaryDirectory() as temp_dir:
        temp_path = Path(temp_dir)
        
        # 测试写入文件
        write_tool = WriteFileTool()
        write_result = await write_tool.execute({
            "file_path": str(temp_path / "test.txt"),
            "content": "Hello, MCP World!\n这是一个测试文件。",
            "encoding": "utf-8"
        })
        
        print(f"写入文件结果: {write_result['success']}")
        
        # 测试读取文件
        read_tool = ReadFileTool()
        read_result = await read_tool.execute({
            "file_path": str(temp_path / "test.txt"),
            "encoding": "utf-8"
        })
        
        print(f"读取文件结果: {read_result['success']}")
        print(f"文件内容: {read_result.get('result', '')[:50]}...")
        
        # 测试目录列表
        list_tool = ListDirectoryTool()
        list_result = await list_tool.execute({
            "directory_path": str(temp_path),
            "include_hidden": False
        })
        
        print(f"目录列表结果: {list_result['success']}")
        print(f"文件数量: {list_result.get('metadata', {}).get('file_count', 0)}")

async def test_network_tools():
    """测试网络工具"""
    print("\n🧪 测试网络工具")
    
    # 测试HTTP请求
    http_tool = HttpRequestTool()
    http_result = await http_tool.execute({
        "url": "https://httpbin.org/json",
        "method": "GET"
    })
    
    print(f"HTTP请求结果: {http_result['success']}")
    if http_result['success']:
        status_code = http_result['result']['status_code']
        print(f"响应状态码: {status_code}")

async def test_data_tools():
    """测试数据处理工具"""
    print("\n🧪 测试数据处理工具")
    
    # 测试JSON处理
    json_tool = JsonProcessorTool()
    
    test_json = '{"name": "张三", "age": 30, "skills": ["Python", "JavaScript"]}'
    
    # 解析JSON
    parse_result = await json_tool.execute({
        "operation": "parse",
        "data": test_json
    })
    
    print(f"JSON解析结果: {parse_result['success']}")
    
    # 格式化JSON
    format_result = await json_tool.execute({
        "operation": "format",
        "data": test_json,
        "indent": 4
    })
    
    if format_result['success']:
        print("格式化后的JSON:")
        print(format_result['result'])
    
    # 测试文本处理
    text_tool = TextProcessorTool()
    
    test_text = "Hello World! This is a test text. Hello again!"
    
    # 文本统计
    count_result = await text_tool.execute({
        "operation": "count",
        "text": test_text
    })
    
    print(f"\n文本统计结果: {count_result['success']}")
    if count_result['success']:
        stats = count_result['result']
        print(f"字符数: {stats['characters']}, 单词数: {stats['words']}")
    
    # 文本搜索
    search_result = await text_tool.execute({
        "operation": "search",
        "text": test_text,
        "pattern": "Hello"
    })
    
    print(f"文本搜索结果: {search_result['success']}")
    if search_result['success']:
        matches = search_result['result']['count']
        print(f"找到 {matches} 个匹配项")

async def main():
    """主测试函数"""
    print("🚀 开始测试高级MCP工具")
    print("=" * 50)
    
    await test_file_operations()
    await test_network_tools()
    await test_data_tools()
    
    print("\n" + "=" * 50)
    print("🎉 所有工具测试完成!")

if __name__ == "__main__":
    asyncio.run(main())

🎯 本节小结

通过这一小节,你已经成功地:

扩展了工具类型:文件操作、网络请求、数据处理
实现了安全机制:路径验证、大小限制、权限控制
增强了错误处理:详细的错误信息和异常安全
优化了代码结构:模块化设计,易于维护和扩展
提供了丰富功能:从简单工具到复杂的数据处理

你的MCP服务器现在具备了以下强大功能:

🛠️ 基础工具:随机数、时间、计算器
📁 文件操作:读写文件、目录列表
🌐 网络功能:HTTP请求、文件下载
📊 数据处理:JSON处理、文本分析

🤔 思考题

  1. 如何为工具添加权限控制系统?
  2. 怎样实现工具的热插拔功能?
  3. 如何优化大文件处理的性能?
  4. 怎样添加工具执行的监控和统计?

准备好学习如何测试和调试你的MCP服务器了吗?


👉 下一小节:4.3 测试和调试