Skip to content

8.8 测试策略完全指南 🧪

"好的测试不是为了证明代码能工作,而是为了发现代码为什么不工作。"

想象测试就像是给你的MCP服务器做全面体检。单元测试是检查每个器官是否健康,集成测试是检查各器官如何协作,而端到端测试则是检查整个身体系统是否正常运转。

测试金字塔:分层测试策略 🏗️

测试架构设计

typescript
// 🌟 测试金字塔实现
import { describe, it, expect, beforeEach, afterEach, mock, spy } from '@jest/globals';
import { MockedFunction } from 'jest-mock';

// 测试配置接口
interface TestConfig {
  level: 'unit' | 'integration' | 'e2e';
  timeout: number;
  retries: number;
  parallel: boolean;
  cleanup: boolean;
}

// 测试结果接口
interface TestResult {
  testName: string;
  passed: boolean;
  duration: number;
  coverage?: number;
  errors: string[];
  warnings: string[];
}

// 基础测试类
abstract class BaseTest {
  protected config: TestConfig;
  protected testResults: TestResult[] = [];
  protected setupTime: number = 0;
  protected teardownTime: number = 0;
  
  constructor(config: Partial<TestConfig> = {}) {
    this.config = {
      level: 'unit',
      timeout: 5000,
      retries: 1,
      parallel: false,
      cleanup: true,
      ...config
    };
  }
  
  // 测试生命周期管理
  async setup(): Promise<void> {
    const startTime = Date.now();
    console.log(`🚀 开始${this.config.level}测试环境初始化...`);
    
    await this.beforeAllTests();
    
    this.setupTime = Date.now() - startTime;
    console.log(`✅ 测试环境初始化完成,耗时:${this.setupTime}ms`);
  }
  
  async teardown(): Promise<void> {
    const startTime = Date.now();
    console.log(`🧹 开始清理测试环境...`);
    
    if (this.config.cleanup) {
      await this.afterAllTests();
    }
    
    this.teardownTime = Date.now() - startTime;
    console.log(`✅ 测试环境清理完成,耗时:${this.teardownTime}ms`);
  }
  
  // 抽象方法,子类实现
  protected abstract beforeAllTests(): Promise<void>;
  protected abstract afterAllTests(): Promise<void>;
  
  // 运行单个测试
  async runTest(testFn: () => Promise<void>, testName: string): Promise<TestResult> {
    const startTime = Date.now();
    const result: TestResult = {
      testName,
      passed: false,
      duration: 0,
      errors: [],
      warnings: []
    };
    
    let retries = this.config.retries;
    
    while (retries >= 0) {
      try {
        console.log(`🧪 执行测试:${testName}${retries < this.config.retries ? ` (重试 ${this.config.retries - retries})` : ''}`);
        
        // 设置测试超时
        const timeoutPromise = new Promise<never>((_, reject) => {
          setTimeout(() => reject(new Error('测试超时')), this.config.timeout);
        });
        
        await Promise.race([testFn(), timeoutPromise]);
        
        result.passed = true;
        break;
        
      } catch (error) {
        const errorMessage = error instanceof Error ? error.message : String(error);
        result.errors.push(errorMessage);
        
        if (retries === 0) {
          console.error(`❌ 测试失败:${testName} - ${errorMessage}`);
        } else {
          console.warn(`⚠️ 测试失败,准备重试:${testName} - ${errorMessage}`);
          // 等待一段时间后重试
          await this.delay(1000);
        }
        
        retries--;
      }
    }
    
    result.duration = Date.now() - startTime;
    this.testResults.push(result);
    
    if (result.passed) {
      console.log(`✅ 测试通过:${testName} (${result.duration}ms)`);
    }
    
    return result;
  }
  
  // 获取测试统计
  getTestStatistics(): TestStatistics {
    const total = this.testResults.length;
    const passed = this.testResults.filter(r => r.passed).length;
    const failed = total - passed;
    const totalDuration = this.testResults.reduce((sum, r) => sum + r.duration, 0);
    const avgDuration = total > 0 ? totalDuration / total : 0;
    
    return {
      total,
      passed,
      failed,
      passRate: total > 0 ? passed / total : 0,
      totalDuration: totalDuration + this.setupTime + this.teardownTime,
      avgTestDuration: avgDuration,
      setupTime: this.setupTime,
      teardownTime: this.teardownTime
    };
  }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// 单元测试类
class UnitTest extends BaseTest {
  private mocks: Map<string, any> = new Map();
  private spies: Map<string, any> = new Map();
  
  constructor() {
    super({ level: 'unit', timeout: 2000, parallel: true });
  }
  
  protected async beforeAllTests(): Promise<void> {
    // 初始化Mock对象
    console.log('📋 初始化Mock对象...');
  }
  
  protected async afterAllTests(): Promise<void> {
    // 清理Mock对象
    this.mocks.clear();
    this.spies.clear();
    console.log('🧹 清理Mock对象完成');
  }
  
