Skip to content

8.2 官方最佳实践 📚

"听君一席话,胜读十年书" - 官方文档就是MCP世界的"武功秘籍",这里汇集了协议设计者的智慧结晶。

前言:为什么要听官方的?

想象一下,你要学做正宗的北京烤鸭,是听胡同里老师傅的,还是听网红博主的?当然是老师傅!MCP官方最佳实践就是这位"老师傅",它来自协议的设计者和维护者,代表着最权威、最前沿的实践经验。

官方安全最佳实践 🔐

1. 用户同意与控制 👤

核心要求:在访问数据或执行操作之前,始终需要明确的用户同意。

typescript
// 🌟 最佳实践示例
class UserConsentManager {
    private userPermissions = new Map<string, Set<string>>();
    
    async requestPermission(
        userId: string, 
        action: string, 
        resource: string
    ): Promise<boolean> {
        // 检查是否已有权限
        const userPerms = this.userPermissions.get(userId) || new Set();
        const permissionKey = `${action}:${resource}`;
        
        if (userPerms.has(permissionKey)) {
            return true; // 已授权
        }
        
        // 向用户显示权限请求
        const consent = await this.showConsentDialog({
            user: userId,
            message: `应用程序想要${action}您的${resource},是否同意?`,
            details: `这将允许应用程序访问${resource}并执行${action}操作。`,
            allowRemember: true // 让用户选择是否记住选择
        });
        
        if (consent.granted) {
            userPerms.add(permissionKey);
            this.userPermissions.set(userId, userPerms);
            
            // 记录授权日志
            this.logPermissionGrant(userId, action, resource);
        }
        
        return consent.granted;
    }
    
    private async showConsentDialog(request: ConsentRequest): Promise<ConsentResponse> {
        // 实际实现中,这里会显示用户界面
        return {
            granted: true, // 用户选择
            remember: false // 是否记住选择
        };
    }
}

实际应用场景

python
# 文件访问工具的权限检查
class FileAccessTool:
    def __init__(self, consent_manager: UserConsentManager):
        self.consent_manager = consent_manager
    
    async def read_file(self, file_path: str, user_id: str) -> str:
        # 🚨 关键:先获取用户同意
        has_permission = await self.consent_manager.request_permission(
            user_id, "读取", f"文件:{file_path}"
        )
        
        if not has_permission:
            raise PermissionDeniedError("用户拒绝了文件访问请求")
        
        # 权限验证通过,执行文件读取
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        
        # 记录访问日志(用于审计)
        self.log_file_access(user_id, file_path, "read")
        
        return content

💡 用户体验提示:权限请求要清晰明了,告诉用户为什么需要这个权限,以及会用来做什么。

2. 数据隐私保护 🔒

核心要求:只有在明确同意的情况下才暴露用户数据,并使用适当的访问控制进行保护。

csharp
// 🌟 数据脱敏处理示例
public class DataPrivacyManager
{
    private readonly Dictionary<DataSensitivityLevel, IDataMasker> _maskers;
    
    public DataPrivacyManager()
    {
        _maskers = new Dictionary<DataSensitivityLevel, IDataMasker>
        {
            [DataSensitivityLevel.Public] = new NoMaskDataMasker(),
            [DataSensitivityLevel.Internal] = new PartialMaskDataMasker(),
            [DataSensitivityLevel.Confidential] = new FullMaskDataMasker(),
            [DataSensitivityLevel.Restricted] = new RestrictedDataMasker()
        };
    }
    
    public async Task<UserData> GetUserDataAsync(
        string userId, 
        UserAccessLevel accessLevel,
        bool hasExplicitConsent = false)
    {
        var rawData = await _dataRepository.GetUserDataAsync(userId);
        
        if (!hasExplicitConsent)
        {
            // 没有明确同意,进行数据脱敏
            return ApplyDataMasking(rawData, accessLevel);
        }
        
        // 有明确同意,但仍要根据访问级别过滤
        return FilterDataByAccessLevel(rawData, accessLevel);
    }
    
