Skip to content

8.10 工作流设计模式 🔄

"好的架构模式就像城市规划,看似无形,却让一切井然有序。"

想象工作流设计模式就像是建筑师的设计图纸。每种模式都解决特定的问题,就像住宅、商场、办公楼有不同的设计原则一样。掌握这些模式,就能构建出既美观又实用的MCP应用。

请求-响应模式:经典的同步交互 📞

基础请求-响应实现

typescript
// 🌟 请求-响应模式实现
import { EventEmitter } from 'events';
import { v4 as uuidv4 } from 'uuid';

interface Request {
  id: string;
  method: string;
  params?: any;
  timestamp: number;
}

interface Response {
  id: string;
  result?: any;
  error?: {
    code: number;
    message: string;
    data?: any;
  };
  timestamp: number;
}

interface RequestContext {
  request: Request;
  startTime: number;
  timeoutId: NodeJS.Timeout;
  resolve: (response: Response) => void;
  reject: (error: Error) => void;
}

class RequestResponseHandler extends EventEmitter {
  private pendingRequests = new Map<string, RequestContext>();
  private defaultTimeout: number;
  private maxConcurrentRequests: number;
  private requestStats = {
    total: 0,
    successful: 0,
    failed: 0,
    timeouts: 0,
    avgResponseTime: 0
  };
  
  constructor(options: {
    defaultTimeout?: number;
    maxConcurrentRequests?: number;
  } = {}) {
    super();
    this.defaultTimeout = options.defaultTimeout || 30000;
    this.maxConcurrentRequests = options.maxConcurrentRequests || 100;
  }
  
  /**
   * 发送请求
   */
  async sendRequest(
    method: string, 
    params?: any, 
    timeout?: number
  ): Promise<any> {
    // 检查并发限制
    if (this.pendingRequests.size >= this.maxConcurrentRequests) {
      throw new Error(`并发请求数超过限制:${this.maxConcurrentRequests}`);
    }
    
    const requestId = uuidv4();
    const request: Request = {
      id: requestId,
      method,
      params,
      timestamp: Date.now()
    };
    
    console.log(`📤 发送请求:${method} (${requestId.substring(0, 8)})`);
    
    return new Promise<any>((resolve, reject) => {
      const requestTimeout = timeout || this.defaultTimeout;
      
      // 设置超时处理
      const timeoutId = setTimeout(() => {
        this.handleTimeout(requestId);
      }, requestTimeout);
      
      // 存储请求上下文
      const context: RequestContext = {
        request,
        startTime: Date.now(),
        timeoutId,
        resolve: (response: Response) => {
          clearTimeout(timeoutId);
          this.pendingRequests.delete(requestId);
          
          // 更新统计
          this.updateStats(context, response);
          
          if (response.error) {
            const error = new Error(response.error.message);
            (error as any).code = response.error.code;
            (error as any).data = response.error.data;
            reject(error);
          } else {
            resolve(response.result);
          }
        },
        reject: (error: Error) => {
          clearTimeout(timeoutId);
          this.pendingRequests.delete(requestId);
          this.requestStats.failed++;
          reject(error);
        }
      };
      
      this.pendingRequests.set(requestId, context);
      this.requestStats.total++;
      
      // 发送请求事件
      this.emit('request', request);
    });
  }
  
  /**
   * 处理响应
   */
  handleResponse(response: Response): void {
    const context = this.pendingRequests.get(response.id);
    
    if (!context) {
      console.warn(`⚠️ 收到未知请求的响应:${response.id}`);
      return;
    }
    
    console.log(`📥 收到响应:${context.request.method} (${response.id.substring(0, 8)})`);
    context.resolve(response);
  }
  
  /**
   * 处理超时
   */
  private handleTimeout(requestId: string): void {
    const context = this.pendingRequests.get(requestId);
    
    if (!context) {
      return;
    }
    
    console.warn(`⏰ 请求超时:${context.request.method} (${requestId.substring(0, 8)})`);
    
    this.pendingRequests.delete(requestId);
    this.requestStats.timeouts++;
    this.requestStats.failed++;
    
    const timeoutError = new Error(
      `请求超时:${context.request.method} (${this.defaultTimeout}ms)`
    );
    (timeoutError as any).code = -32603; // JSON-RPC Internal error
    
    context.reject(timeoutError);
    
    // 发送超时事件
    this.emit('timeout', context.request);
  }
  
  /**
   * 更新统计信息
   */
  private updateStats(context: RequestContext, response: Response): void {
    const responseTime = Date.now() - context.startTime;
    
    if (response.error) {
      this.requestStats.failed++;
    } else {
      this.requestStats.successful++;
    }
    
    // 更新平均响应时间
    const total = this.requestStats.successful + this.requestStats.failed;
    this.requestStats.avgResponseTime = 
      (this.requestStats.avgResponseTime * (total - 1) + responseTime) / total;
  }
  
  /**
   * 获取统计信息
   */
  getStats() {
    return {
      ...this.requestStats,
      pendingRequests: this.pendingRequests.size,
      successRate: this.requestStats.total > 0 
        ? this.requestStats.successful / this.requestStats.total 
        : 0
    };
  }
  
