Skip to content

实战案例分析 🎯

"理论是灰色的,而实践之树常青。只有在真实项目中磨练,才能真正掌握最佳实践的精髓。"

通过分析真实项目案例,我们能更好地理解如何在实际开发中应用MCP最佳实践。每个案例都是一个完整的故事,展示了从问题分析到解决方案实施的全过程。

案例一:智能客服助手系统 🤖

项目背景

某电商公司希望构建一个智能客服助手,能够处理用户咨询、订单查询、售后服务等多种场景。系统需要集成多个外部API,包括订单系统、库存系统、物流系统等。

架构设计与实现

typescript
// 🌟 智能客服助手系统架构
import { EventEmitter } from 'events';
import { z } from 'zod';

// 核心数据模型
interface CustomerInquiry {
  id: string;
  customerId: string;
  message: string;
  intent: string;
  entities: Record<string, any>;
  timestamp: Date;
  sessionId: string;
}

interface ServiceResponse {
  type: 'text' | 'card' | 'list' | 'action';
  content: any;
  actions?: Array<{
    type: string;
    label: string;
    data: any;
  }>;
}

// 意图识别服务
class IntentRecognitionService {
  private patterns = new Map<string, RegExp[]>([
    ['order_query', [
      /订单.*查询|查看.*订单|订单.*状态/i,
      /order.*status|check.*order/i
    ]],
    ['refund_request', [
      /退款|申请退款|要退货/i,
      /refund|return.*product/i
    ]],
    ['product_inquiry', [
      /产品.*咨询|商品.*问题|这个.*怎么样/i,
      /product.*question|about.*item/i
    ]],
    ['shipping_query', [
      /物流.*信息|快递.*查询|什么时候.*/i,
      /shipping.*status|delivery.*time/i
    ]]
  ]);

  private entityExtractors = new Map<string, (text: string) => Record<string, any>>([
    ['order_query', this.extractOrderInfo.bind(this)],
    ['shipping_query', this.extractShippingInfo.bind(this)],
    ['product_inquiry', this.extractProductInfo.bind(this)]
  ]);

  async recognizeIntent(message: string): Promise<{
    intent: string;
    confidence: number;
    entities: Record<string, any>;
  }> {
    console.log(`🧠 分析用户意图: "${message}"`);
    
    let bestMatch = { intent: 'unknown', confidence: 0 };
    
    for (const [intent, patterns] of this.patterns) {
      for (const pattern of patterns) {
        if (pattern.test(message)) {
          const confidence = this.calculateConfidence(message, pattern);
          if (confidence > bestMatch.confidence) {
            bestMatch = { intent, confidence };
          }
        }
      }
    }
    
    // 提取实体信息
    const entities = this.entityExtractors.get(bestMatch.intent)?.(message) || {};
    
    console.log(`🎯 识别结果: ${bestMatch.intent} (置信度: ${bestMatch.confidence.toFixed(2)})`);
    
    return {
      intent: bestMatch.intent,
      confidence: bestMatch.confidence,
      entities
    };
  }

  private calculateConfidence(text: string, pattern: RegExp): number {
    const matches = text.match(pattern);
    if (!matches) return 0;
    
    // 简单的置信度计算:匹配长度 / 文本长度
    const matchLength = matches[0].length;
    const textLength = text.length;
    
    return Math.min(0.95, (matchLength / textLength) * 2);
  }

  private extractOrderInfo(text: string): Record<string, any> {
    const orderIdMatch = text.match(/(\d{10,})/);
    return orderIdMatch ? { orderId: orderIdMatch[1] } : {};
  }

  private extractShippingInfo(text: string): Record<string, any> {
    const trackingMatch = text.match(/([A-Z]{2}\d{9}[A-Z]{2})/);
    return trackingMatch ? { trackingNumber: trackingMatch[1] } : {};
  }

  private extractProductInfo(text: string): Record<string, any> {
    // 简化的产品信息提取
    const productMatch = text.match(/(iPhone|小米|华为|三星)/i);
    return productMatch ? { productBrand: productMatch[1] } : {};
  }
}

// MCP工具集合
class CustomerServiceTools {
  private orderApiClient: any;
  private inventoryApiClient: any;
  private shippingApiClient: any;
  
  constructor() {
    // 初始化API客户端(实际项目中会注入真实的API客户端)
    this.orderApiClient = this.createMockApiClient('Order API');
    this.inventoryApiClient = this.createMockApiClient('Inventory API');
    this.shippingApiClient = this.createMockApiClient('Shipping API');
  }

  private createMockApiClient(name: string) {
    return {
      get: async (path: string, params?: any) => {
        console.log(`🌐 调用 ${name}: GET ${path}`, params);
        await this.simulateApiDelay();
        return this.generateMockResponse(path, params);
      },
      post: async (path: string, data?: any) => {
        console.log(`🌐 调用 ${name}: POST ${path}`, data);
        await this.simulateApiDelay();
        return { success: true, id: Date.now().toString() };
      }
    };
  }

  private async simulateApiDelay(): Promise<void> {
    await new Promise(resolve => setTimeout(resolve, 500 + Math.random() * 1000));
  }