    private UserData ApplyDataMasking(UserData data, UserAccessLevel accessLevel)
    {
        var maskedData = data.DeepClone();
        
        // 根据访问级别应用不同的脱敏策略
        switch (accessLevel)
        {
            case UserAccessLevel.Basic:
                maskedData.Email = MaskEmail(data.Email);
                maskedData.PhoneNumber = MaskPhoneNumber(data.PhoneNumber);
                maskedData.CreditCardInfo = null; // 完全隐藏
                break;
                
            case UserAccessLevel.Advanced:
                maskedData.CreditCardInfo = MaskCreditCard(data.CreditCardInfo);
                break;
                
            case UserAccessLevel.Admin:
                // 管理员级别,但敏感信息仍需脱敏
                maskedData.CreditCardInfo = PartialMaskCreditCard(data.CreditCardInfo);
                break;
        }
        
        return maskedData;
    }
    
    private string MaskEmail(string email)
    {
        if (string.IsNullOrEmpty(email)) return email;
        
        var parts = email.Split('@');
        if (parts.Length != 2) return "***@***.***";
        
        var username = parts[0];
        var domain = parts[1];
        
        // 保留首末字符,中间用*替代
        var maskedUsername = username.Length > 2 
            ? $"{username[0]}***{username[username.Length - 1]}"
            : "***";
            
        return $"{maskedUsername}@{domain}";
    }
}

3. 工具安全性 🛠️

核心要求:在调用任何工具之前需要明确的用户同意,确保用户理解工具功能。

python
# 🌟 工具安全执行框架
class SecureToolExecutor:
    def __init__(self):
        self.tool_registry = {}
        self.execution_logs = []
        self.risk_assessor = ToolRiskAssessor()
    
    def register_tool(self, tool: BaseTool):
        """注册工具时进行安全评估"""
        # 评估工具风险等级
        risk_level = self.risk_assessor.assess_tool_risk(tool)
        
        self.tool_registry[tool.name] = {
            'tool': tool,
            'risk_level': risk_level,
            'registration_time': datetime.now(),
            'usage_count': 0
        }
        
        print(f"✅ 工具 '{tool.name}' 已注册,风险等级:{risk_level}")
    
    async def execute_tool(
        self, 
        tool_name: str, 
        parameters: dict, 
        user_context: UserContext
    ) -> ToolResult:
        if tool_name not in self.tool_registry:
            raise ToolNotFoundError(f"工具 '{tool_name}' 未找到")
        
        tool_info = self.tool_registry[tool_name]
        tool = tool_info['tool']
        risk_level = tool_info['risk_level']
        
        # 🚨 关键步骤:用户同意检查
        consent_granted = await self._request_execution_consent(
            tool, parameters, user_context, risk_level
        )
        
        if not consent_granted:
            raise UserDeniedError("用户拒绝执行工具")
        
        # 执行前的最后安全检查
        self._perform_pre_execution_security_check(tool, parameters, user_context)
        
        try:
            # 记录执行开始
            execution_id = self._log_execution_start(tool_name, parameters, user_context)
            
            # 执行工具
            result = await tool.execute(parameters, user_context)
            
            # 记录执行完成
            self._log_execution_complete(execution_id, result)
            
            # 更新使用统计
            tool_info['usage_count'] += 1
            
            return result
            
        except Exception as e:
            # 记录执行错误
            self._log_execution_error(execution_id, e)
            raise ToolExecutionError(f"工具执行失败:{str(e)}")
    
    async def _request_execution_consent(
        self, 
        tool: BaseTool, 
        parameters: dict, 
        user_context: UserContext,
        risk_level: RiskLevel
    ) -> bool:
        """请求用户执行同意"""
        
        # 根据风险等级显示不同的警告信息
        warning_message = self._get_risk_warning(risk_level)
        
        consent_request = {
            'tool_name': tool.name,
            'tool_description': tool.description,
            'parameters': self._sanitize_parameters_for_display(parameters),
            'risk_level': risk_level.value,
            'warning_message': warning_message,
            'estimated_execution_time': tool.estimated_execution_time,
            'required_permissions': tool.required_permissions
        }
        
        # 向用户显示同意对话框
        return await self._show_consent_dialog(consent_request, user_context)
    
    def _get_risk_warning(self, risk_level: RiskLevel) -> str:
        warnings = {
            RiskLevel.LOW: "ℹ️ 这是一个低风险操作。",
            RiskLevel.MEDIUM: "⚠️ 这个操作可能会修改数据,请确认。",
            RiskLevel.HIGH: "🚨 警告:这是一个高风险操作,可能会对系统造成重大影响!",
            RiskLevel.CRITICAL: "💀 危险:这个操作具有系统级影响,请三思而后行!"
        }
        return warnings.get(risk_level, "未知风险等级")