  // 创建Mock对象
  createMock<T>(name: string, mockImplementation?: Partial<T>): MockedFunction<any> {
    const mockFn = mock.fn(mockImplementation);
    this.mocks.set(name, mockFn);
    return mockFn as MockedFunction<any>;
  }
  
  // 创建Spy对象
  createSpy<T extends object>(name: string, object: T, method: keyof T): any {
    const spyFn = spy(object, method);
    this.spies.set(name, spyFn);
    return spyFn;
  }
  
  // 验证Mock调用
  expectMockCalled(mockName: string, times?: number): void {
    const mockFn = this.mocks.get(mockName);
    if (!mockFn) {
      throw new Error(`Mock '${mockName}' 不存在`);
    }
    
    if (times !== undefined) {
      expect(mockFn).toHaveBeenCalledTimes(times);
    } else {
      expect(mockFn).toHaveBeenCalled();
    }
  }
  
  // 验证Spy调用
  expectSpyCalled(spyName: string, times?: number): void {
    const spyFn = this.spies.get(spyName);
    if (!spyFn) {
      throw new Error(`Spy '${spyName}' 不存在`);
    }
    
    if (times !== undefined) {
      expect(spyFn).toHaveBeenCalledTimes(times);
    } else {
      expect(spyFn).toHaveBeenCalled();
    }
  }
}

// MCP工具单元测试示例
class MCPToolUnitTest extends UnitTest {
  private weatherService: any;
  private mockApiClient: MockedFunction<any>;
  
  async testWeatherTool(): Promise<void> {
    console.log('🌤️ 测试天气工具...');
    
    // 1. 准备测试数据
    const mockWeatherData = {
      location: '北京',
      temperature: 25,
      description: '晴天',
      humidity: 60
    };
    
    // 2. 创建Mock API客户端
    this.mockApiClient = this.createMock('apiClient', {
      get: mock.fn().mockResolvedValue({
        data: mockWeatherData,
        status: 200
      })
    });
    
    // 3. 注入依赖
    this.weatherService = new WeatherService(this.mockApiClient);
    
    // 4. 执行测试
    const result = await this.weatherService.getWeather('北京');
    
    // 5. 验证结果
    expect(result).toEqual(mockWeatherData);
    this.expectMockCalled('apiClient', 1);
    
    // 6. 验证API调用参数
    expect(this.mockApiClient.get).toHaveBeenCalledWith(
      '/weather',
      { params: { location: '北京' } }
    );
    
    console.log('✅ 天气工具单元测试通过');
  }
  
  async testWeatherToolError(): Promise<void> {
    console.log('🌩️ 测试天气工具错误处理...');
    
    // 1. Mock API错误
    this.mockApiClient = this.createMock('apiClient', {
      get: mock.fn().mockRejectedValue(new Error('API错误'))
    });
    
    this.weatherService = new WeatherService(this.mockApiClient);
    
    // 2. 验证错误处理
    await expect(this.weatherService.getWeather('北京'))
      .rejects
      .toThrow('API错误');
    
    console.log('✅ 错误处理测试通过');
  }
  
  async testParameterValidation(): Promise<void> {
    console.log('📋 测试参数验证...');
    
    this.mockApiClient = this.createMock('apiClient');
    this.weatherService = new WeatherService(this.mockApiClient);
    
    // 测试空参数
    await expect(this.weatherService.getWeather(''))
      .rejects
      .toThrow('地点不能为空');
    
    // 测试无效参数
    await expect(this.weatherService.getWeather(null))
      .rejects
      .toThrow('地点参数无效');
    
    console.log('✅ 参数验证测试通过');
  }
}

// 集成测试类
class IntegrationTest extends BaseTest {
  private testDatabase: any;
  private testServer: any;
  private testClient: any;
  
  constructor() {
    super({ level: 'integration', timeout: 10000, retries: 2 });
  }
  
  protected async beforeAllTests(): Promise<void> {
    console.log('🔗 启动集成测试环境...');
    
    // 启动测试数据库
    this.testDatabase = await this.startTestDatabase();
    
    // 启动测试服务器  
    this.testServer = await this.startTestServer();
    
    // 创建测试客户端
    this.testClient = this.createTestClient();
    
    // 等待服务就绪
    await this.waitForServicesReady();
  }
  
  protected async afterAllTests(): Promise<void> {
    console.log('🛑 关闭集成测试环境...');
    
    if (this.testClient) {
      await this.testClient.disconnect();
    }
    
    if (this.testServer) {
      await this.testServer.close();
    }
    
    if (this.testDatabase) {
      await this.testDatabase.close();
    }
  }
  
  private async startTestDatabase(): Promise<any> {
    // 启动内存数据库或测试数据库容器
    console.log('📊 启动测试数据库...');
    // 实现省略
    return {};
  }
  
  private async startTestServer(): Promise<any> {
    // 启动MCP服务器
    console.log('🚀 启动MCP测试服务器...');
    // 实现省略
    return {};
  }
  
  private createTestClient(): any {
    // 创建MCP客户端
    console.log('🔌 创建MCP测试客户端...');
    // 实现省略
    return {};
  }
  