  private generateMockResponse(path: string, params?: any): any {
    if (path.includes('orders')) {
      return {
        id: params?.orderId || '1234567890',
        status: '已发货',
        items: [
          { name: 'iPhone 15 Pro', quantity: 1, price: 8999 }
        ],
        totalAmount: 8999,
        orderDate: '2024-03-15',
        estimatedDelivery: '2024-03-18'
      };
    }
    
    if (path.includes('shipping')) {
      return {
        trackingNumber: params?.trackingNumber || 'SF1234567890',
        status: '运输中',
        currentLocation: '上海分拣中心',
        estimatedArrival: '2024-03-18 15:00',
        trackingHistory: [
          { time: '2024-03-15 10:00', location: '深圳', status: '已揽收' },
          { time: '2024-03-16 14:30', location: '广州', status: '运输中' },
          { time: '2024-03-17 09:15', location: '上海', status: '到达分拣中心' }
        ]
      };
    }
    
    if (path.includes('inventory')) {
      return {
        productId: params?.productId || 'iphone15pro',
        stock: 156,
        status: '有货',
        warehouse: '上海仓库'
      };
    }
    
    return {};
  }

  // MCP工具:查询订单
  async queryOrder(params: { orderId: string }): Promise<any> {
    const orderSchema = z.object({
      orderId: z.string().min(10, '订单号至少10位数字')
    });

    const validated = orderSchema.parse(params);
    
    try {
      const orderInfo = await this.orderApiClient.get(`/orders/${validated.orderId}`);
      
      return {
        type: 'card',
        content: {
          title: `订单 #${orderInfo.id}`,
          status: orderInfo.status,
          items: orderInfo.items,
          totalAmount: `¥${orderInfo.totalAmount}`,
          orderDate: orderInfo.orderDate,
          estimatedDelivery: orderInfo.estimatedDelivery
        },
        actions: [
          {
            type: 'track_shipping',
            label: '查看物流',
            data: { orderId: orderInfo.id }
          },
          {
            type: 'contact_service',
            label: '联系客服',
            data: { orderId: orderInfo.id }
          }
        ]
      };
    } catch (error) {
      return {
        type: 'text',
        content: `订单查询失败:${error.message}。请检查订单号是否正确。`
      };
    }
  }

  // MCP工具:查询物流
  async trackShipping(params: { orderId?: string; trackingNumber?: string }): Promise<any> {
    try {
      let trackingInfo;
      
      if (params.orderId) {
        // 通过订单号获取物流信息
        const orderInfo = await this.orderApiClient.get(`/orders/${params.orderId}`);
        trackingInfo = await this.shippingApiClient.get(`/tracking/${orderInfo.trackingNumber}`);
      } else if (params.trackingNumber) {
        // 直接通过运单号查询
        trackingInfo = await this.shippingApiClient.get(`/tracking/${params.trackingNumber}`);
      } else {
        throw new Error('需要提供订单号或运单号');
      }

      return {
        type: 'list',
        content: {
          title: `物流追踪 - ${trackingInfo.trackingNumber}`,
          status: trackingInfo.status,
          currentLocation: trackingInfo.currentLocation,
          estimatedArrival: trackingInfo.estimatedArrival,
          trackingHistory: trackingInfo.trackingHistory.map((item: any) => ({
            time: item.time,
            location: item.location,
            status: item.status
          }))
        }
      };
    } catch (error) {
      return {
        type: 'text',
        content: `物流查询失败:${error.message}`
      };
    }
  }

  // MCP工具:申请退款
  async requestRefund(params: { orderId: string; reason: string; amount?: number }): Promise<any> {
    const refundSchema = z.object({
      orderId: z.string().min(10),
      reason: z.string().min(5, '退款原因至少5个字符'),
      amount: z.number().positive().optional()
    });

    const validated = refundSchema.parse(params);

    try {
      // 检查订单状态
      const orderInfo = await this.orderApiClient.get(`/orders/${validated.orderId}`);
      
      if (orderInfo.status === '已退款') {
        return {
          type: 'text',
          content: '该订单已经退款,无需重复申请。'
        };
      }

      // 提交退款申请
      const refundRequest = await this.orderApiClient.post('/refunds', {
        orderId: validated.orderId,
        reason: validated.reason,
        amount: validated.amount || orderInfo.totalAmount
      });

      return {
        type: 'card',
        content: {
          title: '退款申请已提交',
          refundId: refundRequest.id,
          amount: `¥${validated.amount || orderInfo.totalAmount}`,
          status: '处理中',
          estimatedTime: '3-5个工作日'
        },
        actions: [
          {
            type: 'check_refund_status',
            label: '查看进度',
            data: { refundId: refundRequest.id }
          }
        ]
      };
    } catch (error) {
      return {
        type: 'text',
        content: `退款申请失败:${error.message}。请稍后重试或联系人工客服。`
      };
    }
  }

  // MCP工具:产品咨询
  async getProductInfo(params: { productId?: string; productName?: string }): Promise<any> {
    try {
      let productInfo;
      
      if (params.productId) {
        productInfo = await this.inventoryApiClient.get(`/products/${params.productId}`);
      } else if (params.productName) {
        // 模拟产品搜索
        productInfo = {
          id: 'iphone15pro',
          name: 'iPhone 15 Pro',
          price: 8999,
          description: '搭载A17 Pro芯片的专业级iPhone',
          specifications: {
            screen: '6.1英寸 Super Retina XDR显示屏',
            chip: 'A17 Pro芯片',
            camera: '48MP主摄像头系统',
            storage: '128GB/256GB/512GB/1TB'
          },
          availability: '有现货'
        };
      } else {
        throw new Error('需要提供产品ID或产品名称');
      }

      return {
        type: 'card',
        content: {
          title: productInfo.name,
          price: `¥${productInfo.price}`,
          description: productInfo.description,
          specifications: productInfo.specifications,
          availability: productInfo.availability
        },
        actions: [
          {
            type: 'add_to_cart',
            label: '加入购物车',
            data: { productId: productInfo.id }
          },
          {
            type: 'check_similar',
            label: '查看同类产品',
            data: { category: 'smartphones' }
          }
        ]
      };
    } catch (error) {
      return {
        type: 'text',
        content: `产品信息查询失败:${error.message}`
      };
    }
  }
}