4. 参数验证 ✅

核心要求:对所有工具调用强制执行验证,防止恶意或格式错误的输入。

java
// 🌟 全面的参数验证框架
public class ParameterValidator {
    private static final Logger logger = LoggerFactory.getLogger(ParameterValidator.class);
    
    public static class ValidationResult {
        private final boolean isValid;
        private final List<String> errors;
        private final Map<String, Object> sanitizedParams;
        
        // 构造函数和getter方法...
    }
    
    public ValidationResult validateToolParameters(
            String toolName, 
            Map<String, Object> parameters, 
            ToolSchema schema) {
        
        List<String> errors = new ArrayList<>();
        Map<String, Object> sanitizedParams = new HashMap<>();
        
        try {
            // 1. 检查必需参数
            validateRequiredParameters(parameters, schema.getRequiredFields(), errors);
            
            // 2. 验证参数类型
            validateParameterTypes(parameters, schema.getFieldDefinitions(), errors, sanitizedParams);
            
            // 3. 验证参数值范围
            validateParameterRanges(sanitizedParams, schema.getFieldDefinitions(), errors);
            
            // 4. 检查安全风险
            validateSecurityConstraints(sanitizedParams, schema.getSecurityConstraints(), errors);
            
            // 5. 执行自定义验证规则
            executeCustomValidationRules(toolName, sanitizedParams, schema.getCustomRules(), errors);
            
            boolean isValid = errors.isEmpty();
            
            if (!isValid) {
                logger.warn("参数验证失败 - 工具: {}, 错误: {}", toolName, errors);
            }
            
            return new ValidationResult(isValid, errors, sanitizedParams);
            
        } catch (Exception e) {
            logger.error("参数验证过程中发生异常", e);
            errors.add("验证过程异常:" + e.getMessage());
            return new ValidationResult(false, errors, new HashMap<>());
        }
    }
    
    private void validateRequiredParameters(
            Map<String, Object> parameters, 
            Set<String> requiredFields, 
            List<String> errors) {
        
        for (String required : requiredFields) {
            if (!parameters.containsKey(required) || parameters.get(required) == null) {
                errors.add(String.format("缺少必需参数:%s", required));
            }
        }
    }
    
    private void validateParameterTypes(
            Map<String, Object> parameters,
            Map<String, FieldDefinition> fieldDefs,
            List<String> errors,
            Map<String, Object> sanitizedParams) {
        
        for (Map.Entry<String, Object> entry : parameters.entrySet()) {
            String fieldName = entry.getKey();
            Object value = entry.getValue();
            
            FieldDefinition fieldDef = fieldDefs.get(fieldName);
            if (fieldDef == null) {
                errors.add(String.format("未知参数:%s", fieldName));
                continue;
            }
            
            try {
                // 类型转换和验证
                Object sanitizedValue = convertAndValidateType(value, fieldDef);
                sanitizedParams.put(fieldName, sanitizedValue);
                
            } catch (TypeValidationException e) {
                errors.add(String.format("参数 %s 类型错误:%s", fieldName, e.getMessage()));
            }
        }
    }
    
