5.5 多模态集成
🎯 学习目标:掌握MCP服务器的多模态数据处理能力
⏱️ 预计时间:45分钟
📊 难度等级:⭐⭐⭐⭐
🌈 多模态的定义与价值
多模态(Multimodal)是指能够处理和理解多种类型数据的能力,包括:
- 📝 文本:自然语言处理
- 🖼️ 图像:计算机视觉
- 🎵 音频:语音识别与生成
- 📹 视频:动态视觉分析
- 📊 结构化数据:表格、图表等
🏗️ MCP多模态架构设计
📊 多模态处理架构图
🔧 核心组件设计
组件 | 功能 | 技术栈 |
---|---|---|
模态识别器 | 自动识别输入数据类型 | Python-Magic, MIME检测 |
文本处理器 | NLP任务处理 | Transformers, spaCy |
图像处理器 | 图像分析与生成 | OpenCV, PIL, DALL-E API |
音频处理器 | 语音识别与合成 | Whisper, Azure Speech |
视频处理器 | 视频内容分析 | FFmpeg, MediaPipe |
结果融合器 | 多模态结果整合 | 自定义融合算法 |
💻 Python多模态MCP服务器实现
📦 依赖安装
bash
# requirements.txt
mcp>=1.0.0
pillow>=10.0.0
opencv-python>=4.8.0
librosa>=0.10.0
whisper>=1.0.0
transformers>=4.30.0
torch>=2.0.0
numpy>=1.24.0
requests>=2.31.0
python-magic>=0.4.27
moviepy>=1.0.3
azure-cognitiveservices-speech>=1.30.0
openai>=1.0.0
bash
# 安装依赖
pip install -r requirements.txt
# 安装系统依赖 (Ubuntu/Debian)
sudo apt-get install libmagic1 ffmpeg
# 安装系统依赖 (macOS)
brew install libmagic ffmpeg
# 安装系统依赖 (Windows)
# 下载并安装FFmpeg
🛠️ 核心服务器实现
python
# multimodal_server.py
import asyncio
import base64
import io
import json
import magic
import tempfile
import os
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import cv2
import librosa
import numpy as np
import whisper
from PIL import Image
import torch
from transformers import pipeline
import openai
from moviepy.editor import VideoFileClip
from mcp.server.fastmcp import FastMCP
from mcp.types import (
Resource, Tool, TextContent, ImageContent,
CallToolResult, ListResourcesResult, ListToolsResult,
ReadResourceResult
)
class MultimodalMCPServer:
"""多模态MCP服务器"""
def __init__(self):
self.app = FastMCP("Multimodal MCP Server")
self.setup_models()
self.setup_tools()
self.setup_resources()
def setup_models(self):
"""初始化AI模型"""
print("🤖 正在加载AI模型...")
# 文本分析模型
self.text_classifier = pipeline(
"text-classification",
model="cardiffnlp/twitter-roberta-base-sentiment-latest"
)
self.text_summarizer = pipeline(
"summarization",
model="facebook/bart-large-cnn"
)
# 图像分析模型
self.image_classifier = pipeline(
"image-classification",
model="google/vit-base-patch16-224"
)
self.image_captioner = pipeline(
"image-to-text",
model="Salesforce/blip-image-captioning-base"
)
# 语音识别模型
try:
self.whisper_model = whisper.load_model("base")
print("✅ Whisper模型加载成功")
except Exception as e:
print(f"⚠️ Whisper模型加载失败: {e}")
self.whisper_model = None
# OpenAI API配置 (需要设置环境变量)
openai.api_key = os.getenv("OPENAI_API_KEY")
print("✅ 模型初始化完成")
def setup_tools(self):
"""注册多模态工具"""
@self.app.tool()
async def analyze_content(
content: str,
content_type: str = "auto"
) -> List[TextContent]:
"""
分析多模态内容
Args:
content: Base64编码的内容或文本内容
content_type: 内容类型 (auto/text/image/audio/video)
"""
try:
if content_type == "auto":
content_type = self.detect_content_type(content)
result = await self.process_content(content, content_type)
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 内容分析失败: {str(e)}"
)]
@self.app.tool()
async def generate_image_description(
image_data: str,
detailed: bool = False
) -> List[TextContent]:
"""
生成图像描述
Args:
image_data: Base64编码的图像数据
detailed: 是否生成详细描述
"""
try:
# 解码图像
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# 基础图像分类
classification = self.image_classifier(image)
# 图像描述生成
caption = self.image_captioner(image)
# 详细分析 (如果需要)
if detailed:
analysis = await self.detailed_image_analysis(image)
else:
analysis = {}
result = {
"type": "image_analysis",
"classification": classification[:3], # 前3个结果
"caption": caption[0]["generated_text"],
"detailed_analysis": analysis,
"image_info": {
"size": image.size,
"mode": image.mode,
"format": image.format
}
}
return [TextContent(
type="text",
text=json.dumps(result, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 图像分析失败: {str(e)}"
)]
@self.app.tool()
async def transcribe_audio(
audio_data: str,
language: str = "auto"
) -> List[TextContent]:
"""
音频转文字
Args:
audio_data: Base64编码的音频数据
language: 语言代码 (auto/zh/en/ja等)
"""
try:
if not self.whisper_model:
raise Exception("Whisper模型未加载")
# 解码音频数据
audio_bytes = base64.b64decode(audio_data)
# 保存临时文件
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(audio_bytes)
tmp_path = tmp_file.name
try:
# 音频转录
if language == "auto":
result = self.whisper_model.transcribe(tmp_path)
else:
result = self.whisper_model.transcribe(tmp_path, language=language)
# 音频信息分析
audio_info = await self.analyze_audio_file(tmp_path)
response = {
"type": "audio_transcription",
"text": result["text"],
"language": result.get("language", "unknown"),
"segments": [
{
"start": seg["start"],
"end": seg["end"],
"text": seg["text"]
}
for seg in result.get("segments", [])
],
"audio_info": audio_info
}
return [TextContent(
type="text",
text=json.dumps(response, indent=2, ensure_ascii=False)
)]
finally:
# 清理临时文件
os.unlink(tmp_path)
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 音频转录失败: {str(e)}"
)]
@self.app.tool()
async def analyze_video(
video_data: str,
extract_frames: bool = True,
extract_audio: bool = True
) -> List[TextContent]:
"""
视频内容分析
Args:
video_data: Base64编码的视频数据
extract_frames: 是否提取关键帧
extract_audio: 是否提取音频
"""
try:
# 解码视频数据
video_bytes = base64.b64decode(video_data)
# 保存临时文件
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
tmp_file.write(video_bytes)
tmp_path = tmp_file.name
try:
analysis_result = {
"type": "video_analysis",
"metadata": {},
"frames": [],
"audio": None
}
# 视频元数据
with VideoFileClip(tmp_path) as video:
analysis_result["metadata"] = {
"duration": video.duration,
"fps": video.fps,
"size": video.size,
"has_audio": video.audio is not None
}
# 提取关键帧
if extract_frames:
frames = await self.extract_key_frames(video)
analysis_result["frames"] = frames
# 提取音频分析
if extract_audio and video.audio:
audio_analysis = await self.extract_video_audio(video)
analysis_result["audio"] = audio_analysis
return [TextContent(
type="text",
text=json.dumps(analysis_result, indent=2, ensure_ascii=False)
)]
finally:
# 清理临时文件
os.unlink(tmp_path)
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 视频分析失败: {str(e)}"
)]
@self.app.tool()
async def multimodal_search(
query: str,
modalities: List[str] = ["text", "image", "audio"],
limit: int = 10
) -> List[TextContent]:
"""
多模态内容搜索
Args:
query: 搜索查询
modalities: 搜索的模态类型
limit: 结果数量限制
"""
try:
search_results = {
"query": query,
"modalities": modalities,
"results": []
}
# 这里可以集成向量数据库进行相似性搜索
# 示例:使用模拟数据
for modality in modalities:
modal_results = await self.search_by_modality(query, modality, limit)
search_results["results"].extend(modal_results)
return [TextContent(
type="text",
text=json.dumps(search_results, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 多模态搜索失败: {str(e)}"
)]
@self.app.tool()
async def generate_multimodal_summary(
contents: List[Dict[str, Any]]
) -> List[TextContent]:
"""
生成多模态内容摘要
Args:
contents: 多模态内容列表,每个包含type和data字段
"""
try:
summary_parts = []
for content in contents:
content_type = content.get("type")
content_data = content.get("data")
if content_type == "text":
# 文本摘要
text_summary = self.text_summarizer(
content_data,
max_length=150,
min_length=50
)
summary_parts.append({
"type": "text",
"summary": text_summary[0]["summary_text"]
})
elif content_type == "image":
# 图像描述
image_bytes = base64.b64decode(content_data)
image = Image.open(io.BytesIO(image_bytes))
caption = self.image_captioner(image)
summary_parts.append({
"type": "image",
"summary": caption[0]["generated_text"]
})
elif content_type == "audio":
# 音频转录摘要
if self.whisper_model:
audio_bytes = base64.b64decode(content_data)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(audio_bytes)
result = self.whisper_model.transcribe(tmp_file.name)
os.unlink(tmp_file.name)
# 对转录文本进行摘要
if len(result["text"]) > 100:
text_summary = self.text_summarizer(
result["text"],
max_length=100,
min_length=30
)
summary_text = text_summary[0]["summary_text"]
else:
summary_text = result["text"]
summary_parts.append({
"type": "audio",
"summary": summary_text
})
final_summary = {
"type": "multimodal_summary",
"total_contents": len(contents),
"summaries": summary_parts,
"combined_summary": self.combine_summaries(summary_parts)
}
return [TextContent(
type="text",
text=json.dumps(final_summary, indent=2, ensure_ascii=False)
)]
except Exception as e:
return [TextContent(
type="text",
text=f"❌ 多模态摘要生成失败: {str(e)}"
)]
def setup_resources(self):
"""注册资源"""
@self.app.list_resources()
async def list_resources() -> ListResourcesResult:
"""列出可用的多模态资源"""
return ListResourcesResult(
resources=[
Resource(
uri="multimodal://models/info",
name="AI模型信息",
mimeType="application/json",
description="当前加载的AI模型信息"
),
Resource(
uri="multimodal://capabilities",
name="多模态能力",
mimeType="application/json",
description="支持的多模态处理能力"
),
Resource(
uri="multimodal://examples",
name="使用示例",
mimeType="application/json",
description="多模态工具使用示例"
)
]
)
@self.app.read_resource()
async def read_resource(uri: str) -> ReadResourceResult:
"""读取多模态资源"""
if uri == "multimodal://models/info":
model_info = {
"text_models": {
"classifier": "cardiffnlp/twitter-roberta-base-sentiment-latest",
"summarizer": "facebook/bart-large-cnn"
},
"image_models": {
"classifier": "google/vit-base-patch16-224",
"captioner": "Salesforce/blip-image-captioning-base"
},
"audio_models": {
"transcriber": "whisper-base" if self.whisper_model else "unavailable"
},
"status": "loaded"
}
return ReadResourceResult(
contents=[TextContent(
type="text",
text=json.dumps(model_info, indent=2, ensure_ascii=False)
)]
)
elif uri == "multimodal://capabilities":
capabilities = {
"supported_modalities": ["text", "image", "audio", "video"],
"text_capabilities": ["classification", "summarization", "sentiment_analysis"],
"image_capabilities": ["classification", "captioning", "object_detection"],
"audio_capabilities": ["transcription", "speech_recognition"],
"video_capabilities": ["frame_extraction", "audio_extraction", "metadata_analysis"],
"integration_features": ["multimodal_search", "content_fusion", "cross_modal_analysis"]
}
return ReadResourceResult(
contents=[TextContent(
type="text",
text=json.dumps(capabilities, indent=2, ensure_ascii=False)
)]
)
elif uri == "multimodal://examples":
examples = {
"text_analysis": {
"tool": "analyze_content",
"example": {
"content": "今天天气真好,心情愉快!",
"content_type": "text"
}
},
"image_analysis": {
"tool": "generate_image_description",
"example": {
"image_data": "<base64_encoded_image>",
"detailed": True
}
},
"audio_transcription": {
"tool": "transcribe_audio",
"example": {
"audio_data": "<base64_encoded_audio>",
"language": "zh"
}
},
"video_analysis": {
"tool": "analyze_video",
"example": {
"video_data": "<base64_encoded_video>",
"extract_frames": True,
"extract_audio": True
}
}
}
return ReadResourceResult(
contents=[TextContent(
type="text",
text=json.dumps(examples, indent=2, ensure_ascii=False)
)]
)
else:
raise ValueError(f"未知资源: {uri}")
# 辅助方法
def detect_content_type(self, content: str) -> str:
"""自动检测内容类型"""
try:
# 尝试解码base64
decoded = base64.b64decode(content)
mime_type = magic.from_buffer(decoded, mime=True)
if mime_type.startswith('image/'):
return 'image'
elif mime_type.startswith('audio/'):
return 'audio'
elif mime_type.startswith('video/'):
return 'video'
else:
return 'text'
except:
# 如果不是base64,则认为是文本
return 'text'
async def process_content(self, content: str, content_type: str) -> Dict[str, Any]:
"""处理多模态内容"""
if content_type == 'text':
return await self.process_text(content)
elif content_type == 'image':
return await self.process_image(content)
elif content_type == 'audio':
return await self.process_audio(content)
elif content_type == 'video':
return await self.process_video(content)
else:
raise ValueError(f"不支持的内容类型: {content_type}")
async def process_text(self, text: str) -> Dict[str, Any]:
"""处理文本内容"""
# 情感分析
sentiment = self.text_classifier(text)
# 文本摘要(如果文本较长)
summary = None
if len(text) > 200:
summary_result = self.text_summarizer(text, max_length=100, min_length=30)
summary = summary_result[0]["summary_text"]
return {
"type": "text_analysis",
"sentiment": sentiment[0],
"summary": summary,
"length": len(text),
"word_count": len(text.split())
}
async def process_image(self, image_data: str) -> Dict[str, Any]:
"""处理图像内容"""
image_bytes = base64.b64decode(image_data)
image = Image.open(io.BytesIO(image_bytes))
# 图像分类
classification = self.image_classifier(image)
# 图像描述
caption = self.image_captioner(image)
return {
"type": "image_analysis",
"classification": classification[:3],
"caption": caption[0]["generated_text"],
"size": image.size,
"mode": image.mode
}
async def process_audio(self, audio_data: str) -> Dict[str, Any]:
"""处理音频内容"""
if not self.whisper_model:
return {"type": "audio_analysis", "error": "Whisper模型未加载"}
audio_bytes = base64.b64decode(audio_data)
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
tmp_file.write(audio_bytes)
tmp_path = tmp_file.name
try:
# 音频转录
result = self.whisper_model.transcribe(tmp_path)
# 音频特征分析
y, sr = librosa.load(tmp_path)
duration = librosa.get_duration(y=y, sr=sr)
return {
"type": "audio_analysis",
"transcription": result["text"],
"language": result.get("language", "unknown"),
"duration": duration,
"sample_rate": sr
}
finally:
os.unlink(tmp_path)
async def process_video(self, video_data: str) -> Dict[str, Any]:
"""处理视频内容"""
video_bytes = base64.b64decode(video_data)
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as tmp_file:
tmp_file.write(video_bytes)
tmp_path = tmp_file.name
try:
with VideoFileClip(tmp_path) as video:
return {
"type": "video_analysis",
"duration": video.duration,
"fps": video.fps,
"size": video.size,
"has_audio": video.audio is not None
}
finally:
os.unlink(tmp_path)
async def detailed_image_analysis(self, image: Image.Image) -> Dict[str, Any]:
"""详细图像分析"""
# 转换为OpenCV格式
opencv_image = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR)
# 图像统计信息
height, width = opencv_image.shape[:2]
# 颜色分析
mean_color = np.mean(opencv_image, axis=(0, 1))
# 边缘检测
gray = cv2.cvtColor(opencv_image, cv2.COLOR_BGR2GRAY)
edges = cv2.Canny(gray, 100, 200)
edge_ratio = np.sum(edges > 0) / (height * width)
return {
"dimensions": {"width": width, "height": height},
"mean_color": {"b": float(mean_color[0]), "g": float(mean_color[1]), "r": float(mean_color[2])},
"edge_density": float(edge_ratio),
"brightness": float(np.mean(gray))
}
async def analyze_audio_file(self, file_path: str) -> Dict[str, Any]:
"""分析音频文件"""
y, sr = librosa.load(file_path)
# 音频特征提取
duration = librosa.get_duration(y=y, sr=sr)
tempo, _ = librosa.beat.beat_track(y=y, sr=sr)
spectral_centroids = librosa.feature.spectral_centroid(y=y, sr=sr)
return {
"duration": float(duration),
"sample_rate": int(sr),
"tempo": float(tempo),
"spectral_centroid_mean": float(np.mean(spectral_centroids)),
"rms_energy": float(np.mean(librosa.feature.rms(y=y)))
}
async def extract_key_frames(self, video: VideoFileClip) -> List[Dict[str, Any]]:
"""提取视频关键帧"""
frames = []
duration = video.duration
# 每10秒提取一帧
for t in range(0, int(duration), 10):
frame = video.get_frame(t)
# 转换为PIL Image
pil_image = Image.fromarray(frame)
# 生成描述
caption = self.image_captioner(pil_image)
frames.append({
"timestamp": t,
"description": caption[0]["generated_text"]
})
return frames
async def extract_video_audio(self, video: VideoFileClip) -> Dict[str, Any]:
"""提取视频音频并分析"""
if not video.audio or not self.whisper_model:
return None
# 提取音频到临时文件
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
video.audio.write_audiofile(tmp_file.name, verbose=False, logger=None)
try:
# 转录音频
result = self.whisper_model.transcribe(tmp_file.name)
return {
"transcription": result["text"],
"language": result.get("language", "unknown"),
"segments": len(result.get("segments", []))
}
finally:
os.unlink(tmp_file.name)
async def search_by_modality(self, query: str, modality: str, limit: int) -> List[Dict[str, Any]]:
"""按模态类型搜索(模拟实现)"""
# 这里应该连接到实际的向量数据库或搜索引擎
# 示例返回模拟数据
mock_results = []
for i in range(min(limit, 3)):
mock_results.append({
"id": f"{modality}_{i}",
"type": modality,
"relevance": 0.9 - i * 0.1,
"title": f"{modality.title()} Result {i+1}",
"description": f"Mock {modality} content related to '{query}'"
})
return mock_results
def combine_summaries(self, summary_parts: List[Dict[str, Any]]) -> str:
"""合并多模态摘要"""
combined = "多模态内容摘要:\n\n"
for i, part in enumerate(summary_parts, 1):
modality = part["type"]
summary = part["summary"]
combined += f"{i}. {modality.title()}内容: {summary}\n"
return combined
async def run(self, transport_type: str = "stdio"):
"""启动服务器"""
if transport_type == "stdio":
from mcp.server.stdio import stdio_server
async with stdio_server() as (read_stream, write_stream):
await self.app.run(read_stream, write_stream, self.app.create_initialization_options())
else:
raise ValueError(f"不支持的传输类型: {transport_type}")
# 启动服务器
async def main():
print("🚀 启动多模态MCP服务器...")
server = MultimodalMCPServer()
await server.run()
if __name__ == "__main__":
asyncio.run(main())
🧪 多模态客户端测试
📝 Python测试客户端
python
# multimodal_client_test.py
import asyncio
import base64
import json
from pathlib import Path
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
class MultimodalClientTest:
def __init__(self):
self.session = None
self.client = None
async def connect(self):
"""连接到多模态MCP服务器"""
try:
server_params = StdioServerParameters(
command="python",
args=["multimodal_server.py"]
)
self.client = stdio_client(server_params)
self.session = await self.client.__aenter__()
await self.session.initialize()
print("✅ 成功连接到多模态MCP服务器")
return True
except Exception as e:
print(f"❌ 连接失败: {e}")
return False
async def test_text_analysis(self):
"""测试文本分析"""
print("\n🔤 测试文本分析...")
test_text = "今天阳光明媚,心情特别好!准备去公园散步,享受这美好的时光。"
result = await self.session.call_tool(
"analyze_content",
{"content": test_text, "content_type": "text"}
)
print("📊 文本分析结果:")
print(result.content[0].text)
async def test_image_analysis(self):
"""测试图像分析"""
print("\n🖼️ 测试图像分析...")
# 加载测试图像
image_path = Path("test_image.jpg")
if not image_path.exists():
print("⚠️ 未找到测试图像 test_image.jpg,跳过图像测试")
return
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()
result = await self.session.call_tool(
"generate_image_description",
{"image_data": image_data, "detailed": True}
)
print("📊 图像分析结果:")
print(result.content[0].text)
async def test_audio_transcription(self):
"""测试音频转录"""
print("\n🎵 测试音频转录...")
# 加载测试音频
audio_path = Path("test_audio.wav")
if not audio_path.exists():
print("⚠️ 未找到测试音频 test_audio.wav,跳过音频测试")
return
with open(audio_path, "rb") as f:
audio_data = base64.b64encode(f.read()).decode()
result = await self.session.call_tool(
"transcribe_audio",
{"audio_data": audio_data, "language": "zh"}
)
print("📊 音频转录结果:")
print(result.content[0].text)
async def test_multimodal_summary(self):
"""测试多模态摘要"""
print("\n📋 测试多模态摘要...")
# 准备多模态内容
contents = [
{
"type": "text",
"data": "人工智能正在改变我们的生活方式。从智能手机到自动驾驶汽车,AI技术无处不在。"
}
]
# 如果有图像文件,添加到内容中
image_path = Path("test_image.jpg")
if image_path.exists():
with open(image_path, "rb") as f:
image_data = base64.b64encode(f.read()).decode()
contents.append({
"type": "image",
"data": image_data
})
result = await self.session.call_tool(
"generate_multimodal_summary",
{"contents": contents}
)
print("📊 多模态摘要结果:")
print(result.content[0].text)
async def test_multimodal_search(self):
"""测试多模态搜索"""
print("\n🔍 测试多模态搜索...")
result = await self.session.call_tool(
"multimodal_search",
{
"query": "美丽的风景",
"modalities": ["text", "image"],
"limit": 5
}
)
print("📊 多模态搜索结果:")
print(result.content[0].text)
async def list_resources(self):
"""列出可用资源"""
print("\n📁 可用资源:")
resources = await self.session.list_resources()
for resource in resources.resources:
print(f" - {resource.name}: {resource.description}")
async def run_comprehensive_test(self):
"""运行综合测试"""
if not await self.connect():
return
try:
print("🧪 开始多模态MCP服务器综合测试")
print("=" * 50)
# 列出资源
await self.list_resources()
# 运行各项测试
await self.test_text_analysis()
await self.test_image_analysis()
await self.test_audio_transcription()
await self.test_multimodal_summary()
await self.test_multimodal_search()
print("\n✅ 所有测试完成!")
finally:
if self.client:
await self.client.__aexit__(None, None, None)
# 运行测试
async def main():
test_client = MultimodalClientTest()
await test_client.run_comprehensive_test()
if __name__ == "__main__":
asyncio.run(main())
🚀 Docker部署配置
📦 Dockerfile
dockerfile
# Dockerfile
FROM python:3.11-slim
# 安装系统依赖
RUN apt-get update && apt-get install -y \
libmagic1 \
ffmpeg \
libsndfile1 \
&& rm -rf /var/lib/apt/lists/*
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口(如果使用HTTP传输)
EXPOSE 8000
# 启动命令
CMD ["python", "multimodal_server.py"]
🐳 Docker Compose配置
yaml
# docker-compose.yml
version: '3.8'
services:
multimodal-mcp:
build: .
container_name: multimodal-mcp-server
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- AZURE_SPEECH_KEY=${AZURE_SPEECH_KEY}
- AZURE_SPEECH_REGION=${AZURE_SPEECH_REGION}
volumes:
- ./models:/app/models
- ./data:/app/data
- ./logs:/app/logs
ports:
- "8000:8000"
restart: unless-stopped
healthcheck:
test: ["CMD", "python", "-c", "import requests; requests.get('http://localhost:8000/health')"]
interval: 30s
timeout: 10s
retries: 3
# Redis缓存(可选)
redis:
image: redis:7-alpine
container_name: multimodal-redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
restart: unless-stopped
# PostgreSQL数据库(可选)
postgres:
image: postgres:15-alpine
container_name: multimodal-postgres
environment:
- POSTGRES_DB=multimodal_mcp
- POSTGRES_USER=mcp_user
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
restart: unless-stopped
volumes:
redis_data:
postgres_data:
🎯 性能优化建议
🚀 模型优化
python
# model_optimizer.py
import torch
from transformers import pipeline
import onnx
import onnxruntime as ort
class ModelOptimizer:
"""AI模型优化器"""
@staticmethod
def optimize_pytorch_model(model_name: str, task: str):
"""优化PyTorch模型"""
# 加载模型
model = pipeline(task, model=model_name)
# 转换为ONNX格式
dummy_input = torch.randn(1, 512) # 根据实际输入调整
torch.onnx.export(
model.model,
dummy_input,
f"{model_name.replace('/', '_')}.onnx",
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output']
)
return f"{model_name.replace('/', '_')}.onnx"
@staticmethod
def create_onnx_session(onnx_path: str):
"""创建ONNX推理会话"""
providers = ['CPUExecutionProvider']
# 如果有GPU,使用CUDA
if torch.cuda.is_available():
providers.insert(0, 'CUDAExecutionProvider')
session = ort.InferenceSession(onnx_path, providers=providers)
return session
# 缓存策略
import redis
import pickle
import hashlib
class MultimodalCache:
"""多模态内容缓存"""
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis_client = redis.Redis(host=redis_host, port=redis_port)
self.ttl = 3600 # 1小时缓存
def get_cache_key(self, content_type: str, content_hash: str) -> str:
"""生成缓存键"""
return f"multimodal:{content_type}:{content_hash}"
def get_content_hash(self, content: bytes) -> str:
"""计算内容哈希"""
return hashlib.sha256(content).hexdigest()[:16]
async def get_cached_result(self, content_type: str, content: bytes):
"""获取缓存结果"""
content_hash = self.get_content_hash(content)
cache_key = self.get_cache_key(content_type, content_hash)
cached_data = self.redis_client.get(cache_key)
if cached_data:
return pickle.loads(cached_data)
return None
async def cache_result(self, content_type: str, content: bytes, result: dict):
"""缓存结果"""
content_hash = self.get_content_hash(content)
cache_key = self.get_cache_key(content_type, content_hash)
serialized_result = pickle.dumps(result)
self.redis_client.setex(cache_key, self.ttl, serialized_result)
🔐 企业级安全增强
python
# security_manager.py
import jwt
import bcrypt
from datetime import datetime, timedelta
from typing import Dict, Optional
import logging
class MultimodalSecurityManager:
"""多模态内容安全管理器"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
self.logger = logging.getLogger(__name__)
def validate_content_type(self, content: bytes, expected_type: str) -> bool:
"""验证内容类型"""
import magic
detected_type = magic.from_buffer(content, mime=True)
type_mapping = {
'image': ['image/jpeg', 'image/png', 'image/gif', 'image/webp'],
'audio': ['audio/wav', 'audio/mp3', 'audio/ogg', 'audio/m4a'],
'video': ['video/mp4', 'video/avi', 'video/mov', 'video/mkv']
}
if expected_type in type_mapping:
return detected_type in type_mapping[expected_type]
return True
def scan_for_malicious_content(self, content: bytes, content_type: str) -> Dict[str, Any]:
"""恶意内容扫描"""
scan_result = {
"is_safe": True,
"threats": [],
"confidence": 1.0
}
# 文件大小检查
max_sizes = {
'image': 10 * 1024 * 1024, # 10MB
'audio': 50 * 1024 * 1024, # 50MB
'video': 100 * 1024 * 1024 # 100MB
}
if len(content) > max_sizes.get(content_type, 5 * 1024 * 1024):
scan_result["threats"].append("文件过大")
scan_result["is_safe"] = False
# 这里可以集成更多安全扫描逻辑
# 例如:病毒扫描、内容审核等
return scan_result
def generate_access_token(self, user_id: str, permissions: List[str]) -> str:
"""生成访问令牌"""
payload = {
"user_id": user_id,
"permissions": permissions,
"exp": datetime.utcnow() + timedelta(hours=24),
"iat": datetime.utcnow()
}
return jwt.encode(payload, self.secret_key, algorithm="HS256")
def verify_access_token(self, token: str) -> Optional[Dict[str, Any]]:
"""验证访问令牌"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
self.logger.warning("令牌已过期")
return None
except jwt.InvalidTokenError:
self.logger.warning("无效令牌")
return None
🎯 本节小结
通过本节学习,你已经掌握了:
✅ 多模态架构:理解多模态MCP服务器的设计原理
✅ AI模型集成:文本、图像、音频、视频处理能力
✅ 实际应用开发:完整的多模态服务器实现
✅ 性能优化:模型优化和缓存策略
✅ 安全防护:企业级安全管理机制
✅ 部署配置:Docker容器化部署
🤔 思考题
- 模态融合:如何设计一个算法来融合不同模态的分析结果?
- 实时处理:对于大型视频文件,如何实现流式处理?
- 模型选择:在资源受限的环境中,如何平衡模型性能和资源消耗?
🔗 相关资源
恭喜! 🎉 你已经掌握了MCP多模态集成开发。这为构建更智能、更全面的AI应用奠定了基础。