  /**
   * 取消所有待处理的请求
   */
  cancelAllRequests(reason: string = '系统关闭'): void {
    console.log(`🚫 取消所有待处理请求:${reason}`);
    
    for (const [requestId, context] of this.pendingRequests) {
      clearTimeout(context.timeoutId);
      context.reject(new Error(`请求被取消:${reason}`));
    }
    
    this.pendingRequests.clear();
  }
}

// MCP工具请求-响应包装器
class MCPToolInvoker {
  private requestHandler: RequestResponseHandler;
  private toolRegistry = new Map<string, ToolDefinition>();
  
  constructor() {
    this.requestHandler = new RequestResponseHandler({
      defaultTimeout: 10000,
      maxConcurrentRequests: 50
    });
    
    // 监听请求事件
    this.requestHandler.on('request', this.handleToolRequest.bind(this));
  }
  
  /**
   * 注册工具
   */
  registerTool(name: string, definition: ToolDefinition): void {
    this.toolRegistry.set(name, definition);
    console.log(`🔧 注册工具:${name}`);
  }
  
  /**
   * 调用工具
   */
  async invokeTool(
    toolName: string, 
    parameters: any = {}, 
    timeout?: number
  ): Promise<any> {
    const tool = this.toolRegistry.get(toolName);
    
    if (!tool) {
      throw new Error(`工具不存在:${toolName}`);
    }
    
    // 参数验证
    this.validateParameters(parameters, tool.inputSchema);
    
    console.log(`🎯 调用工具:${toolName},参数:`, parameters);
    
    try {
      const result = await this.requestHandler.sendRequest(
        'tools/call',
        { name: toolName, arguments: parameters },
        timeout
      );
      
      console.log(`✅ 工具调用成功:${toolName}`);
      return result;
      
    } catch (error) {
      console.error(`❌ 工具调用失败:${toolName}`, error);
      throw error;
    }
  }
  
  /**
   * 处理工具请求
   */
  private async handleToolRequest(request: Request): Promise<void> {
    if (request.method !== 'tools/call') {
      return;
    }
    
    const { name: toolName, arguments: parameters } = request.params;
    const tool = this.toolRegistry.get(toolName);
    
    if (!tool) {
      this.requestHandler.handleResponse({
        id: request.id,
        error: {
          code: -32601,
          message: `工具不存在:${toolName}`
        },
        timestamp: Date.now()
      });
      return;
    }
    
    try {
      // 执行工具逻辑
      const result = await tool.handler(parameters);
      
      this.requestHandler.handleResponse({
        id: request.id,
        result,
        timestamp: Date.now()
      });
      
    } catch (error) {
      this.requestHandler.handleResponse({
        id: request.id,
        error: {
          code: -32603,
          message: error instanceof Error ? error.message : '工具执行失败',
          data: error instanceof Error ? error.stack : undefined
        },
        timestamp: Date.now()
      });
    }
  }
  
  /**
   * 参数验证
   */
  private validateParameters(parameters: any, schema: any): void {
    // 简单的JSON Schema验证实现
    if (schema.type === 'object' && schema.properties) {
      for (const [key, propSchema] of Object.entries(schema.properties as any)) {
        if (schema.required?.includes(key) && parameters[key] === undefined) {
          throw new Error(`缺少必需参数:${key}`);
        }
      }
    }
  }
  
  /**
   * 获取工具列表
   */
  getTools(): ToolDefinition[] {
    return Array.from(this.toolRegistry.values());
  }
  
  /**
   * 获取调用统计
   */
  getInvocationStats() {
    return this.requestHandler.getStats();
  }
}

interface ToolDefinition {
  name: string;
  description: string;
  inputSchema: any;
  handler: (parameters: any) => Promise<any>;
}

// 使用示例
const toolInvoker = new MCPToolInvoker();

// 注册天气工具
toolInvoker.registerTool('get_weather', {
  name: 'get_weather',
  description: '获取指定地点的天气信息',
  inputSchema: {
    type: 'object',
    properties: {
      location: {
        type: 'string',
        description: '地点名称'
      }
    },
    required: ['location']
  },
  handler: async (params) => {
    // 模拟API调用
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    return {
      location: params.location,
      temperature: Math.floor(Math.random() * 40),
      description: ['晴天', '多云', '雨天'][Math.floor(Math.random() * 3)],
      humidity: Math.floor(Math.random() * 100),
      timestamp: new Date().toISOString()
    };
  }
});

事件驱动模式:异步消息处理 📡

事件驱动架构实现

python
# 🌟 事件驱动模式实现
import asyncio
import json
import time
from typing import Any, Dict, List, Optional, Callable, Union
from dataclasses import dataclass, field
from enum import Enum
import weakref
from collections import defaultdict
import logging

class EventPriority(Enum):
    """事件优先级"""
    LOW = 1
    NORMAL = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Event:
    """事件对象"""
    type: str
    data: Dict[str, Any]
    priority: EventPriority = EventPriority.NORMAL
    timestamp: float = field(default_factory=time.time)
    source: Optional[str] = None
    correlation_id: Optional[str] = None
    retry_count: int = 0
    max_retries: int = 3
    
    def to_dict(self) -> Dict[str, Any]:
        return {
            'type': self.type,
            'data': self.data,
            'priority': self.priority.value,
            'timestamp': self.timestamp,
            'source': self.source,
            'correlation_id': self.correlation_id
        }