    private void validateSecurityConstraints(
            Map<String, Object> parameters,
            List<SecurityConstraint> constraints,
            List<String> errors) {
        
        for (SecurityConstraint constraint : constraints) {
            switch (constraint.getType()) {
                case SQL_INJECTION_CHECK:
                    checkSqlInjection(parameters, constraint, errors);
                    break;
                case XSS_CHECK:
                    checkXssAttack(parameters, constraint, errors);
                    break;
                case PATH_TRAVERSAL_CHECK:
                    checkPathTraversal(parameters, constraint, errors);
                    break;
                case FILE_EXTENSION_CHECK:
                    checkFileExtension(parameters, constraint, errors);
                    break;
            }
        }
    }
    
    private void checkSqlInjection(
            Map<String, Object> parameters,
            SecurityConstraint constraint,
            List<String> errors) {
        
        // SQL注入模式检测
        List<String> sqlPatterns = Arrays.asList(
            "(?i)(union|select|insert|update|delete|drop|create|alter)\\s",
            "(?i)(or|and)\\s+\\d+\\s*=\\s*\\d+",
            "(?i)(script|javascript|vbscript):",
            "(?i)(onload|onerror|onclick)\\s*="
        );
        
        for (String paramName : constraint.getApplicableFields()) {
            Object value = parameters.get(paramName);
            if (value instanceof String) {
                String strValue = (String) value;
                
                for (String pattern : sqlPatterns) {
                    if (strValue.matches(".*" + pattern + ".*")) {
                        errors.add(String.format(
                            "参数 %s 包含潜在的SQL注入内容", paramName));
                        break;
                    }
                }
            }
        }
    }
}

5. 速率限制 ⏱️

核心要求:实施速率限制以防止滥用并确保服务器资源的公平使用。

python
# 🌟 智能速率限制器
import time
import asyncio
from collections import defaultdict, deque
from typing import Dict, Optional, Tuple