  private async waitForServicesReady(): Promise<void> {
    console.log('⏳ 等待服务就绪...');
    // 健康检查逻辑
    await this.delay(2000);
  }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// MCP协议集成测试
class MCPProtocolIntegrationTest extends IntegrationTest {
  async testClientServerCommunication(): Promise<void> {
    console.log('🤝 测试客户端-服务器通信...');
    
    // 1. 测试连接建立
    const connected = await this.testClient.connect();
    expect(connected).toBe(true);
    
    // 2. 测试协议握手
    const handshake = await this.testClient.handshake({
      protocolVersion: '2024-11-05',
      capabilities: {
        roots: { listChanged: true },
        sampling: {}
      }
    });
    
    expect(handshake.protocolVersion).toBe('2024-11-05');
    expect(handshake.capabilities).toBeDefined();
    
    console.log('✅ 客户端-服务器通信测试通过');
  }
  
  async testToolExecution(): Promise<void> {
    console.log('🔧 测试工具执行...');
    
    // 1. 获取可用工具列表
    const tools = await this.testClient.listTools();
    expect(tools.tools).toBeInstanceOf(Array);
    expect(tools.tools.length).toBeGreaterThan(0);
    
    // 2. 执行工具
    const weatherTool = tools.tools.find(t => t.name === 'get_weather');
    expect(weatherTool).toBeDefined();
    
    const result = await this.testClient.callTool('get_weather', {
      location: '北京'
    });
    
    expect(result.isError).toBe(false);
    expect(result.content).toBeDefined();
    
    console.log('✅ 工具执行测试通过');
  }
  
  async testResourceAccess(): Promise<void> {
    console.log('📁 测试资源访问...');
    
    // 1. 列出根资源
    const roots = await this.testClient.listRoots();
    expect(roots.roots).toBeInstanceOf(Array);
    
    // 2. 读取资源
    if (roots.roots.length > 0) {
      const resource = await this.testClient.readResource(roots.roots[0].uri);
      expect(resource.contents).toBeDefined();
    }
    
    console.log('✅ 资源访问测试通过');
  }
  
  async testErrorHandling(): Promise<void> {
    console.log('⚠️ 测试错误处理...');
    
    // 1. 测试不存在的工具
    const result = await this.testClient.callTool('nonexistent_tool', {});
    expect(result.isError).toBe(true);
    expect(result.content[0].type).toBe('text');
    expect(result.content[0].text).toContain('工具不存在');
    
    // 2. 测试无效参数
    const result2 = await this.testClient.callTool('get_weather', {
      invalid_param: 'value'
    });
    expect(result2.isError).toBe(true);
    
    console.log('✅ 错误处理测试通过');
  }
}

// 端到端测试类
class EndToEndTest extends BaseTest {
  private browserDriver: any;
  private testUser: any;
  
  constructor() {
    super({ level: 'e2e', timeout: 30000, retries: 3 });
  }
  
  protected async beforeAllTests(): Promise<void> {
    console.log('🌐 启动端到端测试环境...');
    
    // 启动浏览器
    this.browserDriver = await this.startBrowser();
    
    // 创建测试用户
    this.testUser = await this.createTestUser();
    
    // 等待应用就绪
    await this.waitForApplicationReady();
  }
  
  protected async afterAllTests(): Promise<void> {
    console.log('🛑 关闭端到端测试环境...');
    
    if (this.testUser) {
      await this.cleanupTestUser();
    }
    
    if (this.browserDriver) {
      await this.browserDriver.quit();
    }
  }
  
  private async startBrowser(): Promise<any> {
    console.log('🌐 启动浏览器...');
    // 浏览器驱动初始化
    return {};
  }
  
  private async createTestUser(): Promise<any> {
    console.log('👤 创建测试用户...');
    return {
      username: 'test_user',
      email: 'test@example.com',
      password: 'test_password_123'
    };
  }
  
  private async waitForApplicationReady(): Promise<void> {
    console.log('⏳ 等待应用就绪...');
    await this.delay(3000);
  }
  
  private async cleanupTestUser(): Promise<void> {
    console.log('🧹 清理测试用户...');
    // 清理逻辑
  }
  