@dataclass
class EventHandler:
    """事件处理器"""
    callback: Callable[[Event], Any]
    priority: int = 0
    once: bool = False
    condition: Optional[Callable[[Event], bool]] = None
    
    def can_handle(self, event: Event) -> bool:
        """检查是否可以处理该事件"""
        if self.condition:
            return self.condition(event)
        return True

class EventBus:
    """事件总线"""
    
    def __init__(self):
        self._handlers: Dict[str, List[EventHandler]] = defaultdict(list)
        self._wildcard_handlers: List[EventHandler] = []
        self._event_queue = asyncio.PriorityQueue()
        self._running = False
        self._worker_tasks: List[asyncio.Task] = []
        self._stats = {
            'events_published': 0,
            'events_processed': 0,
            'events_failed': 0,
            'handlers_count': 0
        }
        self._middleware: List[Callable] = []
        self.logger = logging.getLogger(__name__)
    
    def subscribe(
        self,
        event_type: str,
        handler: Callable[[Event], Any],
        priority: int = 0,
        once: bool = False,
        condition: Optional[Callable[[Event], bool]] = None
    ) -> Callable[[], None]:
        """订阅事件"""
        event_handler = EventHandler(
            callback=handler,
            priority=priority,
            once=once,
            condition=condition
        )
        
        if event_type == '*':
            self._wildcard_handlers.append(event_handler)
        else:
            self._handlers[event_type].append(event_handler)
            # 按优先级排序
            self._handlers[event_type].sort(key=lambda h: h.priority, reverse=True)
        
        self._stats['handlers_count'] += 1
        self.logger.debug(f"订阅事件: {event_type}, 处理器数量: {len(self._handlers[event_type])}")
        
        # 返回取消订阅函数
        def unsubscribe():
            if event_type == '*':
                if event_handler in self._wildcard_handlers:
                    self._wildcard_handlers.remove(event_handler)
            else:
                if event_handler in self._handlers[event_type]:
                    self._handlers[event_type].remove(event_handler)
            self._stats['handlers_count'] -= 1
        
        return unsubscribe
    
    def use_middleware(self, middleware: Callable[[Event, Callable], Any]):
        """添加中间件"""
        self._middleware.append(middleware)
    
    async def publish(self, event: Union[Event, str], data: Optional[Dict] = None) -> None:
        """发布事件"""
        if isinstance(event, str):
            event = Event(type=event, data=data or {})
        
        self._stats['events_published'] += 1
        
        # 应用中间件
        for middleware in self._middleware:
            try:
                event = await self._apply_middleware(middleware, event)
                if event is None:
                    return  # 中间件阻止了事件
            except Exception as e:
                self.logger.error(f"中间件处理失败: {e}")
                continue
        
        # 将事件加入队列(优先级队列)
        priority_value = -event.priority.value  # 负数用于优先级队列(数值越小优先级越高)
        await self._event_queue.put((priority_value, time.time(), event))
        
        self.logger.debug(f"发布事件: {event.type}, 优先级: {event.priority.name}")
    
    async def _apply_middleware(self, middleware: Callable, event: Event) -> Optional[Event]:
        """应用中间件"""
        if asyncio.iscoroutinefunction(middleware):
            return await middleware(event, lambda e: e)
        else:
            return middleware(event, lambda e: e)
    
    async def start_processing(self, num_workers: int = 3) -> None:
        """启动事件处理"""
        if self._running:
            return
        
        self._running = True
        self.logger.info(f"启动事件处理,工作线程数: {num_workers}")
        
        # 创建工作线程
        for i in range(num_workers):
            task = asyncio.create_task(self._worker(f"worker-{i}"))
            self._worker_tasks.append(task)
    
    async def stop_processing(self) -> None:
        """停止事件处理"""
        if not self._running:
            return
        
        self._running = False
        self.logger.info("停止事件处理...")
        
        # 取消所有工作任务
        for task in self._worker_tasks:
            task.cancel()
        
        # 等待任务完成
        await asyncio.gather(*self._worker_tasks, return_exceptions=True)
        self._worker_tasks.clear()
        
        self.logger.info("事件处理已停止")
    
    async def _worker(self, worker_name: str) -> None:
        """事件处理工作线程"""
        self.logger.debug(f"工作线程启动: {worker_name}")
        
        while self._running:
            try:
                # 从队列获取事件(带超时)
                priority, timestamp, event = await asyncio.wait_for(
                    self._event_queue.get(),
                    timeout=1.0
                )
                
                await self._process_event(event, worker_name)
                
            except asyncio.TimeoutError:
                continue
            except asyncio.CancelledError:
                break
            except Exception as e:
                self.logger.error(f"工作线程 {worker_name} 异常: {e}")
        
        self.logger.debug(f"工作线程停止: {worker_name}")
    
    async def _process_event(self, event: Event, worker_name: str) -> None:
        """处理单个事件"""
        self.logger.debug(f"处理事件: {event.type} (工作线程: {worker_name})")
        
        try:
            # 获取事件处理器
            handlers = self._get_handlers(event)
            
            if not handlers:
                self.logger.warning(f"没有找到事件处理器: {event.type}")
                return
            
            # 执行处理器
            results = []
            for handler in handlers:
                try:
                    if handler.can_handle(event):
                        if asyncio.iscoroutinefunction(handler.callback):
                            result = await handler.callback(event)
                        else:
                            result = handler.callback(event)
                        results.append(result)
                        
                        # 如果是一次性处理器,移除它
                        if handler.once:
                            self._remove_handler(event.type, handler)
                
                except Exception as e:
                    self.logger.error(f"事件处理器执行失败: {e}")
                    self._stats['events_failed'] += 1
                    
                    # 重试逻辑
                    if event.retry_count < event.max_retries:
                        event.retry_count += 1
                        self.logger.info(f"重试事件: {event.type} ({event.retry_count}/{event.max_retries})")
                        await self.publish(event)
                    else:
                        self.logger.error(f"事件处理失败,已达最大重试次数: {event.type}")
                        await self.publish(Event(
                            type='event.failed',
                            data={
                                'original_event': event.to_dict(),
                                'error': str(e),
                                'worker': worker_name
                            }
                        ))
            
            self._stats['events_processed'] += 1
            
            # 发布处理完成事件
            await self.publish(Event(
                type='event.processed',
                data={
                    'original_event': event.to_dict(),
                    'results': results,
                    'worker': worker_name
                }
            ))
            
        except Exception as e:
            self.logger.error(f"处理事件时发生异常: {e}")
            self._stats['events_failed'] += 1
    
    def _get_handlers(self, event: Event) -> List[EventHandler]:
        """获取事件处理器"""
        handlers = []
        
        # 具体事件类型的处理器
        if event.type in self._handlers:
            handlers.extend(self._handlers[event.type])
        
        # 通配符处理器
        handlers.extend(self._wildcard_handlers)
        
        return handlers
    
    def _remove_handler(self, event_type: str, handler: EventHandler) -> None:
        """移除处理器"""
        if event_type == '*':
            if handler in self._wildcard_handlers:
                self._wildcard_handlers.remove(handler)
        else:
            if handler in self._handlers[event_type]:
                self._handlers[event_type].remove(handler)
        
        self._stats['handlers_count'] -= 1
    
    def get_stats(self) -> Dict[str, Any]:
        """获取统计信息"""
        return {
            **self._stats,
            'queue_size': self._event_queue.qsize(),
            'active_workers': len(self._worker_tasks),
            'running': self._running
        }