class IntelligentRateLimiter:
    """智能速率限制器 - 根据用户行为动态调整限制"""
    
    def __init__(self):
        # 用户请求历史记录
        self.user_request_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
        
        # 用户信誉分数
        self.user_reputation: Dict[str, float] = defaultdict(lambda: 1.0)
        
        # 基础限制配置
        self.base_limits = {
            'requests_per_minute': 60,
            'requests_per_hour': 1000,
            'requests_per_day': 10000
        }
        
        # 用户等级配置
        self.user_tier_multipliers = {
            'free': 1.0,
            'basic': 2.0,
            'premium': 5.0,
            'enterprise': 10.0
        }
    
    async def check_rate_limit(
        self, 
        user_id: str, 
        user_tier: str = 'free',
        endpoint: str = 'default'
    ) -> Tuple[bool, Optional[dict]]:
        """
        检查速率限制
        返回:(是否允许, 限制信息)
        """
        current_time = time.time()
        
        # 获取用户请求历史
        user_history = self.user_request_history[user_id]
        
        # 计算用户的动态限制
        dynamic_limits = self._calculate_dynamic_limits(
            user_id, user_tier, endpoint
        )
        
        # 检查各个时间窗口的限制
        rate_check_results = {
            'minute': self._check_time_window(user_history, current_time, 60, dynamic_limits['per_minute']),
            'hour': self._check_time_window(user_history, current_time, 3600, dynamic_limits['per_hour']),
            'day': self._check_time_window(user_history, current_time, 86400, dynamic_limits['per_day'])
        }
        
        # 判断是否超限
        is_rate_limited = any(not result['allowed'] for result in rate_check_results.values())
        
        if not is_rate_limited:
            # 记录这次请求
            user_history.append(current_time)
            
            # 更新用户信誉(正常使用增加信誉)
            self._update_user_reputation(user_id, 'normal_usage')
            
            return True, None
        else:
            # 更新用户信誉(超限使用降低信誉)
            self._update_user_reputation(user_id, 'rate_limit_hit')
            
            # 计算重试时间
            retry_after = max(
                result['retry_after'] for result in rate_check_results.values() 
                if not result['allowed']
            )
            
            limit_info = {
                'retry_after_seconds': retry_after,
                'current_limits': dynamic_limits,
                'usage_details': rate_check_results,
                'reputation_score': self.user_reputation[user_id]
            }
            
            return False, limit_info
    
    def _calculate_dynamic_limits(
        self, 
        user_id: str, 
        user_tier: str, 
        endpoint: str
    ) -> dict:
        """根据用户等级和信誉计算动态限制"""
        
        # 基础倍数
        tier_multiplier = self.user_tier_multipliers.get(user_tier, 1.0)
        
        # 信誉倍数
        reputation_score = self.user_reputation[user_id]
        reputation_multiplier = min(reputation_score, 2.0)  # 最大2倍
        
        # 端点特定倍数
        endpoint_multiplier = self._get_endpoint_multiplier(endpoint)
        
        # 综合倍数
        total_multiplier = tier_multiplier * reputation_multiplier * endpoint_multiplier
        
        return {
            'per_minute': int(self.base_limits['requests_per_minute'] * total_multiplier),
            'per_hour': int(self.base_limits['requests_per_hour'] * total_multiplier),
            'per_day': int(self.base_limits['requests_per_day'] * total_multiplier)
        }
    
    def _check_time_window(
        self, 
        user_history: deque, 
        current_time: float, 
        window_seconds: int, 
        limit: int
    ) -> dict:
        """检查指定时间窗口内的请求数量"""
        
        # 清理过期记录
        cutoff_time = current_time - window_seconds
        while user_history and user_history[0] < cutoff_time:
            user_history.popleft()
        
        current_count = len(user_history)
        allowed = current_count < limit
        
        retry_after = 0
        if not allowed and user_history:
            # 计算需要等待的时间
            oldest_request = user_history[0]
            retry_after = int(oldest_request + window_seconds - current_time) + 1
        
        return {
            'allowed': allowed,
            'current_count': current_count,
            'limit': limit,
            'retry_after': retry_after,
            'window_seconds': window_seconds
        }
    
    def _update_user_reputation(self, user_id: str, action: str):
        """更新用户信誉分数"""
        current_reputation = self.user_reputation[user_id]
        
        if action == 'normal_usage':
            # 正常使用,缓慢增加信誉
            self.user_reputation[user_id] = min(current_reputation + 0.001, 2.0)
        elif action == 'rate_limit_hit':
            # 触发限制,降低信誉
            self.user_reputation[user_id] = max(current_reputation - 0.1, 0.1)
        elif action == 'suspicious_activity':
            # 可疑活动,大幅降低信誉
            self.user_reputation[user_id] = max(current_reputation - 0.5, 0.1)
    
    def _get_endpoint_multiplier(self, endpoint: str) -> float:
        """获取端点特定的限制倍数"""
        endpoint_multipliers = {
            'chat': 1.0,        # 聊天端点,标准限制
            'tools': 0.5,       # 工具执行,更严格的限制
            'upload': 0.2,      # 文件上传,严格限制
            'admin': 2.0,       # 管理端点,宽松限制
            'default': 1.0
        }
        
        return endpoint_multipliers.get(endpoint, 1.0)

# 使用示例
rate_limiter = IntelligentRateLimiter()

async def handle_api_request(request):
    user_id = request.user_id
    user_tier = request.user_tier
    endpoint = request.endpoint
    
    # 检查速率限制
    allowed, limit_info = await rate_limiter.check_rate_limit(
        user_id, user_tier, endpoint
    )
    
    if not allowed:
        return {
            'error': 'Rate limit exceeded',
            'message': f'请求过于频繁,请在 {limit_info["retry_after_seconds"]} 秒后重试',
            'retry_after': limit_info['retry_after_seconds'],
            'current_limits': limit_info['current_limits']
        }, 429
    
    # 继续处理请求
    return await process_request(request)

官方实现最佳实践 ⚙️

1. 能力协商 🤝

核心要求:在连接建立期间,交换支持功能、协议版本、可用工具和资源的信息。

typescript
// 🌟 能力协商实现
interface ServerCapabilities {
    protocolVersion: string;
    supportedFeatures: string[];
    availableTools: ToolInfo[];
    availableResources: ResourceInfo[];
    authentication: AuthenticationInfo;
    rateLimits: RateLimitInfo;
}

class CapabilityNegotiator {
    private serverCapabilities: ServerCapabilities;
    