// 对话管理器
class ConversationManager extends EventEmitter {
  private sessions = new Map<string, ConversationSession>();
  private intentService: IntentRecognitionService;
  private tools: CustomerServiceTools;

  constructor() {
    super();
    this.intentService = new IntentRecognitionService();
    this.tools = new CustomerServiceTools();
  }

  async processInquiry(inquiry: CustomerInquiry): Promise<ServiceResponse> {
    console.log(`💬 处理客户咨询: ${inquiry.message}`);

    // 获取或创建会话
    let session = this.sessions.get(inquiry.sessionId);
    if (!session) {
      session = new ConversationSession(inquiry.sessionId, inquiry.customerId);
      this.sessions.set(inquiry.sessionId, session);
    }

    // 意图识别
    const intentResult = await this.intentService.recognizeIntent(inquiry.message);
    
    // 更新会话上下文
    session.addMessage(inquiry.message, intentResult);

    // 生成响应
    const response = await this.generateResponse(intentResult, session);

    // 记录交互历史
    session.addResponse(response);

    // 发送事件
    this.emit('inquiry_processed', {
      inquiry,
      intentResult,
      response,
      sessionId: inquiry.sessionId
    });

    return response;
  }

  private async generateResponse(
    intentResult: { intent: string; confidence: number; entities: Record<string, any> },
    session: ConversationSession
  ): Promise<ServiceResponse> {
    const { intent, entities, confidence } = intentResult;

    // 置信度太低,转人工客服
    if (confidence < 0.3) {
      return {
        type: 'text',
        content: '抱歉,我没有完全理解您的问题。让我为您转接人工客服。',
        actions: [{
          type: 'transfer_to_human',
          label: '转人工客服',
          data: { sessionId: session.id }
        }]
      };
    }

    // 根据意图调用相应工具
    switch (intent) {
      case 'order_query':
        if (entities.orderId) {
          return await this.tools.queryOrder({ orderId: entities.orderId });
        } else {
          return {
            type: 'text',
            content: '请提供您的订单号,我来帮您查询订单状态。'
          };
        }

      case 'shipping_query':
        if (entities.trackingNumber) {
          return await this.tools.trackShipping({ trackingNumber: entities.trackingNumber });
        } else if (entities.orderId) {
          return await this.tools.trackShipping({ orderId: entities.orderId });
        } else {
          return {
            type: 'text',
            content: '请提供您的订单号或运单号,我来帮您查询物流信息。'
          };
        }

      case 'refund_request':
        if (entities.orderId && entities.reason) {
          return await this.tools.requestRefund({
            orderId: entities.orderId,
            reason: entities.reason
          });
        } else {
          return {
            type: 'text',
            content: '申请退款需要您提供订单号和退款原因。请问是哪个订单需要退款?'
          };
        }

      case 'product_inquiry':
        if (entities.productBrand) {
          return await this.tools.getProductInfo({ productName: entities.productBrand });
        } else {
          return {
            type: 'text',
            content: '请告诉我您想了解哪款产品,我来为您介绍详细信息。'
          };
        }

      default:
        return {
          type: 'text',
          content: '我是您的智能客服助手,可以帮您查询订单、物流、申请退款和产品咨询。请问有什么可以帮您的?'
        };
    }
  }

  getSessionStats(): any {
    return {
      activeSessions: this.sessions.size,
      totalInquiries: Array.from(this.sessions.values())
        .reduce((sum, session) => sum + session.messageCount, 0)
    };
  }
}

// 会话管理
class ConversationSession {
  public readonly id: string;
  public readonly customerId: string;
  public readonly startTime: Date;
  private messages: Array<{ message: string; intent: any; timestamp: Date }> = [];
  private responses: Array<{ response: ServiceResponse; timestamp: Date }> = [];

  constructor(sessionId: string, customerId: string) {
    this.id = sessionId;
    this.customerId = customerId;
    this.startTime = new Date();
  }

  addMessage(message: string, intent: any): void {
    this.messages.push({
      message,
      intent,
      timestamp: new Date()
    });
  }

  addResponse(response: ServiceResponse): void {
    this.responses.push({
      response,
      timestamp: new Date()
    });
  }

  get messageCount(): number {
    return this.messages.length;
  }

  get lastIntent(): string {
    return this.messages[this.messages.length - 1]?.intent?.intent || 'unknown';
  }

  getContext(): any {
    return {
      sessionId: this.id,
      customerId: this.customerId,
      messageCount: this.messageCount,
      lastIntent: this.lastIntent,
      duration: Date.now() - this.startTime.getTime()
    };
  }
}