# MCP事件驱动工具系统
class MCPEventDrivenSystem:
    """MCP事件驱动系统"""
    
    def __init__(self):
        self.event_bus = EventBus()
        self.tools = {}
        self.setup_system_events()
    
    def setup_system_events(self):
        """设置系统事件"""
        # 工具注册事件
        self.event_bus.subscribe('tool.registered', self._on_tool_registered)
        
        # 工具调用事件
        self.event_bus.subscribe('tool.called', self._on_tool_called)
        self.event_bus.subscribe('tool.completed', self._on_tool_completed)
        self.event_bus.subscribe('tool.failed', self._on_tool_failed)
        
        # 系统监控事件
        self.event_bus.subscribe('system.health_check', self._on_health_check)
        
        # 添加日志中间件
        self.event_bus.use_middleware(self._logging_middleware)
    
    async def _logging_middleware(self, event: Event, next_fn: Callable) -> Event:
        """日志中间件"""
        print(f"🎭 事件中间件: {event.type} at {time.strftime('%H:%M:%S')}")
        return await next_fn(event)
    
    async def _on_tool_registered(self, event: Event):
        """工具注册事件处理"""
        tool_name = event.data.get('name')
        print(f"🔧 工具已注册: {tool_name}")
        
        # 发布工具可用事件
        await self.event_bus.publish(Event(
            type='tool.available',
            data={'name': tool_name, 'timestamp': time.time()}
        ))
    
    async def _on_tool_called(self, event: Event):
        """工具调用事件处理"""
        tool_name = event.data.get('name')
        parameters = event.data.get('parameters', {})
        
        print(f"🎯 工具调用开始: {tool_name}")
        
        try:
            # 执行工具逻辑
            tool_func = self.tools.get(tool_name)
            if not tool_func:
                raise ValueError(f"工具不存在: {tool_name}")
            
            result = await tool_func(parameters)
            
            # 发布完成事件
            await self.event_bus.publish(Event(
                type='tool.completed',
                data={
                    'name': tool_name,
                    'parameters': parameters,
                    'result': result,
                    'correlation_id': event.correlation_id
                }
            ))
            
        except Exception as e:
            # 发布失败事件
            await self.event_bus.publish(Event(
                type='tool.failed',
                data={
                    'name': tool_name,
                    'parameters': parameters,
                    'error': str(e),
                    'correlation_id': event.correlation_id
                }
            ))
    
    async def _on_tool_completed(self, event: Event):
        """工具完成事件处理"""
        tool_name = event.data.get('name')
        result = event.data.get('result')
        
        print(f"✅ 工具执行完成: {tool_name}")
        
        # 可以在这里添加结果缓存、指标收集等逻辑
    
    async def _on_tool_failed(self, event: Event):
        """工具失败事件处理"""
        tool_name = event.data.get('name')
        error = event.data.get('error')
        
        print(f"❌ 工具执行失败: {tool_name} - {error}")
        
        # 可以在这里添加错误日志、告警等逻辑
    
    async def _on_health_check(self, event: Event):
        """健康检查事件处理"""
        stats = self.event_bus.get_stats()
        
        health_status = {
            'status': 'healthy' if stats['running'] else 'unhealthy',
            'timestamp': time.time(),
            'stats': stats,
            'tools_count': len(self.tools)
        }
        
        # 发布健康状态事件
        await self.event_bus.publish(Event(
            type='system.health_status',
            data=health_status
        ))
    
    def register_tool(self, name: str, func: Callable):
        """注册工具"""
        self.tools[name] = func
        
        # 发布工具注册事件
        asyncio.create_task(self.event_bus.publish(Event(
            type='tool.registered',
            data={'name': name, 'timestamp': time.time()}
        )))
    
    async def call_tool(self, name: str, parameters: Dict[str, Any] = None) -> Any:
        """调用工具"""
        correlation_id = f"call_{int(time.time() * 1000)}"
        
        # 发布工具调用事件
        await self.event_bus.publish(Event(
            type='tool.called',
            data={
                'name': name,
                'parameters': parameters or {},
                'correlation_id': correlation_id
            }
        ))
        
        # 在实际应用中,这里需要等待结果或使用回调机制
        return f"工具 {name} 调用已发起"
    
    async def start(self):
        """启动系统"""
        await self.event_bus.start_processing()
        print("🚀 MCP事件驱动系统已启动")
    
    async def stop(self):
        """停止系统"""
        await self.event_bus.stop_processing()
        print("🛑 MCP事件驱动系统已停止")