    constructor() {
        this.serverCapabilities = {
            protocolVersion: "2024-11-05",
            supportedFeatures: [
                "tools",
                "resources", 
                "prompts",
                "logging",
                "progress_notifications",
                "request_cancellation"
            ],
            availableTools: this.getAvailableTools(),
            availableResources: this.getAvailableResources(),
            authentication: {
                type: "api_key",
                required: true,
                scopes: ["read", "write", "admin"]
            },
            rateLimits: {
                requestsPerMinute: 100,
                requestsPerHour: 1000,
                concurrentRequests: 10
            }
        };
    }
    
    async handleCapabilityExchange(clientCapabilities: any): Promise<NegotiationResult> {
        // 1. 验证协议版本兼容性
        const protocolCompatible = this.checkProtocolCompatibility(
            clientCapabilities.protocolVersion
        );
        
        if (!protocolCompatible) {
            throw new ProtocolIncompatibilityError(
                `不支持的协议版本:${clientCapabilities.protocolVersion}`
            );
        }
        
        // 2. 协商功能集合
        const negotiatedFeatures = this.negotiateFeatures(
            clientCapabilities.requestedFeatures
        );
        
        // 3. 过滤可用工具(基于客户端能力和权限)
        const availableTools = this.filterToolsByClientCapabilities(
            clientCapabilities
        );
        
        // 4. 过滤可用资源
        const availableResources = this.filterResourcesByClientCapabilities(
            clientCapabilities
        );
        
        // 5. 生成会话配置
        const sessionConfig = {
            sessionId: this.generateSessionId(),
            negotiatedFeatures,
            availableTools,
            availableResources,
            authentication: this.serverCapabilities.authentication,
            rateLimits: this.calculateUserRateLimits(clientCapabilities.userTier)
        };
        
        return {
            success: true,
            sessionConfig,
            serverCapabilities: this.serverCapabilities,
            message: "能力协商成功"
        };
    }
    
    private negotiateFeatures(requestedFeatures: string[]): string[] {
        const supportedFeatures = this.serverCapabilities.supportedFeatures;
        
        // 返回客户端请求且服务器支持的功能
        return requestedFeatures.filter(feature => 
            supportedFeatures.includes(feature)
        );
    }
    
    private filterToolsByClientCapabilities(clientCaps: any): ToolInfo[] {
        return this.serverCapabilities.availableTools.filter(tool => {
            // 检查客户端是否支持工具的要求
            if (tool.requiresFileAccess && !clientCaps.supportsFileAccess) {
                return false;
            }
            
            if (tool.requiresInternetAccess && !clientCaps.supportsInternetAccess) {
                return false;
            }
            
            // 检查权限要求
            if (tool.requiredPermissions) {
                const hasPermissions = tool.requiredPermissions.every(
                    permission => clientCaps.permissions?.includes(permission)
                );
                if (!hasPermissions) return false;
            }
            
            return true;
        });
    }
}

2. 工具设计原则 🔧

核心要求:创建专注的工具,做好一件事,而不是处理多个关注点的单体工具。

csharp
// ❌ 不好的设计:瑞士军刀式工具
public class SuperTool : ITool
{
    public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
    {
        var operation = request.Parameters["operation"].ToString();
        
        switch (operation)
        {
            case "weather":
                return await GetWeather(request.Parameters);
            case "calculator":
                return await Calculate(request.Parameters);
            case "file_operations":
                return await HandleFileOperations(request.Parameters);
            case "database_query":
                return await QueryDatabase(request.Parameters);
            case "send_email":
                return await SendEmail(request.Parameters);
            // ... 更多功能
            default:
                throw new InvalidOperationException("未知操作");
        }
    }
}

// ✅ 好的设计:专注的单一职责工具
public class WeatherForecastTool : ITool
{
    private readonly IWeatherService _weatherService;
    private readonly ILogger<WeatherForecastTool> _logger;
    
    public WeatherForecastTool(IWeatherService weatherService, ILogger<WeatherForecastTool> logger)
    {
        _weatherService = weatherService;
        _logger = logger;
    }
    