// 系统启动和测试
async function runCustomerServiceDemo() {
  console.log('🚀 启动智能客服助手系统...\n');
  
  const conversationManager = new ConversationManager();
  
  // 监听处理事件
  conversationManager.on('inquiry_processed', (data) => {
    console.log(`📊 处理完成 - 意图: ${data.intentResult.intent}, 置信度: ${data.intentResult.confidence.toFixed(2)}\n`);
  });

  // 模拟客户咨询场景
  const testInquiries = [
    {
      id: '1',
      customerId: 'user123',
      message: '我想查询订单1234567890的状态',
      sessionId: 'session1'
    },
    {
      id: '2',
      customerId: 'user123',
      message: '物流信息怎么查看?',
      sessionId: 'session1'
    },
    {
      id: '3',
      customerId: 'user456',
      message: '我要申请退款,订单9876543210有质量问题',
      sessionId: 'session2'
    },
    {
      id: '4',
      customerId: 'user789',
      message: 'iPhone 15 Pro有什么特点?',
      sessionId: 'session3'
    }
  ];

  // 依次处理咨询
  for (const inquiry of testInquiries) {
    console.log(`👤 客户咨询: "${inquiry.message}"`);
    
    const response = await conversationManager.processInquiry({
      ...inquiry,
      intent: '',
      entities: {},
      timestamp: new Date()
    });
    
    console.log(`🤖 助手回复:`, JSON.stringify(response, null, 2));
    console.log('-'.repeat(80));
    
    // 模拟处理间隔
    await new Promise(resolve => setTimeout(resolve, 1000));
  }

  // 显示统计信息
  const stats = conversationManager.getSessionStats();
  console.log(`\n📈 系统统计:`);
  console.log(`  活跃会话: ${stats.activeSessions}`);
  console.log(`  总咨询数: ${stats.totalInquiries}`);
}

// 运行演示
runCustomerServiceDemo().catch(console.error);

最佳实践总结

1. 架构设计亮点:

  • 意图识别与实体提取:使用正则表达式和规则引擎进行意图识别
  • 工具模块化:每个业务功能封装为独立的MCP工具
  • 会话管理:维护用户对话上下文,支持多轮对话
  • 错误处理:完善的异常处理和降级策略

2. 代码质量保障:

  • 参数验证:使用Zod进行输入参数验证
  • 类型安全:TypeScript提供完整类型检查
  • 异步处理:合理使用async/await处理异步操作
  • 事件驱动:通过EventEmitter实现组件间解耦

3. 可扩展性设计:

  • 插件化工具:新功能可以轻松添加为新工具
  • 配置化规则:意图识别规则可外部配置
  • API抽象:外部服务调用通过接口抽象
  • 监控集成:预留监控和日志接口

案例二:多语言代码分析平台 📊

项目背景

某软件公司需要构建一个代码质量分析平台,支持多种编程语言的静态分析、代码审查、安全扫描等功能。系统需要处理大量代码文件,生成详细的分析报告。

核心实现

python
# 🌟 多语言代码分析平台
import asyncio
import json
import os
import tempfile
import subprocess
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import hashlib
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import logging

class Language(Enum):
    """支持的编程语言"""
    PYTHON = "python"
    JAVASCRIPT = "javascript"
    TYPESCRIPT = "typescript"
    JAVA = "java"
    CSHARP = "csharp"
    GO = "go"
    RUST = "rust"

class AnalysisType(Enum):
    """分析类型"""
    SYNTAX = "syntax"          # 语法检查
    STYLE = "style"            # 代码风格
    COMPLEXITY = "complexity"  # 复杂度分析
    SECURITY = "security"      # 安全扫描
    DUPLICATION = "duplication"  # 重复代码检测
    DEPENDENCIES = "dependencies"  # 依赖分析

@dataclass
class CodeFile:
    """代码文件信息"""
    path: str
    content: str
    language: Language
    size: int
    hash: str = field(init=False)
    
    def __post_init__(self):
        self.hash = hashlib.md5(self.content.encode()).hexdigest()

@dataclass
class AnalysisResult:
    """分析结果"""
    file_path: str
    analysis_type: AnalysisType
    language: Language
    issues: List[Dict[str, Any]] = field(default_factory=list)
    metrics: Dict[str, Any] = field(default_factory=dict)
    execution_time: float = 0.0
    success: bool = True
    error_message: Optional[str] = None

class LanguageDetector:
    """语言检测器"""
    
    EXTENSIONS = {
        '.py': Language.PYTHON,
        '.js': Language.JAVASCRIPT,
        '.ts': Language.TYPESCRIPT,
        '.java': Language.JAVA,
        '.cs': Language.CSHARP,
        '.go': Language.GO,
        '.rs': Language.RUST,
    }
    
    SHEBANG_PATTERNS = {
        'python': Language.PYTHON,
        'node': Language.JAVASCRIPT,
        'java': Language.JAVA,
    }
    
    @classmethod
    def detect_language(cls, file_path: str, content: str) -> Optional[Language]:
        """检测文件语言"""
        # 1. 通过文件扩展名检测
        ext = Path(file_path).suffix.lower()
        if ext in cls.EXTENSIONS:
            return cls.EXTENSIONS[ext]
        
        # 2. 通过shebang检测
        first_line = content.split('\n')[0] if content else ''
        if first_line.startswith('#!'):
            for pattern, language in cls.SHEBANG_PATTERNS.items():
                if pattern in first_line.lower():
                    return language
        
        # 3. 通过内容特征检测
        return cls._detect_by_content(content)
    
    @classmethod
    def _detect_by_content(cls, content: str) -> Optional[Language]:
        """通过内容特征检测语言"""
        # 简化的内容特征检测
        if 'def ' in content and 'import ' in content:
            return Language.PYTHON
        elif 'function ' in content and ('var ' in content or 'let ' in content):
            return Language.JAVASCRIPT
        elif 'interface ' in content and ': ' in content:
            return Language.TYPESCRIPT
        elif 'public class ' in content and 'public static void main' in content:
            return Language.JAVA
        elif 'using ' in content and 'namespace ' in content:
            return Language.CSHARP
        elif 'func ' in content and 'package ' in content:
            return Language.GO
        elif 'fn ' in content and 'use ' in content:
            return Language.RUST
        
        return None

