5.3 流式处理和实时通信
🎯 学习目标:实现高性能的流式处理和实时通信,支持大数据处理和实时交互场景
⏱️ 预计时间:50分钟
📊 难度等级:⭐⭐⭐⭐⭐
🌊 为什么需要流式处理?
想象一下这些场景:
- 📊 大数据处理:分析几GB的日志文件,不能一次性加载到内存
- 💬 实时对话:AI回复需要逐字显示,提升用户体验
- 📈 实时监控:系统状态需要持续更新,而非定期刷新
- 🔄 长时间任务:任务进度需要实时反馈给用户
传统的请求-响应模式在这些场景下会显得笨重且用户体验较差。流式处理正是为了解决这些问题!
🏗️ 流式处理架构
🎯 核心流式处理框架
🌊 基础流式接口
python
"""
streaming.py - MCP流式处理核心框架
"""
from abc import ABC, abstractmethod
from typing import AsyncIterator, Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass, field
from enum import Enum
import asyncio
import json
import time
import uuid
from datetime import datetime, timedelta
import gzip
import weakref
from loguru import logger
class StreamType(Enum):
"""流类型"""
TEXT = "text"
JSON = "json"
BINARY = "binary"
EVENT = "event"
LOG = "log"
METRICS = "metrics"
class StreamState(Enum):
"""流状态"""
CREATED = "created"
ACTIVE = "active"
PAUSED = "paused"
COMPLETED = "completed"
ERROR = "error"
CANCELLED = "cancelled"
@dataclass
class StreamMessage:
"""流消息"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
stream_id: str = ""
type: StreamType = StreamType.TEXT
data: Any = None
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.now)
sequence: int = 0
is_final: bool = False
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
"id": self.id,
"stream_id": self.stream_id,
"type": self.type.value,
"data": self.data,
"metadata": self.metadata,
"timestamp": self.timestamp.isoformat(),
"sequence": self.sequence,
"is_final": self.is_final
}
def to_json(self) -> str:
"""转换为JSON字符串"""
return json.dumps(self.to_dict(), ensure_ascii=False)
@dataclass
class StreamConfig:
"""流配置"""
buffer_size: int = 1000
max_message_size: int = 1024 * 1024 # 1MB
compression: bool = True
persist_messages: bool = False
flow_control: bool = True
rate_limit: Optional[int] = None # messages per second
ttl: Optional[int] = None # seconds
class StreamSubscriber(ABC):
"""流订阅者抽象基类"""
def __init__(self, subscriber_id: str):
self.subscriber_id = subscriber_id
self.subscribed_streams: set = set()
@abstractmethod
async def on_message(self, message: StreamMessage):
"""处理流消息"""
pass
@abstractmethod
async def on_stream_started(self, stream_id: str):
"""流开始回调"""
pass
@abstractmethod
async def on_stream_completed(self, stream_id: str):
"""流完成回调"""
pass
@abstractmethod
async def on_stream_error(self, stream_id: str, error: Exception):
"""流错误回调"""
pass
class DataStream:
"""数据流"""
def __init__(self,
stream_id: str,
stream_type: StreamType,
config: StreamConfig = None):
self.stream_id = stream_id
self.stream_type = stream_type
self.config = config or StreamConfig()
self.state = StreamState.CREATED
self.created_at = datetime.now()
self.updated_at = datetime.now()
# 消息缓冲区
self.message_buffer: asyncio.Queue = asyncio.Queue(maxsize=self.config.buffer_size)
self.message_history: List[StreamMessage] = []
self.sequence_counter = 0
# 订阅者管理
self.subscribers: weakref.WeakSet = weakref.WeakSet()
# 流控制
self.is_paused = False
self.completion_event = asyncio.Event()
self.error_event = asyncio.Event()
self.last_activity = datetime.now()
# 统计信息
self.total_messages = 0
self.total_bytes = 0
self.error_count = 0
async def start(self):
"""启动流"""
if self.state != StreamState.CREATED:
raise ValueError(f"流已启动或已完成: {self.state}")
self.state = StreamState.ACTIVE
self.updated_at = datetime.now()
# 通知订阅者
for subscriber in self.subscribers:
try:
await subscriber.on_stream_started(self.stream_id)
except Exception as e:
logger.error(f"通知订阅者流开始失败: {e}")
logger.info(f"数据流已启动: {self.stream_id}")
async def send_message(self, data: Any, metadata: Dict[str, Any] = None, is_final: bool = False):
"""发送消息到流"""
if self.state not in [StreamState.CREATED, StreamState.ACTIVE]:
raise ValueError(f"流状态不允许发送消息: {self.state}")
if self.is_paused:
logger.warning(f"流已暂停,消息将缓冲: {self.stream_id}")
# 创建消息
message = StreamMessage(
stream_id=self.stream_id,
type=self.stream_type,
data=data,
metadata=metadata or {},
sequence=self.sequence_counter,
is_final=is_final
)
self.sequence_counter += 1
# 检查消息大小
message_size = len(message.to_json().encode('utf-8'))
if message_size > self.config.max_message_size:
raise ValueError(f"消息过大: {message_size} > {self.config.max_message_size}")
# 添加到缓冲区
try:
await self.message_buffer.put(message)
except asyncio.QueueFull:
logger.warning(f"流缓冲区已满,丢弃消息: {self.stream_id}")
return
# 更新统计
self.total_messages += 1
self.total_bytes += message_size
self.last_activity = datetime.now()
# 持久化消息(可选)
if self.config.persist_messages:
self.message_history.append(message)
# 分发给订阅者
await self._distribute_message(message)
# 如果是最终消息,完成流
if is_final:
await self.complete()
async def _distribute_message(self, message: StreamMessage):
"""分发消息给订阅者"""
failed_subscribers = []
for subscriber in self.subscribers:
try:
await subscriber.on_message(message)
except Exception as e:
logger.error(f"订阅者处理消息失败: {e}")
failed_subscribers.append(subscriber)
# 移除失败的订阅者
for subscriber in failed_subscribers:
self.subscribers.discard(subscriber)
async def subscribe(self, subscriber: StreamSubscriber):
"""添加订阅者"""
self.subscribers.add(subscriber)
subscriber.subscribed_streams.add(self.stream_id)
# 如果流已经有历史消息,发送给新订阅者
if self.config.persist_messages and self.message_history:
for message in self.message_history:
try:
await subscriber.on_message(message)
except Exception as e:
logger.error(f"发送历史消息失败: {e}")
logger.debug(f"订阅者已添加到流: {subscriber.subscriber_id} -> {self.stream_id}")
async def unsubscribe(self, subscriber: StreamSubscriber):
"""移除订阅者"""
self.subscribers.discard(subscriber)
subscriber.subscribed_streams.discard(self.stream_id)
logger.debug(f"订阅者已从流移除: {subscriber.subscriber_id} -> {self.stream_id}")
async def pause(self):
"""暂停流"""
self.is_paused = True
self.state = StreamState.PAUSED
self.updated_at = datetime.now()
logger.info(f"流已暂停: {self.stream_id}")
async def resume(self):
"""恢复流"""
if self.state == StreamState.PAUSED:
self.is_paused = False
self.state = StreamState.ACTIVE
self.updated_at = datetime.now()
logger.info(f"流已恢复: {self.stream_id}")
async def complete(self):
"""完成流"""
if self.state in [StreamState.COMPLETED, StreamState.ERROR, StreamState.CANCELLED]:
return
self.state = StreamState.COMPLETED
self.updated_at = datetime.now()
self.completion_event.set()
# 通知订阅者
for subscriber in self.subscribers:
try:
await subscriber.on_stream_completed(self.stream_id)
except Exception as e:
logger.error(f"通知订阅者流完成失败: {e}")
logger.info(f"流已完成: {self.stream_id}, 总消息数: {self.total_messages}")
async def error(self, error: Exception):
"""流错误"""
self.state = StreamState.ERROR
self.updated_at = datetime.now()
self.error_count += 1
self.error_event.set()
# 通知订阅者
for subscriber in self.subscribers:
try:
await subscriber.on_stream_error(self.stream_id, error)
except Exception as e:
logger.error(f"通知订阅者流错误失败: {e}")
logger.error(f"流发生错误: {self.stream_id}, 错误: {error}")
async def cancel(self):
"""取消流"""
self.state = StreamState.CANCELLED
self.updated_at = datetime.now()
logger.info(f"流已取消: {self.stream_id}")
def get_statistics(self) -> Dict[str, Any]:
"""获取流统计信息"""
duration = (datetime.now() - self.created_at).total_seconds()
return {
"stream_id": self.stream_id,
"state": self.state.value,
"type": self.stream_type.value,
"total_messages": self.total_messages,
"total_bytes": self.total_bytes,
"error_count": self.error_count,
"subscriber_count": len(self.subscribers),
"duration_seconds": duration,
"messages_per_second": self.total_messages / max(duration, 1),
"bytes_per_second": self.total_bytes / max(duration, 1),
"last_activity": self.last_activity.isoformat()
}
class StreamManager:
"""流管理器"""
def __init__(self):
self.streams: Dict[str, DataStream] = {}
self.stream_registry: Dict[str, Dict[str, Any]] = {}
self.cleanup_task: Optional[asyncio.Task] = None
async def initialize(self):
"""初始化流管理器"""
# 启动清理任务
self.cleanup_task = asyncio.create_task(self._cleanup_expired_streams())
logger.info("流管理器已初始化")
async def create_stream(self,
stream_id: Optional[str] = None,
stream_type: StreamType = StreamType.TEXT,
config: StreamConfig = None) -> DataStream:
"""创建新流"""
if stream_id is None:
stream_id = f"stream_{uuid.uuid4().hex[:8]}"
if stream_id in self.streams:
raise ValueError(f"流ID已存在: {stream_id}")
stream = DataStream(stream_id, stream_type, config)
self.streams[stream_id] = stream
# 注册流信息
self.stream_registry[stream_id] = {
"created_at": stream.created_at,
"type": stream_type.value,
"config": config.__dict__ if config else {}
}
logger.info(f"新流已创建: {stream_id}")
return stream
async def get_stream(self, stream_id: str) -> Optional[DataStream]:
"""获取流"""
return self.streams.get(stream_id)
async def delete_stream(self, stream_id: str) -> bool:
"""删除流"""
if stream_id not in self.streams:
return False
stream = self.streams[stream_id]
# 取消流
if stream.state in [StreamState.ACTIVE, StreamState.PAUSED]:
await stream.cancel()
# 清理资源
del self.streams[stream_id]
self.stream_registry.pop(stream_id, None)
logger.info(f"流已删除: {stream_id}")
return True
async def list_streams(self,
state_filter: Optional[StreamState] = None,
type_filter: Optional[StreamType] = None) -> List[Dict[str, Any]]:
"""列出流"""
result = []
for stream in self.streams.values():
if state_filter and stream.state != state_filter:
continue
if type_filter and stream.stream_type != type_filter:
continue
result.append(stream.get_statistics())
return result
async def _cleanup_expired_streams(self):
"""清理过期流"""
while True:
try:
now = datetime.now()
expired_streams = []
for stream_id, stream in self.streams.items():
# 检查TTL
if stream.config.ttl:
age = (now - stream.created_at).total_seconds()
if age > stream.config.ttl:
expired_streams.append(stream_id)
continue
# 检查非活跃流
inactive_time = (now - stream.last_activity).total_seconds()
if (stream.state == StreamState.COMPLETED and inactive_time > 3600) or \
(stream.state in [StreamState.ERROR, StreamState.CANCELLED] and inactive_time > 1800):
expired_streams.append(stream_id)
# 删除过期流
for stream_id in expired_streams:
await self.delete_stream(stream_id)
logger.debug(f"清理过期流: {stream_id}")
# 每5分钟检查一次
await asyncio.sleep(300)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"清理过期流时出错: {e}")
await asyncio.sleep(300)
async def close(self):
"""关闭流管理器"""
# 取消清理任务
if self.cleanup_task:
self.cleanup_task.cancel()
try:
await self.cleanup_task
except asyncio.CancelledError:
pass
# 关闭所有流
for stream_id in list(self.streams.keys()):
await self.delete_stream(stream_id)
logger.info("流管理器已关闭")
🌐 Server-Sent Events 实现
📡 SSE协议支持
python
"""
sse_handler.py - Server-Sent Events处理器
"""
from typing import Dict, List, Any, Optional, AsyncIterator
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from starlette.background import BackgroundTask
from .streaming import StreamManager, StreamSubscriber, StreamMessage, StreamType
class SSESubscriber(StreamSubscriber):
"""SSE订阅者"""
def __init__(self, subscriber_id: str, response_queue: asyncio.Queue):
super().__init__(subscriber_id)
self.response_queue = response_queue
self.is_active = True
async def on_message(self, message: StreamMessage):
"""处理流消息"""
if not self.is_active:
return
# 格式化为SSE格式
sse_data = {
"event": message.type.value,
"data": message.data,
"metadata": message.metadata,
"id": message.id,
"sequence": message.sequence
}
try:
await self.response_queue.put(sse_data)
except asyncio.QueueFull:
logger.warning(f"SSE队列已满,丢弃消息: {self.subscriber_id}")
async def on_stream_started(self, stream_id: str):
"""流开始回调"""
event_data = {
"event": "stream_started",
"data": {"stream_id": stream_id, "timestamp": datetime.now().isoformat()},
"id": f"start_{stream_id}"
}
try:
await self.response_queue.put(event_data)
except asyncio.QueueFull:
pass
async def on_stream_completed(self, stream_id: str):
"""流完成回调"""
event_data = {
"event": "stream_completed",
"data": {"stream_id": stream_id, "timestamp": datetime.now().isoformat()},
"id": f"complete_{stream_id}"
}
try:
await self.response_queue.put(event_data)
except asyncio.QueueFull:
pass
async def on_stream_error(self, stream_id: str, error: Exception):
"""流错误回调"""
event_data = {
"event": "stream_error",
"data": {
"stream_id": stream_id,
"error": str(error),
"timestamp": datetime.now().isoformat()
},
"id": f"error_{stream_id}"
}
try:
await self.response_queue.put(event_data)
except asyncio.QueueFull:
pass
def close(self):
"""关闭订阅者"""
self.is_active = False
class SSEHandler:
"""Server-Sent Events处理器"""
def __init__(self, stream_manager: StreamManager):
self.stream_manager = stream_manager
self.active_connections: Dict[str, SSESubscriber] = {}
def setup_routes(self, app: FastAPI):
"""设置SSE路由"""
@app.get("/stream/{stream_id}")
async def stream_events(request: Request, stream_id: str):
"""SSE流端点"""
# 检查流是否存在
stream = await self.stream_manager.get_stream(stream_id)
if not stream:
raise HTTPException(status_code=404, detail="流未找到")
# 创建响应队列
response_queue = asyncio.Queue(maxsize=1000)
subscriber_id = f"sse_{request.client.host}_{id(request)}"
# 创建SSE订阅者
subscriber = SSESubscriber(subscriber_id, response_queue)
self.active_connections[subscriber_id] = subscriber
# 订阅流
await stream.subscribe(subscriber)
# 创建SSE生成器
async def event_generator():
try:
# 发送连接确认
yield self._format_sse_message({
"event": "connected",
"data": {
"subscriber_id": subscriber_id,
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
},
"id": f"connect_{subscriber_id}"
})
while subscriber.is_active:
try:
# 等待消息或超时
message = await asyncio.wait_for(
response_queue.get(),
timeout=30.0
)
yield self._format_sse_message(message)
except asyncio.TimeoutError:
# 发送心跳
yield self._format_sse_message({
"event": "heartbeat",
"data": {"timestamp": datetime.now().isoformat()},
"id": f"heartbeat_{int(datetime.now().timestamp())}"
})
except Exception as e:
logger.error(f"SSE消息处理错误: {e}")
break
except asyncio.CancelledError:
logger.info(f"SSE连接已取消: {subscriber_id}")
except Exception as e:
logger.error(f"SSE生成器错误: {e}")
finally:
# 清理订阅者
await stream.unsubscribe(subscriber)
subscriber.close()
self.active_connections.pop(subscriber_id, None)
# 返回流式响应
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET",
"Access-Control-Allow-Headers": "Cache-Control"
},
background=BackgroundTask(self._cleanup_connection, subscriber_id)
)
@app.post("/stream/{stream_id}/message")
async def send_message(stream_id: str, message_data: Dict[str, Any]):
"""发送消息到流"""
stream = await self.stream_manager.get_stream(stream_id)
if not stream:
raise HTTPException(status_code=404, detail="流未找到")
try:
await stream.send_message(
data=message_data.get("data"),
metadata=message_data.get("metadata", {}),
is_final=message_data.get("is_final", False)
)
return {"success": True, "message": "消息已发送"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/streams")
async def list_streams():
"""列出所有流"""
streams = await self.stream_manager.list_streams()
return {"streams": streams}
@app.post("/streams")
async def create_stream(stream_data: Dict[str, Any]):
"""创建新流"""
try:
stream_type = StreamType(stream_data.get("type", "text"))
stream = await self.stream_manager.create_stream(
stream_id=stream_data.get("stream_id"),
stream_type=stream_type
)
await stream.start()
return {
"success": True,
"stream_id": stream.stream_id,
"stream_url": f"/stream/{stream.stream_id}"
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
def _format_sse_message(self, message_data: Dict[str, Any]) -> str:
"""格式化SSE消息"""
lines = []
if "id" in message_data:
lines.append(f"id: {message_data['id']}")
if "event" in message_data:
lines.append(f"event: {message_data['event']}")
if "data" in message_data:
data_str = json.dumps(message_data["data"], ensure_ascii=False)
lines.append(f"data: {data_str}")
lines.append("") # 空行表示消息结束
return "\n".join(lines) + "\n"
async def _cleanup_connection(self, subscriber_id: str):
"""清理连接"""
subscriber = self.active_connections.pop(subscriber_id, None)
if subscriber:
subscriber.close()
logger.debug(f"SSE连接已清理: {subscriber_id}")
🔗 WebSocket实时通信
💬 WebSocket处理器
python
"""
websocket_handler.py - WebSocket实时通信处理器
"""
from typing import Dict, List, Any, Optional, Set
import asyncio
import json
from datetime import datetime
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.websockets import WebSocketState
from .streaming import StreamManager, StreamSubscriber, StreamMessage, StreamType
class WebSocketSubscriber(StreamSubscriber):
"""WebSocket订阅者"""
def __init__(self, subscriber_id: str, websocket: WebSocket):
super().__init__(subscriber_id)
self.websocket = websocket
self.is_active = True
async def on_message(self, message: StreamMessage):
"""处理流消息"""
if not self.is_active or self.websocket.client_state != WebSocketState.CONNECTED:
return
try:
await self.websocket.send_json({
"type": "stream_message",
"stream_id": message.stream_id,
"message": message.to_dict()
})
except Exception as e:
logger.error(f"WebSocket发送消息失败: {e}")
self.is_active = False
async def on_stream_started(self, stream_id: str):
"""流开始回调"""
if not self.is_active:
return
try:
await self.websocket.send_json({
"type": "stream_started",
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"WebSocket发送流开始事件失败: {e}")
async def on_stream_completed(self, stream_id: str):
"""流完成回调"""
if not self.is_active:
return
try:
await self.websocket.send_json({
"type": "stream_completed",
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"WebSocket发送流完成事件失败: {e}")
async def on_stream_error(self, stream_id: str, error: Exception):
"""流错误回调"""
if not self.is_active:
return
try:
await self.websocket.send_json({
"type": "stream_error",
"stream_id": stream_id,
"error": str(error),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"WebSocket发送流错误事件失败: {e}")
def close(self):
"""关闭订阅者"""
self.is_active = False
class WebSocketHandler:
"""WebSocket处理器"""
def __init__(self, stream_manager: StreamManager):
self.stream_manager = stream_manager
self.active_connections: Dict[str, WebSocketSubscriber] = {}
self.connection_groups: Dict[str, Set[str]] = {} # 按流分组的连接
def setup_routes(self, app: FastAPI):
"""设置WebSocket路由"""
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket连接端点"""
await websocket.accept()
connection_id = f"ws_{id(websocket)}"
subscriber = WebSocketSubscriber(connection_id, websocket)
self.active_connections[connection_id] = subscriber
try:
# 发送连接确认
await websocket.send_json({
"type": "connected",
"connection_id": connection_id,
"timestamp": datetime.now().isoformat()
})
# 消息处理循环
while True:
try:
# 接收客户端消息
data = await websocket.receive_json()
await self._handle_client_message(subscriber, data)
except WebSocketDisconnect:
logger.info(f"WebSocket连接断开: {connection_id}")
break
except Exception as e:
logger.error(f"WebSocket消息处理错误: {e}")
await websocket.send_json({
"type": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
})
except Exception as e:
logger.error(f"WebSocket连接错误: {e}")
finally:
# 清理连接
await self._cleanup_connection(connection_id)
async def _handle_client_message(self, subscriber: WebSocketSubscriber, data: Dict[str, Any]):
"""处理客户端消息"""
message_type = data.get("type")
if message_type == "subscribe":
# 订阅流
stream_id = data.get("stream_id")
if not stream_id:
await subscriber.websocket.send_json({
"type": "error",
"error": "stream_id is required"
})
return
stream = await self.stream_manager.get_stream(stream_id)
if not stream:
await subscriber.websocket.send_json({
"type": "error",
"error": f"Stream not found: {stream_id}"
})
return
await stream.subscribe(subscriber)
# 加入连接组
if stream_id not in self.connection_groups:
self.connection_groups[stream_id] = set()
self.connection_groups[stream_id].add(subscriber.subscriber_id)
await subscriber.websocket.send_json({
"type": "subscribed",
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
})
elif message_type == "unsubscribe":
# 取消订阅流
stream_id = data.get("stream_id")
if stream_id:
stream = await self.stream_manager.get_stream(stream_id)
if stream:
await stream.unsubscribe(subscriber)
# 从连接组移除
if stream_id in self.connection_groups:
self.connection_groups[stream_id].discard(subscriber.subscriber_id)
await subscriber.websocket.send_json({
"type": "unsubscribed",
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
})
elif message_type == "send_message":
# 发送消息到流
stream_id = data.get("stream_id")
message_data = data.get("data")
if not stream_id or message_data is None:
await subscriber.websocket.send_json({
"type": "error",
"error": "stream_id and data are required"
})
return
stream = await self.stream_manager.get_stream(stream_id)
if not stream:
await subscriber.websocket.send_json({
"type": "error",
"error": f"Stream not found: {stream_id}"
})
return
try:
await stream.send_message(
data=message_data,
metadata=data.get("metadata", {}),
is_final=data.get("is_final", False)
)
await subscriber.websocket.send_json({
"type": "message_sent",
"stream_id": stream_id,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
await subscriber.websocket.send_json({
"type": "error",
"error": str(e)
})
elif message_type == "create_stream":
# 创建新流
try:
stream_type = StreamType(data.get("stream_type", "text"))
stream = await self.stream_manager.create_stream(
stream_id=data.get("stream_id"),
stream_type=stream_type
)
await stream.start()
await subscriber.websocket.send_json({
"type": "stream_created",
"stream_id": stream.stream_id,
"timestamp": datetime.now().isoformat()
})
except Exception as e:
await subscriber.websocket.send_json({
"type": "error",
"error": str(e)
})
elif message_type == "list_streams":
# 列出流
streams = await self.stream_manager.list_streams()
await subscriber.websocket.send_json({
"type": "streams_list",
"streams": streams,
"timestamp": datetime.now().isoformat()
})
elif message_type == "ping":
# 心跳响应
await subscriber.websocket.send_json({
"type": "pong",
"timestamp": datetime.now().isoformat()
})
else:
await subscriber.websocket.send_json({
"type": "error",
"error": f"Unknown message type: {message_type}"
})
async def _cleanup_connection(self, connection_id: str):
"""清理连接"""
subscriber = self.active_connections.pop(connection_id, None)
if not subscriber:
return
# 从所有流中取消订阅
for stream_id in list(subscriber.subscribed_streams):
stream = await self.stream_manager.get_stream(stream_id)
if stream:
await stream.unsubscribe(subscriber)
# 从连接组移除
if stream_id in self.connection_groups:
self.connection_groups[stream_id].discard(connection_id)
# 如果组为空,删除组
if not self.connection_groups[stream_id]:
del self.connection_groups[stream_id]
subscriber.close()
logger.debug(f"WebSocket连接已清理: {connection_id}")
async def broadcast_to_stream(self, stream_id: str, message: Dict[str, Any]):
"""向流的所有连接广播消息"""
if stream_id not in self.connection_groups:
return
failed_connections = []
for connection_id in self.connection_groups[stream_id]:
subscriber = self.active_connections.get(connection_id)
if not subscriber or not subscriber.is_active:
failed_connections.append(connection_id)
continue
try:
await subscriber.websocket.send_json(message)
except Exception as e:
logger.error(f"广播消息失败: {e}")
failed_connections.append(connection_id)
# 清理失败的连接
for connection_id in failed_connections:
await self._cleanup_connection(connection_id)
📁 文件流处理器
🗂️ 大文件流式处理
python
"""
file_streaming.py - 文件流式处理器
"""
from typing import AsyncIterator, Dict, Any, Optional
import asyncio
import aiofiles
from pathlib import Path
import mimetypes
import hashlib
from .streaming import DataStream, StreamType, StreamConfig, StreamMessage
class FileStreamProcessor:
"""文件流处理器"""
def __init__(self, chunk_size: int = 8192):
self.chunk_size = chunk_size
async def stream_file_content(self,
file_path: str,
stream: DataStream,
encoding: str = 'utf-8') -> None:
"""流式读取文件内容"""
path = Path(file_path)
if not path.exists():
await stream.error(FileNotFoundError(f"文件不存在: {file_path}"))
return
if not path.is_file():
await stream.error(ValueError(f"路径不是文件: {file_path}"))
return
try:
file_size = path.stat().st_size
bytes_read = 0
# 发送文件开始信息
await stream.send_message({
"type": "file_start",
"file_path": str(path),
"file_size": file_size,
"mime_type": mimetypes.guess_type(str(path))[0]
})
# 流式读取文件
async with aiofiles.open(path, 'r', encoding=encoding) as f:
while True:
chunk = await f.read(self.chunk_size)
if not chunk:
break
bytes_read += len(chunk.encode(encoding))
progress = (bytes_read / file_size) * 100
await stream.send_message({
"type": "file_chunk",
"content": chunk,
"progress": progress,
"bytes_read": bytes_read,
"total_bytes": file_size
})
# 发送完成信息
await stream.send_message({
"type": "file_complete",
"total_bytes": bytes_read
}, is_final=True)
except Exception as e:
await stream.error(e)
async def stream_binary_file(self,
file_path: str,
stream: DataStream) -> None:
"""流式读取二进制文件"""
path = Path(file_path)
if not path.exists():
await stream.error(FileNotFoundError(f"文件不存在: {file_path}"))
return
try:
file_size = path.stat().st_size
bytes_read = 0
hasher = hashlib.md5()
await stream.send_message({
"type": "binary_file_start",
"file_path": str(path),
"file_size": file_size,
"mime_type": mimetypes.guess_type(str(path))[0]
})
async with aiofiles.open(path, 'rb') as f:
while True:
chunk = await f.read(self.chunk_size)
if not chunk:
break
bytes_read += len(chunk)
hasher.update(chunk)
progress = (bytes_read / file_size) * 100
# 将二进制数据编码为base64
import base64
chunk_b64 = base64.b64encode(chunk).decode('ascii')
await stream.send_message({
"type": "binary_chunk",
"content": chunk_b64,
"progress": progress,
"bytes_read": bytes_read,
"total_bytes": file_size
})
await stream.send_message({
"type": "binary_file_complete",
"total_bytes": bytes_read,
"md5_hash": hasher.hexdigest()
}, is_final=True)
except Exception as e:
await stream.error(e)
async def stream_directory_tree(self,
directory_path: str,
stream: DataStream,
max_depth: int = 10) -> None:
"""流式遍历目录树"""
path = Path(directory_path)
if not path.exists():
await stream.error(FileNotFoundError(f"目录不存在: {directory_path}"))
return
if not path.is_dir():
await stream.error(ValueError(f"路径不是目录: {directory_path}"))
return
try:
await stream.send_message({
"type": "directory_start",
"directory_path": str(path)
})
async for item_info in self._walk_directory(path, max_depth):
await stream.send_message({
"type": "directory_item",
**item_info
})
await stream.send_message({
"type": "directory_complete"
}, is_final=True)
except Exception as e:
await stream.error(e)
async def _walk_directory(self, path: Path, max_depth: int, current_depth: int = 0) -> AsyncIterator[Dict[str, Any]]:
"""异步遍历目录"""
if current_depth >= max_depth:
return
try:
for item in path.iterdir():
try:
stat_info = item.stat()
item_info = {
"name": item.name,
"path": str(item),
"is_file": item.is_file(),
"is_dir": item.is_dir(),
"size": stat_info.st_size if item.is_file() else None,
"modified": stat_info.st_mtime,
"depth": current_depth
}
if item.is_file():
item_info["mime_type"] = mimetypes.guess_type(str(item))[0]
yield item_info
# 如果是目录,递归遍历
if item.is_dir():
async for sub_item in self._walk_directory(item, max_depth, current_depth + 1):
yield sub_item
except (PermissionError, OSError) as e:
yield {
"name": item.name,
"path": str(item),
"error": str(e),
"depth": current_depth
}
except (PermissionError, OSError) as e:
yield {
"path": str(path),
"error": str(e),
"depth": current_depth
}
🎯 本节小结
通过这一小节,你已经构建了一个完整的流式处理和实时通信系统:
✅ 核心流式框架:支持多种数据类型的流式处理
✅ Server-Sent Events:单向实时数据推送
✅ WebSocket通信:双向实时交互
✅ 文件流处理:大文件的流式读取和处理
✅ 流管理系统:完整的流生命周期管理
🚀 使用示例
python
# 初始化流管理器
stream_manager = StreamManager()
await stream_manager.initialize()
# 创建文本流
text_stream = await stream_manager.create_stream(
stream_type=StreamType.TEXT,
config=StreamConfig(buffer_size=500)
)
await text_stream.start()
# 设置SSE和WebSocket处理器
sse_handler = SSEHandler(stream_manager)
ws_handler = WebSocketHandler(stream_manager)
# 在FastAPI中设置路由
app = FastAPI()
sse_handler.setup_routes(app)
ws_handler.setup_routes(app)
# 流式处理文件
file_processor = FileStreamProcessor()
await file_processor.stream_file_content(
"large_file.txt",
text_stream
)
📊 性能优势
- 内存效率:流式处理避免大文件完全加载到内存
- 实时响应:WebSocket和SSE提供毫秒级的实时通信
- 高并发:支持数千个并发流连接
- 可扩展性:模块化设计支持自定义流处理器
现在你的MCP服务器具备了企业级的流式处理和实时通信能力!