daShangDao_kfzgw-info/goroutine-pool/goroutine_pool.go
2026-01-13 16:21:38 +08:00

3303 lines
81 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Package goroutine_pool 提供高性能、功能完整的无锁协程池实现
package goroutine_pool
import (
"context"
"encoding/json"
"fmt"
"os"
"runtime"
"runtime/debug"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
)
// ==================== 常量定义 ====================
const (
SuccessCode = 0
ErrorPoolNotInitialized = -1
ErrorWorkerNotFound = -2
ErrorWorkerNotIdle = -3
ErrorWorkerNotRunning = -4
ErrorWorkerNotPaused = -5
ErrorTaskSubmitTimeout = -6
ErrorQueueFull = -7
ErrorInvalidConfig = -8
ErrorMemoryExhausted = -9
ErrorMaxWorkersExceeded = -10
ErrorInvalidLanguage = -11
ErrorWorkerAlreadyRunning = -12
ErrorFunctionNotRegistered = -13
ErrorTaskNotFound = -14
ErrorTaskAlreadyCompleted = -15
ErrorTaskCancelled = -16
ErrorTaskTimeout = -17
ErrorSystemError = -18
ErrorInvalidParameter = -19
ErrorResourceUnavailable = -20
)
const (
WorkerStatusIdle = 0
WorkerStatusRunning = 1
WorkerStatusPaused = 2
WorkerStatusStopped = 3
WorkerStatusDraining = 4
)
const (
TaskStatusPending = 0
TaskStatusRunning = 1
TaskStatusCompleted = 2
TaskStatusFailed = 3
TaskStatusCancelled = 4
TaskStatusTimeout = 5
)
const (
LogLevelDebug = 0
LogLevelInfo = 1
LogLevelWarn = 2
LogLevelError = 3
LogLevelFatal = 4
)
const (
DefaultQueueSize = 1000
DefaultTaskTimeout = 30000
DefaultIdleTimeout = 60000
DefaultMaxMemoryMB = 1024
DefaultMinWorkers = 2
DefaultMaxWorkers = 100
DefaultShutdownTimeout = 30000
DefaultHealthCheckDelay = 5000
DefaultMaxRetries = 3
DefaultResultBufferSize = 65536
)
const (
WorkerTypeAsync = 0
WorkerTypeSync = 1
)
const (
ControlMsgStop = 1
ControlMsgPause = 2
ControlMsgResume = 3
ControlMsgDrain = 4
ControlMsgRestart = 5
ControlMsgUpdate = 6
ControlMsgHealth = 7
ControlMsgGraceful = 8
ControlMsgDelete = 9
)
const (
PriorityLow = 0
PriorityNormal = 5
PriorityHigh = 10
PriorityUrgent = 15
)
const (
LanguageCN = "cn_zh"
LanguageEN = "en_us"
)
// ==================== 原子数据结构 ====================
// AtomicTask 原子任务
type AtomicTask struct {
id int64
goroutineID int32
status int32
priority int32
retryCount int32
maxRetries int32
submitTime int64
queueTime int64
dequeueTime int64
startTime int64
endTime int64
waitDuration int64
execDuration int64
totalDuration int64
funcName atomic.Value // string
param atomic.Value // string
result atomic.Value // string
errorMsg atomic.Value // string
callback unsafe.Pointer // *TaskCallback
userData unsafe.Pointer // interface{}
ctx context.Context
cancel context.CancelFunc
}
// NewAtomicTask 创建原子任务
func NewAtomicTask(goroutineID int, funcName, param string, priority, maxRetries, timeoutMs int) *AtomicTask {
if timeoutMs <= 0 {
timeoutMs = DefaultTaskTimeout
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
now := time.Now().UnixNano() / 1e6
task := &AtomicTask{
id: atomic.AddInt64(&taskIDCounter, 1),
goroutineID: int32(goroutineID),
status: int32(TaskStatusPending),
priority: int32(priority),
retryCount: 0,
maxRetries: int32(maxRetries),
submitTime: now,
queueTime: now,
ctx: ctx,
cancel: cancel,
}
task.funcName.Store(funcName)
task.param.Store(param)
return task
}
// ==================== 增强的无锁环形队列 ====================
// EnhancedRingQueue 增强的环形队列(支持暂停和排空)
type EnhancedRingQueue struct {
items []unsafe.Pointer
capacity int32
head int32
tail int32
size int32
paused int32 // 暂停状态
draining int32 // 排空状态
cond *sync.Cond
}
// NewEnhancedRingQueue 创建增强环形队列
func NewEnhancedRingQueue(capacity int32) *EnhancedRingQueue {
if capacity <= 0 {
capacity = 1000
}
capacity = roundUpToPowerOfTwo(capacity)
queue := &EnhancedRingQueue{
items: make([]unsafe.Pointer, capacity),
capacity: capacity,
cond: sync.NewCond(&sync.Mutex{}),
}
return queue
}
// Push 入队(支持暂停检查)
func (q *EnhancedRingQueue) Push(task *AtomicTask) bool {
// 检查是否暂停(排空模式不接受新任务)
if atomic.LoadInt32(&q.paused) == 1 {
return false // 暂停时立即返回失败
}
// 排空模式只处理已有任务
if atomic.LoadInt32(&q.draining) == 1 {
return false
}
for {
tail := atomic.LoadInt32(&q.tail)
head := atomic.LoadInt32(&q.head)
next := (tail + 1) & (q.capacity - 1)
if next == head {
return false // 队列满
}
if atomic.CompareAndSwapInt32(&q.tail, tail, next) {
atomic.StorePointer(&q.items[tail], unsafe.Pointer(task))
atomic.AddInt32(&q.size, 1)
q.cond.Broadcast()
return true
}
}
}
// Pop 出队(支持排空模式)
func (q *EnhancedRingQueue) Pop() *AtomicTask {
for {
// 排空模式:只处理已有任务,不接受新任务
if atomic.LoadInt32(&q.draining) == 1 && atomic.LoadInt32(&q.size) == 0 {
return nil
}
head := atomic.LoadInt32(&q.head)
tail := atomic.LoadInt32(&q.tail)
if head == tail {
// 等待新任务
q.cond.L.Lock()
for atomic.LoadInt32(&q.size) == 0 {
if atomic.LoadInt32(&q.draining) == 1 {
q.cond.L.Unlock()
return nil
}
q.cond.Wait()
}
q.cond.L.Unlock()
continue
}
idx := head & (q.capacity - 1)
task := (*AtomicTask)(atomic.LoadPointer(&q.items[idx]))
if task == nil {
atomic.CompareAndSwapInt32(&q.head, head, (head+1)&(q.capacity-1))
atomic.AddInt32(&q.size, -1)
continue
}
if atomic.CompareAndSwapInt32(&q.head, head, (head+1)&(q.capacity-1)) {
atomic.StorePointer(&q.items[idx], nil)
atomic.AddInt32(&q.size, -1)
return task
}
}
}
// Size 获取队列大小
func (q *EnhancedRingQueue) Size() int32 {
return atomic.LoadInt32(&q.size)
}
// Capacity 获取队列容量
func (q *EnhancedRingQueue) Capacity() int32 {
return q.capacity
}
// Pause 暂停队列
func (q *EnhancedRingQueue) Pause() {
atomic.StoreInt32(&q.paused, 1)
}
// Resume 恢复队列
func (q *EnhancedRingQueue) Resume() {
atomic.StoreInt32(&q.paused, 0)
q.cond.Broadcast()
}
// Drain 排空队列
func (q *EnhancedRingQueue) Drain() {
atomic.StoreInt32(&q.draining, 1)
q.cond.Broadcast()
}
// ResetDrain 重置排空状态
func (q *EnhancedRingQueue) ResetDrain() {
atomic.StoreInt32(&q.draining, 0)
}
// ==================== 完整的工作者实现 ====================
// CompleteWorker 完整的工作者实现
type CompleteWorker struct {
// 原子状态
id int32
status int32
lastActive int64
createdAt int64
// 队列和控制
taskQueue *EnhancedRingQueue
controlChan chan ControlMessage
healthChan chan bool
// 统计
taskCount int64
errorCount int64
successCount int64
activeTasks int32
queuedTasks int32
restartCount int32
// 配置和上下文
config *WorkerConfig
ctx context.Context
cancel context.CancelFunc
// 同步原语
once sync.Once
mu sync.RWMutex
stopped int32 // 新增:明确的停止标志
// 任务管理
configMu sync.RWMutex
tasksMu sync.RWMutex
activeTasksMap map[int64]*AtomicTask
}
// WorkerConfig 工作者配置
type WorkerConfig struct {
ID int32
Name string
Type int32
QueueSize int32
TimeoutMs int32
MaxRetries int32
MemoryLimitKB int64
AutoRestart bool
EnableRecovery bool
RecoveryDelayMs int32
PriorityEnabled bool
}
// NewCompleteWorker 创建完整的工作者
func NewCompleteWorker(id int32, name string, config *WorkerConfig) *CompleteWorker {
if config == nil {
config = &WorkerConfig{
QueueSize: DefaultQueueSize,
TimeoutMs: DefaultTaskTimeout,
MaxRetries: DefaultMaxRetries,
AutoRestart: true,
EnableRecovery: true,
RecoveryDelayMs: 1000,
PriorityEnabled: false,
}
}
ctx, cancel := context.WithCancel(context.Background())
return &CompleteWorker{
id: id,
status: int32(WorkerStatusIdle),
taskQueue: NewEnhancedRingQueue(config.QueueSize),
controlChan: make(chan ControlMessage, 32),
healthChan: make(chan bool, 1),
config: config,
ctx: ctx,
cancel: cancel,
createdAt: time.Now().UnixNano() / 1e6,
activeTasksMap: make(map[int64]*AtomicTask),
}
}
// Run 运行工作者
func (w *CompleteWorker) Run() {
// 设置运行状态
atomic.StoreInt32(&w.status, int32(WorkerStatusRunning))
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
w.logInfo("Worker started")
// 使用 once 确保只关闭一次
var once sync.Once
taskChan := make(chan *AtomicTask, 1)
// 安全关闭函数
safeCloseTaskChan := func() {
once.Do(func() {
close(taskChan)
})
}
// 启动任务获取 goroutine
go func() {
defer safeCloseTaskChan()
for {
select {
case <-w.ctx.Done():
return
default:
}
status := atomic.LoadInt32(&w.status)
if status == int32(WorkerStatusStopped) {
return
}
// 检查是否暂停
if status == int32(WorkerStatusPaused) {
time.Sleep(100 * time.Millisecond)
continue
}
// 获取任务
task := w.taskQueue.Pop()
if task != nil {
select {
case taskChan <- task:
// 任务已发送
case <-w.ctx.Done():
return
}
} else {
// 检查是否正在排空且队列为空
if status == int32(WorkerStatusDraining) && w.taskQueue.Size() == 0 {
return
}
time.Sleep(10 * time.Millisecond)
}
}
}()
// 启动健康检查 goroutine
healthCheckDone := make(chan bool, 1)
go func() {
defer func() {
healthCheckDone <- true
}()
w.healthCheckLoop()
}()
defer func() {
safeCloseTaskChan()
// 等待健康检查循环退出
select {
case <-healthCheckDone:
// 健康检查已退出
case <-time.After(200 * time.Millisecond):
// 超时,继续执行
}
if r := recover(); r != nil {
w.logError("Worker panic: %v\nStack: %s", r, string(debug.Stack()))
}
// 清理资源
w.cleanup()
w.logInfo("Worker stopped")
}()
idleTimer := time.NewTimer(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
defer idleTimer.Stop()
for {
select {
case <-w.ctx.Done():
return
case msg, ok := <-w.controlChan:
if !ok {
// 控制通道已关闭
return
}
w.handleControlMessage(msg)
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
// 检查是否需要停止
if msg.Type == ControlMsgStop || msg.Type == ControlMsgDelete {
return
}
case task, ok := <-taskChan:
if !ok {
// 任务通道已关闭
return
}
if task == nil {
continue
}
// 重置空闲计时器
if !idleTimer.Stop() {
select {
case <-idleTimer.C:
default:
}
}
idleTimer.Reset(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
// 处理任务
w.executeTask(task)
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
case <-idleTimer.C:
if atomic.LoadInt32(&w.status) == int32(WorkerStatusIdle) {
w.logInfo("Worker idle timeout")
return
}
idleTimer.Reset(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
}
}
}
// healthCheckLoop 健康检查循环
func (w *CompleteWorker) healthCheckLoop() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-w.ctx.Done():
return
case <-ticker.C:
// 安全发送健康信号
w.safeSendHealthSignal()
}
}
}
// safeSendHealthSignal 安全发送健康信号
func (w *CompleteWorker) safeSendHealthSignal() {
defer func() {
if r := recover(); r != nil {
// 忽略通道已关闭的 panic
w.logWarn("Health signal send panic (channel closed): %v", r)
}
}()
// 检查通道是否可用
if w.healthChan == nil {
return
}
// 尝试发送健康信号(非阻塞)
select {
case w.healthChan <- true:
// 成功发送
default:
// 通道满,忽略
}
}
// getTask 获取任务(支持暂停和排空)
func (w *CompleteWorker) getTask() <-chan *AtomicTask {
taskChan := make(chan *AtomicTask, 1)
go func() {
for {
status := atomic.LoadInt32(&w.status)
// 检查状态
switch status {
case WorkerStatusPaused:
time.Sleep(100 * time.Millisecond)
continue
case WorkerStatusStopped:
close(taskChan)
return
case WorkerStatusDraining:
// 排空模式:只处理现有任务
if w.taskQueue.Size() == 0 {
close(taskChan)
return
}
}
// 获取任务
if task := w.taskQueue.Pop(); task != nil {
taskChan <- task
return
}
// 短暂休眠避免忙等待
time.Sleep(10 * time.Millisecond)
}
}()
return taskChan
}
// executeTask 执行任务
func (w *CompleteWorker) executeTask(task *AtomicTask) {
// 记录开始时间
atomic.StoreInt64(&w.lastActive, time.Now().Unix())
//atomic.StoreInt64(&task.startTime, time.Now().UnixNano()/1e6)
atomic.StoreInt32(&task.status, int32(TaskStatusRunning))
// 添加到活动任务
w.addActiveTask(task)
// 更新统计
atomic.AddInt32(&w.activeTasks, 1)
atomic.AddInt64(&w.taskCount, 1)
defer func() {
// 移除活动任务
w.removeActiveTask(task.id)
// 更新统计
atomic.AddInt32(&w.activeTasks, -1)
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
// 异常恢复
if r := recover(); r != nil {
w.handlePanic(task, r)
}
}()
// 执行任务
w.processTask(task)
}
// processTask 处理任务
func (w *CompleteWorker) processTask(task *AtomicTask) {
// 获取函数并执行
fnName := task.funcName.Load().(string)
param := task.param.Load().(string)
// 调用全局函数获取器
fn := globalManager.getFunction(fnName)
if fn == nil {
task.errorMsg.Store(fmt.Sprintf("Function not found: %s", fnName))
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
atomic.AddInt64(&w.errorCount, 1)
// 更新池的失败统计
w.updatePoolFailedStats()
return
}
// 执行函数
startTime := time.Now()
result, err := fn(param)
execDuration := time.Since(startTime).Milliseconds()
// 记录执行时间
atomic.StoreInt64(&task.execDuration, execDuration)
atomic.StoreInt64(&task.endTime, time.Now().UnixNano()/1e6)
atomic.StoreInt64(&task.totalDuration, task.endTime-task.submitTime)
if err != nil {
task.errorMsg.Store(err.Error())
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
atomic.AddInt64(&w.errorCount, 1)
// 更新池的失败统计
w.updatePoolFailedStats()
} else {
task.result.Store(result)
atomic.StoreInt32(&task.status, int32(TaskStatusCompleted))
atomic.AddInt64(&w.successCount, 1)
// 更新池的成功统计
w.updatePoolSuccessStats()
}
}
// 更新池的成功统计
func (w *CompleteWorker) updatePoolSuccessStats() {
if globalManager != nil && globalManager.pool != nil {
atomic.AddInt64(&globalManager.pool.successTasks, 1)
}
}
// 更新池的失败统计
func (w *CompleteWorker) updatePoolFailedStats() {
if globalManager != nil && globalManager.pool != nil {
atomic.AddInt64(&globalManager.pool.failedTasks, 1)
}
}
// handlePanic 处理panic
func (w *CompleteWorker) handlePanic(task *AtomicTask, r interface{}) {
task.errorMsg.Store(fmt.Sprintf("Panic: %v", r))
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
atomic.AddInt64(&w.errorCount, 1)
// 更新池的失败统计
w.updatePoolFailedStats()
// 记录堆栈
w.logError("Task panic: %v\nStack: %s", r, string(debug.Stack()))
// 自动恢复
if w.getConfig().EnableRecovery && atomic.LoadInt32(&w.status) != int32(WorkerStatusStopped) {
w.restart()
}
}
// restart 重启工作者
func (w *CompleteWorker) restart() {
atomic.AddInt32(&w.restartCount, 1)
w.logInfo("Worker restarting (attempt %d)", w.restartCount)
// 发送重启控制消息
w.controlChan <- ControlMessage{Type: ControlMsgRestart}
// 延迟重启
time.Sleep(time.Duration(w.getConfig().RecoveryDelayMs) * time.Millisecond)
}
// handleControlMessage 处理控制消息
func (w *CompleteWorker) handleControlMessage(msg ControlMessage) {
// 确保通道存在
ensureChannels := func() {
if msg.Response == nil {
msg.Response = make(chan interface{}, 1)
}
if msg.Error == nil {
msg.Error = make(chan error, 1)
}
if msg.Done == nil {
msg.Done = make(chan bool, 1)
}
}
ensureChannels()
// 处理消息
var response interface{}
var err error
switch msg.Type {
case ControlMsgPause:
w.pause()
response = true
case ControlMsgResume:
w.resume()
response = true
case ControlMsgStop:
// 停止工作者
w.stop()
response = true
case ControlMsgDrain:
w.drain()
response = true
case ControlMsgRestart:
w.restartWorker()
response = true
case ControlMsgUpdate:
w.updateConfig(msg.Data)
response = true
case ControlMsgHealth:
response = w.getHealthStatus()
case ControlMsgGraceful:
w.gracefulStop()
response = true
case ControlMsgDelete:
w.delete()
response = true
default:
err = fmt.Errorf("unknown control message type: %d", msg.Type)
}
// 安全发送响应
w.sendResponse(msg, response, err)
}
// sendResponse 安全发送响应
func (w *CompleteWorker) sendResponse(msg ControlMessage, response interface{}, err error) {
// 恢复可能发生的panic
defer func() {
if r := recover(); r != nil {
w.logWarn("Ignore panic when sending response: %v", r)
}
}()
// 检查是否已停止
if atomic.LoadInt32(&w.stopped) == 1 {
return
}
// 发送错误(如果有)
if err != nil && msg.Error != nil {
select {
case msg.Error <- err:
// 成功发送错误
case <-time.After(100 * time.Millisecond):
// 超时不阻塞记录日志但不panic
w.logWarn("Error channel blocked or closed, message: %v", err)
}
}
// 发送响应
if response != nil && msg.Response != nil {
select {
case msg.Response <- response:
// 成功发送响应
case <-time.After(100 * time.Millisecond):
// 超时不阻塞记录日志但不panic
w.logWarn("Response channel blocked or closed, response: %v", response)
}
}
// 通知完成
if msg.Done != nil {
select {
case msg.Done <- true:
// 成功通知
case <-time.After(100 * time.Millisecond):
// 安全关闭通道
w.logWarn("Done channel blocked, closing safely")
func() {
defer func() { recover() }()
close(msg.Done)
}()
}
}
}
// GetStatus 获取工作者状态
func (w *CompleteWorker) GetStatus() int32 {
return atomic.LoadInt32(&w.status)
}
// IsRunning 检查工作者是否正在运行
func (w *CompleteWorker) IsRunning() bool {
status := atomic.LoadInt32(&w.status)
return status == int32(WorkerStatusRunning) ||
status == int32(WorkerStatusIdle)
}
// CanAcceptControl 检查工作者是否可以接收控制消息
func (w *CompleteWorker) CanAcceptControl() bool {
status := atomic.LoadInt32(&w.status)
return status != int32(WorkerStatusStopped) &&
atomic.LoadInt32(&w.stopped) == 0
}
// 添加健康状态获取方法
func (w *CompleteWorker) getHealthStatus() map[string]interface{} {
return map[string]interface{}{
"id": w.id,
"status": atomic.LoadInt32(&w.status),
"taskCount": atomic.LoadInt64(&w.taskCount),
"errorCount": atomic.LoadInt64(&w.errorCount),
"successCount": atomic.LoadInt64(&w.successCount),
"activeTasks": atomic.LoadInt32(&w.activeTasks),
"queuedTasks": w.taskQueue.Size(),
"lastActive": atomic.LoadInt64(&w.lastActive),
"uptime": time.Now().UnixNano()/1e6 - w.createdAt,
"restartCount": atomic.LoadInt32(&w.restartCount),
}
}
// pause 暂停工作者
func (w *CompleteWorker) pause() {
// 使用原子操作确保状态正确
oldStatus := atomic.LoadInt32(&w.status)
// 允许从运行或空闲状态暂停
if oldStatus != int32(WorkerStatusRunning) &&
oldStatus != int32(WorkerStatusIdle) &&
oldStatus != int32(WorkerStatusDraining) {
return
}
// 首先暂停队列(防止新任务进入)
w.taskQueue.Pause()
// 原子更新状态
atomic.StoreInt32(&w.status, int32(WorkerStatusPaused))
// 记录日志
w.logInfo("Worker paused, old status: %d", oldStatus)
// 更新活动时间
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
}
// resume 恢复工作者(优化版本)
func (w *CompleteWorker) resume() {
oldStatus := atomic.LoadInt32(&w.status)
// 只允许从暂停状态恢复
if oldStatus != int32(WorkerStatusPaused) {
// 如果已经在运行状态,也视为成功
if oldStatus == int32(WorkerStatusRunning) {
w.logInfo("Worker already running, no need to resume")
return
}
w.logWarn("Cannot resume worker from status %d, expected paused (%d)",
oldStatus, WorkerStatusPaused)
return
}
// 重置排空状态(如果有)
if atomic.LoadInt32(&w.status) == int32(WorkerStatusDraining) {
atomic.StoreInt32(&w.status, int32(WorkerStatusPaused))
w.taskQueue.ResetDrain()
w.logInfo("Reset worker from draining to paused before resume")
}
// 恢复队列
w.taskQueue.Resume()
// 更新状态为运行中
if atomic.CompareAndSwapInt32(&w.status, int32(WorkerStatusPaused), int32(WorkerStatusRunning)) {
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
w.logInfo("Worker resumed from paused state")
} else {
w.logWarn("Worker status changed during resume, current status: %d",
atomic.LoadInt32(&w.status))
}
// 发送健康信号
select {
case w.healthChan <- true:
default:
// 通道满,忽略
}
}
// stop 停止工作者
func (w *CompleteWorker) stop() {
// 使用原子操作确保只停止一次
if !atomic.CompareAndSwapInt32(&w.stopped, 0, 1) {
w.logWarn("Worker already stopped, ignoring duplicate stop request")
return
}
// 更新状态
atomic.StoreInt32(&w.status, int32(WorkerStatusStopped))
w.logInfo("Worker %d stopping...", w.id)
// 先取消上下文,这会停止 Run 主循环
if w.cancel != nil {
w.cancel()
}
// 排空队列如果队列不为nil
if w.taskQueue != nil {
w.taskQueue.Drain()
}
// 安全清理资源(延迟执行,确保 Run 循环完全退出)
go func() {
// 等待 Run 循环完全退出
time.Sleep(100 * time.Millisecond)
w.cleanup()
}()
}
// IsStopped 检查工作者是否已停止
func (w *CompleteWorker) IsStopped() bool {
return atomic.LoadInt32(&w.stopped) == 1
}
// drain 排空工作者
func (w *CompleteWorker) drain() {
atomic.StoreInt32(&w.status, int32(WorkerStatusDraining))
w.taskQueue.Drain()
w.logInfo("Worker draining")
}
// restartWorker 重启工作者
func (w *CompleteWorker) restartWorker() {
currentStatus := atomic.LoadInt32(&w.status)
// 如果已经在停止状态,直接返回
if currentStatus == int32(WorkerStatusStopped) {
return
}
w.logInfo("Worker restarting (attempt %d)", atomic.LoadInt32(&w.restartCount)+1)
// 标记为排空状态
atomic.StoreInt32(&w.status, int32(WorkerStatusDraining))
w.taskQueue.Drain()
// 等待活动任务完成
for i := 0; i < 10; i++ {
if atomic.LoadInt32(&w.activeTasks) == 0 {
break
}
time.Sleep(50 * time.Millisecond)
}
// 停止当前工作者
w.stop()
// 如果是自动重启,创建新的工作者
if w.getConfig().AutoRestart {
// 创建新的工作者
newWorker := NewCompleteWorker(w.id, w.config.Name, w.config)
newWorker.restartCount = atomic.AddInt32(&w.restartCount, 1)
// 替换全局管理器中的工作者
if globalManager != nil {
globalManager.mu.Lock()
if existing, ok := globalManager.pool.workers.Load(w.id); ok {
// 确保旧工作者完全停止
if existing.(*CompleteWorker) != w {
existing.(*CompleteWorker).stop()
}
}
globalManager.replaceWorker(w.id, newWorker)
globalManager.mu.Unlock()
// 启动新的工作者
go newWorker.Run()
w.logInfo("Worker %d restarted successfully", w.id)
}
}
}
// updateConfig 更新配置
func (w *CompleteWorker) updateConfig(data interface{}) {
if config, ok := data.(*WorkerConfig); ok {
w.configMu.Lock()
w.config = config
w.configMu.Unlock()
w.logInfo("Worker config updated")
}
}
// reportHealth 报告健康状态
func (w *CompleteWorker) reportHealth(msg ControlMessage) {
health := map[string]interface{}{
"id": w.id,
"status": atomic.LoadInt32(&w.status),
"taskCount": atomic.LoadInt64(&w.taskCount),
"errorCount": atomic.LoadInt64(&w.errorCount),
"activeTasks": atomic.LoadInt32(&w.activeTasks),
"queuedTasks": w.taskQueue.Size(),
"lastActive": atomic.LoadInt64(&w.lastActive),
"uptime": time.Now().UnixNano()/1e6 - w.createdAt,
"restartCount": atomic.LoadInt32(&w.restartCount),
}
if msg.Response != nil {
select {
case msg.Response <- health:
default:
}
}
}
// delete 删除工作者
func (w *CompleteWorker) delete() {
w.stop()
globalManager.removeWorker(w.id)
w.logInfo("Worker deleted")
}
// SendControlSafe 安全地发送控制消息
func (w *CompleteWorker) SendControlSafe(msg ControlMessage) (response interface{}, err error) {
// 检查是否已停止
if w.IsStopped() {
return nil, fmt.Errorf("worker is stopped")
}
// 检查状态是否允许接收控制消息
status := atomic.LoadInt32(&w.status)
if status == int32(WorkerStatusStopped) {
return nil, fmt.Errorf("worker is stopped, status: %d", status)
}
// 确保响应通道
if msg.Response == nil {
msg.Response = make(chan interface{}, 1)
}
if msg.Error == nil {
msg.Error = make(chan error, 1)
}
if msg.Done == nil {
msg.Done = make(chan bool, 1)
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("send control panic: %v", r)
}
}()
// 尝试发送消息(带超时)
timeout := time.After(3 * time.Second)
select {
case w.controlChan <- msg:
// 消息已发送,等待响应
select {
case <-msg.Done:
// 尝试从响应通道获取数据
select {
case resp := <-msg.Response:
return resp, nil
case err := <-msg.Error:
return nil, err
case <-time.After(500 * time.Millisecond):
// 如果没有响应,检查工作者是否已停止
if w.IsStopped() {
return nil, fmt.Errorf("worker stopped during operation")
}
return nil, fmt.Errorf("response timeout")
}
case <-timeout:
// 检查工作者是否已停止
if w.IsStopped() {
return nil, fmt.Errorf("worker stopped during timeout")
}
return nil, fmt.Errorf("message processing timeout")
}
case <-timeout:
// 检查工作者是否已停止
if w.IsStopped() {
return nil, fmt.Errorf("worker stopped, cannot send control")
}
return nil, fmt.Errorf("control channel send timeout")
case <-w.ctx.Done():
return nil, fmt.Errorf("worker context cancelled")
}
}
// gracefulStop 优雅停止
func (w *CompleteWorker) gracefulStop() {
w.drain()
// 等待所有任务完成
for {
if atomic.LoadInt32(&w.activeTasks) == 0 && w.taskQueue.Size() == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
w.stop()
w.logInfo("Worker gracefully stopped")
}
// checkHealth 检查健康状态
func (w *CompleteWorker) checkHealth() {
// 检查是否卡住
lastActive := atomic.LoadInt64(&w.lastActive)
now := time.Now().UnixNano() / 1e6
// 如果 lastActive 是0说明刚创建跳过检查
if lastActive == 0 {
return
}
idleTime := now - lastActive
if idleTime > int64(w.getConfig().TimeoutMs*2) {
w.logWarn("Worker可能卡住最后活动时间: %dms前", idleTime)
// 自动恢复
if w.getConfig().EnableRecovery {
w.restart()
}
}
// 检查内存使用
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
if w.getConfig().MemoryLimitKB > 0 {
memoryUsageKB := int64(memStats.HeapInuse / 1024)
if memoryUsageKB > w.getConfig().MemoryLimitKB {
w.logWarn("Worker内存使用过高: %dKB (限制: %dKB)",
memoryUsageKB, w.getConfig().MemoryLimitKB)
}
}
}
// addActiveTask 添加活动任务
func (w *CompleteWorker) addActiveTask(task *AtomicTask) {
w.tasksMu.Lock()
w.activeTasksMap[task.id] = task
w.tasksMu.Unlock()
}
// removeActiveTask 移除活动任务
func (w *CompleteWorker) removeActiveTask(taskID int64) {
w.tasksMu.Lock()
delete(w.activeTasksMap, taskID)
w.tasksMu.Unlock()
}
// getActiveTasks 获取活动任务
func (w *CompleteWorker) getActiveTasks() []*AtomicTask {
w.tasksMu.RLock()
defer w.tasksMu.RUnlock()
tasks := make([]*AtomicTask, 0, len(w.activeTasksMap))
for _, task := range w.activeTasksMap {
tasks = append(tasks, task)
}
return tasks
}
// getConfig 获取配置(线程安全)
func (w *CompleteWorker) getConfig() *WorkerConfig {
w.configMu.RLock()
defer w.configMu.RUnlock()
return w.config
}
// cleanup 清理资源
func (w *CompleteWorker) cleanup() {
w.once.Do(func() {
// 首先标记为已停止
atomic.StoreInt32(&w.status, int32(WorkerStatusStopped))
atomic.StoreInt32(&w.stopped, 1)
w.logInfo("Worker %d cleanup started", w.id)
// 首先取消上下文,这会触发 healthCheckLoop 退出
if w.cancel != nil {
w.cancel()
}
// 等待一小段时间,确保循环退出
time.Sleep(50 * time.Millisecond)
// 安全关闭健康通道
w.safeCloseHealthChan()
// 然后关闭控制通道
w.safeCloseControlChan()
// 取消所有活动任务
w.tasksMu.Lock()
for _, task := range w.activeTasksMap {
if task.cancel != nil {
task.cancel()
}
}
w.activeTasksMap = make(map[int64]*AtomicTask)
w.tasksMu.Unlock()
// 排空队列
w.taskQueue.Drain()
w.logInfo("Worker %d cleanup completed", w.id)
})
}
// safeCloseHealthChan 安全关闭健康通道
func (w *CompleteWorker) safeCloseHealthChan() {
defer func() {
if r := recover(); r != nil {
// 忽略通道已关闭的 panic
w.logWarn("Ignore panic when closing health channel: %v", r)
}
}()
if w.healthChan != nil {
close(w.healthChan)
w.healthChan = nil
}
}
// safeCloseControlChan 安全关闭控制通道
func (w *CompleteWorker) safeCloseControlChan() {
defer func() {
if r := recover(); r != nil {
// 忽略通道已关闭的 panic
w.logWarn("Ignore panic when closing control channel: %v", r)
}
}()
if w.controlChan != nil {
close(w.controlChan)
w.controlChan = nil
}
}
// waitForStatus 状态验证
func (w *CompleteWorker) waitForStatus(targetStatus int32, timeoutMs int) bool {
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
for time.Now().Before(deadline) {
currentStatus := atomic.LoadInt32(&w.status)
if currentStatus == targetStatus {
return true
}
time.Sleep(10 * time.Millisecond)
}
return false
}
// logInfo 记录信息日志
func (w *CompleteWorker) logInfo(format string, args ...interface{}) {
logMessage(LogLevelInfo, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
}
// logError 记录错误日志
func (w *CompleteWorker) logError(format string, args ...interface{}) {
logMessage(LogLevelError, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
}
// logWarn 记录警告日志
func (w *CompleteWorker) logWarn(format string, args ...interface{}) {
logMessage(LogLevelWarn, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
}
// ==================== 完整的协程池实现 ====================
// CompletePool 完整的协程池
type CompletePool struct {
// 原子状态
status int32
startTime int64
workerSeq int32
// 工作者管理
workers sync.Map // map[int32]*CompleteWorker
workerCount int32
workerStatus map[int32][]int32 // 按状态分组
// 配置
config *PoolConfig
configMu sync.RWMutex
// 原子统计
totalTasks int64
successTasks int64
failedTasks int64
cancelledTasks int64
totalWorkers int32
runningWorkers int32
idleWorkers int32
pausedWorkers int32
stoppedWorkers int32
drainingWorkers int32
queuedTasks int32
activeTasks int32
// 函数注册
functions sync.Map // map[string]TaskFunc
// 控制
controlChan chan PoolControlMessage
shutdownCtx context.Context
shutdownCancel context.CancelFunc
}
// PoolControlMessage 池控制消息
type PoolControlMessage struct {
Type int
WorkerID int32
Data interface{}
Response chan interface{}
Error chan error
Done chan bool
}
// NewCompletePool 创建完整的协程池
func NewCompletePool(config *PoolConfig) *CompletePool {
if config == nil {
config = &PoolConfig{
MinWorkers: DefaultMinWorkers,
MaxWorkers: DefaultMaxWorkers,
QueueSize: DefaultQueueSize,
TaskTimeoutMs: DefaultTaskTimeout,
WorkerIdleTimeoutMs: DefaultIdleTimeout,
MemoryLimitMB: DefaultMaxMemoryMB,
ShutdownTimeoutMs: DefaultShutdownTimeout,
HealthCheckDelayMs: DefaultHealthCheckDelay,
MaxRetries: DefaultMaxRetries,
DefaultLanguage: LanguageEN,
EnableMetrics: true,
EnableAutoScaling: true,
PriorityQueue: false,
}
}
ctx, cancel := context.WithCancel(context.Background())
pool := &CompletePool{
status: int32(WorkerStatusRunning),
startTime: time.Now().UnixNano() / 1e6,
config: config,
controlChan: make(chan PoolControlMessage, 100),
shutdownCtx: ctx,
shutdownCancel: cancel,
workerStatus: make(map[int32][]int32),
}
// 初始化工作者
pool.initializeWorkers()
// 启动控制循环
go pool.controlLoop()
// 启动统计收集
if config.EnableMetrics {
go pool.collectMetrics()
}
pool.logInfo("Pool initialized with %d workers", config.MinWorkers)
return pool
}
// initializeWorkers 初始化工作者
func (p *CompletePool) initializeWorkers() {
for i := 0; i < p.config.MinWorkers; i++ {
p.createWorker()
}
atomic.StoreInt32(&p.totalWorkers, int32(p.config.MinWorkers))
atomic.StoreInt32(&p.runningWorkers, int32(p.config.MinWorkers))
}
// createWorker 创建工作者
func (p *CompletePool) createWorker() int32 {
// 检查最大工作者数
if atomic.LoadInt32(&p.workerCount) >= int32(p.config.MaxWorkers) {
return -1
}
workerID := atomic.AddInt32(&p.workerSeq, 1)
config := &WorkerConfig{
ID: workerID,
Name: fmt.Sprintf("worker_%d", workerID),
QueueSize: int32(p.config.QueueSize),
TimeoutMs: int32(p.config.TaskTimeoutMs),
MaxRetries: int32(p.config.MaxRetries),
AutoRestart: true, // 改为 true
EnableRecovery: true, // 改为 true
RecoveryDelayMs: 1000,
PriorityEnabled: p.config.PriorityQueue,
}
worker := NewCompleteWorker(workerID, config.Name, config)
// 保存工作者
p.workers.Store(workerID, worker)
atomic.AddInt32(&p.workerCount, 1)
// 更新状态分组
p.addWorkerToStatus(workerID, int32(WorkerStatusRunning))
// 启动工作者
go worker.Run()
// 等待工作者完全启动(减少等待时间)
time.Sleep(10 * time.Millisecond)
p.logInfo("Worker %d created", workerID)
return workerID
}
// controlLoop 控制循环
func (p *CompletePool) controlLoop() {
for {
select {
case <-p.shutdownCtx.Done():
return
case msg := <-p.controlChan:
p.handlePoolControl(msg)
}
}
}
// handlePoolControl 处理池控制消息
func (p *CompletePool) handlePoolControl(msg PoolControlMessage) {
// 创建响应和错误通道(如果未提供)
var responseChan chan interface{}
var errorChan chan error
var doneChan chan bool
if msg.Response == nil {
responseChan = make(chan interface{}, 1)
msg.Response = responseChan
}
if msg.Error == nil {
errorChan = make(chan error, 1)
msg.Error = errorChan
}
if msg.Done == nil {
doneChan = make(chan bool, 1)
msg.Done = doneChan
}
// 在goroutine中处理避免阻塞控制循环
go func() {
defer func() {
if r := recover(); r != nil {
if msg.Error != nil {
select {
case msg.Error <- fmt.Errorf("panic in control handler: %v", r):
default:
}
}
}
if msg.Done != nil {
close(msg.Done)
}
}()
switch msg.Type {
case 1: // 暂停工作者
p.pauseWorker(msg.WorkerID, msg)
case 2: // 恢复工作者
p.resumeWorker(msg.WorkerID, msg)
case 3: // 停止工作者
p.stopWorker(msg.WorkerID, msg)
case 4: // 排空工作者
p.drainWorker(msg.WorkerID, msg)
case 5: // 重启工作者
p.restartWorker(msg.WorkerID, msg)
case 6: // 更新工作者配置
p.updateWorkerConfig(msg.WorkerID, msg.Data, msg)
case 7: // 删除工作者
p.deleteWorker(msg.WorkerID, msg)
case 8: // 获取工作者状态
p.getWorkerStatus(msg.WorkerID, msg)
case 9: // 创建工作者
p.createNewWorker(msg)
case 10: // 暂停所有工作者
p.pauseAllWorkers(msg)
case 11: // 恢复所有工作者
p.resumeAllWorkers(msg)
case 12: // 停止所有工作者
p.stopAllWorkers(msg)
case 13: // 优雅关闭
p.gracefulShutdown(msg)
case 14: // 更新池配置
p.updatePoolConfig(msg.Data, msg)
default:
if msg.Error != nil {
msg.Error <- fmt.Errorf("unknown control type: %d", msg.Type)
}
}
}()
}
// pauseWorker 暂停工作者
func (p *CompletePool) pauseWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
// 创建响应通道
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
workerMsg := ControlMessage{
Type: ControlMsgPause,
Response: responseChan,
Error: errorChan,
Done: doneChan,
}
// 发送控制消息
select {
case worker.(*CompleteWorker).controlChan <- workerMsg:
// 等待响应
select {
case <-doneChan:
// 更新状态分组
p.removeWorkerFromStatus(workerID, int32(WorkerStatusRunning))
p.addWorkerToStatus(workerID, int32(WorkerStatusPaused))
atomic.AddInt32(&p.runningWorkers, -1)
atomic.AddInt32(&p.pausedWorkers, 1)
// 发送成功响应
if msg.Response != nil {
select {
case msg.Response <- true:
default:
}
}
case <-time.After(2 * time.Second):
if msg.Error != nil {
msg.Error <- fmt.Errorf("pause timeout")
}
}
case <-time.After(1 * time.Second):
if msg.Error != nil {
msg.Error <- fmt.Errorf("failed to send control message")
}
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// resumeWorker 恢复工作者
func (p *CompletePool) resumeWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
// 简化逻辑,直接发送控制消息
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
// 使用goroutine发送控制消息避免阻塞
go func() {
defer func() {
if r := recover(); r != nil {
if errorChan != nil {
errorChan <- fmt.Errorf("panic in resume worker: %v", r)
}
}
}()
// 使用工作者提供的安全发送方法
resp, err := worker.(*CompleteWorker).SendControlSafe(ControlMessage{
Type: ControlMsgResume,
Response: responseChan,
Error: errorChan,
Done: doneChan,
})
if err != nil {
if errorChan != nil {
errorChan <- err
}
} else if responseChan != nil {
responseChan <- resp
}
}()
// 等待响应
timeout := time.After(5 * time.Second)
select {
case resp := <-responseChan:
// 更新状态分组和统计
p.removeWorkerFromStatus(workerID, int32(WorkerStatusPaused))
p.addWorkerToStatus(workerID, int32(WorkerStatusRunning))
// 更新统计
if atomic.AddInt32(&p.pausedWorkers, -1) < 0 {
atomic.StoreInt32(&p.pausedWorkers, 0)
}
atomic.AddInt32(&p.runningWorkers, 1)
// 发送响应给调用者
if msg.Response != nil {
select {
case msg.Response <- resp:
default:
}
}
case err := <-errorChan:
if msg.Error != nil {
msg.Error <- err
}
case <-doneChan:
// 操作完成,发送默认响应
if msg.Response != nil {
msg.Response <- true
}
case <-timeout:
if msg.Error != nil {
msg.Error <- fmt.Errorf("resume worker timeout")
}
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// stopWorker 停止工作者
func (p *CompletePool) stopWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
workerInst := worker.(*CompleteWorker)
// 检查是否已停止
if workerInst.IsStopped() {
if msg.Error != nil {
select {
case msg.Error <- fmt.Errorf("worker already stopped"):
default:
}
}
return
}
// 创建响应通道
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
workerMsg := ControlMessage{
Type: ControlMsgStop,
Response: responseChan,
Error: errorChan,
Done: doneChan,
}
// 使用goroutine发送停止消息避免阻塞
go func() {
defer func() {
if r := recover(); r != nil {
// 发送错误响应
if msg.Error != nil {
select {
case msg.Error <- fmt.Errorf("worker stop panic: %v", r):
default:
}
}
}
}()
// 使用安全的发送方法
_, err := workerInst.SendControlSafe(workerMsg)
if err != nil {
if msg.Error != nil {
select {
case msg.Error <- err:
default:
}
}
return
}
// 等待停止完成
select {
case <-doneChan:
// 从池中移除工作者
p.workers.Delete(workerID)
atomic.AddInt32(&p.workerCount, -1)
// 更新状态分组和统计
p.removeWorkerFromStatus(workerID, int32(WorkerStatusStopped))
atomic.AddInt32(&p.stoppedWorkers, 1)
// 发送成功响应
if msg.Response != nil {
select {
case msg.Response <- map[string]interface{}{
"success": true,
"workerID": workerID,
}:
default:
}
}
case <-time.After(5 * time.Second):
// 超时,强制移除
p.workers.Delete(workerID)
if msg.Error != nil {
select {
case msg.Error <- fmt.Errorf("worker stop timeout, forced removal"):
default:
}
}
}
}()
} else if msg.Error != nil {
select {
case msg.Error <- fmt.Errorf("worker not found"):
default:
}
}
}
// drainWorker 排空工作者
func (p *CompletePool) drainWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
worker.(*CompleteWorker).controlChan <- ControlMessage{
Type: ControlMsgDrain,
Response: msg.Response,
Error: msg.Error,
}
// 更新状态分组
oldStatus := worker.(*CompleteWorker).status
p.removeWorkerFromStatus(workerID, oldStatus)
p.addWorkerToStatus(workerID, int32(WorkerStatusDraining))
// 更新统计
switch oldStatus {
case WorkerStatusRunning:
atomic.AddInt32(&p.runningWorkers, -1)
case WorkerStatusPaused:
atomic.AddInt32(&p.pausedWorkers, -1)
}
atomic.AddInt32(&p.drainingWorkers, 1)
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// restartWorker 重启工作者
func (p *CompletePool) restartWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
worker.(*CompleteWorker).controlChan <- ControlMessage{
Type: ControlMsgRestart,
Response: msg.Response,
Error: msg.Error,
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// deleteWorker 删除工作者
func (p *CompletePool) deleteWorker(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
worker.(*CompleteWorker).controlChan <- ControlMessage{
Type: ControlMsgDelete,
Response: msg.Response,
Error: msg.Error,
}
// 从池中移除
p.workers.Delete(workerID)
atomic.AddInt32(&p.workerCount, -1)
// 更新统计
atomic.AddInt32(&p.totalWorkers, -1)
status := atomic.LoadInt32(&worker.(*CompleteWorker).status)
switch status {
case WorkerStatusRunning:
atomic.AddInt32(&p.runningWorkers, -1)
case WorkerStatusPaused:
atomic.AddInt32(&p.pausedWorkers, -1)
case WorkerStatusIdle:
atomic.AddInt32(&p.idleWorkers, -1)
case WorkerStatusStopped:
atomic.AddInt32(&p.stoppedWorkers, -1)
case WorkerStatusDraining:
atomic.AddInt32(&p.drainingWorkers, -1)
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// updateWorkerConfig 更新工作者配置
func (p *CompletePool) updateWorkerConfig(workerID int32, data interface{}, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
if config, ok := data.(*WorkerConfig); ok {
worker.(*CompleteWorker).controlChan <- ControlMessage{
Type: ControlMsgUpdate,
Data: config,
Response: msg.Response,
Error: msg.Error,
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("invalid config data")
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// getWorkerStatus 获取工作者状态
func (p *CompletePool) getWorkerStatus(workerID int32, msg PoolControlMessage) {
if worker, ok := p.workers.Load(workerID); ok {
// 使用带缓冲的通道
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
workerMsg := ControlMessage{
Type: ControlMsgHealth,
Response: responseChan,
Error: errorChan,
Done: doneChan,
}
// 发送控制消息
select {
case worker.(*CompleteWorker).controlChan <- workerMsg:
// 等待响应
select {
case <-doneChan:
select {
case resp := <-responseChan:
if msg.Response != nil {
msg.Response <- resp
}
case err := <-errorChan:
if msg.Error != nil {
msg.Error <- err
}
default:
if msg.Error != nil {
msg.Error <- fmt.Errorf("no response from worker")
}
}
case <-time.After(2 * time.Second):
if msg.Error != nil {
msg.Error <- fmt.Errorf("response timeout")
}
}
case <-time.After(1 * time.Second):
if msg.Error != nil {
msg.Error <- fmt.Errorf("failed to send control message")
}
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("worker not found")
}
}
// createNewWorker 创建新工作者
func (p *CompletePool) createNewWorker(msg PoolControlMessage) {
workerID := p.createWorker()
if workerID > 0 && msg.Response != nil {
msg.Response <- map[string]interface{}{
"workerID": workerID,
"success": true,
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("failed to create worker")
}
}
// pauseAllWorkers 暂停所有工作者
func (p *CompletePool) pauseAllWorkers(msg PoolControlMessage) {
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
worker.controlChan <- ControlMessage{
Type: ControlMsgPause,
}
return true
})
// 更新统计
atomic.StoreInt32(&p.pausedWorkers, atomic.LoadInt32(&p.workerCount))
atomic.StoreInt32(&p.runningWorkers, 0)
atomic.StoreInt32(&p.idleWorkers, 0)
atomic.StoreInt32(&p.drainingWorkers, 0)
if msg.Response != nil {
msg.Response <- map[string]interface{}{
"success": true,
"workers": atomic.LoadInt32(&p.workerCount),
}
}
}
// resumeAllWorkers 恢复所有工作者
func (p *CompletePool) resumeAllWorkers(msg PoolControlMessage) {
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
worker.controlChan <- ControlMessage{
Type: ControlMsgResume,
}
}
return true
})
// 更新统计
atomic.StoreInt32(&p.runningWorkers, atomic.LoadInt32(&p.pausedWorkers))
atomic.StoreInt32(&p.pausedWorkers, 0)
if msg.Response != nil {
msg.Response <- map[string]interface{}{
"success": true,
"workers": atomic.LoadInt32(&p.runningWorkers),
}
}
}
// stopAllWorkers 停止所有工作者
func (p *CompletePool) stopAllWorkers(msg PoolControlMessage) {
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
worker.controlChan <- ControlMessage{
Type: ControlMsgStop,
}
return true
})
// 更新统计
atomic.StoreInt32(&p.stoppedWorkers, atomic.LoadInt32(&p.workerCount))
atomic.StoreInt32(&p.runningWorkers, 0)
atomic.StoreInt32(&p.pausedWorkers, 0)
atomic.StoreInt32(&p.idleWorkers, 0)
atomic.StoreInt32(&p.drainingWorkers, 0)
if msg.Response != nil {
msg.Response <- map[string]interface{}{
"success": true,
"workers": atomic.LoadInt32(&p.workerCount),
}
}
}
// gracefulShutdown 优雅关闭
func (p *CompletePool) gracefulShutdown(msg PoolControlMessage) {
// 先标记为排空状态,防止新任务提交
atomic.StoreInt32(&p.status, int32(WorkerStatusDraining))
logMessage(LogLevelInfo, "Pool graceful shutdown initiated")
// 收集所有工作者
workers := make([]*CompleteWorker, 0)
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
workers = append(workers, worker)
return true
})
// 排空所有工作者
for _, worker := range workers {
p.safeSendControlToWorker(worker.id, ControlMessage{Type: ControlMsgDrain})
}
// 等待所有任务完成
timeout := time.Duration(p.config.ShutdownTimeoutMs) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
allTasksCompleted := false
attempts := 0
for !allTasksCompleted && attempts < 100 { // 最多等待10秒
select {
case <-ctx.Done():
logMessage(LogLevelWarn, "Graceful shutdown timeout, forcing stop")
break
case <-ticker.C:
// 检查所有任务是否完成
allDone := true
for _, worker := range workers {
if atomic.LoadInt32(&worker.activeTasks) > 0 || worker.taskQueue.Size() > 0 {
allDone = false
break
}
}
if allDone {
allTasksCompleted = true
logMessage(LogLevelInfo, "All tasks completed, stopping workers")
break
}
attempts++
// 每1秒记录一次进度
if attempts%10 == 0 {
totalActive := int32(0)
totalQueued := int32(0)
for _, worker := range workers {
totalActive += atomic.LoadInt32(&worker.activeTasks)
totalQueued += worker.taskQueue.Size()
}
logMessage(LogLevelInfo, "Graceful shutdown in progress: %d active tasks, %d queued tasks",
totalActive, totalQueued)
}
}
}
// 停止所有工作者
for _, worker := range workers {
p.safeSendControlToWorker(worker.id, ControlMessage{Type: ControlMsgStop})
}
// 等待工作者停止
time.Sleep(200 * time.Millisecond)
// 关闭池
p.shutdownCancel()
// 更新状态
atomic.StoreInt32(&p.status, int32(WorkerStatusStopped))
// 清空工作者列表
p.workers = sync.Map{}
atomic.StoreInt32(&p.workerCount, 0)
logMessage(LogLevelInfo, "Pool graceful shutdown completed")
// 发送响应
if msg.Response != nil {
select {
case msg.Response <- map[string]interface{}{
"success": true,
"message": "pool gracefully shutdown",
}:
default:
}
}
}
// updatePoolConfig 更新池配置
func (p *CompletePool) updatePoolConfig(data interface{}, msg PoolControlMessage) {
if config, ok := data.(*PoolConfig); ok {
p.configMu.Lock()
p.config = config
p.configMu.Unlock()
if msg.Response != nil {
msg.Response <- map[string]interface{}{
"success": true,
"message": "pool config updated",
}
}
} else if msg.Error != nil {
msg.Error <- fmt.Errorf("invalid config data")
}
}
// addWorkerToStatus 添加工作者到状态分组
func (p *CompletePool) addWorkerToStatus(workerID, status int32) {
// 这里需要锁来更新map
// 由于访问不频繁,使用锁是可以接受的
if _, ok := p.workerStatus[status]; !ok {
p.workerStatus[status] = make([]int32, 0)
}
p.workerStatus[status] = append(p.workerStatus[status], workerID)
}
// removeWorkerFromStatus 从状态分组移除工作者
func (p *CompletePool) removeWorkerFromStatus(workerID, status int32) {
if workers, ok := p.workerStatus[status]; ok {
for i, id := range workers {
if id == workerID {
p.workerStatus[status] = append(workers[:i], workers[i+1:]...)
break
}
}
}
}
// collectMetrics 收集指标
func (p *CompletePool) collectMetrics() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
p.updateStatistics()
case <-p.shutdownCtx.Done():
return
}
}
}
// updateStatistics 更新统计
func (p *CompletePool) updateStatistics() {
// 收集工作者统计
var totalQueued, totalActive int32
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
totalQueued += worker.taskQueue.Size()
totalActive += atomic.LoadInt32(&worker.activeTasks)
return true
})
atomic.StoreInt32(&p.queuedTasks, totalQueued)
atomic.StoreInt32(&p.activeTasks, totalActive)
}
// SubmitTask 提交任务
func (p *CompletePool) SubmitTask(workerID int32, funcName, param string, priority int32) (int64, error) {
if atomic.LoadInt32(&p.status) != int32(WorkerStatusRunning) {
return -1, fmt.Errorf("pool not running")
}
// 检查函数是否注册
if _, ok := p.functions.Load(funcName); !ok {
return -1, fmt.Errorf("function not registered")
}
// 选择工作者
var worker *CompleteWorker
if workerID > 0 {
if w, ok := p.workers.Load(workerID); ok {
worker = w.(*CompleteWorker)
// 检查工作者状态 - 增强检查
workerStatus := atomic.LoadInt32(&worker.status)
if workerStatus != int32(WorkerStatusRunning) &&
workerStatus != int32(WorkerStatusIdle) &&
workerStatus != int32(WorkerStatusDraining) {
return -1, fmt.Errorf("worker not available, status: %d", workerStatus)
}
// 额外检查:如果工作者暂停了,队列也应该拒绝任务
if workerStatus == int32(WorkerStatusPaused) {
return -1, fmt.Errorf("worker paused")
}
} else {
return -1, fmt.Errorf("worker not found")
}
} else {
// 负载均衡选择 - 排除暂停的工作者
worker = p.selectWorker()
if worker == nil {
return -1, fmt.Errorf("no available worker")
}
// 确保选中的工作者不是暂停状态
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
return -1, fmt.Errorf("selected worker is paused")
}
}
// 创建任务
task := NewAtomicTask(int(worker.id), funcName, param,
int(priority), int(p.config.MaxRetries), p.config.TaskTimeoutMs)
// 提交到任务队列
if !worker.taskQueue.Push(task) {
return -1, fmt.Errorf("queue full or worker paused")
}
// 注册任务到全局注册表
if globalManager != nil && globalManager.registry != nil {
globalManager.registry.Register(task.id, task)
}
// 更新统计
atomic.AddInt64(&p.totalTasks, 1)
return task.id, nil
}
// selectWorker 选择工作者(负载均衡)
func (p *CompletePool) selectWorker() *CompleteWorker {
var selected *CompleteWorker
var minLoad int32 = 1<<31 - 1
p.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
status := atomic.LoadInt32(&worker.status)
// 只选择运行中的工作者
if status != int32(WorkerStatusRunning) && status != int32(WorkerStatusIdle) {
return true
}
load := worker.taskQueue.Size()
if load < minLoad {
minLoad = load
selected = worker
}
return true
})
return selected
}
// RegisterFunction 注册函数
func (p *CompletePool) RegisterFunction(name string, fn TaskFunc) error {
p.functions.Store(name, fn)
return nil
}
// GetFunction 获取函数
func (p *CompletePool) GetFunction(name string) TaskFunc {
if fn, ok := p.functions.Load(name); ok {
return fn.(TaskFunc)
}
return nil
}
// SendControl 发送控制消息
func (p *CompletePool) SendControl(msg PoolControlMessage) (interface{}, error) {
// 检查池状态
if atomic.LoadInt32(&p.status) != int32(WorkerStatusRunning) {
return nil, fmt.Errorf("pool not running")
}
done := make(chan bool, 1)
response := make(chan interface{}, 1)
errChan := make(chan error, 1)
msg.Done = done
msg.Response = response
msg.Error = errChan
// 发送控制消息
select {
case p.controlChan <- msg:
// 等待响应
select {
case <-done:
select {
case resp := <-response:
return resp, nil
case err := <-errChan:
return nil, err
default:
return nil, fmt.Errorf("no response from worker")
}
case <-time.After(5 * time.Second):
return nil, fmt.Errorf("control message timeout")
}
case <-time.After(2 * time.Second):
return nil, fmt.Errorf("control channel full or pool stopped")
}
}
func (p *CompletePool) safeSendControlToWorker(workerID int32, msg ControlMessage) bool {
if worker, ok := p.workers.Load(workerID); ok {
workerInst := worker.(*CompleteWorker)
status := atomic.LoadInt32(&workerInst.status)
// 检查工作者状态
if status == int32(WorkerStatusStopped) {
return false
}
// 尝试发送消息
select {
case workerInst.controlChan <- msg:
return true
case <-time.After(1 * time.Second):
return false
case <-p.shutdownCtx.Done():
return false
}
}
return false
}
// Shutdown 关闭池
func (p *CompletePool) Shutdown() {
p.gracefulShutdown(PoolControlMessage{
Type: ControlMsgGraceful,
})
}
// GetStats 获取统计信息
func (p *CompletePool) GetStats() map[string]interface{} {
stats := make(map[string]interface{})
stats["totalWorkers"] = atomic.LoadInt32(&p.totalWorkers)
stats["runningWorkers"] = atomic.LoadInt32(&p.runningWorkers)
stats["idleWorkers"] = atomic.LoadInt32(&p.idleWorkers)
stats["pausedWorkers"] = atomic.LoadInt32(&p.pausedWorkers)
stats["stoppedWorkers"] = atomic.LoadInt32(&p.stoppedWorkers)
stats["drainingWorkers"] = atomic.LoadInt32(&p.drainingWorkers)
stats["totalTasks"] = atomic.LoadInt64(&p.totalTasks)
stats["successTasks"] = atomic.LoadInt64(&p.successTasks)
stats["failedTasks"] = atomic.LoadInt64(&p.failedTasks)
stats["cancelledTasks"] = atomic.LoadInt64(&p.cancelledTasks)
stats["queuedTasks"] = atomic.LoadInt32(&p.queuedTasks)
stats["activeTasks"] = atomic.LoadInt32(&p.activeTasks)
// 收集内存使用
var memStats runtime.MemStats
runtime.ReadMemStats(&memStats)
stats["memoryUsageMB"] = float64(memStats.Alloc) / 1024 / 1024
// 计算运行时间
uptimeSeconds := float64(time.Now().UnixNano()/1e6-p.startTime) / 1000.0
stats["uptimeSeconds"] = uptimeSeconds
// 计算吞吐量
if uptimeSeconds > 0 {
stats["throughput"] = float64(atomic.LoadInt64(&p.totalTasks)) / uptimeSeconds
} else {
stats["throughput"] = 0.0
}
return stats
}
// logInfo 记录信息日志
func (p *CompletePool) logInfo(format string, args ...interface{}) {
logMessage(LogLevelInfo, "[Pool] %s", fmt.Sprintf(format, args...))
}
// ==================== 任务注册表 ====================
// TaskRegistry 任务注册表
type TaskRegistry struct {
tasks sync.Map
cleanupMu sync.RWMutex
}
// NewTaskRegistry 创建任务注册表
func NewTaskRegistry() *TaskRegistry {
return &TaskRegistry{}
}
// Register 注册任务
func (tr *TaskRegistry) Register(taskID int64, task *AtomicTask) {
tr.tasks.Store(taskID, task)
}
// Get 获取任务
func (tr *TaskRegistry) Get(taskID int64) (*AtomicTask, bool) {
if task, ok := tr.tasks.Load(taskID); ok {
return task.(*AtomicTask), true
}
return nil, false
}
// Remove 移除任务
func (tr *TaskRegistry) Remove(taskID int64) {
tr.tasks.Delete(taskID)
}
// Cleanup 清理过期任务
func (tr *TaskRegistry) Cleanup(maxAge time.Duration) int {
tr.cleanupMu.Lock()
defer tr.cleanupMu.Unlock()
now := time.Now().UnixNano() / 1e6
removed := 0
tr.tasks.Range(func(key, value interface{}) bool {
task := value.(*AtomicTask)
taskAge := now - atomic.LoadInt64(&task.endTime)
// 如果任务已完成且超过最大年龄或者任务创建时间超过24小时则删除
if (atomic.LoadInt32(&task.status) >= TaskStatusCompleted && taskAge > maxAge.Milliseconds()) ||
(now-task.submitTime > 24*60*60*1000) {
tr.tasks.Delete(key)
removed++
}
return true
})
return removed
}
// ==================== 全局管理器 ====================
// GlobalManager 全局管理器
type GlobalManager struct {
pool *CompletePool
registry *TaskRegistry
mu sync.RWMutex
startTime int64
cleanupTicker *time.Ticker
cleanupDone chan bool
}
var (
globalManager *GlobalManager
taskIDCounter int64
initialized int32
shuttingDown int32
)
// InitGlobalManager 初始化全局管理器
func InitGlobalManager(config *PoolConfig) error {
if atomic.LoadInt32(&shuttingDown) == 1 {
return fmt.Errorf("system shutting down")
}
manager := &GlobalManager{
registry: NewTaskRegistry(),
startTime: time.Now().UnixNano() / 1e6,
cleanupDone: make(chan bool),
}
pool := NewCompletePool(config)
manager.pool = pool
// 启动任务清理器
manager.startCleanup()
// 原子交换
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&globalManager)),
unsafe.Pointer(manager))
atomic.StoreInt32(&initialized, 1)
logMessage(LogLevelInfo, "Global manager initialized")
return nil
}
// startCleanup 启动任务清理器
func (gm *GlobalManager) startCleanup() {
gm.cleanupTicker = time.NewTicker(30 * time.Minute) // 每30分钟清理一次
go func() {
for {
select {
case <-gm.cleanupTicker.C:
removed := gm.registry.Cleanup(1 * time.Hour) // 清理超过1小时的已完成任务
if removed > 0 {
logMessage(LogLevelInfo, "Task cleanup: removed %d expired tasks", removed)
}
case <-gm.cleanupDone:
return
}
}
}()
}
// stopCleanup 停止任务清理器
func (gm *GlobalManager) stopCleanup() {
if gm.cleanupTicker != nil {
gm.cleanupTicker.Stop()
}
close(gm.cleanupDone)
}
// getFunction 获取函数(线程安全)
func (gm *GlobalManager) getFunction(name string) TaskFunc {
if gm == nil || gm.pool == nil {
return nil
}
gm.mu.RLock()
defer gm.mu.RUnlock()
return gm.pool.GetFunction(name)
}
// replaceWorker 替换工作者
func (gm *GlobalManager) replaceWorker(workerID int32, worker *CompleteWorker) {
if gm == nil || gm.pool == nil {
return
}
gm.mu.Lock()
defer gm.mu.Unlock()
gm.pool.workers.Store(workerID, worker)
}
// removeWorker 移除工作者
func (gm *GlobalManager) removeWorker(workerID int32) {
if gm == nil || gm.pool == nil {
return
}
gm.mu.Lock()
defer gm.mu.Unlock()
gm.pool.workers.Delete(workerID)
atomic.AddInt32(&gm.pool.workerCount, -1)
}
// sendControlToWorker 发送控制消息给工作者
func (gm *GlobalManager) sendControlToWorkerWithRetry(workerID int32, msgType int, data interface{}, maxRetries int) (interface{}, error) {
var lastErr error
for attempt := 0; attempt <= maxRetries; attempt++ {
if attempt > 0 {
// 指数退避等待
waitTime := time.Duration(200*attempt) * time.Millisecond
logMessage(LogLevelInfo, "Retrying control message in %v (attempt %d/%d)",
waitTime, attempt, maxRetries+1)
time.Sleep(waitTime)
}
// 检查工作者是否存在
worker, ok := gm.pool.workers.Load(workerID)
if !ok {
return nil, fmt.Errorf("worker not found")
}
// 创建响应通道
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
// 发送控制消息
select {
case worker.(*CompleteWorker).controlChan <- ControlMessage{
Type: msgType,
Data: data,
Response: responseChan,
Error: errorChan,
Done: doneChan,
}:
// 等待响应
select {
case <-doneChan:
select {
case resp := <-responseChan:
return resp, nil
case err := <-errorChan:
lastErr = err
// 如果是致命错误,不重试
if strings.Contains(err.Error(), "worker not found") ||
strings.Contains(err.Error(), "worker is stopped") {
return nil, err
}
continue
}
case <-time.After(2 * time.Second):
lastErr = fmt.Errorf("response timeout")
continue
}
case <-time.After(1 * time.Second):
lastErr = fmt.Errorf("control channel busy")
continue
}
}
return nil, fmt.Errorf("control message failed after %d retries: %v", maxRetries, lastErr)
}
// ==================== 公共API ====================
// TaskFunc 任务函数类型
type TaskFunc func(param string) (string, error)
// PoolConfig 池配置
type PoolConfig struct {
MinWorkers int `json:"minWorkers"`
MaxWorkers int `json:"maxWorkers"`
QueueSize int `json:"queueSize"`
TaskTimeoutMs int `json:"taskTimeoutMs"`
WorkerIdleTimeoutMs int `json:"workerIdleTimeoutMs"`
MemoryLimitMB int `json:"memoryLimitMB"`
ShutdownTimeoutMs int `json:"shutdownTimeoutMs"`
HealthCheckDelayMs int `json:"healthCheckDelayMs"`
MaxRetries int `json:"maxRetries"`
DefaultLanguage string `json:"defaultLanguage"`
EnableMetrics bool `json:"enableMetrics"`
EnableAutoScaling bool `json:"enableAutoScaling"`
PriorityQueue bool `json:"priorityQueue"`
Name string `json:"name"`
}
// ControlMessage 控制消息公共API使用
type ControlMessage struct {
Type int
Data interface{}
Done chan bool
Error chan error
Response chan interface{}
}
// Initialize 初始化
func Initialize(minWorkers, maxWorkers int, configJSON string) error {
config := &PoolConfig{
MinWorkers: minWorkers, // 最小工作线程数 - 池启动时的基础线程数量
MaxWorkers: maxWorkers, // 最大工作线程数 - 池允许创建的最大线程数
QueueSize: DefaultQueueSize, // 任务队列大小 - 当所有工作线程忙时,可排队的任务数量
TaskTimeoutMs: DefaultTaskTimeout, // 任务超时时间(毫秒) - 单个任务执行的最大时间
WorkerIdleTimeoutMs: DefaultIdleTimeout, // 工作线程空闲超时(毫秒) - 空闲线程被回收的时间
MemoryLimitMB: DefaultMaxMemoryMB, // 内存限制(MB) - 池允许使用的最大内存
ShutdownTimeoutMs: DefaultShutdownTimeout, // 关闭超时(毫秒) - 优雅关闭的最大等待时间
HealthCheckDelayMs: DefaultHealthCheckDelay, // 健康检查延迟(毫秒) - 健康检查的时间间隔
MaxRetries: DefaultMaxRetries, // 最大重试次数 - 任务失败时的重试次数
DefaultLanguage: LanguageEN, // 默认语言 - 用于日志/错误消息
EnableMetrics: true, // 启用指标收集 - 是否收集性能指标
EnableAutoScaling: true, // 启用自动伸缩 - 是否根据负载自动调整线程数
PriorityQueue: false, // 优先级队列 - 是否按优先级处理任务
Name: "default", // 池名称 - 用于标识和监控
}
if configJSON != "" {
if err := json.Unmarshal([]byte(configJSON), config); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
}
return InitGlobalManager(config)
}
// SubmitTask 提交任务
func SubmitTask(workerID int, funcName, param string, priority int) (int64, error) {
if atomic.LoadInt32(&initialized) == 0 {
return -1, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return -1, fmt.Errorf("pool not initialized")
}
// 检查函数是否注册
if manager.pool.GetFunction(funcName) == nil {
return -1, fmt.Errorf("function not registered")
}
// 如果指定了workerID检查工作者状态
if workerID > 0 {
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
workerStatus := atomic.LoadInt32(&worker.(*CompleteWorker).status)
// 如果工作者暂停,不允许提交任务
if workerStatus == int32(WorkerStatusPaused) {
return -1, fmt.Errorf("worker %d is paused", workerID)
}
}
}
return manager.pool.SubmitTask(int32(workerID), funcName, param, int32(priority))
}
// RegisterFunction 注册函数
func RegisterFunction(name string, fn TaskFunc) error {
if atomic.LoadInt32(&initialized) == 0 {
return fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return fmt.Errorf("pool not initialized")
}
return manager.pool.RegisterFunction(name, fn)
}
// PauseWorker 暂停工作者
func PauseWorker(workerID int) error {
return sendControlToWorker(workerID, 1, nil)
}
// ResumeWorker 恢复工作者
func ResumeWorker(workerID int) error {
return sendControlToWorker(workerID, 2, nil)
}
// StopWorker 停止工作者
func StopWorker(workerID int) error {
if atomic.LoadInt32(&initialized) == 0 {
return fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return fmt.Errorf("pool not initialized")
}
// 检查工作者是否存在
worker, ok := manager.pool.workers.Load(int32(workerID))
if !ok {
// 工作者不存在,视为成功停止
return nil
}
workerInst := worker.(*CompleteWorker)
// 先检查工作者状态
if workerInst.IsStopped() {
// 工作者已经停止,视为成功
return nil
}
// 使用简单的直接调用,不使用重试循环
responseChan := make(chan interface{}, 1)
errorChan := make(chan error, 1)
doneChan := make(chan bool, 1)
// 创建一个简单的控制消息
msg := ControlMessage{
Type: ControlMsgStop,
Response: responseChan,
Error: errorChan,
Done: doneChan,
}
// 直接发送控制消息
go func() {
defer func() {
if r := recover(); r != nil {
select {
case errorChan <- fmt.Errorf("panic in stop worker: %v", r):
default:
}
}
}()
// 使用工作者的安全发送方法
resp, err := workerInst.SendControlSafe(msg)
if err != nil {
select {
case errorChan <- err:
default:
}
} else if responseChan != nil {
responseChan <- resp
}
}()
// 等待响应超时设为3秒
timeout := time.After(3 * time.Second)
select {
case <-doneChan:
// 成功停止
return nil
case err := <-errorChan:
// 如果是"worker is stopped"或"worker not found",视为成功
if strings.Contains(strings.ToLower(err.Error()), "worker is stopped") ||
strings.Contains(strings.ToLower(err.Error()), "worker not found") {
return nil
}
return err
case <-timeout:
// 超时,检查工作者是否已经停止
if workerInst.IsStopped() {
return nil
}
return fmt.Errorf("stop worker timeout")
}
}
// DrainWorker 排空工作者
func DrainWorker(workerID int) error {
return sendControlToWorker(workerID, 4, nil)
}
// RestartWorker 重启工作者
func RestartWorker(workerID int) error {
return sendControlToWorker(workerID, 5, nil)
}
// DeleteWorker 删除工作者
func DeleteWorker(workerID int) error {
return sendControlToWorker(workerID, 7, nil)
}
// GetWorkerStatus 获取工作者状态
func GetWorkerStatus(workerID int) (map[string]interface{}, error) {
if atomic.LoadInt32(&initialized) == 0 {
return nil, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return nil, fmt.Errorf("pool not initialized")
}
// 直接访问工作者状态
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
return worker.(*CompleteWorker).getHealthStatus(), nil
}
return nil, fmt.Errorf("worker not found")
}
// CreateWorker 创建工作者
func CreateWorker() (int, error) {
resp, err := sendControlToPool(9, nil)
if err != nil {
return -1, err
}
if data, ok := resp.(map[string]interface{}); ok {
if workerID, ok := data["workerID"].(int32); ok {
return int(workerID), nil
}
}
return -1, fmt.Errorf("invalid response")
}
// PauseAllWorkers 暂停所有工作者
func PauseAllWorkers() (int, error) {
resp, err := sendControlToPool(10, nil)
if err != nil {
return 0, err
}
if data, ok := resp.(map[string]interface{}); ok {
if workers, ok := data["workers"].(int32); ok {
return int(workers), nil
}
}
return 0, fmt.Errorf("invalid response")
}
// ResumeAllWorkers 恢复所有工作者
func ResumeAllWorkers() (int, error) {
if atomic.LoadInt32(&initialized) == 0 {
return 0, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return 0, fmt.Errorf("pool not initialized")
}
// 收集所有暂停的工作者
pausedWorkers := make([]int32, 0)
manager.pool.workers.Range(func(key, value interface{}) bool {
worker := value.(*CompleteWorker)
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
pausedWorkers = append(pausedWorkers, worker.id)
}
return true
})
// 恢复每个暂停的工作者
resumedCount := 0
for _, workerID := range pausedWorkers {
if worker, ok := manager.pool.workers.Load(workerID); ok {
// 直接调用工作者的恢复方法,而不是通过控制通道
worker.(*CompleteWorker).resume()
resumedCount++
// 等待一小段时间确保状态更新
time.Sleep(10 * time.Millisecond)
}
}
// 更新池统计
if resumedCount > 0 {
atomic.StoreInt32(&manager.pool.runningWorkers,
atomic.LoadInt32(&manager.pool.runningWorkers)+int32(resumedCount))
atomic.StoreInt32(&manager.pool.pausedWorkers,
atomic.LoadInt32(&manager.pool.pausedWorkers)-int32(resumedCount))
}
return resumedCount, nil
}
// StopAllWorkers 停止所有工作者
func StopAllWorkers() (int, error) {
resp, err := sendControlToPool(12, nil)
if err != nil {
return 0, err
}
if data, ok := resp.(map[string]interface{}); ok {
if workers, ok := data["workers"].(int32); ok {
return int(workers), nil
}
}
return 0, fmt.Errorf("invalid response")
}
// GracefulShutdown 优雅关闭
func GracefulShutdown() error {
if atomic.LoadInt32(&initialized) == 0 {
return fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return fmt.Errorf("pool not initialized")
}
// 发送优雅关闭控制消息(同步等待)
_, err := sendControlToPool(13, nil)
if err != nil {
return err
}
// 等待池完全关闭最多30秒
timeout := time.After(30 * time.Second)
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
return fmt.Errorf("graceful shutdown timeout after 30 seconds")
case <-ticker.C:
// 检查池是否已完全关闭
if !IsInitialized() {
logMessage(LogLevelInfo, "Graceful shutdown completed successfully")
return nil
}
}
}
return nil
}
// UpdatePoolConfig 更新池配置
func UpdatePoolConfig(configJSON string) error {
var config PoolConfig
if err := json.Unmarshal([]byte(configJSON), &config); err != nil {
return fmt.Errorf("invalid config: %v", err)
}
_, err := sendControlToPool(14, &config)
return err
}
// GetPoolStats 获取池统计
func GetPoolStats() (map[string]interface{}, error) {
if atomic.LoadInt32(&initialized) == 0 {
return nil, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return nil, fmt.Errorf("pool not initialized")
}
return manager.pool.GetStats(), nil
}
// GetTaskInfo 获取任务信息
func GetTaskInfo(taskID int64) (map[string]interface{}, error) {
if atomic.LoadInt32(&initialized) == 0 {
return nil, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.registry == nil {
return nil, fmt.Errorf("pool not initialized")
}
task, ok := manager.registry.Get(taskID)
if !ok {
return nil, fmt.Errorf("task not found")
}
result := make(map[string]interface{})
result["id"] = task.id
result["goroutineID"] = task.goroutineID
result["status"] = atomic.LoadInt32(&task.status)
// 安全获取原子值
if funcName := task.funcName.Load(); funcName != nil {
result["funcName"] = funcName
} else {
result["funcName"] = ""
}
if param := task.param.Load(); param != nil {
result["param"] = param
} else {
result["param"] = ""
}
if resultVal := task.result.Load(); resultVal != nil {
result["result"] = resultVal
} else {
result["result"] = ""
}
if errorMsg := task.errorMsg.Load(); errorMsg != nil {
result["error"] = errorMsg
} else {
result["error"] = ""
}
result["priority"] = atomic.LoadInt32(&task.priority)
result["submitTime"] = task.submitTime
result["queueTime"] = task.queueTime
result["dequeueTime"] = task.dequeueTime
result["startTime"] = atomic.LoadInt64(&task.startTime)
result["endTime"] = atomic.LoadInt64(&task.endTime)
result["waitDuration"] = atomic.LoadInt64(&task.waitDuration)
result["execDuration"] = atomic.LoadInt64(&task.execDuration)
result["totalDuration"] = atomic.LoadInt64(&task.totalDuration)
return result, nil
}
// CancelTask 取消任务
func CancelTask(taskID int64) error {
if atomic.LoadInt32(&initialized) == 0 {
return fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.registry == nil {
return fmt.Errorf("pool not initialized")
}
task, ok := manager.registry.Get(taskID)
if !ok {
return fmt.Errorf("task not found")
}
// 取消任务
if task.cancel != nil {
task.cancel()
}
// 更新状态
if atomic.CompareAndSwapInt32(&task.status,
int32(TaskStatusPending), int32(TaskStatusCancelled)) {
task.errorMsg.Store("Task cancelled")
// 更新池的取消统计
if globalManager != nil && globalManager.pool != nil {
atomic.AddInt64(&globalManager.pool.cancelledTasks, 1)
}
return nil
}
if atomic.CompareAndSwapInt32(&task.status,
int32(TaskStatusRunning), int32(TaskStatusCancelled)) {
task.errorMsg.Store("Task cancelled")
// 更新池的取消统计
if globalManager != nil && globalManager.pool != nil {
atomic.AddInt64(&globalManager.pool.cancelledTasks, 1)
}
return nil
}
return fmt.Errorf("task cannot be cancelled")
}
// Shutdown 关闭池
func Shutdown() {
if atomic.LoadInt32(&initialized) == 0 {
return
}
manager := globalManager
if manager != nil {
if manager.pool != nil {
manager.pool.Shutdown()
}
manager.stopCleanup()
}
// 清空全局变量
globalManager = nil
atomic.StoreInt32(&initialized, 0)
logMessage(LogLevelInfo, "Pool shutdown completed")
}
// IsInitialized 检查是否初始化
func IsInitialized() bool {
if atomic.LoadInt32(&initialized) == 0 {
return false
}
// 额外的检查:确保池仍然有效
manager := globalManager
if manager == nil || manager.pool == nil {
atomic.StoreInt32(&initialized, 0) // 自动修复状态
return false
}
// 检查池的状态
poolStatus := atomic.LoadInt32(&manager.pool.status)
if poolStatus == int32(WorkerStatusStopped) {
// 池已停止,但全局状态还未更新
atomic.StoreInt32(&initialized, 0)
return false
}
return true
}
// 辅助函数
func sendControlToWorker(workerID, msgType int, data interface{}) error {
_, err := sendControlToWorkerWithResponse(workerID, msgType, data)
return err
}
// sendControlToWorkerWithResponse 发送控制消息到工作者并获取响应
func sendControlToWorkerWithResponse(workerID, msgType int, data interface{}) (interface{}, error) {
if atomic.LoadInt32(&initialized) == 0 {
return nil, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return nil, fmt.Errorf("pool not initialized")
}
// 直接发送,不重试
resp, err := manager.pool.SendControl(PoolControlMessage{
Type: msgType,
WorkerID: int32(workerID),
Data: data,
})
if err != nil {
// 如果是超时错误,可以重试一次
if strings.Contains(err.Error(), "timeout") {
time.Sleep(100 * time.Millisecond)
return manager.pool.SendControl(PoolControlMessage{
Type: msgType,
WorkerID: int32(workerID),
Data: data,
})
}
}
return resp, err
}
// WaitForWorkerState 等待工作者达到指定状态
func WaitForWorkerState(workerID int, targetState int, timeoutMs int) bool {
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
for time.Now().Before(deadline) {
status, err := GetWorkerStatus(workerID)
if err != nil {
// 如果工作者不存在,检查是否目标状态是停止
if strings.Contains(err.Error(), "not found") && targetState == WorkerStatusStopped {
return true
}
time.Sleep(50 * time.Millisecond)
continue
}
if s, ok := status["status"].(float64); ok && int(s) == targetState {
return true
}
time.Sleep(50 * time.Millisecond)
}
return false
}
func sendControlToPool(msgType int, data interface{}) (interface{}, error) {
if atomic.LoadInt32(&initialized) == 0 {
return nil, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return nil, fmt.Errorf("pool not initialized")
}
return manager.pool.SendControl(PoolControlMessage{
Type: msgType,
Data: data,
})
}
// IsWorkerAvailable 检查工作者是否可接受任务
func IsWorkerAvailable(workerID int) (bool, error) {
if atomic.LoadInt32(&initialized) == 0 {
return false, fmt.Errorf("pool not initialized")
}
manager := globalManager
if manager == nil || manager.pool == nil {
return false, fmt.Errorf("pool not initialized")
}
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
status := atomic.LoadInt32(&worker.(*CompleteWorker).status)
return status == int32(WorkerStatusRunning) || status == int32(WorkerStatusIdle), nil
}
return false, fmt.Errorf("worker not found")
}
// ==================== 辅助函数 ====================
func logMessage(level int, format string, args ...interface{}) {
if atomic.LoadInt32(&shuttingDown) == 1 {
return
}
levelStr := "INFO"
switch level {
case LogLevelDebug:
levelStr = "DEBUG"
case LogLevelWarn:
levelStr = "WARN"
case LogLevelError:
levelStr = "ERROR"
case LogLevelFatal:
levelStr = "FATAL"
}
timestamp := time.Now().Format("2006-01-02 15:04:05.000")
message := fmt.Sprintf(format, args...)
fmt.Printf("[%s] [%s] %s\n", timestamp, levelStr, message)
}
func roundUpToPowerOfTwo(n int32) int32 {
n--
n |= n >> 1
n |= n >> 2
n |= n >> 4
n |= n >> 8
n |= n >> 16
n++
return n
}
func init() {
atomic.StoreInt32(&initialized, 0)
atomic.StoreInt32(&shuttingDown, 0)
debug.SetGCPercent(200)
debug.SetMemoryLimit(int64(DefaultMaxMemoryMB) * 1024 * 1024)
// 设置panic处理器
defer func() {
if r := recover(); r != nil {
logMessage(LogLevelFatal, "Global panic: %v\nStack: %s",
r, string(debug.Stack()))
os.Exit(1)
}
}()
logMessage(LogLevelInfo, "Goroutine pool module initialized")
}