package goroutine_pool import ( "fmt" "strconv" "strings" "sync" "sync/atomic" "testing" "time" ) // ==================== 测试辅助函数 ==================== // 测试任务函数1: 字符串处理 func testStringTask(param string) (string, error) { time.Sleep(10 * time.Millisecond) // 模拟处理时间 return "Processed: " + param, nil } // 测试任务函数2: 数值计算 func testNumberTask(param string) (string, error) { n, err := strconv.Atoi(param) if err != nil { return "", fmt.Errorf("invalid number: %s", param) } time.Sleep(5 * time.Millisecond) // 模拟计算时间 return strconv.Itoa(n * n), nil } // 测试任务函数3: 错误测试 func testErrorTask(param string) (string, error) { time.Sleep(3 * time.Millisecond) return "", fmt.Errorf("simulated error for: %s", param) } // 测试任务函数4: 长时任务 func testLongTask(param string) (string, error) { time.Sleep(100 * time.Millisecond) return "Long task completed: " + param, nil } // 测试任务函数5: 内存密集型 func testMemoryIntensiveTask(param string) (string, error) { // 分配一些内存 data := make([]byte, 1024*1024) // 1MB for i := range data { data[i] = byte(i % 256) } time.Sleep(20 * time.Millisecond) return fmt.Sprintf("Allocated %d bytes", len(data)), nil } // ==================== 完整功能测试 ==================== // TestFullFunctionality 测试完整功能 func TestFullFunctionality(t *testing.T) { fmt.Println("========== 开始完整功能测试 ==========") // 1. 初始化池 fmt.Println("\n1. 初始化协程池...") err := Initialize(3, 10, `{ "minWorkers": 3, "maxWorkers": 10, "queueSize": 50, "taskTimeoutMs": 30000, "workerIdleTimeoutMs": 60000, "memoryLimitMB": 512, "shutdownTimeoutMs": 10000, "enableMetrics": true, "enableAutoScaling": true, "priorityQueue": true }`) if err != nil { t.Fatalf("初始化失败: %v", err) } fmt.Println("✓ 池初始化成功") // 等待池完全启动 time.Sleep(100 * time.Millisecond) // 2. 注册任务函数 fmt.Println("\n2. 注册任务函数...") RegisterFunction("string_task", testStringTask) RegisterFunction("number_task", testNumberTask) RegisterFunction("error_task", testErrorTask) RegisterFunction("long_task", testLongTask) RegisterFunction("memory_task", testMemoryIntensiveTask) fmt.Println("✓ 任务函数注册完成") // 3. 基础任务提交测试 fmt.Println("\n3. 基础任务提交测试...") testBasicTasks(t) //// 4. 并发测试 //fmt.Println("\n4. 并发任务测试...") //testConcurrentTasks(t) // //// 5. 工作者控制测试 //fmt.Println("\n5. 工作者控制测试...") //testWorkerControl(t) //// 6. 池控制测试 //fmt.Println("\n6. 池控制测试...") //testPoolControl(t) //// 7. 优先级测试 //fmt.Println("\n7. 优先级任务测试...") //testPriorityTasks(t) //// 8. 错误处理测试 //fmt.Println("\n8. 错误处理测试...") //testErrorHandling(t) ////9. 内存和性能测试 //fmt.Println("\n9. 内存和性能测试...") //testMemoryAndPerformance(t) // 10. 优雅关闭测试 fmt.Println("\n10. 优雅关闭测试...") testGracefulShutdown(t) // //// 11. 重新初始化测试 //fmt.Println("\n11. 重新初始化测试...") //testReinitialization(t) fmt.Println("\n========== 所有测试完成 ==========") // 确保最终清理 if IsInitialized() { Shutdown() } } // testBasicTasks 测试基础任务功能 func testBasicTasks(t *testing.T) { // 提交单个任务 taskID, err := SubmitTask(0, "string_task", "Hello World", PriorityNormal) if err != nil { t.Errorf("提交任务失败: %v", err) } else { fmt.Printf(" ✓ 任务提交成功: ID=%d\n", taskID) } // 等待任务完成 time.Sleep(50 * time.Millisecond) // 获取任务信息 info, err := GetTaskInfo(taskID) if err != nil { t.Errorf("获取任务信息失败: %v", err) } else { fmt.Printf(" 任务信息: ID=%v, 状态=%v\n", info["id"], info["status"]) } // 批量提交任务 var wg sync.WaitGroup for i := 1; i <= 5; i++ { wg.Add(1) go func(index int) { defer wg.Done() taskID, err := SubmitTask(0, "number_task", strconv.Itoa(index), PriorityNormal) if err != nil { fmt.Printf(" 批量任务%d提交失败: %v\n", index, err) } else { fmt.Printf(" ✓ 批量任务%d提交成功: ID=%d\n", index, taskID) } }(i) } wg.Wait() // 等待所有任务完成 time.Sleep(200 * time.Millisecond) // 获取统计信息 stats, err := GetPoolStats() if err != nil { t.Errorf("获取统计信息失败: %v", err) } else { fmt.Printf(" 池统计: 总任务=%v, 成功=%v, 失败=%v, 排队=%v\n", stats["totalTasks"], stats["successTasks"], stats["failedTasks"], stats["queuedTasks"]) } } // testConcurrentTasks 测试并发任务 func testConcurrentTasks(t *testing.T) { const concurrentCount = 50 var completed int32 var errors int32 var wg sync.WaitGroup startTime := time.Now() // 提交大量并发任务 for i := 0; i < concurrentCount; i++ { wg.Add(1) go func(taskNum int) { defer wg.Done() taskID, err := SubmitTask(0, "number_task", strconv.Itoa(taskNum), PriorityNormal) if err != nil { atomic.AddInt32(&errors, 1) fmt.Printf(" 并发任务%d提交失败: %v\n", taskNum, err) return } // 等待任务完成 for retry := 0; retry < 10; retry++ { info, err := GetTaskInfo(taskID) if err == nil { status := info["status"].(int32) if status == TaskStatusCompleted || status == TaskStatusFailed { atomic.AddInt32(&completed, 1) break } } time.Sleep(50 * time.Millisecond) } }(i) } wg.Wait() duration := time.Since(startTime) fmt.Printf(" ✓ 并发测试完成: 总数=%d, 完成=%d, 错误=%d, 耗时=%v\n", concurrentCount, completed, errors, duration) fmt.Printf(" 平均吞吐量: %.2f 任务/秒\n", float64(completed)/duration.Seconds()) } // testWorkerControl 测试工作者控制 func testWorkerControl(t *testing.T) { fmt.Println(" 创建新工作者...") workerID, err := CreateWorker() if err != nil { t.Errorf("创建工作者失败: %v", err) return } fmt.Printf(" ✓ 工作者创建成功: ID=%d\n", workerID) // 等待工作者完全启动 time.Sleep(500 * time.Millisecond) // 先获取状态 status, err := GetWorkerStatus(workerID) if err != nil { t.Errorf("获取工作者状态失败: %v", err) return } fmt.Printf(" ✓ 初始状态: %v\n", status) // 暂停工作者 fmt.Println(" 暂停工作者...") err = PauseWorker(workerID) if err != nil { // 如果是"worker not paused"错误,可能是因为状态已经是暂停 if strings.Contains(err.Error(), "not paused") { fmt.Println(" ⚠ 工作者可能已经暂停") } else { t.Errorf("暂停工作者失败: %v", err) return } } else { fmt.Println(" ✓ 工作者已暂停") } // 再次检查状态 time.Sleep(200 * time.Millisecond) status, err = GetWorkerStatus(workerID) if err != nil { t.Errorf("获取工作者状态失败: %v", err) return } fmt.Printf(" ✓ 暂停后状态: %v\n", status) // 恢复工作者 fmt.Println(" 恢复工作者...") err = ResumeWorker(workerID) if err != nil { // 如果是"already running"错误,可以接受 if strings.Contains(err.Error(), "already running") { fmt.Println(" ⚠ 工作者已经在运行状态") } else { t.Errorf("恢复工作者失败: %v", err) return } } else { fmt.Println(" ✓ 工作者已恢复") } // 等待恢复完成 time.Sleep(200 * time.Millisecond) status, err = GetWorkerStatus(workerID) if err != nil { t.Errorf("获取工作者状态失败: %v", err) return } fmt.Printf(" ✓ 恢复后状态: %v\n", status) // 停止工作者 fmt.Println(" 停止工作者...") err = StopWorker(workerID) if err != nil { // 检查是否为可接受的错误 acceptableErrors := []string{ "worker not found", "worker is stopped", "worker is already stopped", "worker context cancelled", "timeout", "control channel", "no response", } isAcceptable := false errStr := strings.ToLower(err.Error()) for _, acceptable := range acceptableErrors { if strings.Contains(errStr, acceptable) { isAcceptable = true break } } if !isAcceptable { t.Errorf("停止工作者失败: %v", err) return } fmt.Printf(" ⚠ 工作者停止(可能已成功): %v\n", err) } else { fmt.Println(" ✓ 工作者已停止") } // 等待停止完成 time.Sleep(500 * time.Millisecond) // 验证工作者确实已停止 status, err = GetWorkerStatus(workerID) if err != nil { // 如果工作者不存在了,也是正常情况 if strings.Contains(err.Error(), "worker not found") { fmt.Println(" ✓ 工作者已从池中移除") } else { t.Errorf("获取工作者状态失败: %v", err) } } else { // 如果工作者还存在,检查状态 // 注意:status 已经是 map[string]interface{} 类型,不需要类型断言 if s, ok := status["status"].(float64); ok && int(s) == WorkerStatusStopped { fmt.Println(" ✓ 工作者状态为已停止") } else { fmt.Printf(" ⚠ 工作者状态为: %v\n", status["status"]) } } } // testPoolControl 测试池控制 func testPoolControl(t *testing.T) { // 暂停所有工作者 fmt.Println(" 暂停所有工作者...") count, err := PauseAllWorkers() if err != nil { t.Errorf("暂停所有工作者失败: %v", err) } else { fmt.Printf(" ✓ 已暂停 %d 个工作者\n", count) } // 等待确保所有工作者都已暂停 time.Sleep(100 * time.Millisecond) // 检查工作者状态 for i := 1; i <= 3; i++ { status, err := GetWorkerStatus(i) if err == nil { if s, ok := status["status"].(float64); ok && int(s) == WorkerStatusPaused { fmt.Printf(" ✓ 工作者 %d 已暂停\n", i) } else { fmt.Printf(" ! 工作者 %d 状态异常: %v\n", i, status) } } } // 提交任务应该失败 taskID, err := SubmitTask(0, "string_task", "During Pause", PriorityNormal) if err != nil { fmt.Printf(" ✓ 提交任务失败(预期): %v\n", err) } else { t.Errorf("暂停状态下任务不应提交成功,但得到了任务ID: %d", taskID) } // 恢复所有工作者 fmt.Println(" 恢复所有工作者...") count, err = ResumeAllWorkers() if err != nil { t.Errorf("恢复所有工作者失败: %v", err) } else { fmt.Printf(" ✓ 已恢复 %d 个工作者\n", count) } // 等待工作者完全恢复 time.Sleep(100 * time.Millisecond) // 检查工作者是否恢复 for i := 1; i <= 3; i++ { status, err := GetWorkerStatus(i) if err == nil { if s, ok := status["status"].(float64); ok && (int(s) == WorkerStatusRunning || int(s) == WorkerStatusIdle) { fmt.Printf(" ✓ 工作者 %d 已恢复\n", i) } else { fmt.Printf(" ! 工作者 %d 未正确恢复,状态: %v\n", i, status) } } } // 现在工作者恢复了,提交一个新任务 taskID, err = SubmitTask(0, "string_task", "After Resume", PriorityNormal) if err != nil { t.Errorf("恢复后提交任务失败: %v", err) } else { fmt.Printf(" ✓ 恢复后任务已提交: ID=%d\n", taskID) // 等待任务执行 time.Sleep(500 * time.Millisecond) info, err := GetTaskInfo(taskID) if err != nil { fmt.Printf(" ! 获取任务信息失败: %v\n", err) } else if info != nil { if status, ok := info["status"].(int32); ok { if status == TaskStatusCompleted { fmt.Println(" ✓ 任务已执行完成") } else if status == TaskStatusRunning { fmt.Println(" ! 任务仍在运行中") } else if status == TaskStatusPending { fmt.Println(" ! 任务仍在排队") } else { fmt.Printf(" ! 任务状态异常: %d\n", status) } } } } } // testPriorityTasks 测试优先级任务 func testPriorityTasks(t *testing.T) { fmt.Println(" 提交不同优先级的任务...") // 提交低优先级任务 for i := 1; i <= 3; i++ { param := fmt.Sprintf("Low Priority %d", i) taskID, err := SubmitTask(0, "string_task", param, PriorityLow) if err != nil { t.Errorf("低优先级任务%d提交失败: %v", i, err) } else { fmt.Printf(" ✓ 低优先级任务%d: ID=%d\n", i, taskID) } } // 提交高优先级任务 for i := 1; i <= 2; i++ { param := fmt.Sprintf("High Priority %d", i) taskID, err := SubmitTask(0, "string_task", param, PriorityHigh) if err != nil { t.Errorf("高优先级任务%d提交失败: %v", i, err) } else { fmt.Printf(" ✓ 高优先级任务%d: ID=%d\n", i, taskID) } } // 提交紧急优先级任务 urgentTaskID, err := SubmitTask(0, "string_task", "Urgent Task", PriorityUrgent) if err != nil { t.Errorf("紧急优先级任务提交失败: %v", err) } else { fmt.Printf(" ✓ 紧急优先级任务: ID=%d\n", urgentTaskID) } time.Sleep(300 * time.Millisecond) fmt.Println(" 优先级验证完成(需检查执行时间戳)") } // testErrorHandling 测试错误处理 func testErrorHandling(t *testing.T) { // 提交会失败的任务 taskID, err := SubmitTask(0, "error_task", "Test Error", PriorityNormal) if err != nil { t.Errorf("错误任务提交失败: %v", err) } else { fmt.Printf(" ✓ 错误任务已提交: ID=%d\n", taskID) } time.Sleep(50 * time.Millisecond) // 检查错误任务状态 info, err := GetTaskInfo(taskID) if err != nil { t.Errorf("获取错误任务信息失败: %v", err) } else { status := info["status"].(int32) if status == TaskStatusFailed { fmt.Println(" ✓ 任务失败处理正确") } } // 测试取消任务 taskID, err = SubmitTask(0, "long_task", "To be cancelled", PriorityNormal) if err != nil { t.Errorf("可取消任务提交失败: %v", err) } else { fmt.Printf(" ✓ 可取消任务已提交: ID=%d\n", taskID) } // 立即取消 err = CancelTask(taskID) if err != nil { t.Errorf("取消任务失败: %v", err) } else { fmt.Println(" ✓ 任务取消请求已发送") } time.Sleep(100 * time.Millisecond) // 检查取消状态 info, err = GetTaskInfo(taskID) if err == nil { status := info["status"].(int32) if status == TaskStatusCancelled { fmt.Println(" ✓ 任务已成功取消") } } } // testMemoryAndPerformance 测试内存和性能 func testMemoryAndPerformance(t *testing.T) { fmt.Println(" 开始内存密集型任务测试...") startTime := time.Now() const memoryTasks = 20 var wg sync.WaitGroup for i := 0; i < memoryTasks; i++ { wg.Add(1) go func(index int) { defer wg.Done() taskID, err := SubmitTask(0, "memory_task", fmt.Sprintf("Memory Test %d", index), PriorityNormal) if err != nil { fmt.Printf(" 内存任务%d提交失败: %v\n", index, err) } else { // 等待完成 for retry := 0; retry < 20; retry++ { info, err := GetTaskInfo(taskID) if err == nil && (info["status"].(int32) == TaskStatusCompleted || info["status"].(int32) == TaskStatusFailed) { break } time.Sleep(50 * time.Millisecond) } } }(i) } wg.Wait() duration := time.Since(startTime) fmt.Printf(" ✓ 内存测试完成: 耗时=%v\n", duration) } // testGracefulShutdown 测试优雅关闭 func testGracefulShutdown(t *testing.T) { // 提交一些任务 fmt.Println(" 提交一些任务以测试优雅关闭...") taskIDs := make([]int64, 0, 5) for i := 1; i <= 5; i++ { taskID, err := SubmitTask(0, "string_task", fmt.Sprintf("Shutdown Test %d", i), PriorityNormal) if err != nil { t.Errorf("关闭测试任务%d提交失败: %v", i, err) } else { fmt.Printf(" ✓ 关闭测试任务%d: ID=%d\n", i, taskID) taskIDs = append(taskIDs, taskID) } } // 等待一小会儿让任务开始执行 time.Sleep(200 * time.Millisecond) // 检查任务状态 runningTasks := 0 for _, taskID := range taskIDs { info, err := GetTaskInfo(taskID) if err == nil && info != nil { if status, ok := info["status"].(int32); ok { if status == TaskStatusRunning || status == TaskStatusPending { runningTasks++ } } } } fmt.Printf(" 当前有 %d 个任务正在运行或排队\n", runningTasks) // 开始优雅关闭(同步等待) fmt.Println(" 开始优雅关闭...") startTime := time.Now() err := GracefulShutdown() shutdownDuration := time.Since(startTime) if err != nil { t.Errorf("优雅关闭失败: %v", err) fmt.Printf(" 优雅关闭失败: %v (耗时: %v)\n", err, shutdownDuration) } else { fmt.Printf(" ✓ 优雅关闭成功 (耗时: %v)\n", shutdownDuration) } // 等待一小段时间确保状态更新 time.Sleep(100 * time.Millisecond) // 检查池状态 if !IsInitialized() { fmt.Println(" ✓ 池已正确关闭") // 检查任务是否都执行完成 completedCount := 0 for _, taskID := range taskIDs { info, err := GetTaskInfo(taskID) if err == nil && info != nil { if status, ok := info["status"].(int32); ok { if status == TaskStatusCompleted || status == TaskStatusFailed { completedCount++ } } } } fmt.Printf(" ✓ %d/%d 个任务执行完成\n", completedCount, len(taskIDs)) } else { // 如果池未关闭,可能是还在处理中,等待更长时间 fmt.Println(" ! 池仍在运行,等待额外时间...") time.Sleep(1 * time.Second) if !IsInitialized() { fmt.Println(" ✓ 池已正确关闭(等待后)") } else { t.Error("池未正确关闭") // 强制关闭 Shutdown() } } } // testReinitialization 测试重新初始化 func testReinitialization(t *testing.T) { fmt.Println(" 测试重新初始化...") // 重新初始化 err := Initialize(2, 5, `{ "minWorkers": 2, "maxWorkers": 5, "queueSize": 20, "name": "ReinitializedPool" }`) if err != nil { t.Fatalf("重新初始化失败: %v", err) } fmt.Println(" ✓ 池重新初始化成功") // 重新注册函数 RegisterFunction("string_task", testStringTask) RegisterFunction("number_task", testNumberTask) // 测试新池功能 taskID, err := SubmitTask(0, "string_task", "Reinitialization Test", PriorityNormal) if err != nil { t.Errorf("重新初始化后提交任务失败: %v", err) } else { fmt.Printf(" ✓ 重新初始化后任务提交成功: ID=%d\n", taskID) } time.Sleep(100 * time.Millisecond) // 获取任务信息 info, err := GetTaskInfo(taskID) if err == nil && info["status"].(int32) == TaskStatusCompleted { fmt.Println(" ✓ 重新初始化后任务执行成功") } } // ==================== 高级测试 ==================== // TestAdvancedFeatures2 测试高级功能(重命名避免冲突) func TestAdvancedFeatures2(t *testing.T) { fmt.Println("\n========== 开始高级功能测试 ==========") // 初始化 err := Initialize(4, 8, `{ "minWorkers": 4, "maxWorkers": 8, "queueSize": 100, "priorityQueue": true }`) if err != nil { t.Fatalf("初始化失败: %v", err) } RegisterFunction("string_task", testStringTask) RegisterFunction("number_task", testNumberTask) // 1. 动态扩缩容测试 fmt.Println("\n1. 动态扩缩容测试...") testAutoScaling2(t) // 2. 故障恢复测试 fmt.Println("\n2. 故障恢复测试...") testFaultRecovery2(t) // 3. 配置热更新测试 fmt.Println("\n3. 配置热更新测试...") testHotConfigUpdate2(t) fmt.Println("\n========== 高级测试完成 ==========") // 确保清理 if IsInitialized() { Shutdown() } } // testAutoScaling2 测试自动扩缩容 func testAutoScaling2(t *testing.T) { // 获取初始统计 stats, err := GetPoolStats() if err != nil { t.Errorf("获取统计失败: %v", err) return } initialWorkers := stats["totalWorkers"].(int32) fmt.Printf(" 初始工作者数: %d\n", initialWorkers) // 提交大量任务触发扩容 const burstTasks = 100 var wg sync.WaitGroup fmt.Printf(" 提交 %d 个任务触发扩容...\n", burstTasks) for i := 0; i < burstTasks; i++ { wg.Add(1) go func(index int) { defer wg.Done() SubmitTask(0, "number_task", strconv.Itoa(index), PriorityNormal) }(i) } wg.Wait() time.Sleep(500 * time.Millisecond) // 检查扩容 stats, err = GetPoolStats() if err != nil { t.Errorf("获取统计失败: %v", err) return } finalWorkers := stats["totalWorkers"].(int32) fmt.Printf(" 扩容后工作者数: %d (增加: %d)\n", finalWorkers, finalWorkers-initialWorkers) } // testFaultRecovery2 测试故障恢复 func testFaultRecovery2(t *testing.T) { // 创建专门用于测试的工作者 workerID, err := CreateWorker() if err != nil { t.Errorf("创建测试工作者失败: %v", err) return } fmt.Printf(" 创建测试工作者: ID=%d\n", workerID) // 注册一个会panic的任务函数 panicFunc := func(param string) (string, error) { panic("simulated panic in task") } RegisterFunction("panic_task", panicFunc) // 提交会panic的任务 taskID, err := SubmitTask(workerID, "panic_task", "Trigger Panic", PriorityNormal) if err != nil { t.Errorf("提交panic任务失败: %v", err) return } fmt.Printf(" ✓ 提交panic任务: ID=%d\n", taskID) // 等待恢复 time.Sleep(2 * time.Second) // 提交正常任务测试恢复后的功能 taskID, err = SubmitTask(workerID, "string_task", "After Recovery", PriorityNormal) if err == nil { fmt.Printf(" ✓ 恢复后任务提交成功: ID=%d\n", taskID) time.Sleep(100 * time.Millisecond) } } // testHotConfigUpdate2 测试配置热更新 func testHotConfigUpdate2(t *testing.T) { fmt.Println(" 更新池配置...") newConfig := `{ "minWorkers": 6, "maxWorkers": 12, "queueSize": 200, "taskTimeoutMs": 10000, "workerIdleTimeoutMs": 60000, "memoryLimitMB": 1024, "name": "UpdatedPool" }` err := UpdatePoolConfig(newConfig) if err != nil { t.Errorf("配置热更新失败: %v", err) } else { fmt.Println(" ✓ 配置热更新成功") } } // ==================== 集成测试示例 ==================== // TestExampleUsage 使用示例 func TestExampleUsage(t *testing.T) { fmt.Println("========== 协程池使用示例 ==========") // 1. 初始化 err := Initialize(2, 5, "") if err != nil { t.Fatalf("初始化失败: %v", err) } defer Shutdown() // 2. 注册任务函数 RegisterFunction("process_data", func(param string) (string, error) { // 模拟数据处理 time.Sleep(20 * time.Millisecond) return "Processed: " + param, nil }) // 3. 提交任务 taskIDs := make([]int64, 0, 10) for i := 0; i < 10; i++ { taskID, err := SubmitTask(0, "process_data", fmt.Sprintf("Data item %d", i), PriorityNormal) if err == nil { taskIDs = append(taskIDs, taskID) } } fmt.Printf("提交了 %d 个任务\n", len(taskIDs)) // 4. 等待任务完成 time.Sleep(1 * time.Second) // 5. 获取统计 stats, _ := GetPoolStats() fmt.Printf("池统计: 总任务=%v, 成功=%v, 失败=%v\n", stats["totalTasks"], stats["successTasks"], stats["failedTasks"]) fmt.Println("示例完成") } // ==================== 主测试函数 ==================== // TestAll 运行所有测试 func TestAll(t *testing.T) { // 设置恢复 defer func() { if r := recover(); r != nil { t.Errorf("测试发生panic: %v", r) } // 最终清理 if IsInitialized() { Shutdown() } }() fmt.Println("开始运行协程池完整测试套件") fmt.Println("======================================") // 运行基础功能测试 TestFullFunctionality(t) // 运行高级功能测试 TestAdvancedFeatures2(t) // 运行使用示例 TestExampleUsage(t) fmt.Println("\n所有测试运行完成!") fmt.Println("======================================") } func TestDebugBasic(t *testing.T) { fmt.Println("=== 调试测试开始 ===") // 1. 简单初始化 err := Initialize(2, 2, `{ "minWorkers": 2, "maxWorkers": 2, "queueSize": 10, "taskTimeoutMs": 1000, "workerIdleTimeoutMs": 1000, "enableMetrics": false }`) if err != nil { t.Fatalf("初始化失败: %v", err) } defer Shutdown() // 等待池启动 time.Sleep(100 * time.Millisecond) // 2. 注册简单任务 RegisterFunction("echo", func(param string) (string, error) { fmt.Printf("任务执行: %s\n", param) return "Echo: " + param, nil }) // 3. 提交单个任务 taskID, err := SubmitTask(0, "echo", "Test1", PriorityNormal) if err != nil { t.Fatalf("提交任务失败: %v", err) } fmt.Printf("任务提交成功: ID=%d\n", taskID) // 4. 等待执行 time.Sleep(200 * time.Millisecond) // 5. 检查状态 stats, err := GetPoolStats() if err != nil { t.Fatalf("获取统计失败: %v", err) } fmt.Printf("池统计: %+v\n", stats) // 6. 获取任务信息 info, err := GetTaskInfo(taskID) if err != nil { fmt.Printf("获取任务信息失败: %v\n", err) } else { fmt.Printf("任务信息: %+v\n", info) } fmt.Println("=== 调试测试结束 ===") }