Skip to content

4.3 高级工具开发

🎯 学习目标:掌握MCP高级工具开发技巧,创建实用的企业级工具
⏱️ 预计时间:50分钟
📊 难度等级:⭐⭐⭐⭐

🚀 进入高级工具开发

简单的计算器工具只是开始。现在让我们学习如何开发真正实用的企业级工具,这些工具能够:

  • 🌐 与外部API交互:天气查询、股票数据、新闻获取
  • 🗄️ 操作数据库:查询、插入、更新数据
  • 📊 数据分析:统计计算、图表生成、趋势分析
  • 🔧 系统集成:邮件发送、文档生成、任务自动化

🛠️ 高级工具架构模式

📊 工具分层架构

🌐 网络服务工具开发

🌤️ 天气查询工具

让我们开发一个真实的天气查询工具,展示如何与外部API集成:

python
# weather_tools.py
import aiohttp
import asyncio
import json
from typing import Dict, Any, Optional
from datetime import datetime, timedelta
import logging

logger = logging.getLogger(__name__)

class WeatherTool:
    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.openweathermap.org/data/2.5"
        self.cache = {}
        self.cache_ttl = 300  # 5分钟缓存
    
    def get_tool_definition(self) -> Dict[str, Any]:
        """获取天气工具定义"""
        return {
            "name": "get_weather",
            "description": "获取指定城市的天气信息,支持当前天气和天气预报",
            "parameters": {
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "城市名称,支持中文或英文,如:北京、Beijing、New York"
                    },
                    "forecast_days": {
                        "type": "integer",
                        "description": "预报天数,0表示只获取当前天气,1-5表示未来几天预报",
                        "minimum": 0,
                        "maximum": 5,
                        "default": 0
                    },
                    "units": {
                        "type": "string", 
                        "description": "温度单位",
                        "enum": ["metric", "imperial", "kelvin"],
                        "default": "metric"
                    },
                    "language": {
                        "type": "string",
                        "description": "返回语言",
                        "enum": ["zh_cn", "en", "zh_tw"],
                        "default": "zh_cn"
                    }
                },
                "required": ["city"]
            }
        }
    
    def _get_cache_key(self, city: str, forecast_days: int, units: str) -> str:
        """生成缓存键"""
        return f"weather:{city}:{forecast_days}:{units}"
    
    def _is_cache_valid(self, timestamp: datetime) -> bool:
        """检查缓存是否有效"""
        return datetime.now() - timestamp < timedelta(seconds=self.cache_ttl)
    
    async def _make_api_request(self, url: str, params: Dict[str, Any]) -> Dict[str, Any]:
        """发送API请求"""
        params['appid'] = self.api_key
        
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(url, params=params, timeout=10) as response:
                    if response.status == 200:
                        return await response.json()
                    elif response.status == 404:
                        raise ValueError(f"未找到城市:{params.get('q', '未知')}")
                    elif response.status == 401:
                        raise ValueError("API密钥无效")
                    else:
                        error_data = await response.json()
                        raise ValueError(f"API请求失败:{error_data.get('message', '未知错误')}")
                        
            except aiohttp.ClientTimeout:
                raise ValueError("请求超时,请稍后重试")
            except aiohttp.ClientError as e:
                raise ValueError(f"网络请求失败:{str(e)}")
    
    async def _get_current_weather(self, city: str, units: str, lang: str) -> Dict[str, Any]:
        """获取当前天气"""
        url = f"{self.base_url}/weather"
        params = {
            'q': city,
            'units': units,
            'lang': lang
        }
        
        return await self._make_api_request(url, params)
    
    async def _get_weather_forecast(self, city: str, days: int, units: str, lang: str) -> Dict[str, Any]:
        """获取天气预报"""
        url = f"{self.base_url}/forecast"
        params = {
            'q': city,
            'units': units, 
            'lang': lang,
            'cnt': days * 8  # 每天8次预报(3小时一次)
        }
        
        return await self._make_api_request(url, params)
    
    def _format_weather_response(self, data: Dict[str, Any], forecast_days: int) -> Dict[str, Any]:
        """格式化天气响应"""
        if forecast_days == 0:
            # 当前天气格式化
            return {
                "type": "current_weather",
                "city": data['name'],
                "country": data['sys']['country'],
                "temperature": {
                    "current": round(data['main']['temp'], 1),
                    "feels_like": round(data['main']['feels_like'], 1),
                    "min": round(data['main']['temp_min'], 1),
                    "max": round(data['main']['temp_max'], 1)
                },
                "weather": {
                    "main": data['weather'][0]['main'],
                    "description": data['weather'][0]['description'],
                    "icon": data['weather'][0]['icon']
                },
                "details": {
                    "humidity": data['main']['humidity'],  # 湿度%
                    "pressure": data['main']['pressure'],  # 气压hPa
                    "visibility": data.get('visibility', 0) / 1000,  # 能见度km
                    "wind": {
                        "speed": data['wind']['speed'],  # 风速
                        "direction": data['wind'].get('deg', 0)  # 风向
                    }
                },
                "sun": {
                    "sunrise": datetime.fromtimestamp(data['sys']['sunrise']).strftime('%H:%M'),
                    "sunset": datetime.fromtimestamp(data['sys']['sunset']).strftime('%H:%M')
                },
                "timestamp": datetime.now().isoformat()
            }
        else:
            # 预报格式化
            forecasts = []
            current_date = ""
            daily_data = {}
            
            for item in data['list']:
                date = datetime.fromtimestamp(item['dt']).strftime('%Y-%m-%d')
                time = datetime.fromtimestamp(item['dt']).strftime('%H:%M')
                
                if date != current_date:
                    if daily_data:
                        forecasts.append(daily_data)
                    
                    daily_data = {
                        "date": date,
                        "day_name": datetime.fromtimestamp(item['dt']).strftime('%A'),
                        "temperature": {
                            "min": item['main']['temp'],
                            "max": item['main']['temp']
                        },
                        "weather": {
                            "main": item['weather'][0]['main'],
                            "description": item['weather'][0]['description'],
                            "icon": item['weather'][0]['icon']
                        },
                        "details": {
                            "humidity": item['main']['humidity'],
                            "pressure": item['main']['pressure'],
                            "wind_speed": item['wind']['speed']
                        },
                        "hourly": []
                    }
                    current_date = date
                
                # 更新最高最低温度
                daily_data['temperature']['min'] = min(
                    daily_data['temperature']['min'], 
                    item['main']['temp']
                )
                daily_data['temperature']['max'] = max(
                    daily_data['temperature']['max'], 
                    item['main']['temp']
                )
                
                # 添加小时数据
                daily_data['hourly'].append({
                    "time": time,
                    "temperature": round(item['main']['temp'], 1),
                    "weather": item['weather'][0]['description'],
                    "humidity": item['main']['humidity']
                })
            
            if daily_data:
                forecasts.append(daily_data)
            
            return {
                "type": "weather_forecast",
                "city": data['city']['name'],
                "country": data['city']['country'],
                "forecast_days": len(forecasts),
                "forecasts": forecasts[:forecast_days],  # 限制返回天数
                "timestamp": datetime.now().isoformat()
            }
    
    async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """执行天气查询工具"""
        try:
            # 参数提取和验证
            city = arguments.get('city', '').strip()
            if not city:
                raise ValueError("城市名称不能为空")
            
            forecast_days = arguments.get('forecast_days', 0)
            units = arguments.get('units', 'metric')
            language = arguments.get('language', 'zh_cn')
            
            # 检查缓存
            cache_key = self._get_cache_key(city, forecast_days, units)
            if cache_key in self.cache:
                cached_data, timestamp = self.cache[cache_key]
                if self._is_cache_valid(timestamp):
                    logger.info(f"从缓存获取天气数据: {city}")
                    return {
                        "content": [{"type": "text", "text": json.dumps(cached_data, ensure_ascii=False, indent=2)}],
                        "isError": False
                    }
            
            # 获取天气数据
            if forecast_days == 0:
                weather_data = await self._get_current_weather(city, units, language)
            else:
                weather_data = await self._get_weather_forecast(city, forecast_days, units, language)
            
            # 格式化响应
            formatted_data = self._format_weather_response(weather_data, forecast_days)
            
            # 更新缓存
            self.cache[cache_key] = (formatted_data, datetime.now())
            
            # 生成用户友好的文本响应
            if forecast_days == 0:
                response_text = self._generate_current_weather_text(formatted_data)
            else:
                response_text = self._generate_forecast_text(formatted_data)
            
            return {
                "content": [
                    {
                        "type": "text", 
                        "text": response_text
                    },
                    {
                        "type": "resource",
                        "resource": {
                            "uri": f"weather://{city}",
                            "name": f"{city}天气数据",
                            "mimeType": "application/json"
                        },
                        "text": json.dumps(formatted_data, ensure_ascii=False, indent=2)
                    }
                ],
                "isError": False
            }
            
        except ValueError as e:
            logger.error(f"天气查询参数错误: {e}")
            return {
                "content": [{"type": "text", "text": f"❌ 查询失败:{str(e)}"}],
                "isError": True
            }
        except Exception as e:
            logger.error(f"天气查询异常: {e}")
            return {
                "content": [{"type": "text", "text": f"❌ 系统错误:{str(e)}"}],
                "isError": True
            }
    
    def _generate_current_weather_text(self, data: Dict[str, Any]) -> str:
        """生成当前天气的友好文本"""
        temp = data['temperature']
        weather = data['weather']
        details = data['details']
        sun = data['sun']
        
        text = f"""🌤️ {data['city']}当前天气

🌡️ **温度情况**
  当前温度:{temp['current']}°C(体感 {temp['feels_like']}°C)
  最高/最低:{temp['max']}°C / {temp['min']}°C

☁️ **天气状况**
  天气:{weather['description']}
  
💧 **详细信息**
  湿度:{details['humidity']}%
  气压:{details['pressure']} hPa
  能见度:{details['visibility']} km
  风速:{details['wind']['speed']} m/s
  
🌅 **日出日落**
  日出:{sun['sunrise']}
  日落:{sun['sunset']}
  
⏰ 更新时间:{datetime.fromisoformat(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}"""
        
        return text
    
    def _generate_forecast_text(self, data: Dict[str, Any]) -> str:
        """生成天气预报的友好文本"""
        text = f"📅 {data['city']}未来{data['forecast_days']}天天气预报\n\n"
        
        for i, forecast in enumerate(data['forecasts']):
            date_obj = datetime.strptime(forecast['date'], '%Y-%m-%d')
            date_str = date_obj.strftime('%m月%d日')
            day_name = ['周一', '周二', '周三', '周四', '周五', '周六', '周日'][date_obj.weekday()]
            
            temp = forecast['temperature']
            weather = forecast['weather']
            
            text += f"""📆 **{date_str} {day_name}**
🌡️ 温度:{round(temp['min'], 1)}°C ~ {round(temp['max'], 1)}°C
☁️ 天气:{weather['description']}
💧 湿度:{forecast['details']['humidity']}%

"""
        
        text += f"⏰ 预报时间:{datetime.fromisoformat(data['timestamp']).strftime('%Y-%m-%d %H:%M:%S')}"
        return text