# 使用示例
async def main():
    system = MCPEventDrivenSystem()
    
    # 注册示例工具
    async def weather_tool(params):
        location = params.get('location', '未知')
        await asyncio.sleep(1)  # 模拟API调用
        return {
            'location': location,
            'temperature': 25,
            'description': '晴天'
        }
    
    system.register_tool('get_weather', weather_tool)
    
    # 启动系统
    await system.start()
    
    # 调用工具
    await system.call_tool('get_weather', {'location': '北京'})
    
    # 触发健康检查
    await system.event_bus.publish(Event(type='system.health_check', data={}))
    
    # 等待处理完成
    await asyncio.sleep(3)
    
    # 显示统计信息
    stats = system.event_bus.get_stats()
    print(f"\n📊 系统统计:")
    for key, value in stats.items():
        print(f"  {key}: {value}")
    
    # 停止系统
    await system.stop()

if __name__ == "__main__":
    asyncio.run(main())

发布-订阅模式:解耦的消息传递 📢

发布-订阅实现

csharp
// 🌟 发布-订阅模式实现
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

public interface IMessage
{
    string Type { get; }
    DateTime Timestamp { get; }
    string CorrelationId { get; }
    object Data { get; }
}

public class MCPMessage : IMessage
{
    public string Type { get; set; }
    public DateTime Timestamp { get; set; } = DateTime.UtcNow;
    public string CorrelationId { get; set; } = Guid.NewGuid().ToString();
    public object Data { get; set; }
    public Dictionary<string, object> Headers { get; set; } = new();
    public int Priority { get; set; } = 0;
    public TimeSpan? TimeToLive { get; set; }
}

public interface IMessageHandler<in T> where T : IMessage
{
    Task HandleAsync(T message, CancellationToken cancellationToken = default);
}

public interface IMessagePublisher
{
    Task PublishAsync<T>(T message, CancellationToken cancellationToken = default) where T : IMessage;
    Task PublishAsync(string topic, object data, CancellationToken cancellationToken = default);
}

public interface IMessageSubscriber
{
    IDisposable Subscribe<T>(IMessageHandler<T> handler) where T : IMessage;
    IDisposable Subscribe<T>(Func<T, Task> handler) where T : IMessage;
    IDisposable Subscribe(string topic, Func<IMessage, Task> handler);
}

public class MessageBroker : IMessagePublisher, IMessageSubscriber, IDisposable
{
    private readonly ConcurrentDictionary<Type, ConcurrentBag<IMessageHandler<IMessage>>> _typeHandlers = new();
    private readonly ConcurrentDictionary<string, ConcurrentBag<Func<IMessage, Task>>> _topicHandlers = new();
    private readonly SemaphoreSlim _publishSemaphore;
    private readonly MessageBrokerOptions _options;
    private volatile bool _disposed;

    // 统计信息
    private long _publishedCount;
    private long _processedCount;
    private long _failedCount;

    public MessageBroker(MessageBrokerOptions options = null)
    {
        _options = options ?? new MessageBrokerOptions();
        _publishSemaphore = new SemaphoreSlim(_options.MaxConcurrency, _options.MaxConcurrency);
    }

