8.2 官方最佳实践 📚
"听君一席话,胜读十年书" - 官方文档就是MCP世界的"武功秘籍",这里汇集了协议设计者的智慧结晶。
前言:为什么要听官方的?
想象一下,你要学做正宗的北京烤鸭,是听胡同里老师傅的,还是听网红博主的?当然是老师傅!MCP官方最佳实践就是这位"老师傅",它来自协议的设计者和维护者,代表着最权威、最前沿的实践经验。
官方安全最佳实践 🔐
1. 用户同意与控制 👤
核心要求:在访问数据或执行操作之前,始终需要明确的用户同意。
typescript
// 🌟 最佳实践示例
class UserConsentManager {
private userPermissions = new Map<string, Set<string>>();
async requestPermission(
userId: string,
action: string,
resource: string
): Promise<boolean> {
// 检查是否已有权限
const userPerms = this.userPermissions.get(userId) || new Set();
const permissionKey = `${action}:${resource}`;
if (userPerms.has(permissionKey)) {
return true; // 已授权
}
// 向用户显示权限请求
const consent = await this.showConsentDialog({
user: userId,
message: `应用程序想要${action}您的${resource},是否同意?`,
details: `这将允许应用程序访问${resource}并执行${action}操作。`,
allowRemember: true // 让用户选择是否记住选择
});
if (consent.granted) {
userPerms.add(permissionKey);
this.userPermissions.set(userId, userPerms);
// 记录授权日志
this.logPermissionGrant(userId, action, resource);
}
return consent.granted;
}
private async showConsentDialog(request: ConsentRequest): Promise<ConsentResponse> {
// 实际实现中,这里会显示用户界面
return {
granted: true, // 用户选择
remember: false // 是否记住选择
};
}
}
实际应用场景:
python
# 文件访问工具的权限检查
class FileAccessTool:
def __init__(self, consent_manager: UserConsentManager):
self.consent_manager = consent_manager
async def read_file(self, file_path: str, user_id: str) -> str:
# 🚨 关键:先获取用户同意
has_permission = await self.consent_manager.request_permission(
user_id, "读取", f"文件:{file_path}"
)
if not has_permission:
raise PermissionDeniedError("用户拒绝了文件访问请求")
# 权限验证通过,执行文件读取
with open(file_path, 'r', encoding='utf-8') as file:
content = file.read()
# 记录访问日志(用于审计)
self.log_file_access(user_id, file_path, "read")
return content
💡 用户体验提示:权限请求要清晰明了,告诉用户为什么需要这个权限,以及会用来做什么。
2. 数据隐私保护 🔒
核心要求:只有在明确同意的情况下才暴露用户数据,并使用适当的访问控制进行保护。
csharp
// 🌟 数据脱敏处理示例
public class DataPrivacyManager
{
private readonly Dictionary<DataSensitivityLevel, IDataMasker> _maskers;
public DataPrivacyManager()
{
_maskers = new Dictionary<DataSensitivityLevel, IDataMasker>
{
[DataSensitivityLevel.Public] = new NoMaskDataMasker(),
[DataSensitivityLevel.Internal] = new PartialMaskDataMasker(),
[DataSensitivityLevel.Confidential] = new FullMaskDataMasker(),
[DataSensitivityLevel.Restricted] = new RestrictedDataMasker()
};
}
public async Task<UserData> GetUserDataAsync(
string userId,
UserAccessLevel accessLevel,
bool hasExplicitConsent = false)
{
var rawData = await _dataRepository.GetUserDataAsync(userId);
if (!hasExplicitConsent)
{
// 没有明确同意,进行数据脱敏
return ApplyDataMasking(rawData, accessLevel);
}
// 有明确同意,但仍要根据访问级别过滤
return FilterDataByAccessLevel(rawData, accessLevel);
}
private UserData ApplyDataMasking(UserData data, UserAccessLevel accessLevel)
{
var maskedData = data.DeepClone();
// 根据访问级别应用不同的脱敏策略
switch (accessLevel)
{
case UserAccessLevel.Basic:
maskedData.Email = MaskEmail(data.Email);
maskedData.PhoneNumber = MaskPhoneNumber(data.PhoneNumber);
maskedData.CreditCardInfo = null; // 完全隐藏
break;
case UserAccessLevel.Advanced:
maskedData.CreditCardInfo = MaskCreditCard(data.CreditCardInfo);
break;
case UserAccessLevel.Admin:
// 管理员级别,但敏感信息仍需脱敏
maskedData.CreditCardInfo = PartialMaskCreditCard(data.CreditCardInfo);
break;
}
return maskedData;
}
private string MaskEmail(string email)
{
if (string.IsNullOrEmpty(email)) return email;
var parts = email.Split('@');
if (parts.Length != 2) return "***@***.***";
var username = parts[0];
var domain = parts[1];
// 保留首末字符,中间用*替代
var maskedUsername = username.Length > 2
? $"{username[0]}***{username[username.Length - 1]}"
: "***";
return $"{maskedUsername}@{domain}";
}
}
3. 工具安全性 🛠️
核心要求:在调用任何工具之前需要明确的用户同意,确保用户理解工具功能。
python
# 🌟 工具安全执行框架
class SecureToolExecutor:
def __init__(self):
self.tool_registry = {}
self.execution_logs = []
self.risk_assessor = ToolRiskAssessor()
def register_tool(self, tool: BaseTool):
"""注册工具时进行安全评估"""
# 评估工具风险等级
risk_level = self.risk_assessor.assess_tool_risk(tool)
self.tool_registry[tool.name] = {
'tool': tool,
'risk_level': risk_level,
'registration_time': datetime.now(),
'usage_count': 0
}
print(f"✅ 工具 '{tool.name}' 已注册,风险等级:{risk_level}")
async def execute_tool(
self,
tool_name: str,
parameters: dict,
user_context: UserContext
) -> ToolResult:
if tool_name not in self.tool_registry:
raise ToolNotFoundError(f"工具 '{tool_name}' 未找到")
tool_info = self.tool_registry[tool_name]
tool = tool_info['tool']
risk_level = tool_info['risk_level']
# 🚨 关键步骤:用户同意检查
consent_granted = await self._request_execution_consent(
tool, parameters, user_context, risk_level
)
if not consent_granted:
raise UserDeniedError("用户拒绝执行工具")
# 执行前的最后安全检查
self._perform_pre_execution_security_check(tool, parameters, user_context)
try:
# 记录执行开始
execution_id = self._log_execution_start(tool_name, parameters, user_context)
# 执行工具
result = await tool.execute(parameters, user_context)
# 记录执行完成
self._log_execution_complete(execution_id, result)
# 更新使用统计
tool_info['usage_count'] += 1
return result
except Exception as e:
# 记录执行错误
self._log_execution_error(execution_id, e)
raise ToolExecutionError(f"工具执行失败:{str(e)}")
async def _request_execution_consent(
self,
tool: BaseTool,
parameters: dict,
user_context: UserContext,
risk_level: RiskLevel
) -> bool:
"""请求用户执行同意"""
# 根据风险等级显示不同的警告信息
warning_message = self._get_risk_warning(risk_level)
consent_request = {
'tool_name': tool.name,
'tool_description': tool.description,
'parameters': self._sanitize_parameters_for_display(parameters),
'risk_level': risk_level.value,
'warning_message': warning_message,
'estimated_execution_time': tool.estimated_execution_time,
'required_permissions': tool.required_permissions
}
# 向用户显示同意对话框
return await self._show_consent_dialog(consent_request, user_context)
def _get_risk_warning(self, risk_level: RiskLevel) -> str:
warnings = {
RiskLevel.LOW: "ℹ️ 这是一个低风险操作。",
RiskLevel.MEDIUM: "⚠️ 这个操作可能会修改数据,请确认。",
RiskLevel.HIGH: "🚨 警告:这是一个高风险操作,可能会对系统造成重大影响!",
RiskLevel.CRITICAL: "💀 危险:这个操作具有系统级影响,请三思而后行!"
}
return warnings.get(risk_level, "未知风险等级")
4. 参数验证 ✅
核心要求:对所有工具调用强制执行验证,防止恶意或格式错误的输入。
java
// 🌟 全面的参数验证框架
public class ParameterValidator {
private static final Logger logger = LoggerFactory.getLogger(ParameterValidator.class);
public static class ValidationResult {
private final boolean isValid;
private final List<String> errors;
private final Map<String, Object> sanitizedParams;
// 构造函数和getter方法...
}
public ValidationResult validateToolParameters(
String toolName,
Map<String, Object> parameters,
ToolSchema schema) {
List<String> errors = new ArrayList<>();
Map<String, Object> sanitizedParams = new HashMap<>();
try {
// 1. 检查必需参数
validateRequiredParameters(parameters, schema.getRequiredFields(), errors);
// 2. 验证参数类型
validateParameterTypes(parameters, schema.getFieldDefinitions(), errors, sanitizedParams);
// 3. 验证参数值范围
validateParameterRanges(sanitizedParams, schema.getFieldDefinitions(), errors);
// 4. 检查安全风险
validateSecurityConstraints(sanitizedParams, schema.getSecurityConstraints(), errors);
// 5. 执行自定义验证规则
executeCustomValidationRules(toolName, sanitizedParams, schema.getCustomRules(), errors);
boolean isValid = errors.isEmpty();
if (!isValid) {
logger.warn("参数验证失败 - 工具: {}, 错误: {}", toolName, errors);
}
return new ValidationResult(isValid, errors, sanitizedParams);
} catch (Exception e) {
logger.error("参数验证过程中发生异常", e);
errors.add("验证过程异常:" + e.getMessage());
return new ValidationResult(false, errors, new HashMap<>());
}
}
private void validateRequiredParameters(
Map<String, Object> parameters,
Set<String> requiredFields,
List<String> errors) {
for (String required : requiredFields) {
if (!parameters.containsKey(required) || parameters.get(required) == null) {
errors.add(String.format("缺少必需参数:%s", required));
}
}
}
private void validateParameterTypes(
Map<String, Object> parameters,
Map<String, FieldDefinition> fieldDefs,
List<String> errors,
Map<String, Object> sanitizedParams) {
for (Map.Entry<String, Object> entry : parameters.entrySet()) {
String fieldName = entry.getKey();
Object value = entry.getValue();
FieldDefinition fieldDef = fieldDefs.get(fieldName);
if (fieldDef == null) {
errors.add(String.format("未知参数:%s", fieldName));
continue;
}
try {
// 类型转换和验证
Object sanitizedValue = convertAndValidateType(value, fieldDef);
sanitizedParams.put(fieldName, sanitizedValue);
} catch (TypeValidationException e) {
errors.add(String.format("参数 %s 类型错误:%s", fieldName, e.getMessage()));
}
}
}
private void validateSecurityConstraints(
Map<String, Object> parameters,
List<SecurityConstraint> constraints,
List<String> errors) {
for (SecurityConstraint constraint : constraints) {
switch (constraint.getType()) {
case SQL_INJECTION_CHECK:
checkSqlInjection(parameters, constraint, errors);
break;
case XSS_CHECK:
checkXssAttack(parameters, constraint, errors);
break;
case PATH_TRAVERSAL_CHECK:
checkPathTraversal(parameters, constraint, errors);
break;
case FILE_EXTENSION_CHECK:
checkFileExtension(parameters, constraint, errors);
break;
}
}
}
private void checkSqlInjection(
Map<String, Object> parameters,
SecurityConstraint constraint,
List<String> errors) {
// SQL注入模式检测
List<String> sqlPatterns = Arrays.asList(
"(?i)(union|select|insert|update|delete|drop|create|alter)\\s",
"(?i)(or|and)\\s+\\d+\\s*=\\s*\\d+",
"(?i)(script|javascript|vbscript):",
"(?i)(onload|onerror|onclick)\\s*="
);
for (String paramName : constraint.getApplicableFields()) {
Object value = parameters.get(paramName);
if (value instanceof String) {
String strValue = (String) value;
for (String pattern : sqlPatterns) {
if (strValue.matches(".*" + pattern + ".*")) {
errors.add(String.format(
"参数 %s 包含潜在的SQL注入内容", paramName));
break;
}
}
}
}
}
}
5. 速率限制 ⏱️
核心要求:实施速率限制以防止滥用并确保服务器资源的公平使用。
python
# 🌟 智能速率限制器
import time
import asyncio
from collections import defaultdict, deque
from typing import Dict, Optional, Tuple
class IntelligentRateLimiter:
"""智能速率限制器 - 根据用户行为动态调整限制"""
def __init__(self):
# 用户请求历史记录
self.user_request_history: Dict[str, deque] = defaultdict(lambda: deque(maxlen=1000))
# 用户信誉分数
self.user_reputation: Dict[str, float] = defaultdict(lambda: 1.0)
# 基础限制配置
self.base_limits = {
'requests_per_minute': 60,
'requests_per_hour': 1000,
'requests_per_day': 10000
}
# 用户等级配置
self.user_tier_multipliers = {
'free': 1.0,
'basic': 2.0,
'premium': 5.0,
'enterprise': 10.0
}
async def check_rate_limit(
self,
user_id: str,
user_tier: str = 'free',
endpoint: str = 'default'
) -> Tuple[bool, Optional[dict]]:
"""
检查速率限制
返回:(是否允许, 限制信息)
"""
current_time = time.time()
# 获取用户请求历史
user_history = self.user_request_history[user_id]
# 计算用户的动态限制
dynamic_limits = self._calculate_dynamic_limits(
user_id, user_tier, endpoint
)
# 检查各个时间窗口的限制
rate_check_results = {
'minute': self._check_time_window(user_history, current_time, 60, dynamic_limits['per_minute']),
'hour': self._check_time_window(user_history, current_time, 3600, dynamic_limits['per_hour']),
'day': self._check_time_window(user_history, current_time, 86400, dynamic_limits['per_day'])
}
# 判断是否超限
is_rate_limited = any(not result['allowed'] for result in rate_check_results.values())
if not is_rate_limited:
# 记录这次请求
user_history.append(current_time)
# 更新用户信誉(正常使用增加信誉)
self._update_user_reputation(user_id, 'normal_usage')
return True, None
else:
# 更新用户信誉(超限使用降低信誉)
self._update_user_reputation(user_id, 'rate_limit_hit')
# 计算重试时间
retry_after = max(
result['retry_after'] for result in rate_check_results.values()
if not result['allowed']
)
limit_info = {
'retry_after_seconds': retry_after,
'current_limits': dynamic_limits,
'usage_details': rate_check_results,
'reputation_score': self.user_reputation[user_id]
}
return False, limit_info
def _calculate_dynamic_limits(
self,
user_id: str,
user_tier: str,
endpoint: str
) -> dict:
"""根据用户等级和信誉计算动态限制"""
# 基础倍数
tier_multiplier = self.user_tier_multipliers.get(user_tier, 1.0)
# 信誉倍数
reputation_score = self.user_reputation[user_id]
reputation_multiplier = min(reputation_score, 2.0) # 最大2倍
# 端点特定倍数
endpoint_multiplier = self._get_endpoint_multiplier(endpoint)
# 综合倍数
total_multiplier = tier_multiplier * reputation_multiplier * endpoint_multiplier
return {
'per_minute': int(self.base_limits['requests_per_minute'] * total_multiplier),
'per_hour': int(self.base_limits['requests_per_hour'] * total_multiplier),
'per_day': int(self.base_limits['requests_per_day'] * total_multiplier)
}
def _check_time_window(
self,
user_history: deque,
current_time: float,
window_seconds: int,
limit: int
) -> dict:
"""检查指定时间窗口内的请求数量"""
# 清理过期记录
cutoff_time = current_time - window_seconds
while user_history and user_history[0] < cutoff_time:
user_history.popleft()
current_count = len(user_history)
allowed = current_count < limit
retry_after = 0
if not allowed and user_history:
# 计算需要等待的时间
oldest_request = user_history[0]
retry_after = int(oldest_request + window_seconds - current_time) + 1
return {
'allowed': allowed,
'current_count': current_count,
'limit': limit,
'retry_after': retry_after,
'window_seconds': window_seconds
}
def _update_user_reputation(self, user_id: str, action: str):
"""更新用户信誉分数"""
current_reputation = self.user_reputation[user_id]
if action == 'normal_usage':
# 正常使用,缓慢增加信誉
self.user_reputation[user_id] = min(current_reputation + 0.001, 2.0)
elif action == 'rate_limit_hit':
# 触发限制,降低信誉
self.user_reputation[user_id] = max(current_reputation - 0.1, 0.1)
elif action == 'suspicious_activity':
# 可疑活动,大幅降低信誉
self.user_reputation[user_id] = max(current_reputation - 0.5, 0.1)
def _get_endpoint_multiplier(self, endpoint: str) -> float:
"""获取端点特定的限制倍数"""
endpoint_multipliers = {
'chat': 1.0, # 聊天端点,标准限制
'tools': 0.5, # 工具执行,更严格的限制
'upload': 0.2, # 文件上传,严格限制
'admin': 2.0, # 管理端点,宽松限制
'default': 1.0
}
return endpoint_multipliers.get(endpoint, 1.0)
# 使用示例
rate_limiter = IntelligentRateLimiter()
async def handle_api_request(request):
user_id = request.user_id
user_tier = request.user_tier
endpoint = request.endpoint
# 检查速率限制
allowed, limit_info = await rate_limiter.check_rate_limit(
user_id, user_tier, endpoint
)
if not allowed:
return {
'error': 'Rate limit exceeded',
'message': f'请求过于频繁,请在 {limit_info["retry_after_seconds"]} 秒后重试',
'retry_after': limit_info['retry_after_seconds'],
'current_limits': limit_info['current_limits']
}, 429
# 继续处理请求
return await process_request(request)
官方实现最佳实践 ⚙️
1. 能力协商 🤝
核心要求:在连接建立期间,交换支持功能、协议版本、可用工具和资源的信息。
typescript
// 🌟 能力协商实现
interface ServerCapabilities {
protocolVersion: string;
supportedFeatures: string[];
availableTools: ToolInfo[];
availableResources: ResourceInfo[];
authentication: AuthenticationInfo;
rateLimits: RateLimitInfo;
}
class CapabilityNegotiator {
private serverCapabilities: ServerCapabilities;
constructor() {
this.serverCapabilities = {
protocolVersion: "2024-11-05",
supportedFeatures: [
"tools",
"resources",
"prompts",
"logging",
"progress_notifications",
"request_cancellation"
],
availableTools: this.getAvailableTools(),
availableResources: this.getAvailableResources(),
authentication: {
type: "api_key",
required: true,
scopes: ["read", "write", "admin"]
},
rateLimits: {
requestsPerMinute: 100,
requestsPerHour: 1000,
concurrentRequests: 10
}
};
}
async handleCapabilityExchange(clientCapabilities: any): Promise<NegotiationResult> {
// 1. 验证协议版本兼容性
const protocolCompatible = this.checkProtocolCompatibility(
clientCapabilities.protocolVersion
);
if (!protocolCompatible) {
throw new ProtocolIncompatibilityError(
`不支持的协议版本:${clientCapabilities.protocolVersion}`
);
}
// 2. 协商功能集合
const negotiatedFeatures = this.negotiateFeatures(
clientCapabilities.requestedFeatures
);
// 3. 过滤可用工具(基于客户端能力和权限)
const availableTools = this.filterToolsByClientCapabilities(
clientCapabilities
);
// 4. 过滤可用资源
const availableResources = this.filterResourcesByClientCapabilities(
clientCapabilities
);
// 5. 生成会话配置
const sessionConfig = {
sessionId: this.generateSessionId(),
negotiatedFeatures,
availableTools,
availableResources,
authentication: this.serverCapabilities.authentication,
rateLimits: this.calculateUserRateLimits(clientCapabilities.userTier)
};
return {
success: true,
sessionConfig,
serverCapabilities: this.serverCapabilities,
message: "能力协商成功"
};
}
private negotiateFeatures(requestedFeatures: string[]): string[] {
const supportedFeatures = this.serverCapabilities.supportedFeatures;
// 返回客户端请求且服务器支持的功能
return requestedFeatures.filter(feature =>
supportedFeatures.includes(feature)
);
}
private filterToolsByClientCapabilities(clientCaps: any): ToolInfo[] {
return this.serverCapabilities.availableTools.filter(tool => {
// 检查客户端是否支持工具的要求
if (tool.requiresFileAccess && !clientCaps.supportsFileAccess) {
return false;
}
if (tool.requiresInternetAccess && !clientCaps.supportsInternetAccess) {
return false;
}
// 检查权限要求
if (tool.requiredPermissions) {
const hasPermissions = tool.requiredPermissions.every(
permission => clientCaps.permissions?.includes(permission)
);
if (!hasPermissions) return false;
}
return true;
});
}
}
2. 工具设计原则 🔧
核心要求:创建专注的工具,做好一件事,而不是处理多个关注点的单体工具。
csharp
// ❌ 不好的设计:瑞士军刀式工具
public class SuperTool : ITool
{
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
var operation = request.Parameters["operation"].ToString();
switch (operation)
{
case "weather":
return await GetWeather(request.Parameters);
case "calculator":
return await Calculate(request.Parameters);
case "file_operations":
return await HandleFileOperations(request.Parameters);
case "database_query":
return await QueryDatabase(request.Parameters);
case "send_email":
return await SendEmail(request.Parameters);
// ... 更多功能
default:
throw new InvalidOperationException("未知操作");
}
}
}
// ✅ 好的设计:专注的单一职责工具
public class WeatherForecastTool : ITool
{
private readonly IWeatherService _weatherService;
private readonly ILogger<WeatherForecastTool> _logger;
public WeatherForecastTool(IWeatherService weatherService, ILogger<WeatherForecastTool> logger)
{
_weatherService = weatherService;
_logger = logger;
}
public string Name => "weather_forecast";
public string Description => "获取指定地区的详细天气预报";
public ToolDefinition GetDefinition()
{
return new ToolDefinition
{
Name = Name,
Description = Description,
Parameters = new Dictionary<string, ParameterDefinition>
{
["location"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "城市或地区名称(支持中英文)",
Examples = new[] { "北京", "Shanghai", "New York" }
},
["days"] = new ParameterDefinition
{
Type = ParameterType.Integer,
Description = "预报天数(1-7天)",
Default = 3,
Minimum = 1,
Maximum = 7
},
["units"] = new ParameterDefinition
{
Type = ParameterType.String,
Description = "温度单位",
Enum = new[] { "celsius", "fahrenheit" },
Default = "celsius"
}
},
Required = new[] { "location" }
};
}
public async Task<ToolResponse> ExecuteAsync(ToolRequest request)
{
try
{
// 参数提取和验证
var location = request.Parameters["location"].ToString();
var days = request.Parameters.ContainsKey("days")
? Convert.ToInt32(request.Parameters["days"])
: 3;
var units = request.Parameters.GetValueOrDefault("units", "celsius").ToString();
_logger.LogInformation("获取天气预报:{Location}, {Days}天, 单位:{Units}",
location, days, units);
// 调用天气服务
var forecast = await _weatherService.GetForecastAsync(location, days, units);
// 格式化返回结果
var formattedResult = FormatWeatherResponse(forecast);
return new ToolResponse
{
Content = new List<ContentItem>
{
new TextContent(JsonSerializer.Serialize(formattedResult, new JsonSerializerOptions
{
WriteIndented = true,
Encoder = JavaScriptEncoder.UnsafeRelaxedJsonEscaping // 支持中文
}))
}
};
}
catch (LocationNotFoundException ex)
{
_logger.LogWarning("位置未找到:{Location}", ex.Location);
throw new ToolExecutionException($"未找到位置:{ex.Location},请检查拼写或尝试其他表达方式");
}
catch (WeatherServiceException ex)
{
_logger.LogError(ex, "天气服务异常");
throw new ToolExecutionException("天气服务暂时不可用,请稍后重试");
}
}
private WeatherForecastResult FormatWeatherResponse(WeatherData data)
{
return new WeatherForecastResult
{
Location = data.Location,
LastUpdated = data.LastUpdated,
CurrentWeather = new CurrentWeatherInfo
{
Temperature = data.Current.Temperature,
Description = data.Current.Description,
Humidity = data.Current.Humidity,
WindSpeed = data.Current.WindSpeed
},
Forecast = data.DailyForecasts.Select(f => new DailyForecast
{
Date = f.Date.ToString("yyyy-MM-dd"),
HighTemperature = f.HighTemperature,
LowTemperature = f.LowTemperature,
Description = f.Description,
ChanceOfRain = f.ChanceOfRain
}).ToList()
};
}
}
小结
官方最佳实践为我们提供了MCP开发的"黄金标准":
安全最佳实践 🛡️
- 用户同意与控制 - 尊重用户的选择权
- 数据隐私保护 - 保护用户隐私不泄露
- 工具安全性 - 确保工具使用的安全性
- 参数验证 - 验证所有输入防止攻击
- 速率限制 - 防止滥用保证服务质量
实现最佳实践 ⚙️
- 能力协商 - 建立兼容的通信基础
- 工具设计 - 单一职责,专注高效
这些实践不是束缚,而是帮助我们构建更安全、更可靠、更易维护的MCP系统的指导原则。
🎯 实践建议:在你的项目中逐步实施这些最佳实践,不要试图一次性全部实现。先从安全性最关键的部分开始。
下一节:工具设计最佳实践 - 学习如何设计优雅高效的工具