4.2 添加更多工具
🎯 学习目标:扩展MCP服务器功能,添加文件操作、网络请求、数据处理等实用工具
⏱️ 预计时间:45分钟
📊 难度等级:⭐⭐⭐
🚀 从简单到复杂的进化
还记得我们在上一节创建的简单工具吗?现在是时候让你的MCP服务器变得更加强大了!我们将添加一些真正实用的工具,让你的AI助手能够:
- 📁 操作文件系统:读写文件、创建目录
- 🌐 发送网络请求:获取API数据、下载内容
- 📊 处理数据:JSON解析、CSV处理、图像分析
- 🔍 搜索和查询:文本搜索、数据过滤
🛠️ 工具设计原则
在添加新工具之前,让我们了解一些重要的设计原则:
🎯 设计原则
📁 文件操作工具集
🗂️ 扩展工具配置
首先,让我们扩展 config.py
文件:
python
# 在 config.py 中添加新的工具配置
# 文件操作工具配置
FILE_TOOL_CONFIGS = {
"read_file": {
"name": "read_file",
"description": "读取文件内容",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "文件路径"
},
"encoding": {
"type": "string",
"description": "文件编码",
"default": "utf-8"
},
"max_size": {
"type": "integer",
"description": "最大文件大小(字节)",
"default": 1048576 # 1MB
}
},
"required": ["file_path"]
}
},
"write_file": {
"name": "write_file",
"description": "写入文件内容",
"parameters": {
"type": "object",
"properties": {
"file_path": {
"type": "string",
"description": "文件路径"
},
"content": {
"type": "string",
"description": "要写入的内容"
},
"encoding": {
"type": "string",
"description": "文件编码",
"default": "utf-8"
},
"mode": {
"type": "string",
"enum": ["write", "append"],
"description": "写入模式",
"default": "write"
}
},
"required": ["file_path", "content"]
}
},
"list_directory": {
"name": "list_directory",
"description": "列出目录内容",
"parameters": {
"type": "object",
"properties": {
"directory_path": {
"type": "string",
"description": "目录路径"
},
"include_hidden": {
"type": "boolean",
"description": "是否包含隐藏文件",
"default": False
},
"recursive": {
"type": "boolean",
"description": "是否递归列出子目录",
"default": False
},
"max_depth": {
"type": "integer",
"description": "最大递归深度",
"default": 3
}
},
"required": ["directory_path"]
}
}
}
# 网络请求工具配置
NETWORK_TOOL_CONFIGS = {
"http_request": {
"name": "http_request",
"description": "发送HTTP请求",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "请求URL"
},
"method": {
"type": "string",
"enum": ["GET", "POST", "PUT", "DELETE", "PATCH"],
"description": "HTTP方法",
"default": "GET"
},
"headers": {
"type": "object",
"description": "请求头",
"default": {}
},
"params": {
"type": "object",
"description": "URL参数",
"default": {}
},
"data": {
"type": "object",
"description": "请求体数据",
"default": {}
},
"timeout": {
"type": "integer",
"description": "超时时间(秒)",
"default": 30
}
},
"required": ["url"]
}
},
"download_file": {
"name": "download_file",
"description": "下载文件",
"parameters": {
"type": "object",
"properties": {
"url": {
"type": "string",
"description": "下载URL"
},
"save_path": {
"type": "string",
"description": "保存路径"
},
"chunk_size": {
"type": "integer",
"description": "下载块大小",
"default": 8192
},
"max_size": {
"type": "integer",
"description": "最大文件大小(字节)",
"default": 104857600 # 100MB
}
},
"required": ["url", "save_path"]
}
}
}
# 数据处理工具配置
DATA_TOOL_CONFIGS = {
"json_processor": {
"name": "json_processor",
"description": "处理JSON数据",
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["parse", "format", "validate", "extract", "merge"],
"description": "操作类型"
},
"data": {
"type": "string",
"description": "JSON字符串或文件路径"
},
"path": {
"type": "string",
"description": "JSON路径(用于extract操作)",
"default": ""
},
"indent": {
"type": "integer",
"description": "格式化缩进",
"default": 2
}
},
"required": ["operation", "data"]
}
},
"text_processor": {
"name": "text_processor",
"description": "处理文本数据",
"parameters": {
"type": "object",
"properties": {
"operation": {
"type": "string",
"enum": ["count", "search", "replace", "split", "join", "clean"],
"description": "操作类型"
},
"text": {
"type": "string",
"description": "要处理的文本"
},
"pattern": {
"type": "string",
"description": "搜索/替换模式",
"default": ""
},
"replacement": {
"type": "string",
"description": "替换文本",
"default": ""
},
"separator": {
"type": "string",
"description": "分隔符",
"default": " "
}
},
"required": ["operation", "text"]
}
}
}
# 更新主配置
TOOL_CONFIGS.update(FILE_TOOL_CONFIGS)
TOOL_CONFIGS.update(NETWORK_TOOL_CONFIGS)
TOOL_CONFIGS.update(DATA_TOOL_CONFIGS)
# 更新默认启用工具列表
config.enabled_tools.extend([
"read_file", "write_file", "list_directory",
"http_request", "download_file",
"json_processor", "text_processor"
])
📁 文件操作工具实现
创建 file_tools.py
:
python
"""
file_tools.py - 文件操作工具实现
"""
import os
import json
from pathlib import Path
from typing import Dict, Any, List
from loguru import logger
import aiofiles
import aiofiles.os
from tools import MCPTool
class ReadFileTool(MCPTool):
"""文件读取工具"""
def __init__(self):
super().__init__(
name="read_file",
description="读取文件内容"
)
self.allowed_extensions = {'.txt', '.json', '.csv', '.xml', '.yaml', '.yml', '.md', '.py', '.js', '.html', '.css'}
self.max_file_size = 10 * 1024 * 1024 # 10MB
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""读取文件"""
try:
file_path = params.get("file_path")
encoding = params.get("encoding", "utf-8")
max_size = params.get("max_size", self.max_file_size)
# 安全性检查
path_obj = Path(file_path).resolve()
# 检查文件是否存在
if not await aiofiles.os.path.exists(path_obj):
raise FileNotFoundError(f"文件不存在: {file_path}")
# 检查是否为文件
if not await aiofiles.os.path.isfile(path_obj):
raise ValueError(f"路径不是文件: {file_path}")
# 检查文件扩展名
if path_obj.suffix.lower() not in self.allowed_extensions:
raise ValueError(f"不支持的文件类型: {path_obj.suffix}")
# 检查文件大小
file_stat = await aiofiles.os.stat(path_obj)
if file_stat.st_size > max_size:
raise ValueError(f"文件过大: {file_stat.st_size} bytes (最大: {max_size} bytes)")
# 读取文件内容
async with aiofiles.open(path_obj, 'r', encoding=encoding) as f:
content = await f.read()
logger.info(f"成功读取文件: {file_path} ({len(content)} 字符)")
return {
"success": True,
"result": content,
"message": f"成功读取文件: {file_path}",
"metadata": {
"file_path": str(path_obj),
"file_size": file_stat.st_size,
"encoding": encoding,
"char_count": len(content),
"line_count": content.count('\n') + 1,
"file_extension": path_obj.suffix
}
}
except Exception as e:
logger.error(f"文件读取失败 {file_path}: {e}")
return {
"success": False,
"error": str(e),
"message": f"文件读取失败: {e}"
}
class WriteFileTool(MCPTool):
"""文件写入工具"""
def __init__(self):
super().__init__(
name="write_file",
description="写入文件内容"
)
self.safe_directories = {'./data', './output', './temp'}
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""写入文件"""
try:
file_path = params.get("file_path")
content = params.get("content")
encoding = params.get("encoding", "utf-8")
mode = params.get("mode", "write")
path_obj = Path(file_path).resolve()
# 安全性检查:确保写入到允许的目录
is_safe = any(
str(path_obj).startswith(str(Path(safe_dir).resolve()))
for safe_dir in self.safe_directories
)
if not is_safe:
# 创建安全目录如果不存在
safe_path = Path('./output') / Path(file_path).name
path_obj = safe_path.resolve()
logger.warning(f"重定向到安全路径: {path_obj}")
# 确保目录存在
path_obj.parent.mkdir(parents=True, exist_ok=True)
# 确定写入模式
file_mode = 'a' if mode == 'append' else 'w'
# 写入文件
async with aiofiles.open(path_obj, file_mode, encoding=encoding) as f:
await f.write(content)
# 获取文件信息
file_stat = await aiofiles.os.stat(path_obj)
logger.info(f"成功写入文件: {path_obj} ({len(content)} 字符, 模式: {mode})")
return {
"success": True,
"result": str(path_obj),
"message": f"成功写入文件: {path_obj.name}",
"metadata": {
"file_path": str(path_obj),
"file_size": file_stat.st_size,
"encoding": encoding,
"mode": mode,
"content_length": len(content),
"lines_written": content.count('\n') + 1
}
}
except Exception as e:
logger.error(f"文件写入失败 {file_path}: {e}")
return {
"success": False,
"error": str(e),
"message": f"文件写入失败: {e}"
}
class ListDirectoryTool(MCPTool):
"""目录列表工具"""
def __init__(self):
super().__init__(
name="list_directory",
description="列出目录内容"
)
async def _list_directory_recursive(self, path: Path, max_depth: int, current_depth: int = 0, include_hidden: bool = False) -> List[Dict[str, Any]]:
"""递归列出目录内容"""
items = []
if current_depth >= max_depth:
return items
try:
for item in path.iterdir():
# 跳过隐藏文件
if not include_hidden and item.name.startswith('.'):
continue
try:
stat_info = await aiofiles.os.stat(item)
is_dir = await aiofiles.os.path.isdir(item)
item_info = {
"name": item.name,
"path": str(item),
"type": "directory" if is_dir else "file",
"size": stat_info.st_size if not is_dir else None,
"modified_time": stat_info.st_mtime,
"depth": current_depth
}
if not is_dir:
item_info["extension"] = item.suffix.lower()
items.append(item_info)
# 递归处理子目录
if is_dir and current_depth < max_depth - 1:
sub_items = await self._list_directory_recursive(
item, max_depth, current_depth + 1, include_hidden
)
items.extend(sub_items)
except (PermissionError, OSError) as e:
logger.warning(f"无法访问 {item}: {e}")
continue
except (PermissionError, OSError) as e:
logger.error(f"无法列出目录 {path}: {e}")
return items
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""列出目录内容"""
try:
directory_path = params.get("directory_path")
include_hidden = params.get("include_hidden", False)
recursive = params.get("recursive", False)
max_depth = params.get("max_depth", 3)
path_obj = Path(directory_path).resolve()
# 检查目录是否存在
if not await aiofiles.os.path.exists(path_obj):
raise FileNotFoundError(f"目录不存在: {directory_path}")
# 检查是否为目录
if not await aiofiles.os.path.isdir(path_obj):
raise ValueError(f"路径不是目录: {directory_path}")
# 列出目录内容
if recursive:
items = await self._list_directory_recursive(
path_obj, max_depth, 0, include_hidden
)
else:
items = []
for item in path_obj.iterdir():
if not include_hidden and item.name.startswith('.'):
continue
try:
stat_info = await aiofiles.os.stat(item)
is_dir = await aiofiles.os.path.isdir(item)
item_info = {
"name": item.name,
"path": str(item),
"type": "directory" if is_dir else "file",
"size": stat_info.st_size if not is_dir else None,
"modified_time": stat_info.st_mtime,
"depth": 0
}
if not is_dir:
item_info["extension"] = item.suffix.lower()
items.append(item_info)
except (PermissionError, OSError) as e:
logger.warning(f"无法访问 {item}: {e}")
continue
# 统计信息
file_count = sum(1 for item in items if item["type"] == "file")
dir_count = sum(1 for item in items if item["type"] == "directory")
total_size = sum(item["size"] or 0 for item in items if item["type"] == "file")
logger.info(f"列出目录: {directory_path} ({len(items)} 项)")
return {
"success": True,
"result": items,
"message": f"成功列出目录: {path_obj.name}",
"metadata": {
"directory_path": str(path_obj),
"total_items": len(items),
"file_count": file_count,
"directory_count": dir_count,
"total_size": total_size,
"recursive": recursive,
"max_depth": max_depth if recursive else 1,
"include_hidden": include_hidden
}
}
except Exception as e:
logger.error(f"目录列表失败 {directory_path}: {e}")
return {
"success": False,
"error": str(e),
"message": f"目录列表失败: {e}"
}
🌐 网络请求工具实现
创建 network_tools.py
:
python
"""
network_tools.py - 网络请求工具实现
"""
import aiohttp
import aiofiles
from pathlib import Path
from typing import Dict, Any
from loguru import logger
from urllib.parse import urlparse
from tools import MCPTool
class HttpRequestTool(MCPTool):
"""HTTP请求工具"""
def __init__(self):
super().__init__(
name="http_request",
description="发送HTTP请求"
)
self.allowed_schemes = {'http', 'https'}
self.max_response_size = 10 * 1024 * 1024 # 10MB
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送HTTP请求"""
try:
url = params.get("url")
method = params.get("method", "GET").upper()
headers = params.get("headers", {})
url_params = params.get("params", {})
data = params.get("data", {})
timeout = params.get("timeout", 30)
# URL验证
parsed_url = urlparse(url)
if parsed_url.scheme not in self.allowed_schemes:
raise ValueError(f"不支持的协议: {parsed_url.scheme}")
# 设置默认headers
default_headers = {
'User-Agent': 'MCP-Server/1.0.0',
'Accept': '*/*'
}
request_headers = {**default_headers, **headers}
# 创建会话
timeout_obj = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout_obj) as session:
# 发送请求
async with session.request(
method=method,
url=url,
headers=request_headers,
params=url_params,
json=data if data else None
) as response:
# 检查响应大小
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > self.max_response_size:
raise ValueError(f"响应过大: {content_length} bytes")
# 读取响应
response_text = await response.text()
# 尝试解析JSON
response_data = None
try:
if response.content_type == 'application/json':
response_data = await response.json()
except:
pass
logger.info(f"HTTP请求成功: {method} {url} -> {response.status}")
return {
"success": True,
"result": {
"status_code": response.status,
"headers": dict(response.headers),
"text": response_text,
"json": response_data,
"url": str(response.url)
},
"message": f"HTTP请求成功: {method} {url}",
"metadata": {
"method": method,
"url": url,
"status_code": response.status,
"content_type": response.content_type,
"content_length": len(response_text),
"response_time": "N/A" # aiohttp不直接提供响应时间
}
}
except Exception as e:
logger.error(f"HTTP请求失败 {method} {url}: {e}")
return {
"success": False,
"error": str(e),
"message": f"HTTP请求失败: {e}"
}
class DownloadFileTool(MCPTool):
"""文件下载工具"""
def __init__(self):
super().__init__(
name="download_file",
description="下载文件"
)
self.allowed_schemes = {'http', 'https'}
self.safe_directories = {'./downloads', './temp', './output'}
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""下载文件"""
try:
url = params.get("url")
save_path = params.get("save_path")
chunk_size = params.get("chunk_size", 8192)
max_size = params.get("max_size", 100 * 1024 * 1024) # 100MB
# URL验证
parsed_url = urlparse(url)
if parsed_url.scheme not in self.allowed_schemes:
raise ValueError(f"不支持的协议: {parsed_url.scheme}")
# 路径安全检查
path_obj = Path(save_path).resolve()
is_safe = any(
str(path_obj).startswith(str(Path(safe_dir).resolve()))
for safe_dir in self.safe_directories
)
if not is_safe:
# 重定向到安全目录
safe_path = Path('./downloads') / Path(save_path).name
path_obj = safe_path.resolve()
logger.warning(f"重定向到安全路径: {path_obj}")
# 确保目录存在
path_obj.parent.mkdir(parents=True, exist_ok=True)
# 开始下载
downloaded_size = 0
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
response.raise_for_status()
# 检查文件大小
content_length = response.headers.get('Content-Length')
if content_length and int(content_length) > max_size:
raise ValueError(f"文件过大: {content_length} bytes")
# 保存文件
async with aiofiles.open(path_obj, 'wb') as f:
async for chunk in response.content.iter_chunked(chunk_size):
await f.write(chunk)
downloaded_size += len(chunk)
# 检查下载大小
if downloaded_size > max_size:
raise ValueError(f"下载大小超限: {downloaded_size} bytes")
# 获取文件信息
file_stat = await aiofiles.os.stat(path_obj)
logger.info(f"文件下载成功: {url} -> {path_obj} ({downloaded_size} bytes)")
return {
"success": True,
"result": str(path_obj),
"message": f"文件下载成功: {path_obj.name}",
"metadata": {
"url": url,
"save_path": str(path_obj),
"file_size": file_stat.st_size,
"downloaded_size": downloaded_size,
"content_type": response.headers.get('Content-Type', 'unknown'),
"status_code": response.status
}
}
except Exception as e:
logger.error(f"文件下载失败 {url}: {e}")
return {
"success": False,
"error": str(e),
"message": f"文件下载失败: {e}"
}
📊 数据处理工具实现
创建 data_tools.py
:
python
"""
data_tools.py - 数据处理工具实现
"""
import json
import re
from typing import Dict, Any, List, Union
from loguru import logger
from pathlib import Path
import aiofiles
from tools import MCPTool
class JsonProcessorTool(MCPTool):
"""JSON处理工具"""
def __init__(self):
super().__init__(
name="json_processor",
description="处理JSON数据"
)
async def _extract_json_path(self, data: Any, path: str) -> Any:
"""提取JSON路径对应的值"""
if not path:
return data
parts = path.split('.')
current = data
for part in parts:
if isinstance(current, dict):
if part in current:
current = current[part]
else:
raise KeyError(f"路径不存在: {part}")
elif isinstance(current, list):
try:
index = int(part)
current = current[index]
except (ValueError, IndexError):
raise KeyError(f"无效的数组索引: {part}")
else:
raise TypeError(f"无法访问路径: {part}")
return current
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理JSON数据"""
try:
operation = params.get("operation")
data_input = params.get("data")
path = params.get("path", "")
indent = params.get("indent", 2)
# 读取数据
if data_input.startswith('{') or data_input.startswith('['):
# 直接JSON字符串
json_data = json.loads(data_input)
else:
# 文件路径
file_path = Path(data_input)
if not file_path.exists():
raise FileNotFoundError(f"文件不存在: {data_input}")
async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
content = await f.read()
json_data = json.loads(content)
result = None
message = ""
if operation == "parse":
result = json_data
message = "JSON解析成功"
elif operation == "format":
result = json.dumps(json_data, indent=indent, ensure_ascii=False)
message = f"JSON格式化成功 (缩进: {indent})"
elif operation == "validate":
# 数据已经解析成功,说明JSON有效
result = {"valid": True, "type": type(json_data).__name__}
message = "JSON验证通过"
elif operation == "extract":
if not path:
raise ValueError("extract操作需要指定path参数")
extracted = await self._extract_json_path(json_data, path)
result = extracted
message = f"成功提取路径: {path}"
elif operation == "merge":
# 如果数据是数组,则合并所有对象
if isinstance(json_data, list):
merged = {}
for item in json_data:
if isinstance(item, dict):
merged.update(item)
result = merged
message = f"合并了 {len(json_data)} 个对象"
else:
result = json_data
message = "数据不是数组,无需合并"
else:
raise ValueError(f"不支持的操作: {operation}")
logger.info(f"JSON处理成功: {operation}")
return {
"success": True,
"result": result,
"message": message,
"metadata": {
"operation": operation,
"data_type": type(json_data).__name__,
"data_size": len(str(json_data)),
"path": path if path else None,
"indent": indent if operation == "format" else None
}
}
except json.JSONDecodeError as e:
logger.error(f"JSON解析错误: {e}")
return {
"success": False,
"error": f"JSON格式错误: {e}",
"message": "JSON数据格式无效"
}
except Exception as e:
logger.error(f"JSON处理失败: {e}")
return {
"success": False,
"error": str(e),
"message": f"JSON处理失败: {e}"
}
class TextProcessorTool(MCPTool):
"""文本处理工具"""
def __init__(self):
super().__init__(
name="text_processor",
description="处理文本数据"
)
async def execute(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""处理文本数据"""
try:
operation = params.get("operation")
text = params.get("text")
pattern = params.get("pattern", "")
replacement = params.get("replacement", "")
separator = params.get("separator", " ")
result = None
message = ""
if operation == "count":
result = {
"characters": len(text),
"words": len(text.split()),
"lines": text.count('\n') + 1,
"paragraphs": len([p for p in text.split('\n\n') if p.strip()])
}
message = "文本统计完成"
elif operation == "search":
if not pattern:
raise ValueError("search操作需要指定pattern参数")
# 支持正则表达式搜索
matches = re.findall(pattern, text, re.IGNORECASE)
positions = []
for match in re.finditer(pattern, text, re.IGNORECASE):
positions.append({
"match": match.group(),
"start": match.start(),
"end": match.end()
})
result = {
"matches": matches,
"positions": positions,
"count": len(matches)
}
message = f"找到 {len(matches)} 个匹配项"
elif operation == "replace":
if not pattern:
raise ValueError("replace操作需要指定pattern参数")
# 支持正则表达式替换
new_text = re.sub(pattern, replacement, text, flags=re.IGNORECASE)
replaced_count = len(re.findall(pattern, text, re.IGNORECASE))
result = {
"text": new_text,
"replaced_count": replaced_count
}
message = f"替换了 {replaced_count} 处匹配项"
elif operation == "split":
if separator:
parts = text.split(separator)
else:
parts = text.split()
result = {
"parts": parts,
"count": len(parts)
}
message = f"分割为 {len(parts)} 部分"
elif operation == "join":
# 假设text是以换行符分隔的多行文本
lines = text.split('\n')
joined_text = separator.join(lines)
result = {
"text": joined_text,
"line_count": len(lines)
}
message = f"连接了 {len(lines)} 行文本"
elif operation == "clean":
# 清理文本:去除多余空白、标准化换行等
cleaned = re.sub(r'\s+', ' ', text.strip())
cleaned = re.sub(r'\n\s*\n', '\n\n', cleaned) # 标准化段落分隔
result = {
"text": cleaned,
"removed_chars": len(text) - len(cleaned)
}
message = f"清理文本完成,移除了 {len(text) - len(cleaned)} 个多余字符"
else:
raise ValueError(f"不支持的操作: {operation}")
logger.info(f"文本处理成功: {operation}")
return {
"success": True,
"result": result,
"message": message,
"metadata": {
"operation": operation,
"original_length": len(text),
"pattern": pattern if pattern else None,
"separator": separator if separator != " " else None
}
}
except Exception as e:
logger.error(f"文本处理失败: {e}")
return {
"success": False,
"error": str(e),
"message": f"文本处理失败: {e}"
}
🔄 更新工具管理器
现在我们需要更新 tools.py
中的工具管理器来注册新工具:
python
# 在 tools.py 文件末尾添加
from file_tools import ReadFileTool, WriteFileTool, ListDirectoryTool
from network_tools import HttpRequestTool, DownloadFileTool
from data_tools import JsonProcessorTool, TextProcessorTool
class ToolManager:
"""工具管理器 - 更新版本"""
def __init__(self):
self.tools: Dict[str, MCPTool] = {}
self._register_all_tools()
def _register_all_tools(self):
"""注册所有工具"""
# 基础工具
basic_tools = [
RandomNumberTool(),
CurrentTimeTool(),
CalculatorTool()
]
# 文件操作工具
file_tools = [
ReadFileTool(),
WriteFileTool(),
ListDirectoryTool()
]
# 网络工具
network_tools = [
HttpRequestTool(),
DownloadFileTool()
]
# 数据处理工具
data_tools = [
JsonProcessorTool(),
TextProcessorTool()
]
# 注册所有工具
all_tools = basic_tools + file_tools + network_tools + data_tools
for tool in all_tools:
if tool.name in config.enabled_tools:
self.register_tool(tool)
logger.info(f"注册工具: {tool.name}")
# ... 其他方法保持不变
🧪 测试新工具
创建 test_advanced_tools.py
:
python
"""
test_advanced_tools.py - 测试高级工具
"""
import asyncio
import json
import tempfile
from pathlib import Path
from file_tools import ReadFileTool, WriteFileTool, ListDirectoryTool
from network_tools import HttpRequestTool
from data_tools import JsonProcessorTool, TextProcessorTool
async def test_file_operations():
"""测试文件操作工具"""
print("🧪 测试文件操作工具")
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
temp_path = Path(temp_dir)
# 测试写入文件
write_tool = WriteFileTool()
write_result = await write_tool.execute({
"file_path": str(temp_path / "test.txt"),
"content": "Hello, MCP World!\n这是一个测试文件。",
"encoding": "utf-8"
})
print(f"写入文件结果: {write_result['success']}")
# 测试读取文件
read_tool = ReadFileTool()
read_result = await read_tool.execute({
"file_path": str(temp_path / "test.txt"),
"encoding": "utf-8"
})
print(f"读取文件结果: {read_result['success']}")
print(f"文件内容: {read_result.get('result', '')[:50]}...")
# 测试目录列表
list_tool = ListDirectoryTool()
list_result = await list_tool.execute({
"directory_path": str(temp_path),
"include_hidden": False
})
print(f"目录列表结果: {list_result['success']}")
print(f"文件数量: {list_result.get('metadata', {}).get('file_count', 0)}")
async def test_network_tools():
"""测试网络工具"""
print("\n🧪 测试网络工具")
# 测试HTTP请求
http_tool = HttpRequestTool()
http_result = await http_tool.execute({
"url": "https://httpbin.org/json",
"method": "GET"
})
print(f"HTTP请求结果: {http_result['success']}")
if http_result['success']:
status_code = http_result['result']['status_code']
print(f"响应状态码: {status_code}")
async def test_data_tools():
"""测试数据处理工具"""
print("\n🧪 测试数据处理工具")
# 测试JSON处理
json_tool = JsonProcessorTool()
test_json = '{"name": "张三", "age": 30, "skills": ["Python", "JavaScript"]}'
# 解析JSON
parse_result = await json_tool.execute({
"operation": "parse",
"data": test_json
})
print(f"JSON解析结果: {parse_result['success']}")
# 格式化JSON
format_result = await json_tool.execute({
"operation": "format",
"data": test_json,
"indent": 4
})
if format_result['success']:
print("格式化后的JSON:")
print(format_result['result'])
# 测试文本处理
text_tool = TextProcessorTool()
test_text = "Hello World! This is a test text. Hello again!"
# 文本统计
count_result = await text_tool.execute({
"operation": "count",
"text": test_text
})
print(f"\n文本统计结果: {count_result['success']}")
if count_result['success']:
stats = count_result['result']
print(f"字符数: {stats['characters']}, 单词数: {stats['words']}")
# 文本搜索
search_result = await text_tool.execute({
"operation": "search",
"text": test_text,
"pattern": "Hello"
})
print(f"文本搜索结果: {search_result['success']}")
if search_result['success']:
matches = search_result['result']['count']
print(f"找到 {matches} 个匹配项")
async def main():
"""主测试函数"""
print("🚀 开始测试高级MCP工具")
print("=" * 50)
await test_file_operations()
await test_network_tools()
await test_data_tools()
print("\n" + "=" * 50)
print("🎉 所有工具测试完成!")
if __name__ == "__main__":
asyncio.run(main())
🎯 本节小结
通过这一小节,你已经成功地:
✅ 扩展了工具类型:文件操作、网络请求、数据处理
✅ 实现了安全机制:路径验证、大小限制、权限控制
✅ 增强了错误处理:详细的错误信息和异常安全
✅ 优化了代码结构:模块化设计,易于维护和扩展
✅ 提供了丰富功能:从简单工具到复杂的数据处理
你的MCP服务器现在具备了以下强大功能:
🛠️ 基础工具:随机数、时间、计算器
📁 文件操作:读写文件、目录列表
🌐 网络功能:HTTP请求、文件下载
📊 数据处理:JSON处理、文本分析
🤔 思考题
- 如何为工具添加权限控制系统?
- 怎样实现工具的热插拔功能?
- 如何优化大文件处理的性能?
- 怎样添加工具执行的监控和统计?
准备好学习如何测试和调试你的MCP服务器了吗?