    public async Task PublishAsync<T>(T message, CancellationToken cancellationToken = default) where T : IMessage
    {
        if (_disposed) throw new ObjectDisposedException(nameof(MessageBroker));
        if (message == null) throw new ArgumentNullException(nameof(message));

        await _publishSemaphore.WaitAsync(cancellationToken);
        
        try
        {
            Interlocked.Increment(ref _publishedCount);
            
            Console.WriteLine($"📢 发布消息: {message.Type} ({message.CorrelationId[..8]})");

            // 检查消息是否过期
            if (message is MCPMessage mcpMessage && mcpMessage.TimeToLive.HasValue)
            {
                var age = DateTime.UtcNow - message.Timestamp;
                if (age > mcpMessage.TimeToLive.Value)
                {
                    Console.WriteLine($"⏰ 消息已过期: {message.Type}");
                    return;
                }
            }

            // 处理类型订阅者
            var messageType = typeof(T);
            if (_typeHandlers.TryGetValue(messageType, out var handlers))
            {
                var tasks = handlers.Select(handler => ProcessMessageAsync(handler, message, cancellationToken));
                await Task.WhenAll(tasks);
            }

            // 处理主题订阅者
            if (_topicHandlers.TryGetValue(message.Type, out var topicHandlers))
            {
                var tasks = topicHandlers.Select(handler => ProcessTopicMessageAsync(handler, message, cancellationToken));
                await Task.WhenAll(tasks);
            }

            // 处理通配符订阅者
            if (_topicHandlers.TryGetValue("*", out var wildcardHandlers))
            {
                var tasks = wildcardHandlers.Select(handler => ProcessTopicMessageAsync(handler, message, cancellationToken));
                await Task.WhenAll(tasks);
            }
        }
        finally
        {
            _publishSemaphore.Release();
        }
    }

    public async Task PublishAsync(string topic, object data, CancellationToken cancellationToken = default)
    {
        var message = new MCPMessage
        {
            Type = topic,
            Data = data
        };

        await PublishAsync(message, cancellationToken);
    }

    public IDisposable Subscribe<T>(IMessageHandler<T> handler) where T : IMessage
    {
        if (_disposed) throw new ObjectDisposedException(nameof(MessageBroker));
        if (handler == null) throw new ArgumentNullException(nameof(handler));

        var messageType = typeof(T);
        var wrappedHandler = new MessageHandlerWrapper<T>(handler);
        
        _typeHandlers.AddOrUpdate(
            messageType,
            new ConcurrentBag<IMessageHandler<IMessage>> { wrappedHandler },
            (key, existing) => 
            {
                existing.Add(wrappedHandler);
                return existing;
            });

        Console.WriteLine($"📝 订阅消息类型: {messageType.Name}");

        return new Subscription(() => RemoveTypeHandler(messageType, wrappedHandler));
    }

    public IDisposable Subscribe<T>(Func<T, Task> handler) where T : IMessage
    {
        return Subscribe(new FuncMessageHandler<T>(handler));
    }

    public IDisposable Subscribe(string topic, Func<IMessage, Task> handler)
    {
        if (_disposed) throw new ObjectDisposedException(nameof(MessageBroker));
        if (string.IsNullOrEmpty(topic)) throw new ArgumentException("Topic cannot be null or empty", nameof(topic));
        if (handler == null) throw new ArgumentNullException(nameof(handler));

        _topicHandlers.AddOrUpdate(
            topic,
            new ConcurrentBag<Func<IMessage, Task>> { handler },
            (key, existing) =>
            {
                existing.Add(handler);
                return existing;
            });

        Console.WriteLine($"📝 订阅主题: {topic}");

        return new Subscription(() => RemoveTopicHandler(topic, handler));
    }

    private async Task ProcessMessageAsync(IMessageHandler<IMessage> handler, IMessage message, CancellationToken cancellationToken)
    {
        try
        {
            await handler.HandleAsync(message, cancellationToken);
            Interlocked.Increment(ref _processedCount);
        }
        catch (Exception ex)
        {
            Interlocked.Increment(ref _failedCount);
            Console.WriteLine($"❌ 消息处理失败: {message.Type} - {ex.Message}");
            
            if (_options.EnableDeadLetterQueue)
            {
                await HandleDeadLetter(message, ex);
            }
        }
    }

    private async Task ProcessTopicMessageAsync(Func<IMessage, Task> handler, IMessage message, CancellationToken cancellationToken)
    {
        try
        {
            await handler(message);
            Interlocked.Increment(ref _processedCount);
        }
        catch (Exception ex)
        {
            Interlocked.Increment(ref _failedCount);
            Console.WriteLine($"❌ 主题消息处理失败: {message.Type} - {ex.Message}");
            
            if (_options.EnableDeadLetterQueue)
            {
                await HandleDeadLetter(message, ex);
            }
        }
    }

    private async Task HandleDeadLetter(IMessage message, Exception exception)
    {
        var deadLetterMessage = new MCPMessage
        {
            Type = "system.dead_letter",
            Data = new
            {
                OriginalMessage = message,
                Error = exception.Message,
                StackTrace = exception.StackTrace,
                Timestamp = DateTime.UtcNow
            }
        };

        // 发布到死信队列主题
        if (_topicHandlers.TryGetValue("system.dead_letter", out var deadLetterHandlers))
        {
            var tasks = deadLetterHandlers.Select(handler => handler(deadLetterMessage));
            await Task.WhenAll(tasks);
        }
    }

