8.6 性能优化技巧 ⚡
"性能就像功夫,不是一朝一夕能练成的,但掌握了正确的方法,就能事半功倍。"
想象一下,你的MCP服务器就像是一家餐厅。如果菜品制作速度太慢,客人就会不耐烦地离开。性能优化就是让你的"厨师"(服务器)更快地"做菜"(处理请求),让"客人"(用户)满意。
缓存策略:让数据飞起来 🚀
多层缓存架构
缓存就像是把常用的东西放在手边,不用每次都跑到仓库去拿。
python
# 🌟 多层缓存系统实现
import asyncio
import json
import hashlib
from typing import Any, Dict, Optional, Union, Callable
from dataclasses import dataclass
from datetime import datetime, timedelta
import redis.asyncio as redis
from abc import ABC, abstractmethod
@dataclass
class CacheEntry:
"""缓存条目"""
data: Any
created_at: datetime
expires_at: Optional[datetime] = None
access_count: int = 0
last_accessed: datetime = None
def is_expired(self) -> bool:
if self.expires_at is None:
return False
return datetime.now() > self.expires_at
def touch(self):
"""更新访问时间和次数"""
self.last_accessed = datetime.now()
self.access_count += 1
class CacheLayer(ABC):
"""缓存层抽象基类"""
@abstractmethod
async def get(self, key: str) -> Optional[Any]:
pass
@abstractmethod
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
pass
@abstractmethod
async def delete(self, key: str) -> bool:
pass
@abstractmethod
async def clear(self) -> bool:
pass
class MemoryCache(CacheLayer):
"""内存缓存层 - L1缓存"""
def __init__(self, max_size: int = 1000, default_ttl: int = 300):
self.max_size = max_size
self.default_ttl = default_ttl
self.cache: Dict[str, CacheEntry] = {}
self.access_order = [] # LRU跟踪
async def get(self, key: str) -> Optional[Any]:
if key not in self.cache:
return None
entry = self.cache[key]
# 检查是否过期
if entry.is_expired():
await self.delete(key)
return None
# 更新访问信息
entry.touch()
# 更新LRU顺序
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
return entry.data
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
# 确保缓存大小不超限
await self._ensure_capacity()
ttl_seconds = ttl or self.default_ttl
expires_at = datetime.now() + timedelta(seconds=ttl_seconds) if ttl_seconds > 0 else None
entry = CacheEntry(
data=value,
created_at=datetime.now(),
expires_at=expires_at
)
self.cache[key] = entry
# 更新访问顺序
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
return True
async def delete(self, key: str) -> bool:
if key in self.cache:
del self.cache[key]
if key in self.access_order:
self.access_order.remove(key)
return True
return False
async def clear(self) -> bool:
self.cache.clear()
self.access_order.clear()
return True
async def _ensure_capacity(self):
"""确保缓存容量不超限,使用LRU策略清理"""
while len(self.cache) >= self.max_size:
if not self.access_order:
break
# 删除最少使用的条目
lru_key = self.access_order.pop(0)
if lru_key in self.cache:
del self.cache[lru_key]
def get_stats(self) -> Dict[str, Any]:
"""获取缓存统计信息"""
total_entries = len(self.cache)
expired_count = sum(1 for entry in self.cache.values() if entry.is_expired())
return {
'total_entries': total_entries,
'active_entries': total_entries - expired_count,
'expired_entries': expired_count,
'cache_utilization': total_entries / self.max_size,
'memory_layer': 'L1'
}
class RedisCache(CacheLayer):
"""Redis缓存层 - L2缓存"""
def __init__(self, redis_url: str = "redis://localhost:6379", prefix: str = "mcp:"):
self.redis_url = redis_url
self.prefix = prefix
self.redis_client: Optional[redis.Redis] = None
async def connect(self):
"""建立Redis连接"""
if self.redis_client is None:
self.redis_client = redis.from_url(self.redis_url)
async def disconnect(self):
"""关闭Redis连接"""
if self.redis_client:
await self.redis_client.close()
def _make_key(self, key: str) -> str:
"""生成带前缀的键"""
return f"{self.prefix}{key}"
async def get(self, key: str) -> Optional[Any]:
await self.connect()
redis_key = self._make_key(key)
data = await self.redis_client.get(redis_key)
if data is None:
return None
try:
# 反序列化数据
return json.loads(data.decode('utf-8'))
except (json.JSONDecodeError, UnicodeDecodeError):
# 数据损坏,删除
await self.delete(key)
return None
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
await self.connect()
redis_key = self._make_key(key)
try:
# 序列化数据
serialized_data = json.dumps(value, ensure_ascii=False)
# 存储到Redis
if ttl and ttl > 0:
result = await self.redis_client.setex(redis_key, ttl, serialized_data)
else:
result = await self.redis_client.set(redis_key, serialized_data)
return bool(result)
except (json.JSONEncodeError, TypeError) as e:
print(f"Redis序列化错误: {str(e)}")
return False
async def delete(self, key: str) -> bool:
await self.connect()
redis_key = self._make_key(key)
deleted_count = await self.redis_client.delete(redis_key)
return deleted_count > 0
async def clear(self) -> bool:
await self.connect()
# 删除所有带前缀的键
pattern = f"{self.prefix}*"
keys = await self.redis_client.keys(pattern)
if keys:
deleted_count = await self.redis_client.delete(*keys)
return deleted_count > 0
return True
class HybridCache:
"""混合缓存系统 - 结合L1和L2缓存"""
def __init__(
self,
l1_cache: MemoryCache,
l2_cache: RedisCache,
l1_hit_promote: bool = True # L1命中时是否延长L2的TTL
):
self.l1 = l1_cache
self.l2 = l2_cache
self.l1_hit_promote = l1_hit_promote
self.stats = {
'l1_hits': 0,
'l2_hits': 0,
'misses': 0,
'total_requests': 0
}
async def get(self, key: str) -> Optional[Any]:
"""多层缓存获取"""
self.stats['total_requests'] += 1
# 先从L1缓存获取
l1_result = await self.l1.get(key)
if l1_result is not None:
self.stats['l1_hits'] += 1
# 如果设置了L1命中提升,延长L2的TTL
if self.l1_hit_promote:
await self.l2.set(key, l1_result, ttl=self.l1.default_ttl)
return l1_result
# L1缓存未命中,尝试L2缓存
l2_result = await self.l2.get(key)
if l2_result is not None:
self.stats['l2_hits'] += 1
# 将L2的数据提升到L1
await self.l1.set(key, l2_result)
return l2_result
# 完全未命中
self.stats['misses'] += 1
return None
async def set(self, key: str, value: Any, l1_ttl: Optional[int] = None, l2_ttl: Optional[int] = None) -> bool:
"""同时设置L1和L2缓存"""
# 并发设置两层缓存
l1_task = self.l1.set(key, value, l1_ttl)
l2_task = self.l2.set(key, value, l2_ttl)
l1_result, l2_result = await asyncio.gather(l1_task, l2_task, return_exceptions=True)
# 至少有一个成功就算成功
return (l1_result is True) or (l2_result is True)
async def delete(self, key: str) -> bool:
"""同时删除L1和L2缓存"""
l1_task = self.l1.delete(key)
l2_task = self.l2.delete(key)
l1_result, l2_result = await asyncio.gather(l1_task, l2_task, return_exceptions=True)
return (l1_result is True) or (l2_result is True)
async def clear(self) -> bool:
"""清空所有缓存层"""
l1_task = self.l1.clear()
l2_task = self.l2.clear()
await asyncio.gather(l1_task, l2_task, return_exceptions=True)
# 重置统计
self.stats = {
'l1_hits': 0,
'l2_hits': 0,
'misses': 0,
'total_requests': 0
}
return True
def get_cache_stats(self) -> Dict[str, Any]:
"""获取缓存命中率统计"""
total = self.stats['total_requests']
if total == 0:
return {
'l1_hit_rate': 0.0,
'l2_hit_rate': 0.0,
'overall_hit_rate': 0.0,
**self.stats
}
l1_hit_rate = self.stats['l1_hits'] / total
l2_hit_rate = self.stats['l2_hits'] / total
overall_hit_rate = (self.stats['l1_hits'] + self.stats['l2_hits']) / total
return {
'l1_hit_rate': l1_hit_rate,
'l2_hit_rate': l2_hit_rate,
'overall_hit_rate': overall_hit_rate,
**self.stats,
'l1_stats': self.l1.get_stats()
}
# 智能缓存装饰器
def cached(
key_template: str, # 键模板,如 "weather:{location}:{days}"
l1_ttl: int = 300, # L1缓存TTL(秒)
l2_ttl: int = 3600, # L2缓存TTL(秒)
cache_instance: Optional[HybridCache] = None
):
"""智能缓存装饰器"""
def decorator(func: Callable) -> Callable:
async def wrapper(*args, **kwargs):
# 获取缓存实例
cache = cache_instance or get_default_cache()
# 生成缓存键
cache_key = generate_cache_key(key_template, func, *args, **kwargs)
# 尝试从缓存获取
cached_result = await cache.get(cache_key)
if cached_result is not None:
return cached_result
# 缓存未命中,执行原函数
result = await func(*args, **kwargs)
# 将结果存入缓存
if result is not None: # 只缓存非None结果
await cache.set(cache_key, result, l1_ttl, l2_ttl)
return result
return wrapper
return decorator
def generate_cache_key(template: str, func: Callable, *args, **kwargs) -> str:
"""生成缓存键"""
# 简单实现:使用函数名+参数哈希
import inspect
# 获取函数参数名
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
# 生成参数字符串
param_str = json.dumps(dict(bound_args.arguments), sort_keys=True, ensure_ascii=False)
param_hash = hashlib.md5(param_str.encode()).hexdigest()[:8]
# 替换模板中的占位符
key = template.format(**bound_args.arguments)
return f"{func.__name__}:{key}:{param_hash}"
# 全局缓存实例
_default_cache: Optional[HybridCache] = None
def get_default_cache() -> HybridCache:
"""获取默认缓存实例"""
global _default_cache
if _default_cache is None:
l1_cache = MemoryCache(max_size=1000, default_ttl=300)
l2_cache = RedisCache()
_default_cache = HybridCache(l1_cache, l2_cache)
return _default_cache
# 使用示例
class WeatherService:
"""天气服务 - 演示缓存使用"""
def __init__(self):
self.api_call_count = 0
@cached(
key_template="weather:{location}:{days}",
l1_ttl=300, # L1缓存5分钟
l2_ttl=1800 # L2缓存30分钟
)
async def get_weather(self, location: str, days: int = 1) -> Dict[str, Any]:
"""获取天气信息(带缓存)"""
# 模拟API调用耗时
self.api_call_count += 1
print(f"🌐 调用天气API(第{self.api_call_count}次): {location}, {days}天")
await asyncio.sleep(0.5) # 模拟网络延迟
# 模拟返回数据
return {
'location': location,
'days': days,
'temperature': 25,
'description': '晴天',
'forecast': [
{'date': '2024-03-15', 'temp': 25, 'desc': '晴天'},
{'date': '2024-03-16', 'temp': 23, 'desc': '多云'}
][:days],
'fetched_at': datetime.now().isoformat()
}
async def get_weather_no_cache(self, location: str, days: int = 1) -> Dict[str, Any]:
"""获取天气信息(无缓存)"""
self.api_call_count += 1
print(f"🌐 直接调用天气API(第{self.api_call_count}次): {location}, {days}天")
await asyncio.sleep(0.5)
return {
'location': location,
'days': days,
'temperature': 25,
'description': '晴天',
'fetched_at': datetime.now().isoformat()
}
# 性能测试示例
async def cache_performance_test():
"""缓存性能测试"""
weather_service = WeatherService()
cache = get_default_cache()
print("🧪 开始缓存性能测试...")
# 测试场景1:相同请求多次调用
print("\n📊 场景1:相同请求多次调用")
start_time = asyncio.get_event_loop().time()
for i in range(5):
result = await weather_service.get_weather("北京", 3)
print(f"第{i+1}次请求完成,温度:{result['temperature']}°C")
cached_time = asyncio.get_event_loop().time() - start_time
print(f"有缓存总耗时:{cached_time:.2f}秒")
# 清空API调用计数器
weather_service.api_call_count = 0
# 对比无缓存的情况
print("\n📊 对比:无缓存情况")
start_time = asyncio.get_event_loop().time()
for i in range(5):
result = await weather_service.get_weather_no_cache("北京", 3)
print(f"第{i+1}次请求完成,温度:{result['temperature']}°C")
no_cache_time = asyncio.get_event_loop().time() - start_time
print(f"无缓存总耗时:{no_cache_time:.2f}秒")
improvement = (no_cache_time - cached_time) / no_cache_time * 100
print(f"🚀 性能提升:{improvement:.1f}%")
# 显示缓存统计
print("\n📈 缓存统计:")
stats = cache.get_cache_stats()
for key, value in stats.items():
if isinstance(value, float):
print(f" {key}: {value:.2%}")
else:
print(f" {key}: {value}")
# 运行测试
if __name__ == "__main__":
asyncio.run(cache_performance_test())
异步处理:并发的艺术 🎭
高效的异步编程模式
typescript
// 🌟 异步处理最佳实践
import { EventEmitter } from 'events';
class AsyncTaskManager extends EventEmitter {
private runningTasks = new Map<string, Promise<any>>();
private taskQueue: Array<QueuedTask> = [];
private maxConcurrency: number;
private currentConcurrency = 0;
constructor(maxConcurrency: number = 10) {
super();
this.maxConcurrency = maxConcurrency;
}
async executeTask<T>(
taskId: string,
taskFn: () => Promise<T>,
options: TaskOptions = {}
): Promise<T> {
const {
priority = 5,
timeout = 30000,
retries = 3,
deduplicate = true
} = options;
// 任务去重:如果相同任务正在执行,直接返回现有Promise
if (deduplicate && this.runningTasks.has(taskId)) {
console.log(`⚡ 任务去重:${taskId}`);
return this.runningTasks.get(taskId) as Promise<T>;
}
// 创建任务Promise
const taskPromise = this.createTaskPromise(taskId, taskFn, { timeout, retries });
// 如果当前并发数未达上限,立即执行
if (this.currentConcurrency < this.maxConcurrency) {
return this.startTask(taskId, taskPromise);
}
// 否则加入队列等待
return new Promise((resolve, reject) => {
this.taskQueue.push({
taskId,
taskPromise,
priority,
resolve,
reject,
createdAt: Date.now()
});
// 按优先级排序(高优先级在前)
this.taskQueue.sort((a, b) => b.priority - a.priority);
console.log(`📋 任务加入队列:${taskId},队列长度:${this.taskQueue.length}`);
});
}
private async startTask<T>(taskId: string, taskPromise: Promise<T>): Promise<T> {
this.currentConcurrency++;
this.runningTasks.set(taskId, taskPromise);
console.log(`🚀 开始执行任务:${taskId},当前并发:${this.currentConcurrency}`);
this.emit('taskStarted', { taskId, concurrency: this.currentConcurrency });
try {
const result = await taskPromise;
console.log(`✅ 任务完成:${taskId}`);
this.emit('taskCompleted', { taskId, result });
return result;
} catch (error) {
console.error(`❌ 任务失败:${taskId}`, error);
this.emit('taskFailed', { taskId, error });
throw error;
} finally {
// 清理任务
this.runningTasks.delete(taskId);
this.currentConcurrency--;
// 处理下一个队列中的任务
this.processQueue();
}
}
private processQueue() {
if (this.taskQueue.length === 0 || this.currentConcurrency >= this.maxConcurrency) {
return;
}
const queuedTask = this.taskQueue.shift()!;
// 启动排队的任务
this.startTask(queuedTask.taskId, queuedTask.taskPromise)
.then(queuedTask.resolve)
.catch(queuedTask.reject);
}
private async createTaskPromise<T>(
taskId: string,
taskFn: () => Promise<T>,
options: { timeout: number; retries: number }
): Promise<T> {
const { timeout, retries } = options;
for (let attempt = 1; attempt <= retries; attempt++) {
try {
// 添加超时控制
const result = await Promise.race([
taskFn(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`任务超时:${taskId}`)), timeout)
)
]);
return result;
} catch (error) {
console.warn(`🔄 任务重试:${taskId},第${attempt}/${retries}次`);
if (attempt === retries) {
throw error;
}
// 指数退避延迟
const delay = Math.min(1000 * Math.pow(2, attempt - 1), 10000);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
throw new Error(`任务执行失败:${taskId}`);
}
// 批量执行任务
async executeBatch<T>(
tasks: Array<{ id: string; fn: () => Promise<T>; options?: TaskOptions }>,
batchOptions: {
failFast?: boolean; // 遇到错误是否立即停止
collectResults?: boolean; // 是否收集所有结果
} = {}
): Promise<T[]> {
const { failFast = false, collectResults = true } = batchOptions;
console.log(`🔀 批量执行任务:${tasks.length}个`);
const taskPromises = tasks.map(task =>
this.executeTask(task.id, task.fn, task.options)
);
if (failFast) {
// 快速失败模式:任何一个失败就停止
return await Promise.all(taskPromises);
} else {
// 容错模式:收集所有结果(包括错误)
const results = await Promise.allSettled(taskPromises);
if (collectResults) {
return results.map((result, index) => {
if (result.status === 'fulfilled') {
return result.value;
} else {
console.error(`批量任务失败:${tasks[index].id}`, result.reason);
throw result.reason;
}
});
}
// 只返回成功的结果
return results
.filter((result): result is PromiseFulfilledResult<T> => result.status === 'fulfilled')
.map(result => result.value);
}
}
// 获取管理器状态
getStatus() {
return {
runningTasks: this.runningTasks.size,
queuedTasks: this.taskQueue.length,
currentConcurrency: this.currentConcurrency,
maxConcurrency: this.maxConcurrency,
utilizationRate: this.currentConcurrency / this.maxConcurrency
};
}
// 优雅关闭
async shutdown(timeout: number = 30000): Promise<void> {
console.log('🛑 开始关闭任务管理器...');
// 停止接受新任务
this.taskQueue.length = 0;
// 等待现有任务完成
const runningTaskPromises = Array.from(this.runningTasks.values());
if (runningTaskPromises.length > 0) {
console.log(`⏳ 等待${runningTaskPromises.length}个任务完成...`);
try {
await Promise.race([
Promise.allSettled(runningTaskPromises),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('关闭超时')), timeout)
)
]);
} catch (error) {
console.warn('⚠️ 强制关闭,部分任务可能未完成');
}
}
console.log('✅ 任务管理器已关闭');
}
}
interface TaskOptions {
priority?: number;
timeout?: number;
retries?: number;
deduplicate?: boolean;
}
interface QueuedTask {
taskId: string;
taskPromise: Promise<any>;
priority: number;
resolve: (value: any) => void;
reject: (reason: any) => void;
createdAt: number;
}
// 使用示例:并发处理多个API调用
class DataAggregationService {
private taskManager = new AsyncTaskManager(5); // 最大5个并发
async aggregateUserData(userId: string): Promise<UserDataSummary> {
console.log(`📊 开始聚合用户数据:${userId}`);
// 并发执行多个数据获取任务
const [
profile,
orders,
preferences,
analytics
] = await this.taskManager.executeBatch([
{
id: `profile_${userId}`,
fn: () => this.fetchUserProfile(userId),
options: { priority: 10, timeout: 5000 } // 高优先级
},
{
id: `orders_${userId}`,
fn: () => this.fetchUserOrders(userId),
options: { priority: 8, timeout: 10000 }
},
{
id: `preferences_${userId}`,
fn: () => this.fetchUserPreferences(userId),
options: { priority: 6, timeout: 3000 }
},
{
id: `analytics_${userId}`,
fn: () => this.fetchUserAnalytics(userId),
options: { priority: 4, timeout: 15000 }
}
], { failFast: false }); // 不快速失败,容忍部分数据获取失败
// 聚合数据
return this.combineUserData(profile, orders, preferences, analytics);
}
private async fetchUserProfile(userId: string): Promise<UserProfile> {
console.log(`👤 获取用户资料:${userId}`);
await this.simulateApiCall(200); // 模拟200ms API调用
return { id: userId, name: '用户名', email: 'user@example.com' };
}
private async fetchUserOrders(userId: string): Promise<Order[]> {
console.log(`🛒 获取用户订单:${userId}`);
await this.simulateApiCall(800);
return [{ id: '1', amount: 100 }, { id: '2', amount: 200 }];
}
private async fetchUserPreferences(userId: string): Promise<UserPreferences> {
console.log(`⚙️ 获取用户偏好:${userId}`);
await this.simulateApiCall(300);
return { theme: 'dark', language: 'zh-CN' };
}
private async fetchUserAnalytics(userId: string): Promise<UserAnalytics> {
console.log(`📈 获取用户分析:${userId}`);
await this.simulateApiCall(1200);
return { totalPurchases: 15, lastLogin: new Date() };
}
private async simulateApiCall(delay: number): Promise<void> {
await new Promise(resolve => setTimeout(resolve, delay));
}
private combineUserData(
profile: UserProfile,
orders: Order[],
preferences: UserPreferences,
analytics: UserAnalytics
): UserDataSummary {
return {
profile,
orderSummary: {
totalOrders: orders.length,
totalAmount: orders.reduce((sum, order) => sum + order.amount, 0)
},
preferences,
analytics
};
}
}
// 数据类型定义
interface UserProfile {
id: string;
name: string;
email: string;
}
interface Order {
id: string;
amount: number;
}
interface UserPreferences {
theme: string;
language: string;
}
interface UserAnalytics {
totalPurchases: number;
lastLogin: Date;
}
interface UserDataSummary {
profile: UserProfile;
orderSummary: {
totalOrders: number;
totalAmount: number;
};
preferences: UserPreferences;
analytics: UserAnalytics;
}
// 性能测试
async function testAsyncPerformance() {
const dataService = new DataAggregationService();
console.log('🧪 开始异步性能测试...\n');
// 测试1:单个用户数据聚合
console.log('📊 测试1:单个用户数据聚合');
const startTime = Date.now();
const userData = await dataService.aggregateUserData('user123');
const singleUserTime = Date.now() - startTime;
console.log(`✅ 单用户数据聚合完成,耗时:${singleUserTime}ms`);
console.log(`📦 数据摘要:${userData.orderSummary.totalOrders}个订单,总金额:${userData.orderSummary.totalAmount}元\n`);
// 测试2:多用户并发聚合
console.log('📊 测试2:多用户并发聚合');
const batchStartTime = Date.now();
const userIds = ['user1', 'user2', 'user3', 'user4', 'user5'];
const batchPromises = userIds.map(id => dataService.aggregateUserData(id));
const batchResults = await Promise.all(batchPromises);
const batchTime = Date.now() - batchStartTime;
console.log(`✅ 批量用户数据聚合完成,耗时:${batchTime}ms`);
console.log(`👥 处理了${batchResults.length}个用户`);
// 性能分析
const expectedSequentialTime = singleUserTime * userIds.length;
const speedup = expectedSequentialTime / batchTime;
const efficiency = speedup / userIds.length;
console.log(`\n📈 性能分析:`);
console.log(` 预期串行时间:${expectedSequentialTime}ms`);
console.log(` 实际并行时间:${batchTime}ms`);
console.log(` 加速比:${speedup.toFixed(2)}x`);
console.log(` 并行效率:${(efficiency * 100).toFixed(1)}%`);
}
// 运行测试
testAsyncPerformance().catch(console.error);
资源管理优化 💾
智能的资源池管理
java
// 🌟 智能资源池实现
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.time.LocalDateTime;
import java.time.Duration;
public class SmartResourcePool<T> implements AutoCloseable {
private final BlockingQueue<PooledResource<T>> availableResources;
private final Set<PooledResource<T>> allResources;
private final ResourceFactory<T> factory;
private final ResourceValidator<T> validator;
private final ScheduledExecutorService maintenanceExecutor;
// 配置参数
private final int minSize;
private final int maxSize;
private final Duration maxIdleTime;
private final Duration validationInterval;
// 统计信息
private final AtomicInteger currentSize = new AtomicInteger(0);
private final AtomicInteger activeCount = new AtomicInteger(0);
private final AtomicLong totalCreated = new AtomicLong(0);
private final AtomicLong totalDestroyed = new AtomicLong(0);
private final AtomicLong borrowCount = new AtomicLong(0);
private final AtomicLong returnCount = new AtomicLong(0);
public SmartResourcePool(ResourcePoolConfig<T> config) {
this.minSize = config.getMinSize();
this.maxSize = config.getMaxSize();
this.maxIdleTime = config.getMaxIdleTime();
this.validationInterval = config.getValidationInterval();
this.factory = config.getFactory();
this.validator = config.getValidator();
// 使用有界队列防止内存泄漏
this.availableResources = new ArrayBlockingQueue<>(maxSize);
this.allResources = ConcurrentHashMap.newKeySet();
// 创建维护线程
this.maintenanceExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "ResourcePool-Maintenance");
t.setDaemon(true);
return t;
});
// 初始化最小数量的资源
initializeMinResources();
// 启动定期维护任务
startMaintenanceTasks();
}
/**
* 借用资源
*/
public PooledResource<T> borrowResource(Duration timeout) throws Exception {
borrowCount.incrementAndGet();
long startTime = System.currentTimeMillis();
long timeoutMs = timeout.toMillis();
while (System.currentTimeMillis() - startTime < timeoutMs) {
// 尝试从可用队列获取资源
PooledResource<T> resource = availableResources.poll();
if (resource != null) {
// 验证资源有效性
if (isResourceValid(resource)) {
resource.markAsBorrowed();
activeCount.incrementAndGet();
return resource;
} else {
// 资源无效,销毁并继续寻找
destroyResource(resource);
continue;
}
}
// 没有可用资源,尝试创建新资源
if (currentSize.get() < maxSize) {
resource = createNewResource();
if (resource != null) {
resource.markAsBorrowed();
activeCount.incrementAndGet();
return resource;
}
}
// 等待一段时间后重试
Thread.sleep(10);
}
throw new ResourceExhaustedException(
String.format("无法在%dms内获取资源,当前池大小:%d,活跃资源:%d",
timeoutMs, currentSize.get(), activeCount.get())
);
}
/**
* 归还资源
*/
public void returnResource(PooledResource<T> resource) {
if (resource == null || !allResources.contains(resource)) {
return;
}
returnCount.incrementAndGet();
activeCount.decrementAndGet();
try {
// 重置资源状态
resource.markAsReturned();
// 验证资源是否仍然有效
if (isResourceValid(resource)) {
// 如果池未满,将资源放回可用队列
if (!availableResources.offer(resource)) {
// 队列已满,销毁多余的资源
destroyResource(resource);
}
} else {
// 资源无效,销毁
destroyResource(resource);
}
} catch (Exception e) {
logger.error("归还资源时发生错误", e);
destroyResource(resource);
}
}
/**
* 创建新资源
*/
private PooledResource<T> createNewResource() {
try {
T rawResource = factory.create();
PooledResource<T> pooledResource = new PooledResource<>(
rawResource,
System.currentTimeMillis()
);
allResources.add(pooledResource);
currentSize.incrementAndGet();
totalCreated.incrementAndGet();
logger.debug("创建新资源,当前池大小:{}", currentSize.get());
return pooledResource;
} catch (Exception e) {
logger.error("创建资源失败", e);
return null;
}
}
/**
* 销毁资源
*/
private void destroyResource(PooledResource<T> resource) {
if (allResources.remove(resource)) {
currentSize.decrementAndGet();
totalDestroyed.incrementAndGet();
try {
factory.destroy(resource.getResource());
} catch (Exception e) {
logger.error("销毁资源时发生错误", e);
}
logger.debug("销毁资源,当前池大小:{}", currentSize.get());
}
}
/**
* 验证资源有效性
*/
private boolean isResourceValid(PooledResource<T> resource) {
try {
// 检查资源是否超过最大空闲时间
if (Duration.ofMillis(System.currentTimeMillis() - resource.getLastUsedTime())
.compareTo(maxIdleTime) > 0) {
return false;
}
// 使用验证器检查资源
return validator.isValid(resource.getResource());
} catch (Exception e) {
logger.warn("验证资源时发生错误", e);
return false;
}
}
/**
* 初始化最小数量的资源
*/
private void initializeMinResources() {
for (int i = 0; i < minSize; i++) {
PooledResource<T> resource = createNewResource();
if (resource != null) {
availableResources.offer(resource);
}
}
logger.info("初始化资源池,最小大小:{},当前大小:{}", minSize, currentSize.get());
}
/**
* 启动维护任务
*/
private void startMaintenanceTasks() {
// 定期清理过期资源
maintenanceExecutor.scheduleWithFixedDelay(
this::cleanupExpiredResources,
validationInterval.toSeconds(),
validationInterval.toSeconds(),
TimeUnit.SECONDS
);
// 定期确保最小资源数量
maintenanceExecutor.scheduleWithFixedDelay(
this::ensureMinResources,
30, 30, TimeUnit.SECONDS
);
// 定期记录统计信息
maintenanceExecutor.scheduleWithFixedDelay(
this::logStatistics,
60, 60, TimeUnit.SECONDS
);
}
/**
* 清理过期资源
*/
private void cleanupExpiredResources() {
logger.debug("开始清理过期资源");
List<PooledResource<T>> resourcesToRemove = new ArrayList<>();
// 检查可用队列中的资源
Iterator<PooledResource<T>> iterator = availableResources.iterator();
while (iterator.hasNext()) {
PooledResource<T> resource = iterator.next();
if (!isResourceValid(resource)) {
iterator.remove();
resourcesToRemove.add(resource);
}
}
// 销毁过期资源
for (PooledResource<T> resource : resourcesToRemove) {
destroyResource(resource);
}
if (!resourcesToRemove.isEmpty()) {
logger.info("清理了{}个过期资源", resourcesToRemove.size());
}
}
/**
* 确保最小资源数量
*/
private void ensureMinResources() {
int current = currentSize.get();
int needed = minSize - current;
if (needed > 0) {
logger.debug("补充资源,当前:{},需要:{}", current, needed);
for (int i = 0; i < needed; i++) {
PooledResource<T> resource = createNewResource();
if (resource != null) {
availableResources.offer(resource);
}
}
}
}
/**
* 记录统计信息
*/
private void logStatistics() {
ResourcePoolStats stats = getStatistics();
logger.info("资源池统计 - 当前大小:{},可用:{},活跃:{},利用率:{:.1f}%",
stats.getCurrentSize(),
stats.getAvailableCount(),
stats.getActiveCount(),
stats.getUtilizationRate() * 100
);
}
/**
* 获取统计信息
*/
public ResourcePoolStats getStatistics() {
return new ResourcePoolStats(
currentSize.get(),
availableResources.size(),
activeCount.get(),
totalCreated.get(),
totalDestroyed.get(),
borrowCount.get(),
returnCount.get()
);
}
@Override
public void close() throws Exception {
logger.info("关闭资源池...");
// 停止维护任务
maintenanceExecutor.shutdown();
try {
if (!maintenanceExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
maintenanceExecutor.shutdownNow();
}
} catch (InterruptedException e) {
maintenanceExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
// 销毁所有资源
for (PooledResource<T> resource : allResources) {
try {
factory.destroy(resource.getResource());
} catch (Exception e) {
logger.error("关闭资源时发生错误", e);
}
}
allResources.clear();
availableResources.clear();
logger.info("资源池已关闭,总计创建:{},销毁:{}",
totalCreated.get(), totalDestroyed.get());
}
}
// 池化资源包装类
class PooledResource<T> {
private final T resource;
private final long createdTime;
private volatile long lastUsedTime;
private volatile boolean inUse;
public PooledResource(T resource, long createdTime) {
this.resource = resource;
this.createdTime = createdTime;
this.lastUsedTime = createdTime;
this.inUse = false;
}
public T getResource() {
return resource;
}
public long getCreatedTime() {
return createdTime;
}
public long getLastUsedTime() {
return lastUsedTime;
}
public boolean isInUse() {
return inUse;
}
public void markAsBorrowed() {
this.inUse = true;
this.lastUsedTime = System.currentTimeMillis();
}
public void markAsReturned() {
this.inUse = false;
this.lastUsedTime = System.currentTimeMillis();
}
}
// 资源工厂接口
interface ResourceFactory<T> {
T create() throws Exception;
void destroy(T resource) throws Exception;
}
// 资源验证器接口
interface ResourceValidator<T> {
boolean isValid(T resource);
}
// 资源池统计信息
class ResourcePoolStats {
private final int currentSize;
private final int availableCount;
private final int activeCount;
private final long totalCreated;
private final long totalDestroyed;
private final long borrowCount;
private final long returnCount;
// 构造函数和getter方法...
public double getUtilizationRate() {
return currentSize > 0 ? (double) activeCount / currentSize : 0.0;
}
public double getHitRate() {
return borrowCount > 0 ? (double) returnCount / borrowCount : 0.0;
}
}
// 数据库连接池示例
class DatabaseConnectionPool extends SmartResourcePool<Connection> {
public DatabaseConnectionPool(String url, String username, String password) {
super(ResourcePoolConfig.<Connection>builder()
.minSize(5)
.maxSize(20)
.maxIdleTime(Duration.ofMinutes(10))
.validationInterval(Duration.ofMinutes(1))
.factory(new DatabaseConnectionFactory(url, username, password))
.validator(new DatabaseConnectionValidator())
.build());
}
}
小结
性能优化的核心要点:
⚡ 优化策略
- 智能缓存 - 多层缓存提升数据访问速度
- 异步处理 - 并发执行提高吞吐量
- 资源池化 - 复用昂贵资源减少创建开销
- 连接管理 - 合理管理数据库连接等资源
- 监控指标 - 实时监控性能瓶颈
💡 实践要点
- 根据业务场景选择合适的缓存策略
- 控制并发度避免资源争抢
- 实施资源池管理避免资源泄漏
- 定期监控和调优性能参数
- 建立性能基准线和告警机制
🚀 性能哲学:性能优化是一门艺术,需要在速度、稳定性和资源消耗之间找到平衡点。
下一节:安全实现指南 - 构建牢不可破的安全防线