8.8 测试策略完全指南 🧪
"好的测试不是为了证明代码能工作,而是为了发现代码为什么不工作。"
想象测试就像是给你的MCP服务器做全面体检。单元测试是检查每个器官是否健康,集成测试是检查各器官如何协作,而端到端测试则是检查整个身体系统是否正常运转。
测试金字塔:分层测试策略 🏗️
测试架构设计
typescript
// 🌟 测试金字塔实现
import { describe, it, expect, beforeEach, afterEach, mock, spy } from '@jest/globals';
import { MockedFunction } from 'jest-mock';
// 测试配置接口
interface TestConfig {
level: 'unit' | 'integration' | 'e2e';
timeout: number;
retries: number;
parallel: boolean;
cleanup: boolean;
}
// 测试结果接口
interface TestResult {
testName: string;
passed: boolean;
duration: number;
coverage?: number;
errors: string[];
warnings: string[];
}
// 基础测试类
abstract class BaseTest {
protected config: TestConfig;
protected testResults: TestResult[] = [];
protected setupTime: number = 0;
protected teardownTime: number = 0;
constructor(config: Partial<TestConfig> = {}) {
this.config = {
level: 'unit',
timeout: 5000,
retries: 1,
parallel: false,
cleanup: true,
...config
};
}
// 测试生命周期管理
async setup(): Promise<void> {
const startTime = Date.now();
console.log(`🚀 开始${this.config.level}测试环境初始化...`);
await this.beforeAllTests();
this.setupTime = Date.now() - startTime;
console.log(`✅ 测试环境初始化完成,耗时:${this.setupTime}ms`);
}
async teardown(): Promise<void> {
const startTime = Date.now();
console.log(`🧹 开始清理测试环境...`);
if (this.config.cleanup) {
await this.afterAllTests();
}
this.teardownTime = Date.now() - startTime;
console.log(`✅ 测试环境清理完成,耗时:${this.teardownTime}ms`);
}
// 抽象方法,子类实现
protected abstract beforeAllTests(): Promise<void>;
protected abstract afterAllTests(): Promise<void>;
// 运行单个测试
async runTest(testFn: () => Promise<void>, testName: string): Promise<TestResult> {
const startTime = Date.now();
const result: TestResult = {
testName,
passed: false,
duration: 0,
errors: [],
warnings: []
};
let retries = this.config.retries;
while (retries >= 0) {
try {
console.log(`🧪 执行测试:${testName}${retries < this.config.retries ? ` (重试 ${this.config.retries - retries})` : ''}`);
// 设置测试超时
const timeoutPromise = new Promise<never>((_, reject) => {
setTimeout(() => reject(new Error('测试超时')), this.config.timeout);
});
await Promise.race([testFn(), timeoutPromise]);
result.passed = true;
break;
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
result.errors.push(errorMessage);
if (retries === 0) {
console.error(`❌ 测试失败:${testName} - ${errorMessage}`);
} else {
console.warn(`⚠️ 测试失败,准备重试:${testName} - ${errorMessage}`);
// 等待一段时间后重试
await this.delay(1000);
}
retries--;
}
}
result.duration = Date.now() - startTime;
this.testResults.push(result);
if (result.passed) {
console.log(`✅ 测试通过:${testName} (${result.duration}ms)`);
}
return result;
}
// 获取测试统计
getTestStatistics(): TestStatistics {
const total = this.testResults.length;
const passed = this.testResults.filter(r => r.passed).length;
const failed = total - passed;
const totalDuration = this.testResults.reduce((sum, r) => sum + r.duration, 0);
const avgDuration = total > 0 ? totalDuration / total : 0;
return {
total,
passed,
failed,
passRate: total > 0 ? passed / total : 0,
totalDuration: totalDuration + this.setupTime + this.teardownTime,
avgTestDuration: avgDuration,
setupTime: this.setupTime,
teardownTime: this.teardownTime
};
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// 单元测试类
class UnitTest extends BaseTest {
private mocks: Map<string, any> = new Map();
private spies: Map<string, any> = new Map();
constructor() {
super({ level: 'unit', timeout: 2000, parallel: true });
}
protected async beforeAllTests(): Promise<void> {
// 初始化Mock对象
console.log('📋 初始化Mock对象...');
}
protected async afterAllTests(): Promise<void> {
// 清理Mock对象
this.mocks.clear();
this.spies.clear();
console.log('🧹 清理Mock对象完成');
}
// 创建Mock对象
createMock<T>(name: string, mockImplementation?: Partial<T>): MockedFunction<any> {
const mockFn = mock.fn(mockImplementation);
this.mocks.set(name, mockFn);
return mockFn as MockedFunction<any>;
}
// 创建Spy对象
createSpy<T extends object>(name: string, object: T, method: keyof T): any {
const spyFn = spy(object, method);
this.spies.set(name, spyFn);
return spyFn;
}
// 验证Mock调用
expectMockCalled(mockName: string, times?: number): void {
const mockFn = this.mocks.get(mockName);
if (!mockFn) {
throw new Error(`Mock '${mockName}' 不存在`);
}
if (times !== undefined) {
expect(mockFn).toHaveBeenCalledTimes(times);
} else {
expect(mockFn).toHaveBeenCalled();
}
}
// 验证Spy调用
expectSpyCalled(spyName: string, times?: number): void {
const spyFn = this.spies.get(spyName);
if (!spyFn) {
throw new Error(`Spy '${spyName}' 不存在`);
}
if (times !== undefined) {
expect(spyFn).toHaveBeenCalledTimes(times);
} else {
expect(spyFn).toHaveBeenCalled();
}
}
}
// MCP工具单元测试示例
class MCPToolUnitTest extends UnitTest {
private weatherService: any;
private mockApiClient: MockedFunction<any>;
async testWeatherTool(): Promise<void> {
console.log('🌤️ 测试天气工具...');
// 1. 准备测试数据
const mockWeatherData = {
location: '北京',
temperature: 25,
description: '晴天',
humidity: 60
};
// 2. 创建Mock API客户端
this.mockApiClient = this.createMock('apiClient', {
get: mock.fn().mockResolvedValue({
data: mockWeatherData,
status: 200
})
});
// 3. 注入依赖
this.weatherService = new WeatherService(this.mockApiClient);
// 4. 执行测试
const result = await this.weatherService.getWeather('北京');
// 5. 验证结果
expect(result).toEqual(mockWeatherData);
this.expectMockCalled('apiClient', 1);
// 6. 验证API调用参数
expect(this.mockApiClient.get).toHaveBeenCalledWith(
'/weather',
{ params: { location: '北京' } }
);
console.log('✅ 天气工具单元测试通过');
}
async testWeatherToolError(): Promise<void> {
console.log('🌩️ 测试天气工具错误处理...');
// 1. Mock API错误
this.mockApiClient = this.createMock('apiClient', {
get: mock.fn().mockRejectedValue(new Error('API错误'))
});
this.weatherService = new WeatherService(this.mockApiClient);
// 2. 验证错误处理
await expect(this.weatherService.getWeather('北京'))
.rejects
.toThrow('API错误');
console.log('✅ 错误处理测试通过');
}
async testParameterValidation(): Promise<void> {
console.log('📋 测试参数验证...');
this.mockApiClient = this.createMock('apiClient');
this.weatherService = new WeatherService(this.mockApiClient);
// 测试空参数
await expect(this.weatherService.getWeather(''))
.rejects
.toThrow('地点不能为空');
// 测试无效参数
await expect(this.weatherService.getWeather(null))
.rejects
.toThrow('地点参数无效');
console.log('✅ 参数验证测试通过');
}
}
// 集成测试类
class IntegrationTest extends BaseTest {
private testDatabase: any;
private testServer: any;
private testClient: any;
constructor() {
super({ level: 'integration', timeout: 10000, retries: 2 });
}
protected async beforeAllTests(): Promise<void> {
console.log('🔗 启动集成测试环境...');
// 启动测试数据库
this.testDatabase = await this.startTestDatabase();
// 启动测试服务器
this.testServer = await this.startTestServer();
// 创建测试客户端
this.testClient = this.createTestClient();
// 等待服务就绪
await this.waitForServicesReady();
}
protected async afterAllTests(): Promise<void> {
console.log('🛑 关闭集成测试环境...');
if (this.testClient) {
await this.testClient.disconnect();
}
if (this.testServer) {
await this.testServer.close();
}
if (this.testDatabase) {
await this.testDatabase.close();
}
}
private async startTestDatabase(): Promise<any> {
// 启动内存数据库或测试数据库容器
console.log('📊 启动测试数据库...');
// 实现省略
return {};
}
private async startTestServer(): Promise<any> {
// 启动MCP服务器
console.log('🚀 启动MCP测试服务器...');
// 实现省略
return {};
}
private createTestClient(): any {
// 创建MCP客户端
console.log('🔌 创建MCP测试客户端...');
// 实现省略
return {};
}
private async waitForServicesReady(): Promise<void> {
console.log('⏳ 等待服务就绪...');
// 健康检查逻辑
await this.delay(2000);
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// MCP协议集成测试
class MCPProtocolIntegrationTest extends IntegrationTest {
async testClientServerCommunication(): Promise<void> {
console.log('🤝 测试客户端-服务器通信...');
// 1. 测试连接建立
const connected = await this.testClient.connect();
expect(connected).toBe(true);
// 2. 测试协议握手
const handshake = await this.testClient.handshake({
protocolVersion: '2024-11-05',
capabilities: {
roots: { listChanged: true },
sampling: {}
}
});
expect(handshake.protocolVersion).toBe('2024-11-05');
expect(handshake.capabilities).toBeDefined();
console.log('✅ 客户端-服务器通信测试通过');
}
async testToolExecution(): Promise<void> {
console.log('🔧 测试工具执行...');
// 1. 获取可用工具列表
const tools = await this.testClient.listTools();
expect(tools.tools).toBeInstanceOf(Array);
expect(tools.tools.length).toBeGreaterThan(0);
// 2. 执行工具
const weatherTool = tools.tools.find(t => t.name === 'get_weather');
expect(weatherTool).toBeDefined();
const result = await this.testClient.callTool('get_weather', {
location: '北京'
});
expect(result.isError).toBe(false);
expect(result.content).toBeDefined();
console.log('✅ 工具执行测试通过');
}
async testResourceAccess(): Promise<void> {
console.log('📁 测试资源访问...');
// 1. 列出根资源
const roots = await this.testClient.listRoots();
expect(roots.roots).toBeInstanceOf(Array);
// 2. 读取资源
if (roots.roots.length > 0) {
const resource = await this.testClient.readResource(roots.roots[0].uri);
expect(resource.contents).toBeDefined();
}
console.log('✅ 资源访问测试通过');
}
async testErrorHandling(): Promise<void> {
console.log('⚠️ 测试错误处理...');
// 1. 测试不存在的工具
const result = await this.testClient.callTool('nonexistent_tool', {});
expect(result.isError).toBe(true);
expect(result.content[0].type).toBe('text');
expect(result.content[0].text).toContain('工具不存在');
// 2. 测试无效参数
const result2 = await this.testClient.callTool('get_weather', {
invalid_param: 'value'
});
expect(result2.isError).toBe(true);
console.log('✅ 错误处理测试通过');
}
}
// 端到端测试类
class EndToEndTest extends BaseTest {
private browserDriver: any;
private testUser: any;
constructor() {
super({ level: 'e2e', timeout: 30000, retries: 3 });
}
protected async beforeAllTests(): Promise<void> {
console.log('🌐 启动端到端测试环境...');
// 启动浏览器
this.browserDriver = await this.startBrowser();
// 创建测试用户
this.testUser = await this.createTestUser();
// 等待应用就绪
await this.waitForApplicationReady();
}
protected async afterAllTests(): Promise<void> {
console.log('🛑 关闭端到端测试环境...');
if (this.testUser) {
await this.cleanupTestUser();
}
if (this.browserDriver) {
await this.browserDriver.quit();
}
}
private async startBrowser(): Promise<any> {
console.log('🌐 启动浏览器...');
// 浏览器驱动初始化
return {};
}
private async createTestUser(): Promise<any> {
console.log('👤 创建测试用户...');
return {
username: 'test_user',
email: 'test@example.com',
password: 'test_password_123'
};
}
private async waitForApplicationReady(): Promise<void> {
console.log('⏳ 等待应用就绪...');
await this.delay(3000);
}
private async cleanupTestUser(): Promise<void> {
console.log('🧹 清理测试用户...');
// 清理逻辑
}
private delay(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
}
// MCP应用端到端测试
class MCPApplicationE2ETest extends EndToEndTest {
async testUserRegistrationFlow(): Promise<void> {
console.log('📝 测试用户注册流程...');
// 1. 打开注册页面
await this.browserDriver.get('/register');
// 2. 填写注册表单
await this.browserDriver.findElement('#username').sendKeys(this.testUser.username);
await this.browserDriver.findElement('#email').sendKeys(this.testUser.email);
await this.browserDriver.findElement('#password').sendKeys(this.testUser.password);
// 3. 提交表单
await this.browserDriver.findElement('#register-btn').click();
// 4. 验证注册成功
const successMessage = await this.browserDriver.findElement('.success-message');
expect(await successMessage.getText()).toContain('注册成功');
console.log('✅ 用户注册流程测试通过');
}
async testMCPToolInteraction(): Promise<void> {
console.log('🔧 测试MCP工具交互...');
// 1. 登录
await this.login();
// 2. 打开工具页面
await this.browserDriver.get('/tools');
// 3. 选择天气工具
await this.browserDriver.findElement('[data-tool="weather"]').click();
// 4. 输入参数
await this.browserDriver.findElement('#location-input').sendKeys('北京');
// 5. 执行工具
await this.browserDriver.findElement('#execute-tool').click();
// 6. 等待结果
await this.browserDriver.wait(
() => this.browserDriver.findElement('.tool-result'),
10000
);
// 7. 验证结果
const result = await this.browserDriver.findElement('.tool-result');
const resultText = await result.getText();
expect(resultText).toContain('北京');
expect(resultText).toContain('温度');
console.log('✅ MCP工具交互测试通过');
}
private async login(): Promise<void> {
await this.browserDriver.get('/login');
await this.browserDriver.findElement('#username').sendKeys(this.testUser.username);
await this.browserDriver.findElement('#password').sendKeys(this.testUser.password);
await this.browserDriver.findElement('#login-btn').click();
// 等待登录完成
await this.browserDriver.wait(
() => this.browserDriver.findElement('.dashboard'),
5000
);
}
}
// 性能测试类
class PerformanceTest extends BaseTest {
private metrics: PerformanceMetrics[] = [];
constructor() {
super({ level: 'integration', timeout: 60000, retries: 1 });
}
protected async beforeAllTests(): Promise<void> {
console.log('⚡ 初始化性能测试环境...');
// 性能监控初始化
}
protected async afterAllTests(): Promise<void> {
console.log('📊 生成性能测试报告...');
this.generatePerformanceReport();
}
async testConcurrentToolExecution(): Promise<void> {
console.log('🔀 测试并发工具执行性能...');
const concurrencyLevels = [1, 5, 10, 20, 50];
for (const concurrency of concurrencyLevels) {
console.log(`📈 测试并发度:${concurrency}`);
const startTime = Date.now();
const promises: Promise<any>[] = [];
for (let i = 0; i < concurrency; i++) {
promises.push(this.executeWeatherTool(`测试城市${i}`));
}
const results = await Promise.allSettled(promises);
const endTime = Date.now();
const successful = results.filter(r => r.status === 'fulfilled').length;
const failed = results.filter(r => r.status === 'rejected').length;
const totalTime = endTime - startTime;
const avgResponseTime = totalTime / concurrency;
const throughput = (successful / totalTime) * 1000; // 每秒请求数
this.metrics.push({
concurrency,
totalRequests: concurrency,
successfulRequests: successful,
failedRequests: failed,
totalTime,
avgResponseTime,
throughput,
errorRate: failed / concurrency
});
console.log(`✅ 并发度${concurrency}:成功${successful},失败${failed},平均响应时间${avgResponseTime.toFixed(2)}ms,吞吐量${throughput.toFixed(2)}req/s`);
}
}
private async executeWeatherTool(location: string): Promise<any> {
// 模拟工具执行
const delay = Math.random() * 1000 + 500; // 500-1500ms随机延迟
await new Promise(resolve => setTimeout(resolve, delay));
if (Math.random() < 0.05) { // 5%错误率
throw new Error('模拟错误');
}
return { location, temperature: Math.floor(Math.random() * 40) };
}
private generatePerformanceReport(): void {
console.log('\n📊 性能测试报告:');
console.log('-'.repeat(80));
console.log('并发度 | 总请求 | 成功 | 失败 | 总时间(ms) | 平均响应(ms) | 吞吐量(req/s) | 错误率');
console.log('-'.repeat(80));
for (const metric of this.metrics) {
console.log(
`${metric.concurrency.toString().padStart(6)} | ` +
`${metric.totalRequests.toString().padStart(6)} | ` +
`${metric.successfulRequests.toString().padStart(4)} | ` +
`${metric.failedRequests.toString().padStart(4)} | ` +
`${metric.totalTime.toString().padStart(10)} | ` +
`${metric.avgResponseTime.toFixed(2).padStart(12)} | ` +
`${metric.throughput.toFixed(2).padStart(12)} | ` +
`${(metric.errorRate * 100).toFixed(2).padStart(6)}%`
);
}
console.log('-'.repeat(80));
}
}
// 测试运行器
class TestRunner {
private testSuites: BaseTest[] = [];
addTestSuite(testSuite: BaseTest): void {
this.testSuites.push(testSuite);
}
async runAllTests(): Promise<TestSummary> {
console.log('🚀 开始运行测试套件...\n');
const summary: TestSummary = {
totalSuites: this.testSuites.length,
passedSuites: 0,
failedSuites: 0,
totalTests: 0,
passedTests: 0,
failedTests: 0,
totalDuration: 0,
suiteResults: []
};
const startTime = Date.now();
for (const suite of this.testSuites) {
try {
console.log(`\n📋 运行测试套件:${suite.constructor.name}`);
console.log('='.repeat(50));
await suite.setup();
// 运行测试套件中的所有测试方法
const testMethods = this.getTestMethods(suite);
for (const methodName of testMethods) {
await suite.runTest(
() => (suite as any)[methodName](),
methodName
);
}
await suite.teardown();
const stats = suite.getTestStatistics();
summary.suiteResults.push({
suiteName: suite.constructor.name,
...stats
});
summary.totalTests += stats.total;
summary.passedTests += stats.passed;
summary.failedTests += stats.failed;
if (stats.failed === 0) {
summary.passedSuites++;
console.log(`✅ 测试套件通过:${suite.constructor.name}`);
} else {
summary.failedSuites++;
console.log(`❌ 测试套件失败:${suite.constructor.name}`);
}
} catch (error) {
summary.failedSuites++;
console.error(`💥 测试套件异常:${suite.constructor.name}`, error);
}
}
summary.totalDuration = Date.now() - startTime;
this.printTestSummary(summary);
return summary;
}
private getTestMethods(suite: BaseTest): string[] {
// 获取所有以'test'开头的方法名
const proto = Object.getPrototypeOf(suite);
const methods = Object.getOwnPropertyNames(proto);
return methods.filter(method =>
method.startsWith('test') &&
typeof (suite as any)[method] === 'function'
);
}
private printTestSummary(summary: TestSummary): void {
console.log('\n🏁 测试总结');
console.log('='.repeat(60));
console.log(`📊 测试套件:${summary.totalSuites}个,通过:${summary.passedSuites}个,失败:${summary.failedSuites}个`);
console.log(`🧪 测试用例:${summary.totalTests}个,通过:${summary.passedTests}个,失败:${summary.failedTests}个`);
console.log(`⏱️ 总耗时:${summary.totalDuration}ms`);
console.log(`📈 通过率:${((summary.passedTests / Math.max(1, summary.totalTests)) * 100).toFixed(2)}%`);
if (summary.failedTests > 0) {
console.log('\n❌ 失败的测试套件:');
summary.suiteResults
.filter(result => result.failed > 0)
.forEach(result => {
console.log(` - ${result.suiteName}: ${result.failed}个失败`);
});
}
console.log('='.repeat(60));
}
}
// 接口定义
interface TestStatistics {
total: number;
passed: number;
failed: number;
passRate: number;
totalDuration: number;
avgTestDuration: number;
setupTime: number;
teardownTime: number;
}
interface TestSummary {
totalSuites: number;
passedSuites: number;
failedSuites: number;
totalTests: number;
passedTests: number;
failedTests: number;
totalDuration: number;
suiteResults: (TestStatistics & { suiteName: string })[];
}
interface PerformanceMetrics {
concurrency: number;
totalRequests: number;
successfulRequests: number;
failedRequests: number;
totalTime: number;
avgResponseTime: number;
throughput: number;
errorRate: number;
}
// 模拟的WeatherService类(用于测试)
class WeatherService {
constructor(private apiClient: any) {}
async getWeather(location: string): Promise<any> {
if (!location) {
throw new Error('地点不能为空');
}
if (location === null || location === undefined) {
throw new Error('地点参数无效');
}
const response = await this.apiClient.get('/weather', {
params: { location }
});
return response.data;
}
}
// 使用示例
async function runTestSuite() {
const runner = new TestRunner();
// 添加各种测试套件
runner.addTestSuite(new MCPToolUnitTest());
runner.addTestSuite(new MCPProtocolIntegrationTest());
runner.addTestSuite(new MCPApplicationE2ETest());
runner.addTestSuite(new PerformanceTest());
// 运行所有测试
const summary = await runner.runAllTests();
// 根据测试结果设置退出码
process.exit(summary.failedTests > 0 ? 1 : 0);
}
// 导出测试类
export {
BaseTest,
UnitTest,
IntegrationTest,
EndToEndTest,
PerformanceTest,
TestRunner,
MCPToolUnitTest,
MCPProtocolIntegrationTest,
MCPApplicationE2ETest
};
// 如果直接运行此文件,执行测试
if (require.main === module) {
runTestSuite().catch(console.error);
}
测试驱动开发:红绿重构循环 🔄
TDD实践框架
python
# 🌟 测试驱动开发框架
import unittest
import asyncio
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import json
class TDDPhase:
"""TDD阶段枚举"""
RED = "red" # 红:写失败测试
GREEN = "green" # 绿:写最少代码让测试通过
REFACTOR = "refactor" # 重构:优化代码
@dataclass
class TDDCycle:
"""TDD循环记录"""
feature_name: str
phase: str
start_time: float
end_time: Optional[float] = None
test_count: int = 0
line_count: int = 0
complexity_score: int = 0
class TDDTracker:
"""TDD过程跟踪器"""
def __init__(self):
self.cycles: List[TDDCycle] = []
self.current_cycle: Optional[TDDCycle] = None
self.metrics = {
'red_time': 0,
'green_time': 0,
'refactor_time': 0,
'total_cycles': 0,
'avg_cycle_time': 0
}
def start_cycle(self, feature_name: str, phase: str) -> None:
"""开始新的TDD循环"""
if self.current_cycle and not self.current_cycle.end_time:
self.end_cycle()
self.current_cycle = TDDCycle(
feature_name=feature_name,
phase=phase,
start_time=time.time()
)
print(f"🔄 开始TDD循环:{feature_name} - {phase.upper()}阶段")
def end_cycle(self) -> None:
"""结束当前TDD循环"""
if not self.current_cycle:
return
self.current_cycle.end_time = time.time()
duration = self.current_cycle.end_time - self.current_cycle.start_time
# 更新指标
phase_key = f"{self.current_cycle.phase}_time"
if phase_key in self.metrics:
self.metrics[phase_key] += duration
self.cycles.append(self.current_cycle)
print(f"✅ 完成TDD循环:{self.current_cycle.feature_name} - "
f"{self.current_cycle.phase.upper()}阶段 "
f"({duration:.2f}秒)")
self.current_cycle = None
def get_tdd_metrics(self) -> Dict[str, Any]:
"""获取TDD指标"""
if not self.cycles:
return self.metrics
total_time = sum(
cycle.end_time - cycle.start_time
for cycle in self.cycles
if cycle.end_time
)
self.metrics.update({
'total_cycles': len(self.cycles),
'avg_cycle_time': total_time / len(self.cycles) if self.cycles else 0,
'red_percentage': (self.metrics['red_time'] / total_time * 100) if total_time > 0 else 0,
'green_percentage': (self.metrics['green_time'] / total_time * 100) if total_time > 0 else 0,
'refactor_percentage': (self.metrics['refactor_time'] / total_time * 100) if total_time > 0 else 0
})
return self.metrics
class TDDTestCase(unittest.TestCase):
"""TDD测试基类"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.tdd_tracker = TDDTracker()
def red_phase(self, feature_name: str, test_function: Callable):
"""红阶段:写失败测试"""
self.tdd_tracker.start_cycle(feature_name, TDDPhase.RED)
print(f"🔴 RED阶段:为 '{feature_name}' 编写失败测试")
# 执行测试,期望失败
with self.assertRaises((AssertionError, NotImplementedError, AttributeError)):
test_function()
print("✅ 测试按预期失败")
self.tdd_tracker.end_cycle()
def green_phase(self, feature_name: str, implementation_function: Callable, test_function: Callable):
"""绿阶段:写最少代码让测试通过"""
self.tdd_tracker.start_cycle(feature_name, TDDPhase.GREEN)
print(f"🟢 GREEN阶段:为 '{feature_name}' 实现最少代码")
# 执行实现
implementation_function()
# 运行测试,期望通过
try:
test_function()
print("✅ 测试通过")
except Exception as e:
print(f"❌ 测试仍然失败:{e}")
raise
self.tdd_tracker.end_cycle()
def refactor_phase(self, feature_name: str, refactor_function: Callable, test_function: Callable):
"""重构阶段:优化代码"""
self.tdd_tracker.start_cycle(feature_name, TDDPhase.REFACTOR)
print(f"🔧 REFACTOR阶段:重构 '{feature_name}'")
# 执行重构
refactor_function()
# 确保测试仍然通过
try:
test_function()
print("✅ 重构后测试仍然通过")
except Exception as e:
print(f"❌ 重构破坏了测试:{e}")
raise
self.tdd_tracker.end_cycle()
def complete_tdd_cycle(
self,
feature_name: str,
test_function: Callable,
implementation_function: Callable,
refactor_function: Optional[Callable] = None
):
"""完整的TDD循环"""
print(f"\n🎯 开始完整TDD循环:{feature_name}")
print("=" * 60)
# 红阶段
self.red_phase(feature_name, test_function)
# 绿阶段
self.green_phase(feature_name, implementation_function, test_function)
# 重构阶段(可选)
if refactor_function:
self.refactor_phase(feature_name, refactor_function, test_function)
print(f"🎉 完成TDD循环:{feature_name}")
print("=" * 60)
# MCP天气工具TDD示例
class WeatherToolTDD(TDDTestCase):
def setUp(self):
"""测试设置"""
self.weather_tool = None
self.mock_api_response = {
'temperature': 25,
'description': '晴天',
'humidity': 60,
'location': '北京'
}
def test_weather_tool_complete_tdd(self):
"""完整的天气工具TDD开发"""
# 定义测试函数
def test_get_weather():
"""测试获取天气功能"""
result = self.weather_tool.get_weather('北京')
self.assertIsNotNone(result)
self.assertIn('temperature', result)
self.assertIn('description', result)
self.assertEqual(result['location'], '北京')
# 定义实现函数
def implement_weather_tool():
"""实现天气工具最基本功能"""
class BasicWeatherTool:
def get_weather(self, location):
# 最简单的实现,返回硬编码数据
return {
'temperature': 25,
'description': '晴天',
'humidity': 60,
'location': location
}
self.weather_tool = BasicWeatherTool()
# 定义重构函数
def refactor_weather_tool():
"""重构天气工具,添加错误处理和验证"""
class ImprovedWeatherTool:
def __init__(self):
self.cache = {}
def get_weather(self, location):
# 参数验证
if not location or not location.strip():
raise ValueError("地点不能为空")
location = location.strip()
# 简单缓存
if location in self.cache:
return self.cache[location]
# 模拟API调用
result = {
'temperature': 25,
'description': '晴天',
'humidity': 60,
'location': location,
'timestamp': time.time()
}
self.cache[location] = result
return result
self.weather_tool = ImprovedWeatherTool()
# 执行完整TDD循环
self.complete_tdd_cycle(
"获取天气信息",
test_get_weather,
implement_weather_tool,
refactor_weather_tool
)
def test_weather_tool_error_handling_tdd(self):
"""天气工具错误处理TDD"""
def test_empty_location():
"""测试空地点参数"""
with self.assertRaises(ValueError):
self.weather_tool.get_weather('')
with self.assertRaises(ValueError):
self.weather_tool.get_weather(None)
def implement_error_handling():
"""实现错误处理"""
if hasattr(self.weather_tool, 'get_weather'):
# 已有实现,需要修改
original_method = self.weather_tool.get_weather
def get_weather_with_validation(location):
if not location or not str(location).strip():
raise ValueError("地点不能为空")
return original_method(location)
self.weather_tool.get_weather = get_weather_with_validation
# 先实现基本功能(如果还没有)
if not self.weather_tool:
class BasicWeatherTool:
def get_weather(self, location):
return {'temperature': 25, 'location': location}
self.weather_tool = BasicWeatherTool()
# 执行错误处理的TDD循环
self.complete_tdd_cycle(
"天气工具错误处理",
test_empty_location,
implement_error_handling
)
def tearDown(self):
"""测试清理"""
if hasattr(self, 'tdd_tracker'):
metrics = self.tdd_tracker.get_tdd_metrics()
print(f"\n📊 TDD指标:")
print(f" 总循环数:{metrics['total_cycles']}")
print(f" 平均循环时间:{metrics['avg_cycle_time']:.2f}秒")
print(f" RED阶段占比:{metrics['red_percentage']:.1f}%")
print(f" GREEN阶段占比:{metrics['green_percentage']:.1f}%")
print(f" REFACTOR阶段占比:{metrics['refactor_percentage']:.1f}%")
# 行为驱动开发(BDD)支持
class BDDScenario:
"""BDD场景类"""
def __init__(self, title: str):
self.title = title
self.given_steps: List[str] = []
self.when_steps: List[str] = []
self.then_steps: List[str] = []
self.context: Dict[str, Any] = {}
def given(self, step: str, action: Optional[Callable] = None):
"""Given步骤"""
self.given_steps.append(step)
if action:
action(self.context)
return self
def when(self, step: str, action: Optional[Callable] = None):
"""When步骤"""
self.when_steps.append(step)
if action:
action(self.context)
return self
def then(self, step: str, assertion: Optional[Callable] = None):
"""Then步骤"""
self.then_steps.append(step)
if assertion:
assertion(self.context)
return self
def execute(self):
"""执行BDD场景"""
print(f"\n🎭 执行BDD场景:{self.title}")
print("-" * 50)
for step in self.given_steps:
print(f"📋 Given: {step}")
for step in self.when_steps:
print(f"🎬 When: {step}")
for step in self.then_steps:
print(f"✅ Then: {step}")
print("🎉 场景执行完成")
# BDD测试示例
class MCPToolBDDTest(unittest.TestCase):
def test_weather_tool_bdd_scenario(self):
"""使用BDD风格测试天气工具"""
scenario = BDDScenario("用户查询北京天气")
# 执行场景
scenario.given(
"用户想要查询天气信息",
lambda ctx: ctx.update({'user': 'test_user', 'tool': 'weather'})
).given(
"天气服务正常运行",
lambda ctx: ctx.update({'service_status': 'running'})
).when(
"用户输入地点'北京'",
lambda ctx: ctx.update({
'input_location': '北京',
'result': self.simulate_weather_query(ctx['input_location'])
})
).then(
"系统返回北京的天气信息",
lambda ctx: self.assertIn('temperature', ctx['result'])
).then(
"响应包含温度、描述和湿度",
lambda ctx: self.assertTrue(
all(key in ctx['result'] for key in ['temperature', 'description', 'humidity'])
)
).then(
"地点信息正确",
lambda ctx: self.assertEqual(ctx['result']['location'], '北京')
)
scenario.execute()
def simulate_weather_query(self, location: str) -> Dict[str, Any]:
"""模拟天气查询"""
return {
'temperature': 25,
'description': '晴天',
'humidity': 60,
'location': location,
'timestamp': time.time()
}
# 测试覆盖率分析
class CoverageAnalyzer:
"""测试覆盖率分析器"""
def __init__(self):
self.coverage_data = {}
self.executed_lines = set()
self.total_lines = set()
def track_execution(self, file_path: str, line_number: int):
"""跟踪代码执行"""
self.executed_lines.add((file_path, line_number))
self.total_lines.add((file_path, line_number))
def calculate_coverage(self) -> Dict[str, float]:
"""计算覆盖率"""
if not self.total_lines:
return {'line_coverage': 0.0}
line_coverage = len(self.executed_lines) / len(self.total_lines)
return {
'line_coverage': line_coverage,
'executed_lines': len(self.executed_lines),
'total_lines': len(self.total_lines),
'uncovered_lines': len(self.total_lines) - len(self.executed_lines)
}
def generate_coverage_report(self) -> str:
"""生成覆盖率报告"""
coverage = self.calculate_coverage()
report = f"""
📊 测试覆盖率报告
==================
行覆盖率:{coverage['line_coverage']:.2%}
已执行行数:{coverage['executed_lines']}
总行数:{coverage['total_lines']}
未覆盖行数:{coverage['uncovered_lines']}
"""
return report.strip()
# 运行所有TDD测试
if __name__ == '__main__':
print("🧪 开始TDD测试...")
# 创建测试套件
loader = unittest.TestLoader()
suite = unittest.TestSuite()
# 添加测试类
suite.addTests(loader.loadTestsFromTestCase(WeatherToolTDD))
suite.addTests(loader.loadTestsFromTestCase(MCPToolBDDTest))
# 运行测试
runner = unittest.TextTestRunner(verbosity=2)
result = runner.run(suite)
# 输出结果
print(f"\n🏁 测试完成!")
print(f"运行:{result.testsRun}个测试")
print(f"失败:{len(result.failures)}个")
print(f"错误:{len(result.errors)}个")
if result.wasSuccessful():
print("🎉 所有测试通过!")
else:
print("❌ 部分测试失败")
for test, error in result.failures + result.errors:
print(f" - {test}: {error}")
小结
测试策略的核心要素:
🧪 测试层次
- 单元测试 - 验证单个组件功能
- 集成测试 - 验证组件间协作
- 端到端测试 - 验证完整用户流程
- 性能测试 - 验证系统性能指标
- 安全测试 - 验证安全防护措施
💡 测试实践
- 遵循测试金字塔原则,底层测试多,顶层测试少
- 采用TDD/BDD方法论驱动开发
- 建立自动化测试流水线
- 追求高质量的测试覆盖率
- 定期重构和维护测试代码
🔬 测试哲学:测试不是质量的保证,而是质量的发现者。好的测试让你对代码充满信心。
下一节:自动化测试实战 - 构建高效的自动化测试体系