    private void RemoveTypeHandler(Type messageType, IMessageHandler<IMessage> handler)
    {
        if (_typeHandlers.TryGetValue(messageType, out var handlers))
        {
            // 由于ConcurrentBag不支持Remove,这里创建新的集合
            var updatedHandlers = new ConcurrentBag<IMessageHandler<IMessage>>(
                handlers.Where(h => !ReferenceEquals(h, handler))
            );
            _typeHandlers.TryUpdate(messageType, updatedHandlers, handlers);
        }
    }

    private void RemoveTopicHandler(string topic, Func<IMessage, Task> handler)
    {
        if (_topicHandlers.TryGetValue(topic, out var handlers))
        {
            var updatedHandlers = new ConcurrentBag<Func<IMessage, Task>>(
                handlers.Where(h => !ReferenceEquals(h, handler))
            );
            _topicHandlers.TryUpdate(topic, updatedHandlers, handlers);
        }
    }

    public MessageBrokerStats GetStats()
    {
        return new MessageBrokerStats
        {
            PublishedCount = Interlocked.Read(ref _publishedCount),
            ProcessedCount = Interlocked.Read(ref _processedCount),
            FailedCount = Interlocked.Read(ref _failedCount),
            TypeSubscriptionsCount = _typeHandlers.Count,
            TopicSubscriptionsCount = _topicHandlers.Count,
            IsDisposed = _disposed
        };
    }

    public void Dispose()
    {
        if (_disposed) return;

        _disposed = true;
        _publishSemaphore?.Dispose();
        
        Console.WriteLine("🛑 消息代理已关闭");
    }
}

// 支持类和接口
public class MessageBrokerOptions
{
    public int MaxConcurrency { get; set; } = Environment.ProcessorCount * 2;
    public bool EnableDeadLetterQueue { get; set; } = true;
    public TimeSpan DefaultTimeout { get; set; } = TimeSpan.FromSeconds(30);
}

public class MessageBrokerStats
{
    public long PublishedCount { get; set; }
    public long ProcessedCount { get; set; }
    public long FailedCount { get; set; }
    public int TypeSubscriptionsCount { get; set; }
    public int TopicSubscriptionsCount { get; set; }
    public bool IsDisposed { get; set; }
    
    public double SuccessRate => ProcessedCount + FailedCount > 0 
        ? (double)ProcessedCount / (ProcessedCount + FailedCount) 
        : 0;
}

// 消息处理器包装类
internal class MessageHandlerWrapper<T> : IMessageHandler<IMessage> where T : IMessage
{
    private readonly IMessageHandler<T> _innerHandler;

    public MessageHandlerWrapper(IMessageHandler<T> innerHandler)
    {
        _innerHandler = innerHandler;
    }

    public async Task HandleAsync(IMessage message, CancellationToken cancellationToken = default)
    {
        if (message is T typedMessage)
        {
            await _innerHandler.HandleAsync(typedMessage, cancellationToken);
        }
    }
}

internal class FuncMessageHandler<T> : IMessageHandler<T> where T : IMessage
{
    private readonly Func<T, Task> _handler;

    public FuncMessageHandler(Func<T, Task> handler)
    {
        _handler = handler;
    }

    public Task HandleAsync(T message, CancellationToken cancellationToken = default)
    {
        return _handler(message);
    }
}

// 订阅管理
internal class Subscription : IDisposable
{
    private readonly Action _unsubscribe;
    private bool _disposed;

    public Subscription(Action unsubscribe)
    {
        _unsubscribe = unsubscribe;
    }

    public void Dispose()
    {
        if (_disposed) return;
        
        _disposed = true;
        _unsubscribe?.Invoke();
    }
}

// MCP工具相关消息类型
public class ToolCallMessage : MCPMessage
{
    public ToolCallMessage()
    {
        Type = "tool.call";
    }

    public string ToolName { get; set; }
    public Dictionary<string, object> Parameters { get; set; } = new();
}

public class ToolResultMessage : MCPMessage
{
    public ToolResultMessage()
    {
        Type = "tool.result";
    }

    public string ToolName { get; set; }
    public object Result { get; set; }
    public bool IsSuccess { get; set; }
    public string ErrorMessage { get; set; }
}

// MCP发布-订阅工具系统
public class MCPPubSubSystem : IDisposable
{
    private readonly MessageBroker _broker;
    private readonly Dictionary<string, Func<Dictionary<string, object>, Task<object>>> _tools = new();

    public MCPPubSubSystem()
    {
        _broker = new MessageBroker(new MessageBrokerOptions
        {
            MaxConcurrency = 10,
            EnableDeadLetterQueue = true
        });

        SetupSystemHandlers();
    }