    public string Name => "weather_forecast";
    public string Description => "获取指定地区的详细天气预报";
    
    public ToolDefinition GetDefinition()
    {
        return new ToolDefinition
        {
            Name = Name,
            Description = Description,
            Parameters = new Dictionary<string, ParameterDefinition>
            {
                ["location"] = new ParameterDefinition
                {
                    Type = ParameterType.String,
                    Description = "城市或地区名称(支持中英文)",
                    Examples = new[] { "北京", "Shanghai", "New York" }
                },
                ["days"] = new ParameterDefinition
                {
                    Type = ParameterType.Integer,
                    Description = "预报天数(1-7天)",
                    Default = 3,
                    Minimum = 1,
                    Maximum = 7
                },
                ["units"] = new ParameterDefinition
                {
                    Type = ParameterType.String,
                    Description = "温度单位",
                    Enum = new[] { "celsius", "fahrenheit" },
                    Default = "celsius"
                }
            },
            Required = new[] { "location" }
        };
    }
    
    public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
    {
        try
        {
            // 参数提取和验证
            var location = request.Parameters["location"].ToString();
            var days = request.Parameters.ContainsKey("days") 
                ? Convert.ToInt32(request.Parameters["days"]) 
                : 3;
            var units = request.Parameters.GetValueOrDefault("units", "celsius").ToString();
            
            _logger.LogInformation("获取天气预报:{Location}, {Days}天, 单位:{Units}", 
                location, days, units);
            
            // 调用天气服务
            var forecast = await _weatherService.GetForecastAsync(location, days, units);
            
            // 格式化返回结果
            var formattedResult = FormatWeatherResponse(forecast);
            
            return new ToolResponse
            {
                Content = new List<ContentItem>
                {
                    new TextContent(JsonSerializer.Serialize(formattedResult, new JsonSerializerOptions
                    {
                        WriteIndented = true,
                        Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping // 支持中文
                    }))
                }
            };
        }
        catch (LocationNotFoundException ex)
        {
            _logger.LogWarning("位置未找到:{Location}", ex.Location);
            throw new ToolExecutionException($"未找到位置:{ex.Location},请检查拼写或尝试其他表达方式");
        }
        catch (WeatherServiceException ex)
        {
            _logger.LogError(ex, "天气服务异常");
            throw new ToolExecutionException("天气服务暂时不可用,请稍后重试");
        }
    }
    
    private WeatherForecastResult FormatWeatherResponse(WeatherData data)
    {
        return new WeatherForecastResult
        {
            Location = data.Location,
            LastUpdated = data.LastUpdated,
            CurrentWeather = new CurrentWeatherInfo
            {
                Temperature = data.Current.Temperature,
                Description = data.Current.Description,
                Humidity = data.Current.Humidity,
                WindSpeed = data.Current.WindSpeed
            },
            Forecast = data.DailyForecasts.Select(f => new DailyForecast
            {
                Date = f.Date.ToString("yyyy-MM-dd"),
                HighTemperature = f.HighTemperature,
                LowTemperature = f.LowTemperature,
                Description = f.Description,
                ChanceOfRain = f.ChanceOfRain
            }).ToList()
        };
    }
}

小结

官方最佳实践为我们提供了MCP开发的"黄金标准":

安全最佳实践 🛡️

  1. 用户同意与控制 - 尊重用户的选择权
  2. 数据隐私保护 - 保护用户隐私不泄露
  3. 工具安全性 - 确保工具使用的安全性
  4. 参数验证 - 验证所有输入防止攻击
  5. 速率限制 - 防止滥用保证服务质量

实现最佳实践 ⚙️

  1. 能力协商 - 建立兼容的通信基础
  2. 工具设计 - 单一职责,专注高效

这些实践不是束缚,而是帮助我们构建更安全、更可靠、更易维护的MCP系统的指导原则。

🎯 实践建议:在你的项目中逐步实施这些最佳实践,不要试图一次性全部实现。先从安全性最关键的部分开始。


下一节工具设计最佳实践 - 学习如何设计优雅高效的工具