daShangDao_kfzgw-info/goroutine-pool/goroutine_pool_test.go
2026-01-13 16:21:38 +08:00

950 lines
24 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package goroutine_pool
import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
// ==================== 测试辅助函数 ====================
// 测试任务函数1: 字符串处理
func testStringTask(param string) (string, error) {
time.Sleep(10 * time.Millisecond) // 模拟处理时间
return "Processed: " + param, nil
}
// 测试任务函数2: 数值计算
func testNumberTask(param string) (string, error) {
n, err := strconv.Atoi(param)
if err != nil {
return "", fmt.Errorf("invalid number: %s", param)
}
time.Sleep(5 * time.Millisecond) // 模拟计算时间
return strconv.Itoa(n * n), nil
}
// 测试任务函数3: 错误测试
func testErrorTask(param string) (string, error) {
time.Sleep(3 * time.Millisecond)
return "", fmt.Errorf("simulated error for: %s", param)
}
// 测试任务函数4: 长时任务
func testLongTask(param string) (string, error) {
time.Sleep(100 * time.Millisecond)
return "Long task completed: " + param, nil
}
// 测试任务函数5: 内存密集型
func testMemoryIntensiveTask(param string) (string, error) {
// 分配一些内存
data := make([]byte, 1024*1024) // 1MB
for i := range data {
data[i] = byte(i % 256)
}
time.Sleep(20 * time.Millisecond)
return fmt.Sprintf("Allocated %d bytes", len(data)), nil
}
// ==================== 完整功能测试 ====================
// TestFullFunctionality 测试完整功能
func TestFullFunctionality(t *testing.T) {
fmt.Println("========== 开始完整功能测试 ==========")
// 1. 初始化池
fmt.Println("\n1. 初始化协程池...")
err := Initialize(3, 10, `{
"minWorkers": 3,
"maxWorkers": 10,
"queueSize": 50,
"taskTimeoutMs": 30000,
"workerIdleTimeoutMs": 60000,
"memoryLimitMB": 512,
"shutdownTimeoutMs": 10000,
"enableMetrics": true,
"enableAutoScaling": true,
"priorityQueue": true
}`)
if err != nil {
t.Fatalf("初始化失败: %v", err)
}
fmt.Println("✓ 池初始化成功")
// 等待池完全启动
time.Sleep(100 * time.Millisecond)
// 2. 注册任务函数
fmt.Println("\n2. 注册任务函数...")
RegisterFunction("string_task", testStringTask)
RegisterFunction("number_task", testNumberTask)
RegisterFunction("error_task", testErrorTask)
RegisterFunction("long_task", testLongTask)
RegisterFunction("memory_task", testMemoryIntensiveTask)
fmt.Println("✓ 任务函数注册完成")
// 3. 基础任务提交测试
fmt.Println("\n3. 基础任务提交测试...")
testBasicTasks(t)
//// 4. 并发测试
//fmt.Println("\n4. 并发任务测试...")
//testConcurrentTasks(t)
//
//// 5. 工作者控制测试
//fmt.Println("\n5. 工作者控制测试...")
//testWorkerControl(t)
//// 6. 池控制测试
//fmt.Println("\n6. 池控制测试...")
//testPoolControl(t)
//// 7. 优先级测试
//fmt.Println("\n7. 优先级任务测试...")
//testPriorityTasks(t)
//// 8. 错误处理测试
//fmt.Println("\n8. 错误处理测试...")
//testErrorHandling(t)
////9. 内存和性能测试
//fmt.Println("\n9. 内存和性能测试...")
//testMemoryAndPerformance(t)
// 10. 优雅关闭测试
fmt.Println("\n10. 优雅关闭测试...")
testGracefulShutdown(t)
//
//// 11. 重新初始化测试
//fmt.Println("\n11. 重新初始化测试...")
//testReinitialization(t)
fmt.Println("\n========== 所有测试完成 ==========")
// 确保最终清理
if IsInitialized() {
Shutdown()
}
}
// testBasicTasks 测试基础任务功能
func testBasicTasks(t *testing.T) {
// 提交单个任务
taskID, err := SubmitTask(0, "string_task", "Hello World", PriorityNormal)
if err != nil {
t.Errorf("提交任务失败: %v", err)
} else {
fmt.Printf(" ✓ 任务提交成功: ID=%d\n", taskID)
}
// 等待任务完成
time.Sleep(50 * time.Millisecond)
// 获取任务信息
info, err := GetTaskInfo(taskID)
if err != nil {
t.Errorf("获取任务信息失败: %v", err)
} else {
fmt.Printf(" 任务信息: ID=%v, 状态=%v\n", info["id"], info["status"])
}
// 批量提交任务
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
taskID, err := SubmitTask(0, "number_task", strconv.Itoa(index), PriorityNormal)
if err != nil {
fmt.Printf(" 批量任务%d提交失败: %v\n", index, err)
} else {
fmt.Printf(" ✓ 批量任务%d提交成功: ID=%d\n", index, taskID)
}
}(i)
}
wg.Wait()
// 等待所有任务完成
time.Sleep(200 * time.Millisecond)
// 获取统计信息
stats, err := GetPoolStats()
if err != nil {
t.Errorf("获取统计信息失败: %v", err)
} else {
fmt.Printf(" 池统计: 总任务=%v, 成功=%v, 失败=%v, 排队=%v\n",
stats["totalTasks"], stats["successTasks"], stats["failedTasks"], stats["queuedTasks"])
}
}
// testConcurrentTasks 测试并发任务
func testConcurrentTasks(t *testing.T) {
const concurrentCount = 50
var completed int32
var errors int32
var wg sync.WaitGroup
startTime := time.Now()
// 提交大量并发任务
for i := 0; i < concurrentCount; i++ {
wg.Add(1)
go func(taskNum int) {
defer wg.Done()
taskID, err := SubmitTask(0, "number_task", strconv.Itoa(taskNum), PriorityNormal)
if err != nil {
atomic.AddInt32(&errors, 1)
fmt.Printf(" 并发任务%d提交失败: %v\n", taskNum, err)
return
}
// 等待任务完成
for retry := 0; retry < 10; retry++ {
info, err := GetTaskInfo(taskID)
if err == nil {
status := info["status"].(int32)
if status == TaskStatusCompleted || status == TaskStatusFailed {
atomic.AddInt32(&completed, 1)
break
}
}
time.Sleep(50 * time.Millisecond)
}
}(i)
}
wg.Wait()
duration := time.Since(startTime)
fmt.Printf(" ✓ 并发测试完成: 总数=%d, 完成=%d, 错误=%d, 耗时=%v\n",
concurrentCount, completed, errors, duration)
fmt.Printf(" 平均吞吐量: %.2f 任务/秒\n", float64(completed)/duration.Seconds())
}
// testWorkerControl 测试工作者控制
func testWorkerControl(t *testing.T) {
fmt.Println(" 创建新工作者...")
workerID, err := CreateWorker()
if err != nil {
t.Errorf("创建工作者失败: %v", err)
return
}
fmt.Printf(" ✓ 工作者创建成功: ID=%d\n", workerID)
// 等待工作者完全启动
time.Sleep(500 * time.Millisecond)
// 先获取状态
status, err := GetWorkerStatus(workerID)
if err != nil {
t.Errorf("获取工作者状态失败: %v", err)
return
}
fmt.Printf(" ✓ 初始状态: %v\n", status)
// 暂停工作者
fmt.Println(" 暂停工作者...")
err = PauseWorker(workerID)
if err != nil {
// 如果是"worker not paused"错误,可能是因为状态已经是暂停
if strings.Contains(err.Error(), "not paused") {
fmt.Println(" ⚠ 工作者可能已经暂停")
} else {
t.Errorf("暂停工作者失败: %v", err)
return
}
} else {
fmt.Println(" ✓ 工作者已暂停")
}
// 再次检查状态
time.Sleep(200 * time.Millisecond)
status, err = GetWorkerStatus(workerID)
if err != nil {
t.Errorf("获取工作者状态失败: %v", err)
return
}
fmt.Printf(" ✓ 暂停后状态: %v\n", status)
// 恢复工作者
fmt.Println(" 恢复工作者...")
err = ResumeWorker(workerID)
if err != nil {
// 如果是"already running"错误,可以接受
if strings.Contains(err.Error(), "already running") {
fmt.Println(" ⚠ 工作者已经在运行状态")
} else {
t.Errorf("恢复工作者失败: %v", err)
return
}
} else {
fmt.Println(" ✓ 工作者已恢复")
}
// 等待恢复完成
time.Sleep(200 * time.Millisecond)
status, err = GetWorkerStatus(workerID)
if err != nil {
t.Errorf("获取工作者状态失败: %v", err)
return
}
fmt.Printf(" ✓ 恢复后状态: %v\n", status)
// 停止工作者
fmt.Println(" 停止工作者...")
err = StopWorker(workerID)
if err != nil {
// 检查是否为可接受的错误
acceptableErrors := []string{
"worker not found",
"worker is stopped",
"worker is already stopped",
"worker context cancelled",
"timeout",
"control channel",
"no response",
}
isAcceptable := false
errStr := strings.ToLower(err.Error())
for _, acceptable := range acceptableErrors {
if strings.Contains(errStr, acceptable) {
isAcceptable = true
break
}
}
if !isAcceptable {
t.Errorf("停止工作者失败: %v", err)
return
}
fmt.Printf(" ⚠ 工作者停止(可能已成功): %v\n", err)
} else {
fmt.Println(" ✓ 工作者已停止")
}
// 等待停止完成
time.Sleep(500 * time.Millisecond)
// 验证工作者确实已停止
status, err = GetWorkerStatus(workerID)
if err != nil {
// 如果工作者不存在了,也是正常情况
if strings.Contains(err.Error(), "worker not found") {
fmt.Println(" ✓ 工作者已从池中移除")
} else {
t.Errorf("获取工作者状态失败: %v", err)
}
} else {
// 如果工作者还存在,检查状态
// 注意status 已经是 map[string]interface{} 类型,不需要类型断言
if s, ok := status["status"].(float64); ok && int(s) == WorkerStatusStopped {
fmt.Println(" ✓ 工作者状态为已停止")
} else {
fmt.Printf(" ⚠ 工作者状态为: %v\n", status["status"])
}
}
}
// testPoolControl 测试池控制
func testPoolControl(t *testing.T) {
// 暂停所有工作者
fmt.Println(" 暂停所有工作者...")
count, err := PauseAllWorkers()
if err != nil {
t.Errorf("暂停所有工作者失败: %v", err)
} else {
fmt.Printf(" ✓ 已暂停 %d 个工作者\n", count)
}
// 等待确保所有工作者都已暂停
time.Sleep(100 * time.Millisecond)
// 检查工作者状态
for i := 1; i <= 3; i++ {
status, err := GetWorkerStatus(i)
if err == nil {
if s, ok := status["status"].(float64); ok && int(s) == WorkerStatusPaused {
fmt.Printf(" ✓ 工作者 %d 已暂停\n", i)
} else {
fmt.Printf(" ! 工作者 %d 状态异常: %v\n", i, status)
}
}
}
// 提交任务应该失败
taskID, err := SubmitTask(0, "string_task", "During Pause", PriorityNormal)
if err != nil {
fmt.Printf(" ✓ 提交任务失败(预期): %v\n", err)
} else {
t.Errorf("暂停状态下任务不应提交成功但得到了任务ID: %d", taskID)
}
// 恢复所有工作者
fmt.Println(" 恢复所有工作者...")
count, err = ResumeAllWorkers()
if err != nil {
t.Errorf("恢复所有工作者失败: %v", err)
} else {
fmt.Printf(" ✓ 已恢复 %d 个工作者\n", count)
}
// 等待工作者完全恢复
time.Sleep(100 * time.Millisecond)
// 检查工作者是否恢复
for i := 1; i <= 3; i++ {
status, err := GetWorkerStatus(i)
if err == nil {
if s, ok := status["status"].(float64); ok &&
(int(s) == WorkerStatusRunning || int(s) == WorkerStatusIdle) {
fmt.Printf(" ✓ 工作者 %d 已恢复\n", i)
} else {
fmt.Printf(" ! 工作者 %d 未正确恢复,状态: %v\n", i, status)
}
}
}
// 现在工作者恢复了,提交一个新任务
taskID, err = SubmitTask(0, "string_task", "After Resume", PriorityNormal)
if err != nil {
t.Errorf("恢复后提交任务失败: %v", err)
} else {
fmt.Printf(" ✓ 恢复后任务已提交: ID=%d\n", taskID)
// 等待任务执行
time.Sleep(500 * time.Millisecond)
info, err := GetTaskInfo(taskID)
if err != nil {
fmt.Printf(" ! 获取任务信息失败: %v\n", err)
} else if info != nil {
if status, ok := info["status"].(int32); ok {
if status == TaskStatusCompleted {
fmt.Println(" ✓ 任务已执行完成")
} else if status == TaskStatusRunning {
fmt.Println(" ! 任务仍在运行中")
} else if status == TaskStatusPending {
fmt.Println(" ! 任务仍在排队")
} else {
fmt.Printf(" ! 任务状态异常: %d\n", status)
}
}
}
}
}
// testPriorityTasks 测试优先级任务
func testPriorityTasks(t *testing.T) {
fmt.Println(" 提交不同优先级的任务...")
// 提交低优先级任务
for i := 1; i <= 3; i++ {
param := fmt.Sprintf("Low Priority %d", i)
taskID, err := SubmitTask(0, "string_task", param, PriorityLow)
if err != nil {
t.Errorf("低优先级任务%d提交失败: %v", i, err)
} else {
fmt.Printf(" ✓ 低优先级任务%d: ID=%d\n", i, taskID)
}
}
// 提交高优先级任务
for i := 1; i <= 2; i++ {
param := fmt.Sprintf("High Priority %d", i)
taskID, err := SubmitTask(0, "string_task", param, PriorityHigh)
if err != nil {
t.Errorf("高优先级任务%d提交失败: %v", i, err)
} else {
fmt.Printf(" ✓ 高优先级任务%d: ID=%d\n", i, taskID)
}
}
// 提交紧急优先级任务
urgentTaskID, err := SubmitTask(0, "string_task", "Urgent Task", PriorityUrgent)
if err != nil {
t.Errorf("紧急优先级任务提交失败: %v", err)
} else {
fmt.Printf(" ✓ 紧急优先级任务: ID=%d\n", urgentTaskID)
}
time.Sleep(300 * time.Millisecond)
fmt.Println(" 优先级验证完成(需检查执行时间戳)")
}
// testErrorHandling 测试错误处理
func testErrorHandling(t *testing.T) {
// 提交会失败的任务
taskID, err := SubmitTask(0, "error_task", "Test Error", PriorityNormal)
if err != nil {
t.Errorf("错误任务提交失败: %v", err)
} else {
fmt.Printf(" ✓ 错误任务已提交: ID=%d\n", taskID)
}
time.Sleep(50 * time.Millisecond)
// 检查错误任务状态
info, err := GetTaskInfo(taskID)
if err != nil {
t.Errorf("获取错误任务信息失败: %v", err)
} else {
status := info["status"].(int32)
if status == TaskStatusFailed {
fmt.Println(" ✓ 任务失败处理正确")
}
}
// 测试取消任务
taskID, err = SubmitTask(0, "long_task", "To be cancelled", PriorityNormal)
if err != nil {
t.Errorf("可取消任务提交失败: %v", err)
} else {
fmt.Printf(" ✓ 可取消任务已提交: ID=%d\n", taskID)
}
// 立即取消
err = CancelTask(taskID)
if err != nil {
t.Errorf("取消任务失败: %v", err)
} else {
fmt.Println(" ✓ 任务取消请求已发送")
}
time.Sleep(100 * time.Millisecond)
// 检查取消状态
info, err = GetTaskInfo(taskID)
if err == nil {
status := info["status"].(int32)
if status == TaskStatusCancelled {
fmt.Println(" ✓ 任务已成功取消")
}
}
}
// testMemoryAndPerformance 测试内存和性能
func testMemoryAndPerformance(t *testing.T) {
fmt.Println(" 开始内存密集型任务测试...")
startTime := time.Now()
const memoryTasks = 20
var wg sync.WaitGroup
for i := 0; i < memoryTasks; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
taskID, err := SubmitTask(0, "memory_task", fmt.Sprintf("Memory Test %d", index), PriorityNormal)
if err != nil {
fmt.Printf(" 内存任务%d提交失败: %v\n", index, err)
} else {
// 等待完成
for retry := 0; retry < 20; retry++ {
info, err := GetTaskInfo(taskID)
if err == nil && (info["status"].(int32) == TaskStatusCompleted ||
info["status"].(int32) == TaskStatusFailed) {
break
}
time.Sleep(50 * time.Millisecond)
}
}
}(i)
}
wg.Wait()
duration := time.Since(startTime)
fmt.Printf(" ✓ 内存测试完成: 耗时=%v\n", duration)
}
// testGracefulShutdown 测试优雅关闭
func testGracefulShutdown(t *testing.T) {
// 提交一些任务
fmt.Println(" 提交一些任务以测试优雅关闭...")
taskIDs := make([]int64, 0, 5)
for i := 1; i <= 5; i++ {
taskID, err := SubmitTask(0, "string_task", fmt.Sprintf("Shutdown Test %d", i), PriorityNormal)
if err != nil {
t.Errorf("关闭测试任务%d提交失败: %v", i, err)
} else {
fmt.Printf(" ✓ 关闭测试任务%d: ID=%d\n", i, taskID)
taskIDs = append(taskIDs, taskID)
}
}
// 等待一小会儿让任务开始执行
time.Sleep(200 * time.Millisecond)
// 检查任务状态
runningTasks := 0
for _, taskID := range taskIDs {
info, err := GetTaskInfo(taskID)
if err == nil && info != nil {
if status, ok := info["status"].(int32); ok {
if status == TaskStatusRunning || status == TaskStatusPending {
runningTasks++
}
}
}
}
fmt.Printf(" 当前有 %d 个任务正在运行或排队\n", runningTasks)
// 开始优雅关闭(同步等待)
fmt.Println(" 开始优雅关闭...")
startTime := time.Now()
err := GracefulShutdown()
shutdownDuration := time.Since(startTime)
if err != nil {
t.Errorf("优雅关闭失败: %v", err)
fmt.Printf(" 优雅关闭失败: %v (耗时: %v)\n", err, shutdownDuration)
} else {
fmt.Printf(" ✓ 优雅关闭成功 (耗时: %v)\n", shutdownDuration)
}
// 等待一小段时间确保状态更新
time.Sleep(100 * time.Millisecond)
// 检查池状态
if !IsInitialized() {
fmt.Println(" ✓ 池已正确关闭")
// 检查任务是否都执行完成
completedCount := 0
for _, taskID := range taskIDs {
info, err := GetTaskInfo(taskID)
if err == nil && info != nil {
if status, ok := info["status"].(int32); ok {
if status == TaskStatusCompleted || status == TaskStatusFailed {
completedCount++
}
}
}
}
fmt.Printf(" ✓ %d/%d 个任务执行完成\n", completedCount, len(taskIDs))
} else {
// 如果池未关闭,可能是还在处理中,等待更长时间
fmt.Println(" ! 池仍在运行,等待额外时间...")
time.Sleep(1 * time.Second)
if !IsInitialized() {
fmt.Println(" ✓ 池已正确关闭(等待后)")
} else {
t.Error("池未正确关闭")
// 强制关闭
Shutdown()
}
}
}
// testReinitialization 测试重新初始化
func testReinitialization(t *testing.T) {
fmt.Println(" 测试重新初始化...")
// 重新初始化
err := Initialize(2, 5, `{
"minWorkers": 2,
"maxWorkers": 5,
"queueSize": 20,
"name": "ReinitializedPool"
}`)
if err != nil {
t.Fatalf("重新初始化失败: %v", err)
}
fmt.Println(" ✓ 池重新初始化成功")
// 重新注册函数
RegisterFunction("string_task", testStringTask)
RegisterFunction("number_task", testNumberTask)
// 测试新池功能
taskID, err := SubmitTask(0, "string_task", "Reinitialization Test", PriorityNormal)
if err != nil {
t.Errorf("重新初始化后提交任务失败: %v", err)
} else {
fmt.Printf(" ✓ 重新初始化后任务提交成功: ID=%d\n", taskID)
}
time.Sleep(100 * time.Millisecond)
// 获取任务信息
info, err := GetTaskInfo(taskID)
if err == nil && info["status"].(int32) == TaskStatusCompleted {
fmt.Println(" ✓ 重新初始化后任务执行成功")
}
}
// ==================== 高级测试 ====================
// TestAdvancedFeatures2 测试高级功能(重命名避免冲突)
func TestAdvancedFeatures2(t *testing.T) {
fmt.Println("\n========== 开始高级功能测试 ==========")
// 初始化
err := Initialize(4, 8, `{
"minWorkers": 4,
"maxWorkers": 8,
"queueSize": 100,
"priorityQueue": true
}`)
if err != nil {
t.Fatalf("初始化失败: %v", err)
}
RegisterFunction("string_task", testStringTask)
RegisterFunction("number_task", testNumberTask)
// 1. 动态扩缩容测试
fmt.Println("\n1. 动态扩缩容测试...")
testAutoScaling2(t)
// 2. 故障恢复测试
fmt.Println("\n2. 故障恢复测试...")
testFaultRecovery2(t)
// 3. 配置热更新测试
fmt.Println("\n3. 配置热更新测试...")
testHotConfigUpdate2(t)
fmt.Println("\n========== 高级测试完成 ==========")
// 确保清理
if IsInitialized() {
Shutdown()
}
}
// testAutoScaling2 测试自动扩缩容
func testAutoScaling2(t *testing.T) {
// 获取初始统计
stats, err := GetPoolStats()
if err != nil {
t.Errorf("获取统计失败: %v", err)
return
}
initialWorkers := stats["totalWorkers"].(int32)
fmt.Printf(" 初始工作者数: %d\n", initialWorkers)
// 提交大量任务触发扩容
const burstTasks = 100
var wg sync.WaitGroup
fmt.Printf(" 提交 %d 个任务触发扩容...\n", burstTasks)
for i := 0; i < burstTasks; i++ {
wg.Add(1)
go func(index int) {
defer wg.Done()
SubmitTask(0, "number_task", strconv.Itoa(index), PriorityNormal)
}(i)
}
wg.Wait()
time.Sleep(500 * time.Millisecond)
// 检查扩容
stats, err = GetPoolStats()
if err != nil {
t.Errorf("获取统计失败: %v", err)
return
}
finalWorkers := stats["totalWorkers"].(int32)
fmt.Printf(" 扩容后工作者数: %d (增加: %d)\n", finalWorkers, finalWorkers-initialWorkers)
}
// testFaultRecovery2 测试故障恢复
func testFaultRecovery2(t *testing.T) {
// 创建专门用于测试的工作者
workerID, err := CreateWorker()
if err != nil {
t.Errorf("创建测试工作者失败: %v", err)
return
}
fmt.Printf(" 创建测试工作者: ID=%d\n", workerID)
// 注册一个会panic的任务函数
panicFunc := func(param string) (string, error) {
panic("simulated panic in task")
}
RegisterFunction("panic_task", panicFunc)
// 提交会panic的任务
taskID, err := SubmitTask(workerID, "panic_task", "Trigger Panic", PriorityNormal)
if err != nil {
t.Errorf("提交panic任务失败: %v", err)
return
}
fmt.Printf(" ✓ 提交panic任务: ID=%d\n", taskID)
// 等待恢复
time.Sleep(2 * time.Second)
// 提交正常任务测试恢复后的功能
taskID, err = SubmitTask(workerID, "string_task", "After Recovery", PriorityNormal)
if err == nil {
fmt.Printf(" ✓ 恢复后任务提交成功: ID=%d\n", taskID)
time.Sleep(100 * time.Millisecond)
}
}
// testHotConfigUpdate2 测试配置热更新
func testHotConfigUpdate2(t *testing.T) {
fmt.Println(" 更新池配置...")
newConfig := `{
"minWorkers": 6,
"maxWorkers": 12,
"queueSize": 200,
"taskTimeoutMs": 10000,
"workerIdleTimeoutMs": 60000,
"memoryLimitMB": 1024,
"name": "UpdatedPool"
}`
err := UpdatePoolConfig(newConfig)
if err != nil {
t.Errorf("配置热更新失败: %v", err)
} else {
fmt.Println(" ✓ 配置热更新成功")
}
}
// ==================== 集成测试示例 ====================
// TestExampleUsage 使用示例
func TestExampleUsage(t *testing.T) {
fmt.Println("========== 协程池使用示例 ==========")
// 1. 初始化
err := Initialize(2, 5, "")
if err != nil {
t.Fatalf("初始化失败: %v", err)
}
defer Shutdown()
// 2. 注册任务函数
RegisterFunction("process_data", func(param string) (string, error) {
// 模拟数据处理
time.Sleep(20 * time.Millisecond)
return "Processed: " + param, nil
})
// 3. 提交任务
taskIDs := make([]int64, 0, 10)
for i := 0; i < 10; i++ {
taskID, err := SubmitTask(0, "process_data",
fmt.Sprintf("Data item %d", i), PriorityNormal)
if err == nil {
taskIDs = append(taskIDs, taskID)
}
}
fmt.Printf("提交了 %d 个任务\n", len(taskIDs))
// 4. 等待任务完成
time.Sleep(1 * time.Second)
// 5. 获取统计
stats, _ := GetPoolStats()
fmt.Printf("池统计: 总任务=%v, 成功=%v, 失败=%v\n",
stats["totalTasks"], stats["successTasks"], stats["failedTasks"])
fmt.Println("示例完成")
}
// ==================== 主测试函数 ====================
// TestAll 运行所有测试
func TestAll(t *testing.T) {
// 设置恢复
defer func() {
if r := recover(); r != nil {
t.Errorf("测试发生panic: %v", r)
}
// 最终清理
if IsInitialized() {
Shutdown()
}
}()
fmt.Println("开始运行协程池完整测试套件")
fmt.Println("======================================")
// 运行基础功能测试
TestFullFunctionality(t)
// 运行高级功能测试
TestAdvancedFeatures2(t)
// 运行使用示例
TestExampleUsage(t)
fmt.Println("\n所有测试运行完成")
fmt.Println("======================================")
}
func TestDebugBasic(t *testing.T) {
fmt.Println("=== 调试测试开始 ===")
// 1. 简单初始化
err := Initialize(2, 2, `{
"minWorkers": 2,
"maxWorkers": 2,
"queueSize": 10,
"taskTimeoutMs": 1000,
"workerIdleTimeoutMs": 1000,
"enableMetrics": false
}`)
if err != nil {
t.Fatalf("初始化失败: %v", err)
}
defer Shutdown()
// 等待池启动
time.Sleep(100 * time.Millisecond)
// 2. 注册简单任务
RegisterFunction("echo", func(param string) (string, error) {
fmt.Printf("任务执行: %s\n", param)
return "Echo: " + param, nil
})
// 3. 提交单个任务
taskID, err := SubmitTask(0, "echo", "Test1", PriorityNormal)
if err != nil {
t.Fatalf("提交任务失败: %v", err)
}
fmt.Printf("任务提交成功: ID=%d\n", taskID)
// 4. 等待执行
time.Sleep(200 * time.Millisecond)
// 5. 检查状态
stats, err := GetPoolStats()
if err != nil {
t.Fatalf("获取统计失败: %v", err)
}
fmt.Printf("池统计: %+v\n", stats)
// 6. 获取任务信息
info, err := GetTaskInfo(taskID)
if err != nil {
fmt.Printf("获取任务信息失败: %v\n", err)
} else {
fmt.Printf("任务信息: %+v\n", info)
}
fmt.Println("=== 调试测试结束 ===")
}