  private delay(ms: number): Promise<void> {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

// MCP应用端到端测试
class MCPApplicationE2ETest extends EndToEndTest {
  async testUserRegistrationFlow(): Promise<void> {
    console.log('📝 测试用户注册流程...');
    
    // 1. 打开注册页面
    await this.browserDriver.get('/register');
    
    // 2. 填写注册表单
    await this.browserDriver.findElement('#username').sendKeys(this.testUser.username);
    await this.browserDriver.findElement('#email').sendKeys(this.testUser.email);
    await this.browserDriver.findElement('#password').sendKeys(this.testUser.password);
    
    // 3. 提交表单
    await this.browserDriver.findElement('#register-btn').click();
    
    // 4. 验证注册成功
    const successMessage = await this.browserDriver.findElement('.success-message');
    expect(await successMessage.getText()).toContain('注册成功');
    
    console.log('✅ 用户注册流程测试通过');
  }
  
  async testMCPToolInteraction(): Promise<void> {
    console.log('🔧 测试MCP工具交互...');
    
    // 1. 登录
    await this.login();
    
    // 2. 打开工具页面
    await this.browserDriver.get('/tools');
    
    // 3. 选择天气工具
    await this.browserDriver.findElement('[data-tool="weather"]').click();
    
    // 4. 输入参数
    await this.browserDriver.findElement('#location-input').sendKeys('北京');
    
    // 5. 执行工具
    await this.browserDriver.findElement('#execute-tool').click();
    
    // 6. 等待结果
    await this.browserDriver.wait(
      () => this.browserDriver.findElement('.tool-result'),
      10000
    );
    
    // 7. 验证结果
    const result = await this.browserDriver.findElement('.tool-result');
    const resultText = await result.getText();
    expect(resultText).toContain('北京');
    expect(resultText).toContain('温度');
    
    console.log('✅ MCP工具交互测试通过');
  }
  
  private async login(): Promise<void> {
    await this.browserDriver.get('/login');
    await this.browserDriver.findElement('#username').sendKeys(this.testUser.username);
    await this.browserDriver.findElement('#password').sendKeys(this.testUser.password);
    await this.browserDriver.findElement('#login-btn').click();
    
    // 等待登录完成
    await this.browserDriver.wait(
      () => this.browserDriver.findElement('.dashboard'),
      5000
    );
  }
}

// 性能测试类
class PerformanceTest extends BaseTest {
  private metrics: PerformanceMetrics[] = [];
  
  constructor() {
    super({ level: 'integration', timeout: 60000, retries: 1 });
  }
  
  protected async beforeAllTests(): Promise<void> {
    console.log('⚡ 初始化性能测试环境...');
    // 性能监控初始化
  }
  
  protected async afterAllTests(): Promise<void> {
    console.log('📊 生成性能测试报告...');
    this.generatePerformanceReport();
  }
  
  async testConcurrentToolExecution(): Promise<void> {
    console.log('🔀 测试并发工具执行性能...');
    
    const concurrencyLevels = [1, 5, 10, 20, 50];
    
    for (const concurrency of concurrencyLevels) {
      console.log(`📈 测试并发度:${concurrency}`);
      
      const startTime = Date.now();
      const promises: Promise<any>[] = [];
      
      for (let i = 0; i < concurrency; i++) {
        promises.push(this.executeWeatherTool(`测试城市${i}`));
      }
      
      const results = await Promise.allSettled(promises);
      const endTime = Date.now();
      
      const successful = results.filter(r => r.status === 'fulfilled').length;
      const failed = results.filter(r => r.status === 'rejected').length;
      const totalTime = endTime - startTime;
      const avgResponseTime = totalTime / concurrency;
      const throughput = (successful / totalTime) * 1000; // 每秒请求数
      
      this.metrics.push({
        concurrency,
        totalRequests: concurrency,
        successfulRequests: successful,
        failedRequests: failed,
        totalTime,
        avgResponseTime,
        throughput,
        errorRate: failed / concurrency
      });
      
      console.log(`✅ 并发度${concurrency}:成功${successful},失败${failed},平均响应时间${avgResponseTime.toFixed(2)}ms,吞吐量${throughput.toFixed(2)}req/s`);
    }
  }
  
  private async executeWeatherTool(location: string): Promise<any> {
    // 模拟工具执行
    const delay = Math.random() * 1000 + 500; // 500-1500ms随机延迟
    await new Promise(resolve => setTimeout(resolve, delay));
    
    if (Math.random() < 0.05) { // 5%错误率
      throw new Error('模拟错误');
    }
    
    return { location, temperature: Math.floor(Math.random() * 40) };
  }
  
  private generatePerformanceReport(): void {
    console.log('\n📊 性能测试报告:');
    console.log('-'.repeat(80));
    console.log('并发度 | 总请求 | 成功 | 失败 | 总时间(ms) | 平均响应(ms) | 吞吐量(req/s) | 错误率');
    console.log('-'.repeat(80));
    
    for (const metric of this.metrics) {
      console.log(
        `${metric.concurrency.toString().padStart(6)} | ` +
        `${metric.totalRequests.toString().padStart(6)} | ` +
        `${metric.successfulRequests.toString().padStart(4)} | ` +
        `${metric.failedRequests.toString().padStart(4)} | ` +
        `${metric.totalTime.toString().padStart(10)} | ` +
        `${metric.avgResponseTime.toFixed(2).padStart(12)} | ` +
        `${metric.throughput.toFixed(2).padStart(12)} | ` +
        `${(metric.errorRate * 100).toFixed(2).padStart(6)}%`
      );
    }
    
    console.log('-'.repeat(80));
  }
}

// 测试运行器
class TestRunner {
  private testSuites: BaseTest[] = [];
  
  addTestSuite(testSuite: BaseTest): void {
    this.testSuites.push(testSuite);
  }
  
  async runAllTests(): Promise<TestSummary> {
    console.log('🚀 开始运行测试套件...\n');
    
    const summary: TestSummary = {
      totalSuites: this.testSuites.length,
      passedSuites: 0,
      failedSuites: 0,
      totalTests: 0,
      passedTests: 0,
      failedTests: 0,
      totalDuration: 0,
      suiteResults: []
    };
    
    const startTime = Date.now();
    
    for (const suite of this.testSuites) {
      try {
        console.log(`\n📋 运行测试套件:${suite.constructor.name}`);
        console.log('='.repeat(50));
        
        await suite.setup();
        
        // 运行测试套件中的所有测试方法
        const testMethods = this.getTestMethods(suite);
        
        for (const methodName of testMethods) {
          await suite.runTest(
            () => (suite as any)[methodName](),
            methodName
          );
        }
        
        await suite.teardown();
        
        const stats = suite.getTestStatistics();
        summary.suiteResults.push({
          suiteName: suite.constructor.name,
          ...stats
        });
        
        summary.totalTests += stats.total;
        summary.passedTests += stats.passed;
        summary.failedTests += stats.failed;
        
        if (stats.failed === 0) {
          summary.passedSuites++;
          console.log(`✅ 测试套件通过:${suite.constructor.name}`);
        } else {
          summary.failedSuites++;
          console.log(`❌ 测试套件失败:${suite.constructor.name}`);
        }
        
      } catch (error) {
        summary.failedSuites++;
        console.error(`💥 测试套件异常:${suite.constructor.name}`, error);
      }
    }
    
    summary.totalDuration = Date.now() - startTime;
    
    this.printTestSummary(summary);
    
    return summary;
  }
  
  private getTestMethods(suite: BaseTest): string[] {
    // 获取所有以'test'开头的方法名
    const proto = Object.getPrototypeOf(suite);
    const methods = Object.getOwnPropertyNames(proto);
    
    return methods.filter(method => 
      method.startsWith('test') && 
      typeof (suite as any)[method] === 'function'
    );
  }
  
  private printTestSummary(summary: TestSummary): void {
    console.log('\n🏁 测试总结');
    console.log('='.repeat(60));
    console.log(`📊 测试套件:${summary.totalSuites}个,通过:${summary.passedSuites}个,失败:${summary.failedSuites}个`);
    console.log(`🧪 测试用例:${summary.totalTests}个,通过:${summary.passedTests}个,失败:${summary.failedTests}个`);
    console.log(`⏱️ 总耗时:${summary.totalDuration}ms`);
    console.log(`📈 通过率:${((summary.passedTests / Math.max(1, summary.totalTests)) * 100).toFixed(2)}%`);
    
    if (summary.failedTests > 0) {
      console.log('\n❌ 失败的测试套件:');
      summary.suiteResults
        .filter(result => result.failed > 0)
        .forEach(result => {
          console.log(`  - ${result.suiteName}: ${result.failed}个失败`);
        });
    }
    
    console.log('='.repeat(60));
  }
}

// 接口定义
interface TestStatistics {
  total: number;
  passed: number;
  failed: number;
  passRate: number;
  totalDuration: number;
  avgTestDuration: number;
  setupTime: number;
  teardownTime: number;
}

interface TestSummary {
  totalSuites: number;
  passedSuites: number;
  failedSuites: number;
  totalTests: number;
  passedTests: number;
  failedTests: number;
  totalDuration: number;
  suiteResults: (TestStatistics & { suiteName: string })[];
}

interface PerformanceMetrics {
  concurrency: number;
  totalRequests: number;
  successfulRequests: number;
  failedRequests: number;
  totalTime: number;
  avgResponseTime: number;
  throughput: number;
  errorRate: number;
}

// 模拟的WeatherService类(用于测试)
class WeatherService {
  constructor(private apiClient: any) {}
  
  async getWeather(location: string): Promise<any> {
    if (!location) {
      throw new Error('地点不能为空');
    }
    
    if (location === null || location === undefined) {
      throw new Error('地点参数无效');
    }
    
    const response = await this.apiClient.get('/weather', {
      params: { location }
    });
    
    return response.data;
  }
}

// 使用示例
async function runTestSuite() {
  const runner = new TestRunner();
  
  // 添加各种测试套件
  runner.addTestSuite(new MCPToolUnitTest());
  runner.addTestSuite(new MCPProtocolIntegrationTest());
  runner.addTestSuite(new MCPApplicationE2ETest());
  runner.addTestSuite(new PerformanceTest());
  
  // 运行所有测试
  const summary = await runner.runAllTests();
  
  // 根据测试结果设置退出码
  process.exit(summary.failedTests > 0 ? 1 : 0);
}

// 导出测试类
export {
  BaseTest,
  UnitTest,
  IntegrationTest,
  EndToEndTest,
  PerformanceTest,
  TestRunner,
  MCPToolUnitTest,
  MCPProtocolIntegrationTest,
  MCPApplicationE2ETest
};

// 如果直接运行此文件,执行测试
if (require.main === module) {
  runTestSuite().catch(console.error);
}

测试驱动开发:红绿重构循环 🔄

TDD实践框架

python
# 🌟 测试驱动开发框架
import unittest
import asyncio
from typing import Any, Dict, List, Optional, Callable
from dataclasses import dataclass
from abc import ABC, abstractmethod
import time
import json

class TDDPhase:
    """TDD阶段枚举"""
    RED = "red"      # 红:写失败测试
    GREEN = "green"  # 绿:写最少代码让测试通过
    REFACTOR = "refactor"  # 重构:优化代码

@dataclass
class TDDCycle:
    """TDD循环记录"""
    feature_name: str
    phase: str
    start_time: float
    end_time: Optional[float] = None
    test_count: int = 0
    line_count: int = 0
    complexity_score: int = 0

class TDDTracker:
    """TDD过程跟踪器"""
    
    def __init__(self):
        self.cycles: List[TDDCycle] = []
        self.current_cycle: Optional[TDDCycle] = None
        self.metrics = {
            'red_time': 0,
            'green_time': 0, 
            'refactor_time': 0,
            'total_cycles': 0,
            'avg_cycle_time': 0
        }
    
    def start_cycle(self, feature_name: str, phase: str) -> None:
        """开始新的TDD循环"""
        if self.current_cycle and not self.current_cycle.end_time:
            self.end_cycle()
        
        self.current_cycle = TDDCycle(
            feature_name=feature_name,
            phase=phase,
            start_time=time.time()
        )
        
        print(f"🔄 开始TDD循环:{feature_name} - {phase.upper()}阶段")
    
    def end_cycle(self) -> None:
        """结束当前TDD循环"""
        if not self.current_cycle:
            return
        
        self.current_cycle.end_time = time.time()
        duration = self.current_cycle.end_time - self.current_cycle.start_time
        
        # 更新指标
        phase_key = f"{self.current_cycle.phase}_time"
        if phase_key in self.metrics:
            self.metrics[phase_key] += duration
        
        self.cycles.append(self.current_cycle)
        
        print(f"✅ 完成TDD循环:{self.current_cycle.feature_name} - "
              f"{self.current_cycle.phase.upper()}阶段 "
              f"({duration:.2f}秒)")
        
        self.current_cycle = None
    
    def get_tdd_metrics(self) -> Dict[str, Any]:
        """获取TDD指标"""
        if not self.cycles:
            return self.metrics
        
        total_time = sum(
            cycle.end_time - cycle.start_time 
            for cycle in self.cycles 
            if cycle.end_time
        )
        
        self.metrics.update({
            'total_cycles': len(self.cycles),
            'avg_cycle_time': total_time / len(self.cycles) if self.cycles else 0,
            'red_percentage': (self.metrics['red_time'] / total_time * 100) if total_time > 0 else 0,
            'green_percentage': (self.metrics['green_time'] / total_time * 100) if total_time > 0 else 0,
            'refactor_percentage': (self.metrics['refactor_time'] / total_time * 100) if total_time > 0 else 0
        })
        
        return self.metrics

class TDDTestCase(unittest.TestCase):
    """TDD测试基类"""
    
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.tdd_tracker = TDDTracker()
    
    def red_phase(self, feature_name: str, test_function: Callable):
        """红阶段:写失败测试"""
        self.tdd_tracker.start_cycle(feature_name, TDDPhase.RED)
        
        print(f"🔴 RED阶段:为 '{feature_name}' 编写失败测试")
        
        # 执行测试,期望失败
        with self.assertRaises((AssertionError, NotImplementedError, AttributeError)):
            test_function()
        
        print("✅ 测试按预期失败")
        self.tdd_tracker.end_cycle()
    
    def green_phase(self, feature_name: str, implementation_function: Callable, test_function: Callable):
        """绿阶段:写最少代码让测试通过"""
        self.tdd_tracker.start_cycle(feature_name, TDDPhase.GREEN)
        
        print(f"🟢 GREEN阶段:为 '{feature_name}' 实现最少代码")
        
        # 执行实现
        implementation_function()
        
        # 运行测试,期望通过
        try:
            test_function()
            print("✅ 测试通过")
        except Exception as e:
            print(f"❌ 测试仍然失败:{e}")
            raise
        
        self.tdd_tracker.end_cycle()
    
    def refactor_phase(self, feature_name: str, refactor_function: Callable, test_function: Callable):
        """重构阶段:优化代码"""
        self.tdd_tracker.start_cycle(feature_name, TDDPhase.REFACTOR)
        
        print(f"🔧 REFACTOR阶段:重构 '{feature_name}'")
        
        # 执行重构
        refactor_function()
        
        # 确保测试仍然通过
        try:
            test_function()
            print("✅ 重构后测试仍然通过")
        except Exception as e:
            print(f"❌ 重构破坏了测试:{e}")
            raise
        
        self.tdd_tracker.end_cycle()
    
    def complete_tdd_cycle(
        self, 
        feature_name: str,
        test_function: Callable,
        implementation_function: Callable,
        refactor_function: Optional[Callable] = None
    ):
        """完整的TDD循环"""
        print(f"\n🎯 开始完整TDD循环:{feature_name}")
        print("=" * 60)
        
        # 红阶段
        self.red_phase(feature_name, test_function)
        
        # 绿阶段
        self.green_phase(feature_name, implementation_function, test_function)
        
        # 重构阶段(可选)
        if refactor_function:
            self.refactor_phase(feature_name, refactor_function, test_function)
        
        print(f"🎉 完成TDD循环:{feature_name}")
        print("=" * 60)

# MCP天气工具TDD示例
class WeatherToolTDD(TDDTestCase):
    
    def setUp(self):
        """测试设置"""
        self.weather_tool = None
        self.mock_api_response = {
            'temperature': 25,
            'description': '晴天',
            'humidity': 60,
            'location': '北京'
        }
    
    def test_weather_tool_complete_tdd(self):
        """完整的天气工具TDD开发"""
        
        # 定义测试函数
        def test_get_weather():
            """测试获取天气功能"""
            result = self.weather_tool.get_weather('北京')
            self.assertIsNotNone(result)
            self.assertIn('temperature', result)
            self.assertIn('description', result)
            self.assertEqual(result['location'], '北京')
        
        # 定义实现函数
        def implement_weather_tool():
            """实现天气工具最基本功能"""
            class BasicWeatherTool:
                def get_weather(self, location):
                    # 最简单的实现,返回硬编码数据
                    return {
                        'temperature': 25,
                        'description': '晴天',
                        'humidity': 60,
                        'location': location
                    }
            
            self.weather_tool = BasicWeatherTool()
        
        # 定义重构函数
        def refactor_weather_tool():
            """重构天气工具,添加错误处理和验证"""
            class ImprovedWeatherTool:
                def __init__(self):
                    self.cache = {}
                
                def get_weather(self, location):
                    # 参数验证
                    if not location or not location.strip():
                        raise ValueError("地点不能为空")
                    
                    location = location.strip()
                    
                    # 简单缓存
                    if location in self.cache:
                        return self.cache[location]
                    
                    # 模拟API调用
                    result = {
                        'temperature': 25,
                        'description': '晴天',
                        'humidity': 60,
                        'location': location,
                        'timestamp': time.time()
                    }
                    
                    self.cache[location] = result
                    return result
            
            self.weather_tool = ImprovedWeatherTool()
        
        # 执行完整TDD循环
        self.complete_tdd_cycle(
            "获取天气信息",
            test_get_weather,
            implement_weather_tool,
            refactor_weather_tool
        )
    
    def test_weather_tool_error_handling_tdd(self):
        """天气工具错误处理TDD"""
        
        def test_empty_location():
            """测试空地点参数"""
            with self.assertRaises(ValueError):
                self.weather_tool.get_weather('')
            
            with self.assertRaises(ValueError):
                self.weather_tool.get_weather(None)
        
        def implement_error_handling():
            """实现错误处理"""
            if hasattr(self.weather_tool, 'get_weather'):
                # 已有实现,需要修改
                original_method = self.weather_tool.get_weather
                
                def get_weather_with_validation(location):
                    if not location or not str(location).strip():
                        raise ValueError("地点不能为空")
                    return original_method(location)
                
                self.weather_tool.get_weather = get_weather_with_validation
        
        # 先实现基本功能(如果还没有)
        if not self.weather_tool:
            class BasicWeatherTool:
                def get_weather(self, location):
                    return {'temperature': 25, 'location': location}
            self.weather_tool = BasicWeatherTool()
        
        # 执行错误处理的TDD循环
        self.complete_tdd_cycle(
            "天气工具错误处理",
            test_empty_location,
            implement_error_handling
        )
    
    def tearDown(self):
        """测试清理"""
        if hasattr(self, 'tdd_tracker'):
            metrics = self.tdd_tracker.get_tdd_metrics()
            print(f"\n📊 TDD指标:")
            print(f"  总循环数:{metrics['total_cycles']}")
            print(f"  平均循环时间:{metrics['avg_cycle_time']:.2f}秒")
            print(f"  RED阶段占比:{metrics['red_percentage']:.1f}%")
            print(f"  GREEN阶段占比:{metrics['green_percentage']:.1f}%")
            print(f"  REFACTOR阶段占比:{metrics['refactor_percentage']:.1f}%")

# 行为驱动开发(BDD)支持
class BDDScenario:
    """BDD场景类"""
    
    def __init__(self, title: str):
        self.title = title
        self.given_steps: List[str] = []
        self.when_steps: List[str] = []
        self.then_steps: List[str] = []
        self.context: Dict[str, Any] = {}
    
    def given(self, step: str, action: Optional[Callable] = None):
        """Given步骤"""
        self.given_steps.append(step)
        if action:
            action(self.context)
        return self
    
    def when(self, step: str, action: Optional[Callable] = None):
        """When步骤"""
        self.when_steps.append(step)
        if action:
            action(self.context)
        return self
    
    def then(self, step: str, assertion: Optional[Callable] = None):
        """Then步骤"""
        self.then_steps.append(step)
        if assertion:
            assertion(self.context)
        return self
    
    def execute(self):
        """执行BDD场景"""
        print(f"\n🎭 执行BDD场景:{self.title}")
        print("-" * 50)
        
        for step in self.given_steps:
            print(f"📋 Given: {step}")
        
        for step in self.when_steps:
            print(f"🎬 When: {step}")
        
        for step in self.then_steps:
            print(f"✅ Then: {step}")
        
        print("🎉 场景执行完成")

# BDD测试示例
class MCPToolBDDTest(unittest.TestCase):
    
    def test_weather_tool_bdd_scenario(self):
        """使用BDD风格测试天气工具"""
        
        scenario = BDDScenario("用户查询北京天气")
        
        # 执行场景
        scenario.given(
            "用户想要查询天气信息",
            lambda ctx: ctx.update({'user': 'test_user', 'tool': 'weather'})
        ).given(
            "天气服务正常运行",
            lambda ctx: ctx.update({'service_status': 'running'})
        ).when(
            "用户输入地点'北京'",
            lambda ctx: ctx.update({
                'input_location': '北京',
                'result': self.simulate_weather_query(ctx['input_location'])
            })
        ).then(
            "系统返回北京的天气信息",
            lambda ctx: self.assertIn('temperature', ctx['result'])
        ).then(
            "响应包含温度、描述和湿度",
            lambda ctx: self.assertTrue(
                all(key in ctx['result'] for key in ['temperature', 'description', 'humidity'])
            )
        ).then(
            "地点信息正确",
            lambda ctx: self.assertEqual(ctx['result']['location'], '北京')
        )
        
        scenario.execute()
    
    def simulate_weather_query(self, location: str) -> Dict[str, Any]:
        """模拟天气查询"""
        return {
            'temperature': 25,
            'description': '晴天',
            'humidity': 60,
            'location': location,
            'timestamp': time.time()
        }

# 测试覆盖率分析
class CoverageAnalyzer:
    """测试覆盖率分析器"""
    
    def __init__(self):
        self.coverage_data = {}
        self.executed_lines = set()
        self.total_lines = set()
    
    def track_execution(self, file_path: str, line_number: int):
        """跟踪代码执行"""
        self.executed_lines.add((file_path, line_number))
        self.total_lines.add((file_path, line_number))
    
    def calculate_coverage(self) -> Dict[str, float]:
        """计算覆盖率"""
        if not self.total_lines:
            return {'line_coverage': 0.0}
        
        line_coverage = len(self.executed_lines) / len(self.total_lines)
        
        return {
            'line_coverage': line_coverage,
            'executed_lines': len(self.executed_lines),
            'total_lines': len(self.total_lines),
            'uncovered_lines': len(self.total_lines) - len(self.executed_lines)
        }
    
    def generate_coverage_report(self) -> str:
        """生成覆盖率报告"""
        coverage = self.calculate_coverage()
        
        report = f"""
📊 测试覆盖率报告
==================
行覆盖率:{coverage['line_coverage']:.2%}
已执行行数:{coverage['executed_lines']}
总行数:{coverage['total_lines']}
未覆盖行数:{coverage['uncovered_lines']}
        """
        
        return report.strip()

# 运行所有TDD测试
if __name__ == '__main__':
    print("🧪 开始TDD测试...")
    
    # 创建测试套件
    loader = unittest.TestLoader()
    suite = unittest.TestSuite()
    
    # 添加测试类
    suite.addTests(loader.loadTestsFromTestCase(WeatherToolTDD))
    suite.addTests(loader.loadTestsFromTestCase(MCPToolBDDTest))
    
    # 运行测试
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)
    
    # 输出结果
    print(f"\n🏁 测试完成!")
    print(f"运行:{result.testsRun}个测试")
    print(f"失败:{len(result.failures)}个")
    print(f"错误:{len(result.errors)}个")
    
    if result.wasSuccessful():
        print("🎉 所有测试通过!")
    else:
        print("❌ 部分测试失败")
        for test, error in result.failures + result.errors:
            print(f"  - {test}: {error}")

小结

测试策略的核心要素:

🧪 测试层次

  1. 单元测试 - 验证单个组件功能
  2. 集成测试 - 验证组件间协作
  3. 端到端测试 - 验证完整用户流程
  4. 性能测试 - 验证系统性能指标
  5. 安全测试 - 验证安全防护措施

💡 测试实践

  • 遵循测试金字塔原则,底层测试多,顶层测试少
  • 采用TDD/BDD方法论驱动开发
  • 建立自动化测试流水线
  • 追求高质量的测试覆盖率
  • 定期重构和维护测试代码

🔬 测试哲学:测试不是质量的保证,而是质量的发现者。好的测试让你对代码充满信心。


下一节自动化测试实战 - 构建高效的自动化测试体系