4.3 高级工具开发
🎯 学习目标:掌握MCP高级工具开发技巧,创建实用的企业级工具
⏱️ 预计时间:50分钟
📊 难度等级:⭐⭐⭐⭐
🚀 进入高级工具开发
简单的计算器工具只是开始。现在让我们学习如何开发真正实用的企业级工具,这些工具能够:
- 🌐 与外部API交互:天气查询、股票数据、新闻获取
- 🗄️ 操作数据库:查询、插入、更新数据
- 📊 数据分析:统计计算、图表生成、趋势分析
- 🔧 系统集成:邮件发送、文档生成、任务自动化
🛠️ 高级工具架构模式
📊 工具分层架构
🌐 网络服务工具开发
🌤️ 天气查询工具
让我们开发一个真实的天气查询工具,展示如何与外部API集成:
python
# weather_tools.py
import aiohttp
import asyncio
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class WeatherTool:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.openweathermap.org/data/2.5"
self.cache = {}
self.cache_ttl = 300 # 5分钟缓存
def get_tool_definition(self) -> Dict[str, Any]:
"""获取天气工具定义"""
return {
"name": "get_weather",
"description": "获取指定城市的天气信息,支持当前天气和天气预报",
"parameters": {
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称,支持中文或英文,如:北京、Beijing、New York"
},
"forecast_days": {
"type": "integer",
"description": "预报天数,0表示只获取当前天气,1-5表示未来几天预报",
"minimum": 0,
"maximum": 5,
"default": 0
},
"units": {
"type": "string",
"description": "温度单位",
"enum": ["metric", "imperial", "kelvin"],
"default": "metric"
},
"language": {
"type": "string",
"description": "返回语言",
"enum": ["zh_cn", "en", "zh_tw"],
"default": "zh_cn"
}
},
"required": ["city"]
}
}
def _get_cache_key(self, city: str, forecast_days: int, units: str) -> str:
"""生成缓存键"""
return f"weather:{city}:{forecast_days}:{units}"
def _is_cache_valid(self, timestamp: datetime) -> bool:
"""检查缓存是否有效"""
return datetime.now() - timestamp < timedelta(seconds=self.cache_ttl)
async def _make_api_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""发送API请求"""
params['appid'] = self.api_key
async with aiohttp.ClientSession() as session:
try:
async with session.get(url, params=params, timeout=10) as response:
if response.status == 200:
return await response.json()
elif response.status == 404:
raise ValueError(f"未找到城市:{params.get('q', '未知')}")
elif response.status == 401:
raise ValueError("API密钥无效")
else:
error_data = await response.json()
raise ValueError(f"API请求失败:{error_data.get('message', '未知错误')}")
except aiohttp.ClientTimeout:
raise ValueError("请求超时,请稍后重试")
except aiohttp.ClientError as e:
raise ValueError(f"网络请求失败:{str(e)}")
async def _get_current_weather(self, city: str, units: str, lang: str) -> Dict[str, Any]:
"""获取当前天气"""
url = f"{self.base_url}/weather"
params = {
'q': city,
'units': units,
'lang': lang
}
return await self._make_api_request(url, params)
async def _get_weather_forecast(self, city: str, days: int, units: str, lang: str) -> Dict[str, Any]:
"""获取天气预报"""
url = f"{self.base_url}/forecast"
params = {
'q': city,
'units': units,
'lang': lang,
'cnt': days * 8 # 每天8次预报(3小时一次)
}
return await self._make_api_request(url, params)
def _format_weather_response(self, data: Dict[str, Any], forecast_days: int) -> Dict[str, Any]:
"""格式化天气响应"""
if forecast_days == 0:
# 当前天气格式化
return {
"type": "current_weather",
"city": data['name'],
"country": data['sys']['country'],
"temperature": {
"current": round(data['main']['temp'], 1),
"feels_like": round(data['main']['feels_like'], 1),
"min": round(data['main']['temp_min'], 1),
"max": round(data['main']['temp_max'], 1)
},
"weather": {
"main": data['weather'][0]['main'],
"description": data['weather'][0]['description'],
"icon": data['weather'][0]['icon']
},
"details": {
"humidity": data['main']['humidity'], # 湿度%
"pressure": data['main']['pressure'], # 气压hPa
"visibility": data.get('visibility', 0) / 1000, # 能见度km
"wind": {
"speed": data['wind']['speed'], # 风速
"direction": data['wind'].get('deg', 0) # 风向
}
},
"sun": {
"sunrise": datetime.fromtimestamp(data['sys']['sunrise']).strftime('%H:%M'),
"sunset": datetime.fromtimestamp(data['sys']['sunset']).strftime('%H:%M')
},
"timestamp": datetime.now().isoformat()
}
else:
# 预报格式化
forecasts = []
current_date = ""
daily_data = {}
for item in data['list']:
date = datetime.fromtimestamp(item['dt']).strftime('%Y-%m-%d')
time = datetime.fromtimestamp(item['dt']).strftime('%H:%M')
if date != current_date:
if daily_data:
forecasts.append(daily_data)
daily_data = {
"date": date,
"day_name": datetime.fromtimestamp(item['dt']).strftime('%A'),
"temperature": {
"min": item['main']['temp'],
"max": item['main']['temp']
},
"weather": {
"main": item['weather'][0]['main'],
"description": item['weather'][0]['description'],
"icon": item['weather'][0]['icon']
},
"details": {
"humidity": item['main']['humidity'],
"pressure": item['main']['pressure'],
"wind_speed": item['wind']['speed']
},
"hourly": []
}
current_date = date
# 更新最高最低温度
daily_data['temperature']['min'] = min(
daily_data['temperature']['min'],
item['main']['temp']
)
daily_data['temperature']['max'] = max(
daily_data['temperature']['max'],
item['main']['temp']
)
# 添加小时数据
daily_data['hourly'].append({
"time": time,
"temperature": round(item['main']['temp'], 1),
"weather": item['weather'][0]['description'],
"humidity": item['main']['humidity']
})
if daily_data:
forecasts.append(daily_data)
return {
"type": "weather_forecast",
"city": data['city']['name'],
"country": data['city']['country'],
"forecast_days": len(forecasts),
"forecasts": forecasts[:forecast_days], # 限制返回天数
"timestamp": datetime.now().isoformat()
}
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行天气查询工具"""
try:
# 参数提取和验证
city = arguments.get('city', '').strip()
if not city:
raise ValueError("城市名称不能为空")
forecast_days = arguments.get('forecast_days', 0)
units = arguments.get('units', 'metric')
language = arguments.get('language', 'zh_cn')
# 检查缓存
cache_key = self._get_cache_key(city, forecast_days, units)
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if self._is_cache_valid(timestamp):
logger.info(f"从缓存获取天气数据: {city}")
return {
"content": [{"type": "text", "text": json.dumps(cached_data, ensure_ascii=False, indent=2)}],
"isError": False
}
# 获取天气数据
if forecast_days == 0:
weather_data = await self._get_current_weather(city, units, language)
else:
weather_data = await self._get_weather_forecast(city, forecast_days, units, language)
# 格式化响应
formatted_data = self._format_weather_response(weather_data, forecast_days)
# 更新缓存
self.cache[cache_key] = (formatted_data, datetime.now())
# 生成用户友好的文本响应
if forecast_days == 0:
response_text = self._generate_current_weather_text(formatted_data)
else:
response_text = self._generate_forecast_text(formatted_data)
return {
"content": [
{
"type": "text",
"text": response_text
},
{
"type": "resource",
"resource": {
"uri": f"weather://{city}",
"name": f"{city}天气数据",
"mimeType": "application/json"
},
"text": json.dumps(formatted_data, ensure_ascii=False, indent=2)
}
],
"isError": False
}
except ValueError as e:
logger.error(f"天气查询参数错误: {e}")
return {
"content": [{"type": "text", "text": f"❌ 查询失败:{str(e)}"}],
"isError": True
}
except Exception as e:
logger.error(f"天气查询异常: {e}")
return {
"content": [{"type": "text", "text": f"❌ 系统错误:{str(e)}"}],
"isError": True
}
def _generate_current_weather_text(self, data: Dict[str, Any]) -> str:
"""生成当前天气的友好文本"""
temp = data['temperature']
weather = data['weather']
details = data['details']
sun = data['sun']
text = f"""🌤️ {data['city']}当前天气
🌡️ **温度情况**
当前温度:{temp['current']}°C(体感 {temp['feels_like']}°C)
最高/最低:{temp['max']}°C / {temp['min']}°C
☁️ **天气状况**
天气:{weather['description']}
💧 **详细信息**
湿度:{details['humidity']}%
气压:{details['pressure']} hPa
能见度:{details['visibility']} km
风速:{details['wind']['speed']} m/s
🌅 **日出日落**
日出:{sun['sunrise']}
日落:{sun['sunset']}
⏰ 更新时间:{datetime.fromisoformat(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}"""
return text
def _generate_forecast_text(self, data: Dict[str, Any]) -> str:
"""生成天气预报的友好文本"""
text = f"📅 {data['city']}未来{data['forecast_days']}天天气预报\n\n"
for i, forecast in enumerate(data['forecasts']):
date_obj = datetime.strptime(forecast['date'], '%Y-%m-%d')
date_str = date_obj.strftime('%m月%d日')
day_name = ['周一', '周二', '周三', '周四', '周五', '周六', '周日'][date_obj.weekday()]
temp = forecast['temperature']
weather = forecast['weather']
text += f"""📆 **{date_str} {day_name}**
🌡️ 温度:{round(temp['min'], 1)}°C ~ {round(temp['max'], 1)}°C
☁️ 天气:{weather['description']}
💧 湿度:{forecast['details']['humidity']}%
"""
text += f"⏰ 预报时间:{datetime.fromisoformat(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}"
return text
# 在server.py中注册天气工具
class AdvancedMCPServer:
def __init__(self):
self.weather_tool = WeatherTool(api_key="your_openweather_api_key")
def register_tools(self, server):
"""注册所有工具"""
# 注册天气工具
server.add_tool(
self.weather_tool.get_tool_definition(),
self.weather_tool.execute
)
📈 股票数据工具
python
# stock_tools.py
import yfinance as yf
import pandas as pd
from typing import Dict, Any, List
import json
from datetime import datetime, timedelta
class StockTool:
def __init__(self):
self.cache = {}
self.cache_ttl = 60 # 1分钟缓存
def get_tool_definition(self) -> Dict[str, Any]:
return {
"name": "get_stock_info",
"description": "获取股票实时价格和基本信息",
"parameters": {
"type": "object",
"properties": {
"symbol": {
"type": "string",
"description": "股票代码,如AAPL、TSLA、000001.SZ"
},
"period": {
"type": "string",
"description": "历史数据周期",
"enum": ["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y"],
"default": "1d"
},
"interval": {
"type": "string",
"description": "数据间隔",
"enum": ["1m", "5m", "15m", "30m", "1h", "1d"],
"default": "1d"
},
"info_type": {
"type": "string",
"description": "信息类型",
"enum": ["price", "info", "history", "all"],
"default": "all"
}
},
"required": ["symbol"]
}
}
async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
try:
symbol = arguments.get('symbol', '').upper()
period = arguments.get('period', '1d')
interval = arguments.get('interval', '1d')
info_type = arguments.get('info_type', 'all')
if not symbol:
raise ValueError("股票代码不能为空")
# 获取股票对象
stock = yf.Ticker(symbol)
result = {"symbol": symbol}
# 获取基本信息
if info_type in ['info', 'all']:
try:
info = stock.info
result['info'] = {
"name": info.get('longName', info.get('shortName', 'N/A')),
"sector": info.get('sector', 'N/A'),
"industry": info.get('industry', 'N/A'),
"market_cap": info.get('marketCap', 0),
"pe_ratio": info.get('trailingPE', 0),
"dividend_yield": info.get('dividendYield', 0)
}
except:
result['info'] = {"error": "无法获取股票基本信息"}
# 获取当前价格
if info_type in ['price', 'all']:
try:
history = stock.history(period='1d')
if not history.empty:
latest = history.iloc[-1]
result['price'] = {
"current": round(latest['Close'], 2),
"open": round(latest['Open'], 2),
"high": round(latest['High'], 2),
"low": round(latest['Low'], 2),
"volume": int(latest['Volume']),
"change": round(latest['Close'] - latest['Open'], 2),
"change_percent": round((latest['Close'] - latest['Open']) / latest['Open'] * 100, 2)
}
except:
result['price'] = {"error": "无法获取价格信息"}
# 获取历史数据
if info_type in ['history', 'all']:
try:
history = stock.history(period=period, interval=interval)
if not history.empty:
history_data = []
for index, row in history.tail(10).iterrows(): # 最近10个数据点
history_data.append({
"date": index.strftime('%Y-%m-%d %H:%M:%S'),
"open": round(row['Open'], 2),
"high": round(row['High'], 2),
"low": round(row['Low'], 2),
"close": round(row['Close'], 2),
"volume": int(row['Volume'])
})
result['history'] = history_data
except:
result['history'] = {"error": "无法获取历史数据"}
# 生成文本响应
response_text = self._generate_stock_text(result)
return {
"content": [
{"type": "text", "text": response_text},
{
"type": "resource",
"resource": {
"uri": f"stock://{symbol}",
"name": f"{symbol}股票数据",
"mimeType": "application/json"
},
"text": json.dumps(result, ensure_ascii=False, indent=2)
}
],
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"❌ 股票查询失败:{str(e)}"}],
"isError": True
}
def _generate_stock_text(self, data: Dict[str, Any]) -> str:
"""生成股票信息的友好文本"""
symbol = data['symbol']
text = f"📈 {symbol} 股票信息\n\n"
# 基本信息
if 'info' in data and 'error' not in data['info']:
info = data['info']
text += f"🏢 **公司信息**\n"
text += f" 名称:{info['name']}\n"
text += f" 行业:{info['sector']} - {info['industry']}\n"
text += f" 市值:${info['market_cap']:,}\n"
text += f" 市盈率:{info['pe_ratio']}\n\n"
# 价格信息
if 'price' in data and 'error' not in data['price']:
price = data['price']
change_icon = "📈" if price['change'] >= 0 else "📉"
text += f"💰 **实时价格**\n"
text += f" 当前价格:${price['current']}\n"
text += f" 开盘价格:${price['open']}\n"
text += f" 最高/最低:${price['high']} / ${price['low']}\n"
text += f" {change_icon} 涨跌:${price['change']} ({price['change_percent']:+.2f}%)\n"
text += f" 成交量:{price['volume']:,}\n\n"
# 历史数据
if 'history' in data and 'error' not in data['history']:
text += f"📊 **最近历史数据**\n"
for record in data['history'][-3:]: # 显示最近3个数据点
text += f" {record['date']}: ${record['close']} (量: {record['volume']:,})\n"
text += f"\n⏰ 查询时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
return text
🗄️ 数据库操作工具
📊 SQLite数据库工具
python
# database_tools.py
import sqlite3
import asyncio
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
import pandas as pd
class DatabaseTool:
def __init__(self, db_path: str = "mcp_data.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""初始化数据库"""
with sqlite3.connect(self.db_path) as conn:
# 创建示例表
conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
email TEXT UNIQUE,
age INTEGER,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER,
product_name TEXT,
amount DECIMAL(10,2),
order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# 插入示例数据
conn.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (1, '张三', 'zhang@example.com', 25)")
conn.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (2, '李四', 'li@example.com', 30)")
conn.execute("INSERT OR IGNORE INTO orders (user_id, product_name, amount) VALUES (1, 'iPhone 15', 7999.00)")
conn.commit()
def get_tool_definitions(self) -> List[Dict[str, Any]]:
"""获取数据库工具定义"""
return [
{
"name": "query_database",
"description": "执行SQL查询语句,支持SELECT查询",
"parameters": {
"type": "object",
"properties": {
"sql": {
"type": "string",
"description": "SQL查询语句,仅支持SELECT语句"
},
"limit": {
"type": "integer",
"description": "结果限制行数",
"default": 100,
"maximum": 1000
},
"format": {
"type": "string",
"description": "输出格式",
"enum": ["table", "json", "csv"],
"default": "table"
}
},
"required": ["sql"]
}
},
{
"name": "get_table_schema",
"description": "获取数据库表结构信息",
"parameters": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "表名,如果为空则返回所有表信息"
}
}
}
},
{
"name": "insert_data",
"description": "向数据库表插入数据",
"parameters": {
"type": "object",
"properties": {
"table_name": {
"type": "string",
"description": "表名"
},
"data": {
"type": "object",
"description": "要插入的数据,键值对格式"
}
},
"required": ["table_name", "data"]
}
}
]
async def query_database(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行数据库查询"""
try:
sql = arguments.get('sql', '').strip()
limit = arguments.get('limit', 100)
format_type = arguments.get('format', 'table')
# 安全检查:只允许SELECT语句
if not sql.upper().startswith('SELECT'):
raise ValueError("只允许执行SELECT查询语句")
# 添加LIMIT限制
if 'LIMIT' not in sql.upper():
sql += f" LIMIT {limit}"
with sqlite3.connect(self.db_path) as conn:
conn.row_factory = sqlite3.Row # 使结果可以按列名访问
cursor = conn.execute(sql)
rows = cursor.fetchall()
if not rows:
return {
"content": [{"type": "text", "text": "查询结果为空"}],
"isError": False
}
# 转换为字典列表
results = [dict(row) for row in rows]
# 根据格式返回结果
if format_type == 'json':
response_text = json.dumps(results, ensure_ascii=False, indent=2)
elif format_type == 'csv':
df = pd.DataFrame(results)
response_text = df.to_csv(index=False)
else: # table格式
response_text = self._format_table(results)
return {
"content": [
{"type": "text", "text": f"📊 查询结果 ({len(results)}条记录)\n\n{response_text}"},
{
"type": "resource",
"resource": {
"uri": f"database://query/{hash(sql)}",
"name": "查询结果数据",
"mimeType": "application/json"
},
"text": json.dumps(results, ensure_ascii=False, indent=2)
}
],
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"❌ 数据库查询失败:{str(e)}"}],
"isError": True
}
async def get_table_schema(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""获取表结构"""
try:
table_name = arguments.get('table_name', '')
with sqlite3.connect(self.db_path) as conn:
if table_name:
# 获取指定表的结构
cursor = conn.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
if not columns:
raise ValueError(f"表 '{table_name}' 不存在")
schema_info = {
"table_name": table_name,
"columns": [
{
"name": col[1],
"type": col[2],
"not_null": bool(col[3]),
"default_value": col[4],
"primary_key": bool(col[5])
} for col in columns
]
}
# 获取表的行数
cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
row_count = cursor.fetchone()[0]
schema_info["row_count"] = row_count
response_text = self._format_schema(schema_info)
else:
# 获取所有表信息
cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = cursor.fetchall()
all_schemas = []
for table in tables:
table_name = table[0]
cursor = conn.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
row_count = cursor.fetchone()[0]
all_schemas.append({
"table_name": table_name,
"columns": len(columns),
"row_count": row_count
})
response_text = "📋 数据库表信息\n\n"
for schema in all_schemas:
response_text += f"🗂️ **{schema['table_name']}**\n"
response_text += f" 列数:{schema['columns']}\n"
response_text += f" 行数:{schema['row_count']}\n\n"
return {
"content": [{"type": "text", "text": response_text}],
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"❌ 获取表结构失败:{str(e)}"}],
"isError": True
}
def _format_table(self, results: List[Dict[str, Any]]) -> str:
"""格式化表格输出"""
if not results:
return "无数据"
# 使用pandas格式化表格
df = pd.DataFrame(results)
return df.to_string(index=False, max_rows=50)
def _format_schema(self, schema_info: Dict[str, Any]) -> str:
"""格式化表结构输出"""
text = f"🗂️ **{schema_info['table_name']}** 表结构\n\n"
text += f"📊 总行数:{schema_info['row_count']}\n\n"
text += "📋 **字段信息**:\n"
for col in schema_info['columns']:
text += f" • **{col['name']}** ({col['type']})"
flags = []
if col['primary_key']:
flags.append("主键")
if col['not_null']:
flags.append("非空")
if col['default_value']:
flags.append(f"默认值: {col['default_value']}")
if flags:
text += f" - {', '.join(flags)}"
text += "\n"
return text
📊 数据分析工具
python
# analytics_tools.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from typing import Dict, Any, List
import json
class AnalyticsTool:
def __init__(self):
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
def get_tool_definitions(self) -> List[Dict[str, Any]]:
return [
{
"name": "analyze_data",
"description": "对CSV数据进行统计分析",
"parameters": {
"type": "object",
"properties": {
"data": {
"type": "string",
"description": "CSV格式的数据或数据文件路径"
},
"analysis_type": {
"type": "string",
"description": "分析类型",
"enum": ["summary", "correlation", "distribution", "trend"],
"default": "summary"
},
"columns": {
"type": "array",
"items": {"type": "string"},
"description": "要分析的列名,为空则分析所有数值列"
},
"chart": {
"type": "boolean",
"description": "是否生成图表",
"default": True
}
},
"required": ["data"]
}
}
]
async def analyze_data(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行数据分析"""
try:
data_input = arguments.get('data', '')
analysis_type = arguments.get('analysis_type', 'summary')
target_columns = arguments.get('columns', [])
generate_chart = arguments.get('chart', True)
# 读取数据
if data_input.endswith('.csv'):
df = pd.read_csv(data_input)
else:
df = pd.read_csv(io.StringIO(data_input))
if df.empty:
raise ValueError("数据为空")
# 选择要分析的列
if target_columns:
df = df[target_columns]
else:
# 自动选择数值列
df = df.select_dtypes(include=[np.number])
# 执行分析
result = {"data_info": self._get_data_info(df)}
if analysis_type == "summary":
result["analysis"] = self._summary_analysis(df)
elif analysis_type == "correlation":
result["analysis"] = self._correlation_analysis(df)
elif analysis_type == "distribution":
result["analysis"] = self._distribution_analysis(df)
elif analysis_type == "trend":
result["analysis"] = self._trend_analysis(df)
# 生成图表
chart_data = None
if generate_chart:
chart_data = self._generate_chart(df, analysis_type)
# 生成文本报告
report_text = self._generate_analysis_report(result, analysis_type)
content = [{"type": "text", "text": report_text}]
if chart_data:
content.append({
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": chart_data
}
})
return {
"content": content,
"isError": False
}
except Exception as e:
return {
"content": [{"type": "text", "text": f"❌ 数据分析失败:{str(e)}"}],
"isError": True
}
def _get_data_info(self, df: pd.DataFrame) -> Dict[str, Any]:
"""获取数据基本信息"""
return {
"shape": df.shape,
"columns": list(df.columns),
"dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
"missing_values": df.isnull().sum().to_dict(),
"memory_usage": df.memory_usage(deep=True).sum()
}
def _summary_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:
"""基础统计分析"""
return {
"describe": df.describe().to_dict(),
"skewness": df.skew().to_dict(),
"kurtosis": df.kurtosis().to_dict()
}
def _correlation_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:
"""相关性分析"""
correlation_matrix = df.corr()
return {
"correlation_matrix": correlation_matrix.to_dict(),
"strong_correlations": self._find_strong_correlations(correlation_matrix)
}
def _find_strong_correlations(self, corr_matrix: pd.DataFrame) -> List[Dict[str, Any]]:
"""找出强相关性"""
strong_corr = []
for i, col1 in enumerate(corr_matrix.columns):
for j, col2 in enumerate(corr_matrix.columns):
if i < j: # 避免重复
corr_value = corr_matrix.loc[col1, col2]
if abs(corr_value) > 0.7: # 强相关阈值
strong_corr.append({
"var1": col1,
"var2": col2,
"correlation": round(corr_value, 3),
"strength": "强正相关" if corr_value > 0.7 else "强负相关"
})
return strong_corr
def _generate_chart(self, df: pd.DataFrame, analysis_type: str) -> str:
"""生成图表"""
fig, ax = plt.subplots(figsize=(10, 6))
if analysis_type == "summary":
# 箱线图
df.boxplot(ax=ax)
ax.set_title('数据分布箱线图')
ax.tick_params(axis='x', rotation=45)
elif analysis_type == "correlation":
# 相关性热力图
correlation_matrix = df.corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', ax=ax)
ax.set_title('变量相关性热力图')
elif analysis_type == "distribution":
# 直方图
df.hist(bins=20, ax=ax, alpha=0.7)
ax.set_title('数据分布直方图')
elif analysis_type == "trend":
# 线性图
for column in df.columns:
ax.plot(df.index, df[column], label=column, marker='o')
ax.set_title('数据趋势图')
ax.legend()
plt.tight_layout()
# 保存为base64
buffer = io.BytesIO()
plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight')
buffer.seek(0)
image_base64 = base64.b64encode(buffer.getvalue()).decode()
plt.close()
return image_base64
def _generate_analysis_report(self, result: Dict[str, Any], analysis_type: str) -> str:
"""生成分析报告"""
data_info = result["data_info"]
analysis = result["analysis"]
report = f"📊 **数据分析报告** ({analysis_type})\n\n"
# 数据概要
report += f"📋 **数据概要**\n"
report += f" 数据维度:{data_info['shape'][0]}行 × {data_info['shape'][1]}列\n"
report += f" 分析列:{', '.join(data_info['columns'])}\n"
report += f" 内存使用:{data_info['memory_usage'] / 1024:.1f} KB\n\n"
# 缺失值信息
missing_values = data_info["missing_values"]
if any(missing_values.values()):
report += f"⚠️ **缺失值情况**\n"
for col, missing_count in missing_values.items():
if missing_count > 0:
missing_pct = missing_count / data_info['shape'][0] * 100
report += f" {col}: {missing_count}个 ({missing_pct:.1f}%)\n"
report += "\n"
# 具体分析结果
if analysis_type == "summary":
report += self._format_summary_report(analysis)
elif analysis_type == "correlation":
report += self._format_correlation_report(analysis)
return report
def _format_summary_report(self, analysis: Dict[str, Any]) -> str:
"""格式化统计摘要报告"""
report = "📈 **统计摘要**\n"
describe = analysis["describe"]
for col in describe:
stats = describe[col]
report += f"\n🔢 **{col}**\n"
report += f" 均值:{stats['mean']:.2f}\n"
report += f" 标准差:{stats['std']:.2f}\n"
report += f" 最小值:{stats['min']:.2f}\n"
report += f" 最大值:{stats['max']:.2f}\n"
report += f" 中位数:{stats['50%']:.2f}\n"
return report
def _format_correlation_report(self, analysis: Dict[str, Any]) -> str:
"""格式化相关性分析报告"""
report = "🔗 **相关性分析**\n\n"
strong_corr = analysis["strong_correlations"]
if strong_corr:
report += "💪 **强相关关系**\n"
for corr in strong_corr:
report += f" • {corr['var1']} ↔ {corr['var2']}: {corr['correlation']} ({corr['strength']})\n"
else:
report += "ℹ️ 未发现强相关关系(|r| > 0.7)\n"
return report
🚀 服务器整合
现在让我们将所有高级工具整合到服务器中:
python
# advanced_server.py
import asyncio
import logging
from mcp.server.fastapi import FastMCPServer
from mcp.server.models import InitializationOptions
from weather_tools import WeatherTool
from stock_tools import StockTool
from database_tools import DatabaseTool
from analytics_tools import AnalyticsTool
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AdvancedMCPServer:
def __init__(self):
# 初始化所有工具
self.weather_tool = WeatherTool(api_key="your_openweather_api_key")
self.stock_tool = StockTool()
self.database_tool = DatabaseTool()
self.analytics_tool = AnalyticsTool()
# 创建服务器
self.server = FastMCPServer("advanced-mcp-server")
def setup_tools(self):
"""注册所有工具"""
# 天气工具
self.server.add_tool(
self.weather_tool.get_tool_definition(),
self.weather_tool.execute
)
# 股票工具
self.server.add_tool(
self.stock_tool.get_tool_definition(),
self.stock_tool.execute
)
# 数据库工具
for tool_def in self.database_tool.get_tool_definitions():
if tool_def["name"] == "query_database":
self.server.add_tool(tool_def, self.database_tool.query_database)
elif tool_def["name"] == "get_table_schema":
self.server.add_tool(tool_def, self.database_tool.get_table_schema)
# 数据分析工具
for tool_def in self.analytics_tool.get_tool_definitions():
if tool_def["name"] == "analyze_data":
self.server.add_tool(tool_def, self.analytics_tool.analyze_data)
async def run(self, host: str = "localhost", port: int = 3000):
"""启动服务器"""
self.setup_tools()
logger.info(f"🚀 启动高级MCP服务器...")
logger.info(f"📡 监听地址: http://{host}:{port}")
logger.info(f"🛠️ 已注册工具: 天气查询、股票信息、数据库操作、数据分析")
await self.server.run(host=host, port=port)
if __name__ == "__main__":
server = AdvancedMCPServer()
asyncio.run(server.run())
🧪 测试高级工具
创建测试脚本来验证所有工具:
python
# test_advanced_tools.py
import asyncio
import json
from advanced_client import AdvancedMCPClient
async def test_all_tools():
"""测试所有高级工具"""
client = AdvancedMCPClient()
if not await client.connect():
return
try:
print("🧪 开始高级工具测试\n")
# 1. 测试天气工具
print("1️⃣ 测试天气查询...")
await client.call_tool("get_weather", {
"city": "北京",
"forecast_days": 3
})
# 2. 测试股票工具
print("\n2️⃣ 测试股票查询...")
await client.call_tool("get_stock_info", {
"symbol": "AAPL",
"info_type": "all"
})
# 3. 测试数据库查询
print("\n3️⃣ 测试数据库查询...")
await client.call_tool("query_database", {
"sql": "SELECT * FROM users",
"format": "table"
})
# 4. 测试数据分析
print("\n4️⃣ 测试数据分析...")
sample_data = """name,age,salary
张三,25,8000
李四,30,12000
王五,28,9500
赵六,35,15000"""
await client.call_tool("analyze_data", {
"data": sample_data,
"analysis_type": "summary",
"chart": True
})
print("\n✅ 所有高级工具测试完成!")
finally:
await client.disconnect()
if __name__ == "__main__":
asyncio.run(test_all_tools())
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 高级架构模式:分层架构设计和组件职责划分
✅ 网络服务集成:天气API、股票数据等外部服务调用
✅ 数据库操作:SQLite数据库的查询、插入、结构分析
✅ 数据分析功能:统计分析、相关性分析、可视化图表
✅ 工具整合:多个复杂工具的统一管理和部署
✅ 错误处理:健壮的异常处理和用户友好的错误信息
✅ 缓存策略:提高性能的智能缓存机制
🌟 实战练习
- 扩展天气工具:添加空气质量查询功能
- 增强股票工具:加入技术指标计算(MA、RSI等)
- 优化数据库工具:支持更多数据库类型(MySQL、PostgreSQL)
- 丰富分析工具:添加机器学习预测功能
🔗 相关资源
恭喜! 🎉 你已经学会了高级MCP工具开发。下一节我们将学习测试和调试技巧。