    private void SetupSystemHandlers()
    {
        // 订阅工具调用消息
        _broker.Subscribe<ToolCallMessage>(async message =>
        {
            Console.WriteLine($"🎯 处理工具调用: {message.ToolName}");

            if (_tools.TryGetValue(message.ToolName, out var toolFunc))
            {
                try
                {
                    var result = await toolFunc(message.Parameters);
                    
                    await _broker.PublishAsync(new ToolResultMessage
                    {
                        ToolName = message.ToolName,
                        Result = result,
                        IsSuccess = true,
                        CorrelationId = message.CorrelationId
                    });
                }
                catch (Exception ex)
                {
                    await _broker.PublishAsync(new ToolResultMessage
                    {
                        ToolName = message.ToolName,
                        IsSuccess = false,
                        ErrorMessage = ex.Message,
                        CorrelationId = message.CorrelationId
                    });
                }
            }
            else
            {
                await _broker.PublishAsync(new ToolResultMessage
                {
                    ToolName = message.ToolName,
                    IsSuccess = false,
                    ErrorMessage = $"工具不存在: {message.ToolName}",
                    CorrelationId = message.CorrelationId
                });
            }
        });

        // 订阅工具结果消息
        _broker.Subscribe<ToolResultMessage>(async message =>
        {
            if (message.IsSuccess)
            {
                Console.WriteLine($"✅ 工具执行成功: {message.ToolName}");
            }
            else
            {
                Console.WriteLine($"❌ 工具执行失败: {message.ToolName} - {message.ErrorMessage}");
            }
        });

        // 订阅死信消息
        _broker.Subscribe("system.dead_letter", async message =>
        {
            Console.WriteLine($"💀 死信消息: {message.Type}");
            // 这里可以添加死信处理逻辑,如重试、告警等
        });
    }

    public void RegisterTool(string name, Func<Dictionary<string, object>, Task<object>> toolFunc)
    {
        _tools[name] = toolFunc;
        Console.WriteLine($"🔧 注册工具: {name}");
    }

    public async Task CallToolAsync(string toolName, Dictionary<string, object> parameters = null)
    {
        await _broker.PublishAsync(new ToolCallMessage
        {
            ToolName = toolName,
            Parameters = parameters ?? new Dictionary<string, object>()
        });
    }

    public IDisposable SubscribeToResults(Action<ToolResultMessage> handler)
    {
        return _broker.Subscribe<ToolResultMessage>(message =>
        {
            handler(message);
            return Task.CompletedTask;
        });
    }

    public MessageBrokerStats GetStats() => _broker.GetStats();

    public void Dispose()
    {
        _broker?.Dispose();
    }
}

// 使用示例
class Program
{
    static async Task Main(string[] args)
    {
        using var system = new MCPPubSubSystem();

        // 注册示例工具
        system.RegisterTool("get_weather", async parameters =>
        {
            var location = parameters.GetValueOrDefault("location", "未知").ToString();
            
            // 模拟API调用延迟
            await Task.Delay(1000);
            
            return new
            {
                location,
                temperature = new Random().Next(15, 35),
                description = new[] { "晴天", "多云", "雨天" }[new Random().Next(3)],
                timestamp = DateTime.UtcNow
            };
        });

        system.RegisterTool("calculate", async parameters =>
        {
            var a = Convert.ToDouble(parameters.GetValueOrDefault("a", 0));
            var b = Convert.ToDouble(parameters.GetValueOrDefault("b", 0));
            var operation = parameters.GetValueOrDefault("operation", "add").ToString();

            return operation switch
            {
                "add" => a + b,
                "subtract" => a - b,
                "multiply" => a * b,
                "divide" => b != 0 ? a / b : throw new DivideByZeroException("除数不能为零"),
                _ => throw new ArgumentException($"不支持的操作: {operation}")
            };
        });

        // 订阅结果
        using var subscription = system.SubscribeToResults(result =>
        {
            Console.WriteLine($"📋 收到结果通知: {result.ToolName} - 成功: {result.IsSuccess}");
        });

        // 调用工具
        await system.CallToolAsync("get_weather", new Dictionary<string, object>
        {
            ["location"] = "北京"
        });

        await system.CallToolAsync("calculate", new Dictionary<string, object>
        {
            ["a"] = 10,
            ["b"] = 5,
            ["operation"] = "multiply"
        });

        // 测试错误情况
        await system.CallToolAsync("nonexistent_tool");

        // 等待处理完成
        await Task.Delay(3000);

        // 显示统计信息
        var stats = system.GetStats();
        Console.WriteLine($"\n📊 系统统计:");
        Console.WriteLine($"  发布消息数: {stats.PublishedCount}");
        Console.WriteLine($"  处理消息数: {stats.ProcessedCount}");
        Console.WriteLine($"  失败消息数: {stats.FailedCount}");
        Console.WriteLine($"  成功率: {stats.SuccessRate:P2}");
    }
}

小结

工作流设计模式的核心思想:

🔄 模式选择指南

  1. 请求-响应模式 - 适用于需要同步结果的场景
  2. 事件驱动模式 - 适用于松耦合、异步处理的场景
  3. 发布-订阅模式 - 适用于一对多消息传递的场景
  4. 混合模式 - 根据具体需求组合使用多种模式

💡 设计原则

  • 选择合适的模式解决特定问题
  • 保持模式的纯洁性,避免过度复杂化
  • 考虑可扩展性和可维护性
  • 建立完善的错误处理和监控机制
  • 重视性能和资源管理

🏗️ 架构哲学:好的架构模式不是银弹,而是工具箱中的利器。关键是在正确的场景使用正确的模式。


下一节实战案例分析 - 通过真实案例掌握最佳实践应用