# 在server.py中注册天气工具
class AdvancedMCPServer:
    def __init__(self):
        self.weather_tool = WeatherTool(api_key="your_openweather_api_key")
        
    def register_tools(self, server):
        """注册所有工具"""
        # 注册天气工具
        server.add_tool(
            self.weather_tool.get_tool_definition(),
            self.weather_tool.execute
        )

📈 股票数据工具

python
# stock_tools.py
import yfinance as yf
import pandas as pd
from typing import Dict, Any, List
import json
from datetime import datetime, timedelta

class StockTool:
    def __init__(self):
        self.cache = {}
        self.cache_ttl = 60  # 1分钟缓存
    
    def get_tool_definition(self) -> Dict[str, Any]:
        return {
            "name": "get_stock_info",
            "description": "获取股票实时价格和基本信息",
            "parameters": {
                "type": "object",
                "properties": {
                    "symbol": {
                        "type": "string",
                        "description": "股票代码,如AAPL、TSLA、000001.SZ"
                    },
                    "period": {
                        "type": "string", 
                        "description": "历史数据周期",
                        "enum": ["1d", "5d", "1mo", "3mo", "6mo", "1y", "2y"],
                        "default": "1d"
                    },
                    "interval": {
                        "type": "string",
                        "description": "数据间隔",
                        "enum": ["1m", "5m", "15m", "30m", "1h", "1d"],
                        "default": "1d"
                    },
                    "info_type": {
                        "type": "string",
                        "description": "信息类型",
                        "enum": ["price", "info", "history", "all"],
                        "default": "all"
                    }
                },
                "required": ["symbol"]
            }
        }
    
    async def execute(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        try:
            symbol = arguments.get('symbol', '').upper()
            period = arguments.get('period', '1d')
            interval = arguments.get('interval', '1d')
            info_type = arguments.get('info_type', 'all')
            
            if not symbol:
                raise ValueError("股票代码不能为空")
            
            # 获取股票对象
            stock = yf.Ticker(symbol)
            result = {"symbol": symbol}
            
            # 获取基本信息
            if info_type in ['info', 'all']:
                try:
                    info = stock.info
                    result['info'] = {
                        "name": info.get('longName', info.get('shortName', 'N/A')),
                        "sector": info.get('sector', 'N/A'),
                        "industry": info.get('industry', 'N/A'),
                        "market_cap": info.get('marketCap', 0),
                        "pe_ratio": info.get('trailingPE', 0),
                        "dividend_yield": info.get('dividendYield', 0)
                    }
                except:
                    result['info'] = {"error": "无法获取股票基本信息"}
            
            # 获取当前价格
            if info_type in ['price', 'all']:
                try:
                    history = stock.history(period='1d')
                    if not history.empty:
                        latest = history.iloc[-1]
                        result['price'] = {
                            "current": round(latest['Close'], 2),
                            "open": round(latest['Open'], 2),
                            "high": round(latest['High'], 2),
                            "low": round(latest['Low'], 2),
                            "volume": int(latest['Volume']),
                            "change": round(latest['Close'] - latest['Open'], 2),
                            "change_percent": round((latest['Close'] - latest['Open']) / latest['Open'] * 100, 2)
                        }
                except:
                    result['price'] = {"error": "无法获取价格信息"}
            
            # 获取历史数据
            if info_type in ['history', 'all']:
                try:
                    history = stock.history(period=period, interval=interval)
                    if not history.empty:
                        history_data = []
                        for index, row in history.tail(10).iterrows():  # 最近10个数据点
                            history_data.append({
                                "date": index.strftime('%Y-%m-%d %H:%M:%S'),
                                "open": round(row['Open'], 2),
                                "high": round(row['High'], 2),
                                "low": round(row['Low'], 2),
                                "close": round(row['Close'], 2),
                                "volume": int(row['Volume'])
                            })
                        result['history'] = history_data
                except:
                    result['history'] = {"error": "无法获取历史数据"}
            
            # 生成文本响应
            response_text = self._generate_stock_text(result)
            
            return {
                "content": [
                    {"type": "text", "text": response_text},
                    {
                        "type": "resource",
                        "resource": {
                            "uri": f"stock://{symbol}",
                            "name": f"{symbol}股票数据",
                            "mimeType": "application/json"
                        },
                        "text": json.dumps(result, ensure_ascii=False, indent=2)
                    }
                ],
                "isError": False
            }
            
        except Exception as e:
            return {
                "content": [{"type": "text", "text": f"❌ 股票查询失败:{str(e)}"}],
                "isError": True
            }
    
    def _generate_stock_text(self, data: Dict[str, Any]) -> str:
        """生成股票信息的友好文本"""
        symbol = data['symbol']
        text = f"📈 {symbol} 股票信息\n\n"
        
        # 基本信息
        if 'info' in data and 'error' not in data['info']:
            info = data['info']
            text += f"🏢 **公司信息**\n"
            text += f"  名称:{info['name']}\n"
            text += f"  行业:{info['sector']} - {info['industry']}\n"
            text += f"  市值:${info['market_cap']:,}\n"
            text += f"  市盈率:{info['pe_ratio']}\n\n"
        
        # 价格信息
        if 'price' in data and 'error' not in data['price']:
            price = data['price']
            change_icon = "📈" if price['change'] >= 0 else "📉"
            text += f"💰 **实时价格**\n"
            text += f"  当前价格:${price['current']}\n"
            text += f"  开盘价格:${price['open']}\n"
            text += f"  最高/最低:${price['high']} / ${price['low']}\n"
            text += f"  {change_icon} 涨跌:${price['change']} ({price['change_percent']:+.2f}%)\n"
            text += f"  成交量:{price['volume']:,}\n\n"
        
        # 历史数据
        if 'history' in data and 'error' not in data['history']:
            text += f"📊 **最近历史数据**\n"
            for record in data['history'][-3:]:  # 显示最近3个数据点
                text += f"  {record['date']}: ${record['close']} (量: {record['volume']:,})\n"
        
        text += f"\n⏰ 查询时间:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
        return text

🗄️ 数据库操作工具

📊 SQLite数据库工具

python
# database_tools.py
import sqlite3
import asyncio
import json
from typing import Dict, Any, List, Optional
from pathlib import Path
import pandas as pd

class DatabaseTool:
    def __init__(self, db_path: str = "mcp_data.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """初始化数据库"""
        with sqlite3.connect(self.db_path) as conn:
            # 创建示例表
            conn.execute("""
                CREATE TABLE IF NOT EXISTS users (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    name TEXT NOT NULL,
                    email TEXT UNIQUE,
                    age INTEGER,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            
            conn.execute("""
                CREATE TABLE IF NOT EXISTS orders (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    user_id INTEGER,
                    product_name TEXT,
                    amount DECIMAL(10,2),
                    order_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    FOREIGN KEY (user_id) REFERENCES users (id)
                )
            """)
            
            # 插入示例数据
            conn.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (1, '张三', 'zhang@example.com', 25)")
            conn.execute("INSERT OR IGNORE INTO users (id, name, email, age) VALUES (2, '李四', 'li@example.com', 30)")
            conn.execute("INSERT OR IGNORE INTO orders (user_id, product_name, amount) VALUES (1, 'iPhone 15', 7999.00)")
            conn.commit()
    
    def get_tool_definitions(self) -> List[Dict[str, Any]]:
        """获取数据库工具定义"""
        return [
            {
                "name": "query_database",
                "description": "执行SQL查询语句,支持SELECT查询",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "sql": {
                            "type": "string",
                            "description": "SQL查询语句,仅支持SELECT语句"
                        },
                        "limit": {
                            "type": "integer",
                            "description": "结果限制行数",
                            "default": 100,
                            "maximum": 1000
                        },
                        "format": {
                            "type": "string",
                            "description": "输出格式",
                            "enum": ["table", "json", "csv"],
                            "default": "table"
                        }
                    },
                    "required": ["sql"]
                }
            },
            {
                "name": "get_table_schema",
                "description": "获取数据库表结构信息",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "table_name": {
                            "type": "string",
                            "description": "表名,如果为空则返回所有表信息"
                        }
                    }
                }
            },
            {
                "name": "insert_data",
                "description": "向数据库表插入数据",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "table_name": {
                            "type": "string",
                            "description": "表名"
                        },
                        "data": {
                            "type": "object",
                            "description": "要插入的数据,键值对格式"
                        }
                    },
                    "required": ["table_name", "data"]
                }
            }
        ]
    
    async def query_database(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """执行数据库查询"""
        try:
            sql = arguments.get('sql', '').strip()
            limit = arguments.get('limit', 100)
            format_type = arguments.get('format', 'table')
            
            # 安全检查:只允许SELECT语句
            if not sql.upper().startswith('SELECT'):
                raise ValueError("只允许执行SELECT查询语句")
            
            # 添加LIMIT限制
            if 'LIMIT' not in sql.upper():
                sql += f" LIMIT {limit}"
            
            with sqlite3.connect(self.db_path) as conn:
                conn.row_factory = sqlite3.Row  # 使结果可以按列名访问
                cursor = conn.execute(sql)
                rows = cursor.fetchall()
                
                if not rows:
                    return {
                        "content": [{"type": "text", "text": "查询结果为空"}],
                        "isError": False
                    }
                
                # 转换为字典列表
                results = [dict(row) for row in rows]
                
                # 根据格式返回结果
                if format_type == 'json':
                    response_text = json.dumps(results, ensure_ascii=False, indent=2)
                elif format_type == 'csv':
                    df = pd.DataFrame(results)
                    response_text = df.to_csv(index=False)
                else:  # table格式
                    response_text = self._format_table(results)
                
                return {
                    "content": [
                        {"type": "text", "text": f"📊 查询结果 ({len(results)}条记录)\n\n{response_text}"},
                        {
                            "type": "resource",
                            "resource": {
                                "uri": f"database://query/{hash(sql)}",
                                "name": "查询结果数据",
                                "mimeType": "application/json"
                            },
                            "text": json.dumps(results, ensure_ascii=False, indent=2)
                        }
                    ],
                    "isError": False
                }
                
        except Exception as e:
            return {
                "content": [{"type": "text", "text": f"❌ 数据库查询失败:{str(e)}"}],
                "isError": True
            }
    
    async def get_table_schema(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """获取表结构"""
        try:
            table_name = arguments.get('table_name', '')
            
            with sqlite3.connect(self.db_path) as conn:
                if table_name:
                    # 获取指定表的结构
                    cursor = conn.execute(f"PRAGMA table_info({table_name})")
                    columns = cursor.fetchall()
                    
                    if not columns:
                        raise ValueError(f"表 '{table_name}' 不存在")
                    
                    schema_info = {
                        "table_name": table_name,
                        "columns": [
                            {
                                "name": col[1],
                                "type": col[2],
                                "not_null": bool(col[3]),
                                "default_value": col[4],
                                "primary_key": bool(col[5])
                            } for col in columns
                        ]
                    }
                    
                    # 获取表的行数
                    cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
                    row_count = cursor.fetchone()[0]
                    schema_info["row_count"] = row_count
                    
                    response_text = self._format_schema(schema_info)
                    
                else:
                    # 获取所有表信息
                    cursor = conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
                    tables = cursor.fetchall()
                    
                    all_schemas = []
                    for table in tables:
                        table_name = table[0]
                        cursor = conn.execute(f"PRAGMA table_info({table_name})")
                        columns = cursor.fetchall()
                        cursor = conn.execute(f"SELECT COUNT(*) FROM {table_name}")
                        row_count = cursor.fetchone()[0]
                        
                        all_schemas.append({
                            "table_name": table_name,
                            "columns": len(columns),
                            "row_count": row_count
                        })
                    
                    response_text = "📋 数据库表信息\n\n"
                    for schema in all_schemas:
                        response_text += f"🗂️ **{schema['table_name']}**\n"
                        response_text += f"  列数:{schema['columns']}\n"
                        response_text += f"  行数:{schema['row_count']}\n\n"
                
                return {
                    "content": [{"type": "text", "text": response_text}],
                    "isError": False
                }
                
        except Exception as e:
            return {
                "content": [{"type": "text", "text": f"❌ 获取表结构失败:{str(e)}"}],
                "isError": True
            }
    
    def _format_table(self, results: List[Dict[str, Any]]) -> str:
        """格式化表格输出"""
        if not results:
            return "无数据"
        
        # 使用pandas格式化表格
        df = pd.DataFrame(results)
        return df.to_string(index=False, max_rows=50)
    
    def _format_schema(self, schema_info: Dict[str, Any]) -> str:
        """格式化表结构输出"""
        text = f"🗂️ **{schema_info['table_name']}** 表结构\n\n"
        text += f"📊 总行数:{schema_info['row_count']}\n\n"
        text += "📋 **字段信息**:\n"
        
        for col in schema_info['columns']:
            text += f"  • **{col['name']}** ({col['type']})"
            
            flags = []
            if col['primary_key']:
                flags.append("主键")
            if col['not_null']:
                flags.append("非空")
            if col['default_value']:
                flags.append(f"默认值: {col['default_value']}")
            
            if flags:
                text += f" - {', '.join(flags)}"
            text += "\n"
        
        return text

📊 数据分析工具

python
# analytics_tools.py
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import io
import base64
from typing import Dict, Any, List
import json

class AnalyticsTool:
    def __init__(self):
        # 设置中文字体
        plt.rcParams['font.sans-serif'] = ['SimHei']
        plt.rcParams['axes.unicode_minus'] = False
    
    def get_tool_definitions(self) -> List[Dict[str, Any]]:
        return [
            {
                "name": "analyze_data",
                "description": "对CSV数据进行统计分析",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "data": {
                            "type": "string",
                            "description": "CSV格式的数据或数据文件路径"
                        },
                        "analysis_type": {
                            "type": "string",
                            "description": "分析类型",
                            "enum": ["summary", "correlation", "distribution", "trend"],
                            "default": "summary"
                        },
                        "columns": {
                            "type": "array",
                            "items": {"type": "string"},
                            "description": "要分析的列名,为空则分析所有数值列"
                        },
                        "chart": {
                            "type": "boolean",
                            "description": "是否生成图表",
                            "default": True
                        }
                    },
                    "required": ["data"]
                }
            }
        ]
    
    async def analyze_data(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
        """执行数据分析"""
        try:
            data_input = arguments.get('data', '')
            analysis_type = arguments.get('analysis_type', 'summary')
            target_columns = arguments.get('columns', [])
            generate_chart = arguments.get('chart', True)
            
            # 读取数据
            if data_input.endswith('.csv'):
                df = pd.read_csv(data_input)
            else:
                df = pd.read_csv(io.StringIO(data_input))
            
            if df.empty:
                raise ValueError("数据为空")
            
            # 选择要分析的列
            if target_columns:
                df = df[target_columns]
            else:
                # 自动选择数值列
                df = df.select_dtypes(include=[np.number])
            
            # 执行分析
            result = {"data_info": self._get_data_info(df)}
            
            if analysis_type == "summary":
                result["analysis"] = self._summary_analysis(df)
            elif analysis_type == "correlation":
                result["analysis"] = self._correlation_analysis(df)
            elif analysis_type == "distribution":
                result["analysis"] = self._distribution_analysis(df)
            elif analysis_type == "trend":
                result["analysis"] = self._trend_analysis(df)
            
            # 生成图表
            chart_data = None
            if generate_chart:
                chart_data = self._generate_chart(df, analysis_type)
            
            # 生成文本报告
            report_text = self._generate_analysis_report(result, analysis_type)
            
            content = [{"type": "text", "text": report_text}]
            
            if chart_data:
                content.append({
                    "type": "image",
                    "source": {
                        "type": "base64",
                        "media_type": "image/png",
                        "data": chart_data
                    }
                })
            
            return {
                "content": content,
                "isError": False
            }
            
        except Exception as e:
            return {
                "content": [{"type": "text", "text": f"❌ 数据分析失败:{str(e)}"}],
                "isError": True
            }
    
    def _get_data_info(self, df: pd.DataFrame) -> Dict[str, Any]:
        """获取数据基本信息"""
        return {
            "shape": df.shape,
            "columns": list(df.columns),
            "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
            "missing_values": df.isnull().sum().to_dict(),
            "memory_usage": df.memory_usage(deep=True).sum()
        }
    
    def _summary_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:
        """基础统计分析"""
        return {
            "describe": df.describe().to_dict(),
            "skewness": df.skew().to_dict(),
            "kurtosis": df.kurtosis().to_dict()
        }
    
    def _correlation_analysis(self, df: pd.DataFrame) -> Dict[str, Any]:
        """相关性分析"""
        correlation_matrix = df.corr()
        return {
            "correlation_matrix": correlation_matrix.to_dict(),
            "strong_correlations": self._find_strong_correlations(correlation_matrix)
        }
    
    def _find_strong_correlations(self, corr_matrix: pd.DataFrame) -> List[Dict[str, Any]]:
        """找出强相关性"""
        strong_corr = []
        for i, col1 in enumerate(corr_matrix.columns):
            for j, col2 in enumerate(corr_matrix.columns):
                if i < j:  # 避免重复
                    corr_value = corr_matrix.loc[col1, col2]
                    if abs(corr_value) > 0.7:  # 强相关阈值
                        strong_corr.append({
                            "var1": col1,
                            "var2": col2,
                            "correlation": round(corr_value, 3),
                            "strength": "强正相关" if corr_value > 0.7 else "强负相关"
                        })
        return strong_corr
    
    def _generate_chart(self, df: pd.DataFrame, analysis_type: str) -> str:
        """生成图表"""
        fig, ax = plt.subplots(figsize=(10, 6))
        
        if analysis_type == "summary":
            # 箱线图
            df.boxplot(ax=ax)
            ax.set_title('数据分布箱线图')
            ax.tick_params(axis='x', rotation=45)
            
        elif analysis_type == "correlation":
            # 相关性热力图
            correlation_matrix = df.corr()
            sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', ax=ax)
            ax.set_title('变量相关性热力图')
            
        elif analysis_type == "distribution":
            # 直方图
            df.hist(bins=20, ax=ax, alpha=0.7)
            ax.set_title('数据分布直方图')
            
        elif analysis_type == "trend":
            # 线性图
            for column in df.columns:
                ax.plot(df.index, df[column], label=column, marker='o')
            ax.set_title('数据趋势图')
            ax.legend()
        
        plt.tight_layout()
        
        # 保存为base64
        buffer = io.BytesIO()
        plt.savefig(buffer, format='png', dpi=150, bbox_inches='tight')
        buffer.seek(0)
        image_base64 = base64.b64encode(buffer.getvalue()).decode()
        plt.close()
        
        return image_base64
    
    def _generate_analysis_report(self, result: Dict[str, Any], analysis_type: str) -> str:
        """生成分析报告"""
        data_info = result["data_info"]
        analysis = result["analysis"]
        
        report = f"📊 **数据分析报告** ({analysis_type})\n\n"
        
        # 数据概要
        report += f"📋 **数据概要**\n"
        report += f"  数据维度:{data_info['shape'][0]}行 × {data_info['shape'][1]}\n"
        report += f"  分析列:{', '.join(data_info['columns'])}\n"
        report += f"  内存使用:{data_info['memory_usage'] / 1024:.1f} KB\n\n"
        
        # 缺失值信息
        missing_values = data_info["missing_values"]
        if any(missing_values.values()):
            report += f"⚠️ **缺失值情况**\n"
            for col, missing_count in missing_values.items():
                if missing_count > 0:
                    missing_pct = missing_count / data_info['shape'][0] * 100
                    report += f"  {col}: {missing_count}个 ({missing_pct:.1f}%)\n"
            report += "\n"
        
        # 具体分析结果
        if analysis_type == "summary":
            report += self._format_summary_report(analysis)
        elif analysis_type == "correlation":
            report += self._format_correlation_report(analysis)
        
        return report
    
    def _format_summary_report(self, analysis: Dict[str, Any]) -> str:
        """格式化统计摘要报告"""
        report = "📈 **统计摘要**\n"
        
        describe = analysis["describe"]
        for col in describe:
            stats = describe[col]
            report += f"\n🔢 **{col}**\n"
            report += f"  均值:{stats['mean']:.2f}\n"
            report += f"  标准差:{stats['std']:.2f}\n"
            report += f"  最小值:{stats['min']:.2f}\n"
            report += f"  最大值:{stats['max']:.2f}\n"
            report += f"  中位数:{stats['50%']:.2f}\n"
        
        return report
    
    def _format_correlation_report(self, analysis: Dict[str, Any]) -> str:
        """格式化相关性分析报告"""
        report = "🔗 **相关性分析**\n\n"
        
        strong_corr = analysis["strong_correlations"]
        if strong_corr:
            report += "💪 **强相关关系**\n"
            for corr in strong_corr:
                report += f"  • {corr['var1']}{corr['var2']}: {corr['correlation']} ({corr['strength']})\n"
        else:
            report += "ℹ️ 未发现强相关关系(|r| > 0.7)\n"
        
        return report

🚀 服务器整合

现在让我们将所有高级工具整合到服务器中:

python
# advanced_server.py
import asyncio
import logging
from mcp.server.fastapi import FastMCPServer
from mcp.server.models import InitializationOptions
from weather_tools import WeatherTool
from stock_tools import StockTool
from database_tools import DatabaseTool
from analytics_tools import AnalyticsTool

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class AdvancedMCPServer:
    def __init__(self):
        # 初始化所有工具
        self.weather_tool = WeatherTool(api_key="your_openweather_api_key")
        self.stock_tool = StockTool()
        self.database_tool = DatabaseTool()
        self.analytics_tool = AnalyticsTool()
        
        # 创建服务器
        self.server = FastMCPServer("advanced-mcp-server")
    
    def setup_tools(self):
        """注册所有工具"""
        # 天气工具
        self.server.add_tool(
            self.weather_tool.get_tool_definition(),
            self.weather_tool.execute
        )
        
        # 股票工具
        self.server.add_tool(
            self.stock_tool.get_tool_definition(),
            self.stock_tool.execute
        )
        
        # 数据库工具
        for tool_def in self.database_tool.get_tool_definitions():
            if tool_def["name"] == "query_database":
                self.server.add_tool(tool_def, self.database_tool.query_database)
            elif tool_def["name"] == "get_table_schema":
                self.server.add_tool(tool_def, self.database_tool.get_table_schema)
        
        # 数据分析工具
        for tool_def in self.analytics_tool.get_tool_definitions():
            if tool_def["name"] == "analyze_data":
                self.server.add_tool(tool_def, self.analytics_tool.analyze_data)
    
    async def run(self, host: str = "localhost", port: int = 3000):
        """启动服务器"""
        self.setup_tools()
        
        logger.info(f"🚀 启动高级MCP服务器...")
        logger.info(f"📡 监听地址: http://{host}:{port}")
        logger.info(f"🛠️ 已注册工具: 天气查询、股票信息、数据库操作、数据分析")
        
        await self.server.run(host=host, port=port)

if __name__ == "__main__":
    server = AdvancedMCPServer()
    asyncio.run(server.run())

🧪 测试高级工具

创建测试脚本来验证所有工具:

python
# test_advanced_tools.py
import asyncio
import json
from advanced_client import AdvancedMCPClient

async def test_all_tools():
    """测试所有高级工具"""
    client = AdvancedMCPClient()
    
    if not await client.connect():
        return
    
    try:
        print("🧪 开始高级工具测试\n")
        
        # 1. 测试天气工具
        print("1️⃣ 测试天气查询...")
        await client.call_tool("get_weather", {
            "city": "北京",
            "forecast_days": 3
        })
        
        # 2. 测试股票工具
        print("\n2️⃣ 测试股票查询...")
        await client.call_tool("get_stock_info", {
            "symbol": "AAPL",
            "info_type": "all"
        })
        
        # 3. 测试数据库查询
        print("\n3️⃣ 测试数据库查询...")
        await client.call_tool("query_database", {
            "sql": "SELECT * FROM users",
            "format": "table"
        })
        
        # 4. 测试数据分析
        print("\n4️⃣ 测试数据分析...")
        sample_data = """name,age,salary
张三,25,8000
李四,30,12000
王五,28,9500
赵六,35,15000"""
        
        await client.call_tool("analyze_data", {
            "data": sample_data,
            "analysis_type": "summary",
            "chart": True
        })
        
        print("\n✅ 所有高级工具测试完成!")
        
    finally:
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(test_all_tools())

🎯 本节小结

通过本节学习,你已经掌握了:

高级架构模式:分层架构设计和组件职责划分
网络服务集成:天气API、股票数据等外部服务调用
数据库操作:SQLite数据库的查询、插入、结构分析
数据分析功能:统计分析、相关性分析、可视化图表
工具整合:多个复杂工具的统一管理和部署
错误处理:健壮的异常处理和用户友好的错误信息
缓存策略:提高性能的智能缓存机制

🌟 实战练习

  1. 扩展天气工具:添加空气质量查询功能
  2. 增强股票工具:加入技术指标计算(MA、RSI等)
  3. 优化数据库工具:支持更多数据库类型(MySQL、PostgreSQL)
  4. 丰富分析工具:添加机器学习预测功能

🔗 相关资源


恭喜! 🎉 你已经学会了高级MCP工具开发。下一节我们将学习测试和调试技巧。

👉 下一节:4.4 测试与调试