class BaseAnalyzer:
    """分析器基类"""
    
    def __init__(self, language: Language):
        self.language = language
        self.logger = logging.getLogger(f"{self.__class__.__name__}.{language.value}")
    
    async def analyze(self, code_file: CodeFile, analysis_types: List[AnalysisType]) -> List[AnalysisResult]:
        """执行分析"""
        results = []
        
        for analysis_type in analysis_types:
            start_time = time.time()
            
            try:
                result = await self._run_analysis(code_file, analysis_type)
                result.execution_time = time.time() - start_time
                result.success = True
                
            except Exception as e:
                self.logger.error(f"分析失败 {analysis_type.value}: {str(e)}")
                result = AnalysisResult(
                    file_path=code_file.path,
                    analysis_type=analysis_type,
                    language=self.language,
                    execution_time=time.time() - start_time,
                    success=False,
                    error_message=str(e)
                )
            
            results.append(result)
        
        return results
    
    async def _run_analysis(self, code_file: CodeFile, analysis_type: AnalysisType) -> AnalysisResult:
        """执行具体分析"""
        if analysis_type == AnalysisType.SYNTAX:
            return await self._analyze_syntax(code_file)
        elif analysis_type == AnalysisType.STYLE:
            return await self._analyze_style(code_file)
        elif analysis_type == AnalysisType.COMPLEXITY:
            return await self._analyze_complexity(code_file)
        elif analysis_type == AnalysisType.SECURITY:
            return await self._analyze_security(code_file)
        elif analysis_type == AnalysisType.DUPLICATION:
            return await self._analyze_duplication(code_file)
        elif analysis_type == AnalysisType.DEPENDENCIES:
            return await self._analyze_dependencies(code_file)
        else:
            raise ValueError(f"不支持的分析类型: {analysis_type}")
    
    # 子类需要实现的抽象方法
    async def _analyze_syntax(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError
    
    async def _analyze_style(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError
    
    async def _analyze_complexity(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError
    
    async def _analyze_security(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError
    
    async def _analyze_duplication(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError
    
    async def _analyze_dependencies(self, code_file: CodeFile) -> AnalysisResult:
        raise NotImplementedError

class PythonAnalyzer(BaseAnalyzer):
    """Python代码分析器"""
    
    def __init__(self):
        super().__init__(Language.PYTHON)
    
    async def _analyze_syntax(self, code_file: CodeFile) -> AnalysisResult:
        """Python语法检查"""
        issues = []
        
        try:
            # 使用Python的ast模块进行语法检查
            import ast
            ast.parse(code_file.content)
            
        except SyntaxError as e:
            issues.append({
                'type': 'syntax_error',
                'line': e.lineno,
                'column': e.offset,
                'message': str(e),
                'severity': 'error'
            })
        
        return AnalysisResult(
            file_path=code_file.path,
            analysis_type=AnalysisType.SYNTAX,
            language=self.language,
            issues=issues,
            metrics={'syntax_errors': len(issues)}
        )
    
    async def _analyze_style(self, code_file: CodeFile) -> AnalysisResult:
        """Python代码风格检查"""
        issues = []
        
        # 模拟使用flake8进行风格检查
        lines = code_file.content.split('\n')
        
        for i, line in enumerate(lines, 1):
            # 检查行长度
            if len(line) > 88:  # PEP 8推荐88字符
                issues.append({
                    'type': 'line_too_long',
                    'line': i,
                    'message': f'Line too long ({len(line)} > 88 characters)',
                    'severity': 'warning'
                })
            
            # 检查尾随空格
            if line.endswith(' ') or line.endswith('\t'):
                issues.append({
                    'type': 'trailing_whitespace',
                    'line': i,
                    'message': 'Trailing whitespace',
                    'severity': 'info'
                })
            
            # 检查import风格
            if line.strip().startswith('import ') and ', ' in line:
                issues.append({
                    'type': 'multiple_imports',
                    'line': i,
                    'message': 'Multiple imports on one line',
                    'severity': 'warning'
                })
        
        return AnalysisResult(
            file_path=code_file.path,
            analysis_type=AnalysisType.STYLE,
            language=self.language,
            issues=issues,
            metrics={
                'style_issues': len(issues),
                'line_count': len(lines),
                'avg_line_length': sum(len(line) for line in lines) / len(lines) if lines else 0
            }
        )
    
    async def _analyze_complexity(self, code_file: CodeFile) -> AnalysisResult:
        """Python复杂度分析"""
        try:
            import ast
            tree = ast.parse(code_file.content)
            
            complexity_visitor = ComplexityVisitor()
            complexity_visitor.visit(tree)
            
            issues = []
            for func_name, complexity in complexity_visitor.complexities.items():
                if complexity > 10:  # 复杂度阈值
                    issues.append({
                        'type': 'high_complexity',
                        'function': func_name,
                        'complexity': complexity,
                        'message': f'Function {func_name} has high complexity: {complexity}',
                        'severity': 'warning' if complexity <= 15 else 'error'
                    })
            
            return AnalysisResult(
                file_path=code_file.path,
                analysis_type=AnalysisType.COMPLEXITY,
                language=self.language,
                issues=issues,
                metrics={
                    'avg_complexity': sum(complexity_visitor.complexities.values()) / len(complexity_visitor.complexities) if complexity_visitor.complexities else 0,
                    'max_complexity': max(complexity_visitor.complexities.values()) if complexity_visitor.complexities else 0,
                    'function_count': len(complexity_visitor.complexities)
                }
            )
            
        except Exception as e:
            return AnalysisResult(
                file_path=code_file.path,
                analysis_type=AnalysisType.COMPLEXITY,
                language=self.language,
                success=False,
                error_message=str(e)
            )
    
    async def _analyze_security(self, code_file: CodeFile) -> AnalysisResult:
        """Python安全分析"""
        issues = []
        content = code_file.content
        
        # 检查常见安全问题
        security_patterns = [
            (r'eval\s*\(', 'Use of eval() is dangerous'),
            (r'exec\s*\(', 'Use of exec() is dangerous'),
            (r'subprocess\.call\s*\(.*shell\s*=\s*True', 'Shell injection vulnerability'),
            (r'pickle\.loads?\s*\(', 'Pickle deserialization vulnerability'),
            (r'yaml\.load\s*\((?!.*Loader)', 'Unsafe YAML loading'),
        ]
        
        import re
        lines = content.split('\n')
        
        for pattern, message in security_patterns:
            for i, line in enumerate(lines, 1):
                if re.search(pattern, line):
                    issues.append({
                        'type': 'security_issue',
                        'line': i,
                        'message': message,
                        'severity': 'error',
                        'code_snippet': line.strip()
                    })
        
        return AnalysisResult(
            file_path=code_file.path,
            analysis_type=AnalysisType.SECURITY,
            language=self.language,
            issues=issues,
            metrics={'security_issues': len(issues)}
        )
    
    async def _analyze_duplication(self, code_file: CodeFile) -> AnalysisResult:
        """重复代码检测"""
        # 简化的重复代码检测
        lines = code_file.content.split('\n')
        line_hashes = {}
        duplicates = []
        
        for i, line in enumerate(lines, 1):
            stripped_line = line.strip()
            if len(stripped_line) > 10:  # 忽略太短的行
                line_hash = hashlib.md5(stripped_line.encode()).hexdigest()
                
                if line_hash in line_hashes:
                    duplicates.append({
                        'type': 'duplicate_line',
                        'line': i,
                        'duplicate_of': line_hashes[line_hash],
                        'message': f'Duplicate line found (original at line {line_hashes[line_hash]})',
                        'severity': 'info'
                    })
                else:
                    line_hashes[line_hash] = i
        
        return AnalysisResult(
            file_path=code_file.path,
            analysis_type=AnalysisType.DUPLICATION,
            language=self.language,
            issues=duplicates,
            metrics={
                'duplicate_lines': len(duplicates),
                'duplication_ratio': len(duplicates) / len(lines) if lines else 0
            }
        )
    
    async def _analyze_dependencies(self, code_file: CodeFile) -> AnalysisResult:
        """依赖分析"""
        import ast
        import re
        
        try:
            tree = ast.parse(code_file.content)
            
            imports = []
            for node in ast.walk(tree):
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        imports.append(alias.name)
                elif isinstance(node, ast.ImportFrom):
                    if node.module:
                        imports.append(node.module)
            
            # 检查是否有未使用的导入
            content = code_file.content
            unused_imports = []
            
            for imp in imports:
                # 简单检查:如果导入的模块名在代码中没有出现,可能未使用
                if imp not in content.replace(f'import {imp}', '').replace(f'from {imp}', ''):
                    unused_imports.append({
                        'type': 'unused_import',
                        'import': imp,
                        'message': f'Import "{imp}" appears to be unused',
                        'severity': 'info'
                    })
            
            return AnalysisResult(
                file_path=code_file.path,
                analysis_type=AnalysisType.DEPENDENCIES,
                language=self.language,
                issues=unused_imports,
                metrics={
                    'total_imports': len(imports),
                    'unused_imports': len(unused_imports),
                    'unique_imports': len(set(imports))
                }
            )
            
        except Exception as e:
            return AnalysisResult(
                file_path=code_file.path,
                analysis_type=AnalysisType.DEPENDENCIES,
                language=self.language,
                success=False,
                error_message=str(e)
            )

class ComplexityVisitor(ast.NodeVisitor):
    """复杂度计算访问器"""
    
    def __init__(self):
        self.complexities = {}
        self.current_function = None
        self.current_complexity = 0
    
    def visit_FunctionDef(self, node):
        old_function = self.current_function
        old_complexity = self.current_complexity
        
        self.current_function = node.name
        self.current_complexity = 1  # 基础复杂度
        
        self.generic_visit(node)
        
        self.complexities[node.name] = self.current_complexity
        
        self.current_function = old_function
        self.current_complexity = old_complexity
    
    def visit_If(self, node):
        if self.current_function:
            self.current_complexity += 1
        self.generic_visit(node)
    
    def visit_While(self, node):
        if self.current_function:
            self.current_complexity += 1
        self.generic_visit(node)
    
    def visit_For(self, node):
        if self.current_function:
            self.current_complexity += 1
        self.generic_visit(node)

class AnalysisEngine:
    """分析引擎"""
    
    def __init__(self, max_workers: int = 4):
        self.analyzers = {
            Language.PYTHON: PythonAnalyzer(),
            # 其他语言分析器可以在这里添加
        }
        self.max_workers = max_workers
        self.thread_executor = ThreadPoolExecutor(max_workers=max_workers)
        self.process_executor = ProcessPoolExecutor(max_workers=max_workers)
    
    async def analyze_files(
        self,
        files: List[CodeFile],
        analysis_types: List[AnalysisType]
    ) -> Dict[str, List[AnalysisResult]]:
        """分析多个文件"""
        
        print(f"🔍 开始分析 {len(files)} 个文件...")
        
        # 按语言分组
        files_by_language = {}
        for file in files:
            if file.language not in files_by_language:
                files_by_language[file.language] = []
            files_by_language[file.language].append(file)
        
        all_results = {}
        
        # 并行处理不同语言的文件
        tasks = []
        for language, language_files in files_by_language.items():
            if language in self.analyzers:
                task = self._analyze_language_files(language, language_files, analysis_types)
                tasks.append(task)
            else:
                print(f"⚠️ 不支持的语言: {language.value}")
        
        # 等待所有任务完成
        results_list = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 合并结果
        for results in results_list:
            if isinstance(results, dict):
                all_results.update(results)
            else:
                print(f"❌ 分析任务异常: {results}")
        
        return all_results
    
    async def _analyze_language_files(
        self,
        language: Language,
        files: List[CodeFile],
        analysis_types: List[AnalysisType]
    ) -> Dict[str, List[AnalysisResult]]:
        """分析特定语言的文件"""
        
        analyzer = self.analyzers[language]
        results = {}
        
        # 并发分析文件
        semaphore = asyncio.Semaphore(self.max_workers)
        
        async def analyze_single_file(file: CodeFile) -> tuple[str, List[AnalysisResult]]:
            async with semaphore:
                print(f"  📄 分析文件: {file.path} ({language.value})")
                result = await analyzer.analyze(file, analysis_types)
                return file.path, result
        
        tasks = [analyze_single_file(file) for file in files]
        file_results = await asyncio.gather(*tasks)
        
        for file_path, file_result in file_results:
            results[file_path] = file_result
        
        return results
    
    def generate_report(self, results: Dict[str, List[AnalysisResult]]) -> Dict[str, Any]:
        """生成分析报告"""
        
        report = {
            'summary': {
                'total_files': len(results),
                'total_issues': 0,
                'issues_by_severity': {'error': 0, 'warning': 0, 'info': 0},
                'issues_by_type': {},
                'languages': {},
                'analysis_types': {}
            },
            'files': {},
            'generated_at': time.time()
        }
        
        for file_path, file_results in results.items():
            file_report = {
                'path': file_path,
                'language': file_results[0].language.value if file_results else 'unknown',
                'analyses': {}
            }
            
            for result in file_results:
                # 更新总体统计
                report['summary']['total_issues'] += len(result.issues)
                
                # 统计语言
                lang = result.language.value
                if lang not in report['summary']['languages']:
                    report['summary']['languages'][lang] = 0
                report['summary']['languages'][lang] += 1
                
                # 统计分析类型
                analysis_type = result.analysis_type.value
                if analysis_type not in report['summary']['analysis_types']:
                    report['summary']['analysis_types'][analysis_type] = {'files': 0, 'issues': 0}
                report['summary']['analysis_types'][analysis_type]['files'] += 1
                report['summary']['analysis_types'][analysis_type]['issues'] += len(result.issues)
                
                # 统计问题严重程度和类型
                for issue in result.issues:
                    severity = issue.get('severity', 'info')
                    if severity in report['summary']['issues_by_severity']:
                        report['summary']['issues_by_severity'][severity] += 1
                    
                    issue_type = issue.get('type', 'unknown')
                    if issue_type not in report['summary']['issues_by_type']:
                        report['summary']['issues_by_type'][issue_type] = 0
                    report['summary']['issues_by_type'][issue_type] += 1
                
                # 文件分析结果
                file_report['analyses'][analysis_type] = {
                    'success': result.success,
                    'execution_time': result.execution_time,
                    'issues_count': len(result.issues),
                    'issues': result.issues,
                    'metrics': result.metrics,
                    'error_message': result.error_message
                }
            
            report['files'][file_path] = file_report
        
        return report

# MCP工具接口
class CodeAnalysisTools:
    """代码分析MCP工具集"""
    
    def __init__(self):
        self.engine = AnalysisEngine()
        self.detector = LanguageDetector()
    
    async def analyze_code(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """分析代码工具"""
        try:
            # 参数验证
            if 'files' not in params:
                raise ValueError("Missing required parameter: files")
            
            file_specs = params['files']
            analysis_types = [AnalysisType(t) for t in params.get('analysis_types', ['syntax', 'style'])]
            
            # 准备代码文件
            code_files = []
            for file_spec in file_specs:
                if 'path' not in file_spec or 'content' not in file_spec:
                    continue
                
                language = None
                if 'language' in file_spec:
                    language = Language(file_spec['language'])
                else:
                    language = self.detector.detect_language(file_spec['path'], file_spec['content'])
                
                if language:
                    code_file = CodeFile(
                        path=file_spec['path'],
                        content=file_spec['content'],
                        language=language,
                        size=len(file_spec['content'])
                    )
                    code_files.append(code_file)
            
            if not code_files:
                return {'error': '没有找到可分析的代码文件'}
            
            # 执行分析
            results = await self.engine.analyze_files(code_files, analysis_types)
            
            # 生成报告
            report = self.engine.generate_report(results)
            
            return {
                'success': True,
                'report': report,
                'analyzed_files': len(code_files),
                'analysis_types': [t.value for t in analysis_types]
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    async def detect_language(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """语言检测工具"""
        try:
            if 'file_path' not in params or 'content' not in params:
                raise ValueError("Missing required parameters: file_path, content")
            
            language = self.detector.detect_language(params['file_path'], params['content'])
            
            return {
                'success': True,
                'language': language.value if language else None,
                'confidence': 0.9 if language else 0.0  # 简化的置信度
            }
            
        except Exception as e:
            return {
                'success': False,
                'error': str(e)
            }
    
    async def get_supported_languages(self, params: Dict[str, Any] = None) -> Dict[str, Any]:
        """获取支持的语言列表"""
        return {
            'success': True,
            'supported_languages': [lang.value for lang in Language],
            'available_analyzers': [lang.value for lang in self.engine.analyzers.keys()],
            'analysis_types': [at.value for at in AnalysisType]
        }

# 演示运行
async def run_code_analysis_demo():
    """运行代码分析演示"""
    print("🚀 启动代码分析平台演示...\n")
    
    tools = CodeAnalysisTools()
    
    # 示例代码文件
    sample_files = [
        {
            'path': 'example.py',
            'content': '''
import os, sys
import requests

def calculate_complexity(x, y):
    if x > 0:
        if y > 0:
            if x > y:
                if x > 10:
                    return x * y
                else:
                    return x + y
            else:
                return y - x
        else:
            return x
    else:
        return 0

def unused_function():
    pass

eval("print('dangerous')")
            '''.strip()
        },
        {
            'path': 'example.js',
            'content': '''
function longFunction() {
    var a = 1;
    var b = 2;    
    if (a > 0) {
        if (b > 0) {
            return a + b;
        }
    }
    return 0;
}
            '''.strip()
        }
    ]
    
    # 执行分析
    result = await tools.analyze_code({
        'files': sample_files,
        'analysis_types': ['syntax', 'style', 'complexity', 'security', 'dependencies']
    })
    
    if result['success']:
        report = result['report']
        
        print("📊 分析报告摘要:")
        print(f"  总文件数: {report['summary']['total_files']}")
        print(f"  总问题数: {report['summary']['total_issues']}")
        print(f"  错误: {report['summary']['issues_by_severity']['error']}")
        print(f"  警告: {report['summary']['issues_by_severity']['warning']}")
        print(f"  信息: {report['summary']['issues_by_severity']['info']}")
        print()
        
        print("📈 各语言统计:")
        for lang, count in report['summary']['languages'].items():
            print(f"  {lang}: {count} 个文件")
        print()
        
        print("🔍 分析类型统计:")
        for analysis_type, stats in report['summary']['analysis_types'].items():
            print(f"  {analysis_type}: {stats['files']} 个文件, {stats['issues']} 个问题")
        print()
        
        # 显示详细问题
        for file_path, file_report in report['files'].items():
            print(f"📄 文件: {file_path} ({file_report['language']})")
            
            for analysis_type, analysis_result in file_report['analyses'].items():
                if analysis_result['issues']:
                    print(f"  {analysis_type}:")
                    for issue in analysis_result['issues'][:3]:  # 只显示前3个问题
                        print(f"    - {issue.get('message', 'Unknown issue')} (行 {issue.get('line', '?')})")
                    
                    if len(analysis_result['issues']) > 3:
                        print(f"    ... 还有 {len(analysis_result['issues']) - 3} 个问题")
            print()
    else:
        print(f"❌ 分析失败: {result['error']}")

# 运行演示
if __name__ == "__main__":
    asyncio.run(run_code_analysis_demo())

最佳实践总结

1. 模块化设计:

  • 语言检测器:智能识别编程语言类型
  • 分析器插件化:每种语言独立的分析器实现
  • 分析类型解耦:语法、风格、复杂度等分析独立进行
  • 结果统一格式:标准化的分析结果数据结构

2. 性能优化:

  • 并发处理:使用asyncio和线程池并行分析
  • 内存管理:大文件分块处理,避免内存溢出
  • 缓存机制:文件哈希缓存,避免重复分析
  • 资源限制:信号量控制并发数量

3. 错误处理:

  • 优雅降级:单个文件分析失败不影响整体
  • 详细错误信息:提供具体的错误上下文
  • 超时控制:避免长时间运行的分析任务
  • 资源清理:确保临时文件和进程资源释放

小结

通过这两个实战案例,我们可以看到MCP最佳实践在实际项目中的应用:

🎯 核心要点

  1. 架构设计 - 模块化、可扩展的系统架构
  2. 工具封装 - 将业务逻辑封装为独立的MCP工具
  3. 异步处理 - 合理使用异步编程提高性能
  4. 错误处理 - 完善的异常处理和降级策略
  5. 监控日志 - 全面的运行状态监控和日志记录

💡 实践经验

  • 从业务需求出发设计工具粒度
  • 重视用户体验和交互设计
  • 建立完善的测试和部署流程
  • 持续监控和优化系统性能
  • 保持代码的可读性和可维护性

🏆 实战哲学:最佳实践不是教条,而是经验的结晶。只有在实际项目中不断实践和改进,才能真正掌握MCP开发的精髓。


恭喜! 🎉 您已经完成了第8章"MCP开发最佳实践"的学习。这些实战案例和最佳实践将帮助您在实际项目中构建高质量的MCP应用。