3303 lines
81 KiB
Go
3303 lines
81 KiB
Go
// Package goroutine_pool 提供高性能、功能完整的无锁协程池实现
|
||
package goroutine_pool
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"os"
|
||
"runtime"
|
||
"runtime/debug"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
"unsafe"
|
||
)
|
||
|
||
// ==================== 常量定义 ====================
|
||
|
||
const (
|
||
SuccessCode = 0
|
||
ErrorPoolNotInitialized = -1
|
||
ErrorWorkerNotFound = -2
|
||
ErrorWorkerNotIdle = -3
|
||
ErrorWorkerNotRunning = -4
|
||
ErrorWorkerNotPaused = -5
|
||
ErrorTaskSubmitTimeout = -6
|
||
ErrorQueueFull = -7
|
||
ErrorInvalidConfig = -8
|
||
ErrorMemoryExhausted = -9
|
||
ErrorMaxWorkersExceeded = -10
|
||
ErrorInvalidLanguage = -11
|
||
ErrorWorkerAlreadyRunning = -12
|
||
ErrorFunctionNotRegistered = -13
|
||
ErrorTaskNotFound = -14
|
||
ErrorTaskAlreadyCompleted = -15
|
||
ErrorTaskCancelled = -16
|
||
ErrorTaskTimeout = -17
|
||
ErrorSystemError = -18
|
||
ErrorInvalidParameter = -19
|
||
ErrorResourceUnavailable = -20
|
||
)
|
||
|
||
const (
|
||
WorkerStatusIdle = 0
|
||
WorkerStatusRunning = 1
|
||
WorkerStatusPaused = 2
|
||
WorkerStatusStopped = 3
|
||
WorkerStatusDraining = 4
|
||
)
|
||
|
||
const (
|
||
TaskStatusPending = 0
|
||
TaskStatusRunning = 1
|
||
TaskStatusCompleted = 2
|
||
TaskStatusFailed = 3
|
||
TaskStatusCancelled = 4
|
||
TaskStatusTimeout = 5
|
||
)
|
||
|
||
const (
|
||
LogLevelDebug = 0
|
||
LogLevelInfo = 1
|
||
LogLevelWarn = 2
|
||
LogLevelError = 3
|
||
LogLevelFatal = 4
|
||
)
|
||
|
||
const (
|
||
DefaultQueueSize = 1000
|
||
DefaultTaskTimeout = 30000
|
||
DefaultIdleTimeout = 60000
|
||
DefaultMaxMemoryMB = 1024
|
||
DefaultMinWorkers = 2
|
||
DefaultMaxWorkers = 100
|
||
DefaultShutdownTimeout = 30000
|
||
DefaultHealthCheckDelay = 5000
|
||
DefaultMaxRetries = 3
|
||
DefaultResultBufferSize = 65536
|
||
)
|
||
|
||
const (
|
||
WorkerTypeAsync = 0
|
||
WorkerTypeSync = 1
|
||
)
|
||
|
||
const (
|
||
ControlMsgStop = 1
|
||
ControlMsgPause = 2
|
||
ControlMsgResume = 3
|
||
ControlMsgDrain = 4
|
||
ControlMsgRestart = 5
|
||
ControlMsgUpdate = 6
|
||
ControlMsgHealth = 7
|
||
ControlMsgGraceful = 8
|
||
ControlMsgDelete = 9
|
||
)
|
||
|
||
const (
|
||
PriorityLow = 0
|
||
PriorityNormal = 5
|
||
PriorityHigh = 10
|
||
PriorityUrgent = 15
|
||
)
|
||
|
||
const (
|
||
LanguageCN = "cn_zh"
|
||
LanguageEN = "en_us"
|
||
)
|
||
|
||
// ==================== 原子数据结构 ====================
|
||
|
||
// AtomicTask 原子任务
|
||
type AtomicTask struct {
|
||
id int64
|
||
goroutineID int32
|
||
status int32
|
||
priority int32
|
||
retryCount int32
|
||
maxRetries int32
|
||
submitTime int64
|
||
queueTime int64
|
||
dequeueTime int64
|
||
startTime int64
|
||
endTime int64
|
||
waitDuration int64
|
||
execDuration int64
|
||
totalDuration int64
|
||
|
||
funcName atomic.Value // string
|
||
param atomic.Value // string
|
||
result atomic.Value // string
|
||
errorMsg atomic.Value // string
|
||
|
||
callback unsafe.Pointer // *TaskCallback
|
||
userData unsafe.Pointer // interface{}
|
||
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
}
|
||
|
||
// NewAtomicTask 创建原子任务
|
||
func NewAtomicTask(goroutineID int, funcName, param string, priority, maxRetries, timeoutMs int) *AtomicTask {
|
||
if timeoutMs <= 0 {
|
||
timeoutMs = DefaultTaskTimeout
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeoutMs)*time.Millisecond)
|
||
now := time.Now().UnixNano() / 1e6
|
||
|
||
task := &AtomicTask{
|
||
id: atomic.AddInt64(&taskIDCounter, 1),
|
||
goroutineID: int32(goroutineID),
|
||
status: int32(TaskStatusPending),
|
||
priority: int32(priority),
|
||
retryCount: 0,
|
||
maxRetries: int32(maxRetries),
|
||
submitTime: now,
|
||
queueTime: now,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
}
|
||
|
||
task.funcName.Store(funcName)
|
||
task.param.Store(param)
|
||
|
||
return task
|
||
}
|
||
|
||
// ==================== 增强的无锁环形队列 ====================
|
||
|
||
// EnhancedRingQueue 增强的环形队列(支持暂停和排空)
|
||
type EnhancedRingQueue struct {
|
||
items []unsafe.Pointer
|
||
capacity int32
|
||
head int32
|
||
tail int32
|
||
size int32
|
||
paused int32 // 暂停状态
|
||
draining int32 // 排空状态
|
||
cond *sync.Cond
|
||
}
|
||
|
||
// NewEnhancedRingQueue 创建增强环形队列
|
||
func NewEnhancedRingQueue(capacity int32) *EnhancedRingQueue {
|
||
if capacity <= 0 {
|
||
capacity = 1000
|
||
}
|
||
|
||
capacity = roundUpToPowerOfTwo(capacity)
|
||
|
||
queue := &EnhancedRingQueue{
|
||
items: make([]unsafe.Pointer, capacity),
|
||
capacity: capacity,
|
||
cond: sync.NewCond(&sync.Mutex{}),
|
||
}
|
||
|
||
return queue
|
||
}
|
||
|
||
// Push 入队(支持暂停检查)
|
||
func (q *EnhancedRingQueue) Push(task *AtomicTask) bool {
|
||
// 检查是否暂停(排空模式不接受新任务)
|
||
if atomic.LoadInt32(&q.paused) == 1 {
|
||
return false // 暂停时立即返回失败
|
||
}
|
||
|
||
// 排空模式只处理已有任务
|
||
if atomic.LoadInt32(&q.draining) == 1 {
|
||
return false
|
||
}
|
||
|
||
for {
|
||
tail := atomic.LoadInt32(&q.tail)
|
||
head := atomic.LoadInt32(&q.head)
|
||
next := (tail + 1) & (q.capacity - 1)
|
||
|
||
if next == head {
|
||
return false // 队列满
|
||
}
|
||
|
||
if atomic.CompareAndSwapInt32(&q.tail, tail, next) {
|
||
atomic.StorePointer(&q.items[tail], unsafe.Pointer(task))
|
||
atomic.AddInt32(&q.size, 1)
|
||
q.cond.Broadcast()
|
||
return true
|
||
}
|
||
}
|
||
}
|
||
|
||
// Pop 出队(支持排空模式)
|
||
func (q *EnhancedRingQueue) Pop() *AtomicTask {
|
||
for {
|
||
// 排空模式:只处理已有任务,不接受新任务
|
||
if atomic.LoadInt32(&q.draining) == 1 && atomic.LoadInt32(&q.size) == 0 {
|
||
return nil
|
||
}
|
||
|
||
head := atomic.LoadInt32(&q.head)
|
||
tail := atomic.LoadInt32(&q.tail)
|
||
|
||
if head == tail {
|
||
// 等待新任务
|
||
q.cond.L.Lock()
|
||
for atomic.LoadInt32(&q.size) == 0 {
|
||
if atomic.LoadInt32(&q.draining) == 1 {
|
||
q.cond.L.Unlock()
|
||
return nil
|
||
}
|
||
q.cond.Wait()
|
||
}
|
||
q.cond.L.Unlock()
|
||
continue
|
||
}
|
||
|
||
idx := head & (q.capacity - 1)
|
||
task := (*AtomicTask)(atomic.LoadPointer(&q.items[idx]))
|
||
|
||
if task == nil {
|
||
atomic.CompareAndSwapInt32(&q.head, head, (head+1)&(q.capacity-1))
|
||
atomic.AddInt32(&q.size, -1)
|
||
continue
|
||
}
|
||
|
||
if atomic.CompareAndSwapInt32(&q.head, head, (head+1)&(q.capacity-1)) {
|
||
atomic.StorePointer(&q.items[idx], nil)
|
||
atomic.AddInt32(&q.size, -1)
|
||
return task
|
||
}
|
||
}
|
||
}
|
||
|
||
// Size 获取队列大小
|
||
func (q *EnhancedRingQueue) Size() int32 {
|
||
return atomic.LoadInt32(&q.size)
|
||
}
|
||
|
||
// Capacity 获取队列容量
|
||
func (q *EnhancedRingQueue) Capacity() int32 {
|
||
return q.capacity
|
||
}
|
||
|
||
// Pause 暂停队列
|
||
func (q *EnhancedRingQueue) Pause() {
|
||
atomic.StoreInt32(&q.paused, 1)
|
||
}
|
||
|
||
// Resume 恢复队列
|
||
func (q *EnhancedRingQueue) Resume() {
|
||
atomic.StoreInt32(&q.paused, 0)
|
||
q.cond.Broadcast()
|
||
}
|
||
|
||
// Drain 排空队列
|
||
func (q *EnhancedRingQueue) Drain() {
|
||
atomic.StoreInt32(&q.draining, 1)
|
||
q.cond.Broadcast()
|
||
}
|
||
|
||
// ResetDrain 重置排空状态
|
||
func (q *EnhancedRingQueue) ResetDrain() {
|
||
atomic.StoreInt32(&q.draining, 0)
|
||
}
|
||
|
||
// ==================== 完整的工作者实现 ====================
|
||
|
||
// CompleteWorker 完整的工作者实现
|
||
type CompleteWorker struct {
|
||
// 原子状态
|
||
id int32
|
||
status int32
|
||
lastActive int64
|
||
createdAt int64
|
||
|
||
// 队列和控制
|
||
taskQueue *EnhancedRingQueue
|
||
controlChan chan ControlMessage
|
||
healthChan chan bool
|
||
|
||
// 统计
|
||
taskCount int64
|
||
errorCount int64
|
||
successCount int64
|
||
activeTasks int32
|
||
queuedTasks int32
|
||
restartCount int32
|
||
|
||
// 配置和上下文
|
||
config *WorkerConfig
|
||
ctx context.Context
|
||
cancel context.CancelFunc
|
||
|
||
// 同步原语
|
||
once sync.Once
|
||
mu sync.RWMutex
|
||
stopped int32 // 新增:明确的停止标志
|
||
|
||
// 任务管理
|
||
configMu sync.RWMutex
|
||
tasksMu sync.RWMutex
|
||
activeTasksMap map[int64]*AtomicTask
|
||
}
|
||
|
||
// WorkerConfig 工作者配置
|
||
type WorkerConfig struct {
|
||
ID int32
|
||
Name string
|
||
Type int32
|
||
QueueSize int32
|
||
TimeoutMs int32
|
||
MaxRetries int32
|
||
MemoryLimitKB int64
|
||
AutoRestart bool
|
||
EnableRecovery bool
|
||
RecoveryDelayMs int32
|
||
PriorityEnabled bool
|
||
}
|
||
|
||
// NewCompleteWorker 创建完整的工作者
|
||
func NewCompleteWorker(id int32, name string, config *WorkerConfig) *CompleteWorker {
|
||
if config == nil {
|
||
config = &WorkerConfig{
|
||
QueueSize: DefaultQueueSize,
|
||
TimeoutMs: DefaultTaskTimeout,
|
||
MaxRetries: DefaultMaxRetries,
|
||
AutoRestart: true,
|
||
EnableRecovery: true,
|
||
RecoveryDelayMs: 1000,
|
||
PriorityEnabled: false,
|
||
}
|
||
}
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
return &CompleteWorker{
|
||
id: id,
|
||
status: int32(WorkerStatusIdle),
|
||
taskQueue: NewEnhancedRingQueue(config.QueueSize),
|
||
controlChan: make(chan ControlMessage, 32),
|
||
healthChan: make(chan bool, 1),
|
||
config: config,
|
||
ctx: ctx,
|
||
cancel: cancel,
|
||
createdAt: time.Now().UnixNano() / 1e6,
|
||
activeTasksMap: make(map[int64]*AtomicTask),
|
||
}
|
||
}
|
||
|
||
// Run 运行工作者
|
||
func (w *CompleteWorker) Run() {
|
||
// 设置运行状态
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusRunning))
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
w.logInfo("Worker started")
|
||
|
||
// 使用 once 确保只关闭一次
|
||
var once sync.Once
|
||
taskChan := make(chan *AtomicTask, 1)
|
||
|
||
// 安全关闭函数
|
||
safeCloseTaskChan := func() {
|
||
once.Do(func() {
|
||
close(taskChan)
|
||
})
|
||
}
|
||
|
||
// 启动任务获取 goroutine
|
||
go func() {
|
||
defer safeCloseTaskChan()
|
||
|
||
for {
|
||
select {
|
||
case <-w.ctx.Done():
|
||
return
|
||
default:
|
||
}
|
||
|
||
status := atomic.LoadInt32(&w.status)
|
||
if status == int32(WorkerStatusStopped) {
|
||
return
|
||
}
|
||
|
||
// 检查是否暂停
|
||
if status == int32(WorkerStatusPaused) {
|
||
time.Sleep(100 * time.Millisecond)
|
||
continue
|
||
}
|
||
|
||
// 获取任务
|
||
task := w.taskQueue.Pop()
|
||
if task != nil {
|
||
select {
|
||
case taskChan <- task:
|
||
// 任务已发送
|
||
case <-w.ctx.Done():
|
||
return
|
||
}
|
||
} else {
|
||
// 检查是否正在排空且队列为空
|
||
if status == int32(WorkerStatusDraining) && w.taskQueue.Size() == 0 {
|
||
return
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 启动健康检查 goroutine
|
||
healthCheckDone := make(chan bool, 1)
|
||
go func() {
|
||
defer func() {
|
||
healthCheckDone <- true
|
||
}()
|
||
w.healthCheckLoop()
|
||
}()
|
||
|
||
defer func() {
|
||
safeCloseTaskChan()
|
||
|
||
// 等待健康检查循环退出
|
||
select {
|
||
case <-healthCheckDone:
|
||
// 健康检查已退出
|
||
case <-time.After(200 * time.Millisecond):
|
||
// 超时,继续执行
|
||
}
|
||
|
||
if r := recover(); r != nil {
|
||
w.logError("Worker panic: %v\nStack: %s", r, string(debug.Stack()))
|
||
}
|
||
|
||
// 清理资源
|
||
w.cleanup()
|
||
w.logInfo("Worker stopped")
|
||
}()
|
||
|
||
idleTimer := time.NewTimer(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
|
||
defer idleTimer.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-w.ctx.Done():
|
||
return
|
||
|
||
case msg, ok := <-w.controlChan:
|
||
if !ok {
|
||
// 控制通道已关闭
|
||
return
|
||
}
|
||
w.handleControlMessage(msg)
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
|
||
// 检查是否需要停止
|
||
if msg.Type == ControlMsgStop || msg.Type == ControlMsgDelete {
|
||
return
|
||
}
|
||
|
||
case task, ok := <-taskChan:
|
||
if !ok {
|
||
// 任务通道已关闭
|
||
return
|
||
}
|
||
if task == nil {
|
||
continue
|
||
}
|
||
|
||
// 重置空闲计时器
|
||
if !idleTimer.Stop() {
|
||
select {
|
||
case <-idleTimer.C:
|
||
default:
|
||
}
|
||
}
|
||
idleTimer.Reset(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
|
||
|
||
// 处理任务
|
||
w.executeTask(task)
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
|
||
case <-idleTimer.C:
|
||
if atomic.LoadInt32(&w.status) == int32(WorkerStatusIdle) {
|
||
w.logInfo("Worker idle timeout")
|
||
return
|
||
}
|
||
idleTimer.Reset(time.Duration(w.getConfig().TimeoutMs) * time.Millisecond)
|
||
}
|
||
}
|
||
}
|
||
|
||
// healthCheckLoop 健康检查循环
|
||
func (w *CompleteWorker) healthCheckLoop() {
|
||
ticker := time.NewTicker(1 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-w.ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
// 安全发送健康信号
|
||
w.safeSendHealthSignal()
|
||
}
|
||
}
|
||
}
|
||
|
||
// safeSendHealthSignal 安全发送健康信号
|
||
func (w *CompleteWorker) safeSendHealthSignal() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 忽略通道已关闭的 panic
|
||
w.logWarn("Health signal send panic (channel closed): %v", r)
|
||
}
|
||
}()
|
||
|
||
// 检查通道是否可用
|
||
if w.healthChan == nil {
|
||
return
|
||
}
|
||
|
||
// 尝试发送健康信号(非阻塞)
|
||
select {
|
||
case w.healthChan <- true:
|
||
// 成功发送
|
||
default:
|
||
// 通道满,忽略
|
||
}
|
||
}
|
||
|
||
// getTask 获取任务(支持暂停和排空)
|
||
func (w *CompleteWorker) getTask() <-chan *AtomicTask {
|
||
taskChan := make(chan *AtomicTask, 1)
|
||
|
||
go func() {
|
||
for {
|
||
status := atomic.LoadInt32(&w.status)
|
||
|
||
// 检查状态
|
||
switch status {
|
||
case WorkerStatusPaused:
|
||
time.Sleep(100 * time.Millisecond)
|
||
continue
|
||
case WorkerStatusStopped:
|
||
close(taskChan)
|
||
return
|
||
case WorkerStatusDraining:
|
||
// 排空模式:只处理现有任务
|
||
if w.taskQueue.Size() == 0 {
|
||
close(taskChan)
|
||
return
|
||
}
|
||
}
|
||
|
||
// 获取任务
|
||
if task := w.taskQueue.Pop(); task != nil {
|
||
taskChan <- task
|
||
return
|
||
}
|
||
|
||
// 短暂休眠避免忙等待
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}()
|
||
|
||
return taskChan
|
||
}
|
||
|
||
// executeTask 执行任务
|
||
func (w *CompleteWorker) executeTask(task *AtomicTask) {
|
||
// 记录开始时间
|
||
atomic.StoreInt64(&w.lastActive, time.Now().Unix())
|
||
//atomic.StoreInt64(&task.startTime, time.Now().UnixNano()/1e6)
|
||
atomic.StoreInt32(&task.status, int32(TaskStatusRunning))
|
||
|
||
// 添加到活动任务
|
||
w.addActiveTask(task)
|
||
|
||
// 更新统计
|
||
atomic.AddInt32(&w.activeTasks, 1)
|
||
atomic.AddInt64(&w.taskCount, 1)
|
||
|
||
defer func() {
|
||
// 移除活动任务
|
||
w.removeActiveTask(task.id)
|
||
|
||
// 更新统计
|
||
atomic.AddInt32(&w.activeTasks, -1)
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
|
||
// 异常恢复
|
||
if r := recover(); r != nil {
|
||
w.handlePanic(task, r)
|
||
}
|
||
}()
|
||
|
||
// 执行任务
|
||
w.processTask(task)
|
||
}
|
||
|
||
// processTask 处理任务
|
||
func (w *CompleteWorker) processTask(task *AtomicTask) {
|
||
// 获取函数并执行
|
||
fnName := task.funcName.Load().(string)
|
||
param := task.param.Load().(string)
|
||
|
||
// 调用全局函数获取器
|
||
fn := globalManager.getFunction(fnName)
|
||
if fn == nil {
|
||
task.errorMsg.Store(fmt.Sprintf("Function not found: %s", fnName))
|
||
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
|
||
atomic.AddInt64(&w.errorCount, 1)
|
||
// 更新池的失败统计
|
||
w.updatePoolFailedStats()
|
||
return
|
||
}
|
||
|
||
// 执行函数
|
||
startTime := time.Now()
|
||
result, err := fn(param)
|
||
execDuration := time.Since(startTime).Milliseconds()
|
||
|
||
// 记录执行时间
|
||
atomic.StoreInt64(&task.execDuration, execDuration)
|
||
atomic.StoreInt64(&task.endTime, time.Now().UnixNano()/1e6)
|
||
atomic.StoreInt64(&task.totalDuration, task.endTime-task.submitTime)
|
||
|
||
if err != nil {
|
||
task.errorMsg.Store(err.Error())
|
||
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
|
||
atomic.AddInt64(&w.errorCount, 1)
|
||
// 更新池的失败统计
|
||
w.updatePoolFailedStats()
|
||
} else {
|
||
task.result.Store(result)
|
||
atomic.StoreInt32(&task.status, int32(TaskStatusCompleted))
|
||
atomic.AddInt64(&w.successCount, 1)
|
||
// 更新池的成功统计
|
||
w.updatePoolSuccessStats()
|
||
}
|
||
}
|
||
|
||
// 更新池的成功统计
|
||
func (w *CompleteWorker) updatePoolSuccessStats() {
|
||
if globalManager != nil && globalManager.pool != nil {
|
||
atomic.AddInt64(&globalManager.pool.successTasks, 1)
|
||
}
|
||
}
|
||
|
||
// 更新池的失败统计
|
||
func (w *CompleteWorker) updatePoolFailedStats() {
|
||
if globalManager != nil && globalManager.pool != nil {
|
||
atomic.AddInt64(&globalManager.pool.failedTasks, 1)
|
||
}
|
||
}
|
||
|
||
// handlePanic 处理panic
|
||
func (w *CompleteWorker) handlePanic(task *AtomicTask, r interface{}) {
|
||
task.errorMsg.Store(fmt.Sprintf("Panic: %v", r))
|
||
atomic.StoreInt32(&task.status, int32(TaskStatusFailed))
|
||
atomic.AddInt64(&w.errorCount, 1)
|
||
// 更新池的失败统计
|
||
w.updatePoolFailedStats()
|
||
|
||
// 记录堆栈
|
||
w.logError("Task panic: %v\nStack: %s", r, string(debug.Stack()))
|
||
|
||
// 自动恢复
|
||
if w.getConfig().EnableRecovery && atomic.LoadInt32(&w.status) != int32(WorkerStatusStopped) {
|
||
w.restart()
|
||
}
|
||
}
|
||
|
||
// restart 重启工作者
|
||
func (w *CompleteWorker) restart() {
|
||
atomic.AddInt32(&w.restartCount, 1)
|
||
w.logInfo("Worker restarting (attempt %d)", w.restartCount)
|
||
|
||
// 发送重启控制消息
|
||
w.controlChan <- ControlMessage{Type: ControlMsgRestart}
|
||
|
||
// 延迟重启
|
||
time.Sleep(time.Duration(w.getConfig().RecoveryDelayMs) * time.Millisecond)
|
||
}
|
||
|
||
// handleControlMessage 处理控制消息
|
||
func (w *CompleteWorker) handleControlMessage(msg ControlMessage) {
|
||
// 确保通道存在
|
||
ensureChannels := func() {
|
||
if msg.Response == nil {
|
||
msg.Response = make(chan interface{}, 1)
|
||
}
|
||
if msg.Error == nil {
|
||
msg.Error = make(chan error, 1)
|
||
}
|
||
if msg.Done == nil {
|
||
msg.Done = make(chan bool, 1)
|
||
}
|
||
}
|
||
|
||
ensureChannels()
|
||
|
||
// 处理消息
|
||
var response interface{}
|
||
var err error
|
||
|
||
switch msg.Type {
|
||
case ControlMsgPause:
|
||
w.pause()
|
||
response = true
|
||
case ControlMsgResume:
|
||
w.resume()
|
||
response = true
|
||
case ControlMsgStop:
|
||
// 停止工作者
|
||
w.stop()
|
||
response = true
|
||
case ControlMsgDrain:
|
||
w.drain()
|
||
response = true
|
||
case ControlMsgRestart:
|
||
w.restartWorker()
|
||
response = true
|
||
case ControlMsgUpdate:
|
||
w.updateConfig(msg.Data)
|
||
response = true
|
||
case ControlMsgHealth:
|
||
response = w.getHealthStatus()
|
||
case ControlMsgGraceful:
|
||
w.gracefulStop()
|
||
response = true
|
||
case ControlMsgDelete:
|
||
w.delete()
|
||
response = true
|
||
default:
|
||
err = fmt.Errorf("unknown control message type: %d", msg.Type)
|
||
}
|
||
|
||
// 安全发送响应
|
||
w.sendResponse(msg, response, err)
|
||
}
|
||
|
||
// sendResponse 安全发送响应
|
||
func (w *CompleteWorker) sendResponse(msg ControlMessage, response interface{}, err error) {
|
||
// 恢复可能发生的panic
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
w.logWarn("Ignore panic when sending response: %v", r)
|
||
}
|
||
}()
|
||
|
||
// 检查是否已停止
|
||
if atomic.LoadInt32(&w.stopped) == 1 {
|
||
return
|
||
}
|
||
|
||
// 发送错误(如果有)
|
||
if err != nil && msg.Error != nil {
|
||
select {
|
||
case msg.Error <- err:
|
||
// 成功发送错误
|
||
case <-time.After(100 * time.Millisecond):
|
||
// 超时不阻塞,记录日志但不panic
|
||
w.logWarn("Error channel blocked or closed, message: %v", err)
|
||
}
|
||
}
|
||
|
||
// 发送响应
|
||
if response != nil && msg.Response != nil {
|
||
select {
|
||
case msg.Response <- response:
|
||
// 成功发送响应
|
||
case <-time.After(100 * time.Millisecond):
|
||
// 超时不阻塞,记录日志但不panic
|
||
w.logWarn("Response channel blocked or closed, response: %v", response)
|
||
}
|
||
}
|
||
|
||
// 通知完成
|
||
if msg.Done != nil {
|
||
select {
|
||
case msg.Done <- true:
|
||
// 成功通知
|
||
case <-time.After(100 * time.Millisecond):
|
||
// 安全关闭通道
|
||
w.logWarn("Done channel blocked, closing safely")
|
||
func() {
|
||
defer func() { recover() }()
|
||
close(msg.Done)
|
||
}()
|
||
}
|
||
}
|
||
}
|
||
|
||
// GetStatus 获取工作者状态
|
||
func (w *CompleteWorker) GetStatus() int32 {
|
||
return atomic.LoadInt32(&w.status)
|
||
}
|
||
|
||
// IsRunning 检查工作者是否正在运行
|
||
func (w *CompleteWorker) IsRunning() bool {
|
||
status := atomic.LoadInt32(&w.status)
|
||
return status == int32(WorkerStatusRunning) ||
|
||
status == int32(WorkerStatusIdle)
|
||
}
|
||
|
||
// CanAcceptControl 检查工作者是否可以接收控制消息
|
||
func (w *CompleteWorker) CanAcceptControl() bool {
|
||
status := atomic.LoadInt32(&w.status)
|
||
return status != int32(WorkerStatusStopped) &&
|
||
atomic.LoadInt32(&w.stopped) == 0
|
||
}
|
||
|
||
// 添加健康状态获取方法
|
||
func (w *CompleteWorker) getHealthStatus() map[string]interface{} {
|
||
return map[string]interface{}{
|
||
"id": w.id,
|
||
"status": atomic.LoadInt32(&w.status),
|
||
"taskCount": atomic.LoadInt64(&w.taskCount),
|
||
"errorCount": atomic.LoadInt64(&w.errorCount),
|
||
"successCount": atomic.LoadInt64(&w.successCount),
|
||
"activeTasks": atomic.LoadInt32(&w.activeTasks),
|
||
"queuedTasks": w.taskQueue.Size(),
|
||
"lastActive": atomic.LoadInt64(&w.lastActive),
|
||
"uptime": time.Now().UnixNano()/1e6 - w.createdAt,
|
||
"restartCount": atomic.LoadInt32(&w.restartCount),
|
||
}
|
||
}
|
||
|
||
// pause 暂停工作者
|
||
func (w *CompleteWorker) pause() {
|
||
// 使用原子操作确保状态正确
|
||
oldStatus := atomic.LoadInt32(&w.status)
|
||
|
||
// 允许从运行或空闲状态暂停
|
||
if oldStatus != int32(WorkerStatusRunning) &&
|
||
oldStatus != int32(WorkerStatusIdle) &&
|
||
oldStatus != int32(WorkerStatusDraining) {
|
||
return
|
||
}
|
||
|
||
// 首先暂停队列(防止新任务进入)
|
||
w.taskQueue.Pause()
|
||
|
||
// 原子更新状态
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusPaused))
|
||
|
||
// 记录日志
|
||
w.logInfo("Worker paused, old status: %d", oldStatus)
|
||
|
||
// 更新活动时间
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
}
|
||
|
||
// resume 恢复工作者(优化版本)
|
||
func (w *CompleteWorker) resume() {
|
||
oldStatus := atomic.LoadInt32(&w.status)
|
||
|
||
// 只允许从暂停状态恢复
|
||
if oldStatus != int32(WorkerStatusPaused) {
|
||
// 如果已经在运行状态,也视为成功
|
||
if oldStatus == int32(WorkerStatusRunning) {
|
||
w.logInfo("Worker already running, no need to resume")
|
||
return
|
||
}
|
||
w.logWarn("Cannot resume worker from status %d, expected paused (%d)",
|
||
oldStatus, WorkerStatusPaused)
|
||
return
|
||
}
|
||
|
||
// 重置排空状态(如果有)
|
||
if atomic.LoadInt32(&w.status) == int32(WorkerStatusDraining) {
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusPaused))
|
||
w.taskQueue.ResetDrain()
|
||
w.logInfo("Reset worker from draining to paused before resume")
|
||
}
|
||
|
||
// 恢复队列
|
||
w.taskQueue.Resume()
|
||
|
||
// 更新状态为运行中
|
||
if atomic.CompareAndSwapInt32(&w.status, int32(WorkerStatusPaused), int32(WorkerStatusRunning)) {
|
||
atomic.StoreInt64(&w.lastActive, time.Now().UnixNano()/1e6)
|
||
w.logInfo("Worker resumed from paused state")
|
||
} else {
|
||
w.logWarn("Worker status changed during resume, current status: %d",
|
||
atomic.LoadInt32(&w.status))
|
||
}
|
||
|
||
// 发送健康信号
|
||
select {
|
||
case w.healthChan <- true:
|
||
default:
|
||
// 通道满,忽略
|
||
}
|
||
}
|
||
|
||
// stop 停止工作者
|
||
func (w *CompleteWorker) stop() {
|
||
// 使用原子操作确保只停止一次
|
||
if !atomic.CompareAndSwapInt32(&w.stopped, 0, 1) {
|
||
w.logWarn("Worker already stopped, ignoring duplicate stop request")
|
||
return
|
||
}
|
||
|
||
// 更新状态
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusStopped))
|
||
w.logInfo("Worker %d stopping...", w.id)
|
||
|
||
// 先取消上下文,这会停止 Run 主循环
|
||
if w.cancel != nil {
|
||
w.cancel()
|
||
}
|
||
|
||
// 排空队列(如果队列不为nil)
|
||
if w.taskQueue != nil {
|
||
w.taskQueue.Drain()
|
||
}
|
||
|
||
// 安全清理资源(延迟执行,确保 Run 循环完全退出)
|
||
go func() {
|
||
// 等待 Run 循环完全退出
|
||
time.Sleep(100 * time.Millisecond)
|
||
w.cleanup()
|
||
}()
|
||
}
|
||
|
||
// IsStopped 检查工作者是否已停止
|
||
func (w *CompleteWorker) IsStopped() bool {
|
||
return atomic.LoadInt32(&w.stopped) == 1
|
||
}
|
||
|
||
// drain 排空工作者
|
||
func (w *CompleteWorker) drain() {
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusDraining))
|
||
w.taskQueue.Drain()
|
||
w.logInfo("Worker draining")
|
||
}
|
||
|
||
// restartWorker 重启工作者
|
||
func (w *CompleteWorker) restartWorker() {
|
||
currentStatus := atomic.LoadInt32(&w.status)
|
||
|
||
// 如果已经在停止状态,直接返回
|
||
if currentStatus == int32(WorkerStatusStopped) {
|
||
return
|
||
}
|
||
|
||
w.logInfo("Worker restarting (attempt %d)", atomic.LoadInt32(&w.restartCount)+1)
|
||
|
||
// 标记为排空状态
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusDraining))
|
||
w.taskQueue.Drain()
|
||
|
||
// 等待活动任务完成
|
||
for i := 0; i < 10; i++ {
|
||
if atomic.LoadInt32(&w.activeTasks) == 0 {
|
||
break
|
||
}
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
|
||
// 停止当前工作者
|
||
w.stop()
|
||
|
||
// 如果是自动重启,创建新的工作者
|
||
if w.getConfig().AutoRestart {
|
||
// 创建新的工作者
|
||
newWorker := NewCompleteWorker(w.id, w.config.Name, w.config)
|
||
newWorker.restartCount = atomic.AddInt32(&w.restartCount, 1)
|
||
|
||
// 替换全局管理器中的工作者
|
||
if globalManager != nil {
|
||
globalManager.mu.Lock()
|
||
if existing, ok := globalManager.pool.workers.Load(w.id); ok {
|
||
// 确保旧工作者完全停止
|
||
if existing.(*CompleteWorker) != w {
|
||
existing.(*CompleteWorker).stop()
|
||
}
|
||
}
|
||
globalManager.replaceWorker(w.id, newWorker)
|
||
globalManager.mu.Unlock()
|
||
|
||
// 启动新的工作者
|
||
go newWorker.Run()
|
||
w.logInfo("Worker %d restarted successfully", w.id)
|
||
}
|
||
}
|
||
}
|
||
|
||
// updateConfig 更新配置
|
||
func (w *CompleteWorker) updateConfig(data interface{}) {
|
||
if config, ok := data.(*WorkerConfig); ok {
|
||
w.configMu.Lock()
|
||
w.config = config
|
||
w.configMu.Unlock()
|
||
w.logInfo("Worker config updated")
|
||
}
|
||
}
|
||
|
||
// reportHealth 报告健康状态
|
||
func (w *CompleteWorker) reportHealth(msg ControlMessage) {
|
||
health := map[string]interface{}{
|
||
"id": w.id,
|
||
"status": atomic.LoadInt32(&w.status),
|
||
"taskCount": atomic.LoadInt64(&w.taskCount),
|
||
"errorCount": atomic.LoadInt64(&w.errorCount),
|
||
"activeTasks": atomic.LoadInt32(&w.activeTasks),
|
||
"queuedTasks": w.taskQueue.Size(),
|
||
"lastActive": atomic.LoadInt64(&w.lastActive),
|
||
"uptime": time.Now().UnixNano()/1e6 - w.createdAt,
|
||
"restartCount": atomic.LoadInt32(&w.restartCount),
|
||
}
|
||
|
||
if msg.Response != nil {
|
||
select {
|
||
case msg.Response <- health:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
// delete 删除工作者
|
||
func (w *CompleteWorker) delete() {
|
||
w.stop()
|
||
globalManager.removeWorker(w.id)
|
||
w.logInfo("Worker deleted")
|
||
}
|
||
|
||
// SendControlSafe 安全地发送控制消息
|
||
func (w *CompleteWorker) SendControlSafe(msg ControlMessage) (response interface{}, err error) {
|
||
// 检查是否已停止
|
||
if w.IsStopped() {
|
||
return nil, fmt.Errorf("worker is stopped")
|
||
}
|
||
|
||
// 检查状态是否允许接收控制消息
|
||
status := atomic.LoadInt32(&w.status)
|
||
if status == int32(WorkerStatusStopped) {
|
||
return nil, fmt.Errorf("worker is stopped, status: %d", status)
|
||
}
|
||
|
||
// 确保响应通道
|
||
if msg.Response == nil {
|
||
msg.Response = make(chan interface{}, 1)
|
||
}
|
||
if msg.Error == nil {
|
||
msg.Error = make(chan error, 1)
|
||
}
|
||
if msg.Done == nil {
|
||
msg.Done = make(chan bool, 1)
|
||
}
|
||
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
err = fmt.Errorf("send control panic: %v", r)
|
||
}
|
||
}()
|
||
|
||
// 尝试发送消息(带超时)
|
||
timeout := time.After(3 * time.Second)
|
||
|
||
select {
|
||
case w.controlChan <- msg:
|
||
// 消息已发送,等待响应
|
||
select {
|
||
case <-msg.Done:
|
||
// 尝试从响应通道获取数据
|
||
select {
|
||
case resp := <-msg.Response:
|
||
return resp, nil
|
||
case err := <-msg.Error:
|
||
return nil, err
|
||
case <-time.After(500 * time.Millisecond):
|
||
// 如果没有响应,检查工作者是否已停止
|
||
if w.IsStopped() {
|
||
return nil, fmt.Errorf("worker stopped during operation")
|
||
}
|
||
return nil, fmt.Errorf("response timeout")
|
||
}
|
||
case <-timeout:
|
||
// 检查工作者是否已停止
|
||
if w.IsStopped() {
|
||
return nil, fmt.Errorf("worker stopped during timeout")
|
||
}
|
||
return nil, fmt.Errorf("message processing timeout")
|
||
}
|
||
case <-timeout:
|
||
// 检查工作者是否已停止
|
||
if w.IsStopped() {
|
||
return nil, fmt.Errorf("worker stopped, cannot send control")
|
||
}
|
||
return nil, fmt.Errorf("control channel send timeout")
|
||
case <-w.ctx.Done():
|
||
return nil, fmt.Errorf("worker context cancelled")
|
||
}
|
||
}
|
||
|
||
// gracefulStop 优雅停止
|
||
func (w *CompleteWorker) gracefulStop() {
|
||
w.drain()
|
||
|
||
// 等待所有任务完成
|
||
for {
|
||
if atomic.LoadInt32(&w.activeTasks) == 0 && w.taskQueue.Size() == 0 {
|
||
break
|
||
}
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
w.stop()
|
||
w.logInfo("Worker gracefully stopped")
|
||
}
|
||
|
||
// checkHealth 检查健康状态
|
||
func (w *CompleteWorker) checkHealth() {
|
||
// 检查是否卡住
|
||
lastActive := atomic.LoadInt64(&w.lastActive)
|
||
now := time.Now().UnixNano() / 1e6
|
||
|
||
// 如果 lastActive 是0,说明刚创建,跳过检查
|
||
if lastActive == 0 {
|
||
return
|
||
}
|
||
|
||
idleTime := now - lastActive
|
||
|
||
if idleTime > int64(w.getConfig().TimeoutMs*2) {
|
||
w.logWarn("Worker可能卡住,最后活动时间: %dms前", idleTime)
|
||
|
||
// 自动恢复
|
||
if w.getConfig().EnableRecovery {
|
||
w.restart()
|
||
}
|
||
}
|
||
|
||
// 检查内存使用
|
||
var memStats runtime.MemStats
|
||
runtime.ReadMemStats(&memStats)
|
||
|
||
if w.getConfig().MemoryLimitKB > 0 {
|
||
memoryUsageKB := int64(memStats.HeapInuse / 1024)
|
||
if memoryUsageKB > w.getConfig().MemoryLimitKB {
|
||
w.logWarn("Worker内存使用过高: %dKB (限制: %dKB)",
|
||
memoryUsageKB, w.getConfig().MemoryLimitKB)
|
||
}
|
||
}
|
||
}
|
||
|
||
// addActiveTask 添加活动任务
|
||
func (w *CompleteWorker) addActiveTask(task *AtomicTask) {
|
||
w.tasksMu.Lock()
|
||
w.activeTasksMap[task.id] = task
|
||
w.tasksMu.Unlock()
|
||
}
|
||
|
||
// removeActiveTask 移除活动任务
|
||
func (w *CompleteWorker) removeActiveTask(taskID int64) {
|
||
w.tasksMu.Lock()
|
||
delete(w.activeTasksMap, taskID)
|
||
w.tasksMu.Unlock()
|
||
}
|
||
|
||
// getActiveTasks 获取活动任务
|
||
func (w *CompleteWorker) getActiveTasks() []*AtomicTask {
|
||
w.tasksMu.RLock()
|
||
defer w.tasksMu.RUnlock()
|
||
|
||
tasks := make([]*AtomicTask, 0, len(w.activeTasksMap))
|
||
for _, task := range w.activeTasksMap {
|
||
tasks = append(tasks, task)
|
||
}
|
||
return tasks
|
||
}
|
||
|
||
// getConfig 获取配置(线程安全)
|
||
func (w *CompleteWorker) getConfig() *WorkerConfig {
|
||
w.configMu.RLock()
|
||
defer w.configMu.RUnlock()
|
||
return w.config
|
||
}
|
||
|
||
// cleanup 清理资源
|
||
func (w *CompleteWorker) cleanup() {
|
||
w.once.Do(func() {
|
||
// 首先标记为已停止
|
||
atomic.StoreInt32(&w.status, int32(WorkerStatusStopped))
|
||
atomic.StoreInt32(&w.stopped, 1)
|
||
|
||
w.logInfo("Worker %d cleanup started", w.id)
|
||
|
||
// 首先取消上下文,这会触发 healthCheckLoop 退出
|
||
if w.cancel != nil {
|
||
w.cancel()
|
||
}
|
||
|
||
// 等待一小段时间,确保循环退出
|
||
time.Sleep(50 * time.Millisecond)
|
||
|
||
// 安全关闭健康通道
|
||
w.safeCloseHealthChan()
|
||
|
||
// 然后关闭控制通道
|
||
w.safeCloseControlChan()
|
||
|
||
// 取消所有活动任务
|
||
w.tasksMu.Lock()
|
||
for _, task := range w.activeTasksMap {
|
||
if task.cancel != nil {
|
||
task.cancel()
|
||
}
|
||
}
|
||
w.activeTasksMap = make(map[int64]*AtomicTask)
|
||
w.tasksMu.Unlock()
|
||
|
||
// 排空队列
|
||
w.taskQueue.Drain()
|
||
|
||
w.logInfo("Worker %d cleanup completed", w.id)
|
||
})
|
||
}
|
||
|
||
// safeCloseHealthChan 安全关闭健康通道
|
||
func (w *CompleteWorker) safeCloseHealthChan() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 忽略通道已关闭的 panic
|
||
w.logWarn("Ignore panic when closing health channel: %v", r)
|
||
}
|
||
}()
|
||
|
||
if w.healthChan != nil {
|
||
close(w.healthChan)
|
||
w.healthChan = nil
|
||
}
|
||
}
|
||
|
||
// safeCloseControlChan 安全关闭控制通道
|
||
func (w *CompleteWorker) safeCloseControlChan() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 忽略通道已关闭的 panic
|
||
w.logWarn("Ignore panic when closing control channel: %v", r)
|
||
}
|
||
}()
|
||
|
||
if w.controlChan != nil {
|
||
close(w.controlChan)
|
||
w.controlChan = nil
|
||
}
|
||
}
|
||
|
||
// waitForStatus 状态验证
|
||
func (w *CompleteWorker) waitForStatus(targetStatus int32, timeoutMs int) bool {
|
||
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
|
||
|
||
for time.Now().Before(deadline) {
|
||
currentStatus := atomic.LoadInt32(&w.status)
|
||
if currentStatus == targetStatus {
|
||
return true
|
||
}
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
// logInfo 记录信息日志
|
||
func (w *CompleteWorker) logInfo(format string, args ...interface{}) {
|
||
logMessage(LogLevelInfo, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
|
||
}
|
||
|
||
// logError 记录错误日志
|
||
func (w *CompleteWorker) logError(format string, args ...interface{}) {
|
||
logMessage(LogLevelError, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
|
||
}
|
||
|
||
// logWarn 记录警告日志
|
||
func (w *CompleteWorker) logWarn(format string, args ...interface{}) {
|
||
logMessage(LogLevelWarn, "[Worker %d] %s", w.id, fmt.Sprintf(format, args...))
|
||
}
|
||
|
||
// ==================== 完整的协程池实现 ====================
|
||
|
||
// CompletePool 完整的协程池
|
||
type CompletePool struct {
|
||
// 原子状态
|
||
status int32
|
||
startTime int64
|
||
workerSeq int32
|
||
|
||
// 工作者管理
|
||
workers sync.Map // map[int32]*CompleteWorker
|
||
workerCount int32
|
||
workerStatus map[int32][]int32 // 按状态分组
|
||
|
||
// 配置
|
||
config *PoolConfig
|
||
configMu sync.RWMutex
|
||
|
||
// 原子统计
|
||
totalTasks int64
|
||
successTasks int64
|
||
failedTasks int64
|
||
cancelledTasks int64
|
||
totalWorkers int32
|
||
runningWorkers int32
|
||
idleWorkers int32
|
||
pausedWorkers int32
|
||
stoppedWorkers int32
|
||
drainingWorkers int32
|
||
queuedTasks int32
|
||
activeTasks int32
|
||
|
||
// 函数注册
|
||
functions sync.Map // map[string]TaskFunc
|
||
|
||
// 控制
|
||
controlChan chan PoolControlMessage
|
||
shutdownCtx context.Context
|
||
shutdownCancel context.CancelFunc
|
||
}
|
||
|
||
// PoolControlMessage 池控制消息
|
||
type PoolControlMessage struct {
|
||
Type int
|
||
WorkerID int32
|
||
Data interface{}
|
||
Response chan interface{}
|
||
Error chan error
|
||
Done chan bool
|
||
}
|
||
|
||
// NewCompletePool 创建完整的协程池
|
||
func NewCompletePool(config *PoolConfig) *CompletePool {
|
||
if config == nil {
|
||
config = &PoolConfig{
|
||
MinWorkers: DefaultMinWorkers,
|
||
MaxWorkers: DefaultMaxWorkers,
|
||
QueueSize: DefaultQueueSize,
|
||
TaskTimeoutMs: DefaultTaskTimeout,
|
||
WorkerIdleTimeoutMs: DefaultIdleTimeout,
|
||
MemoryLimitMB: DefaultMaxMemoryMB,
|
||
ShutdownTimeoutMs: DefaultShutdownTimeout,
|
||
HealthCheckDelayMs: DefaultHealthCheckDelay,
|
||
MaxRetries: DefaultMaxRetries,
|
||
DefaultLanguage: LanguageEN,
|
||
EnableMetrics: true,
|
||
EnableAutoScaling: true,
|
||
PriorityQueue: false,
|
||
}
|
||
}
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
|
||
pool := &CompletePool{
|
||
status: int32(WorkerStatusRunning),
|
||
startTime: time.Now().UnixNano() / 1e6,
|
||
config: config,
|
||
controlChan: make(chan PoolControlMessage, 100),
|
||
shutdownCtx: ctx,
|
||
shutdownCancel: cancel,
|
||
workerStatus: make(map[int32][]int32),
|
||
}
|
||
|
||
// 初始化工作者
|
||
pool.initializeWorkers()
|
||
|
||
// 启动控制循环
|
||
go pool.controlLoop()
|
||
|
||
// 启动统计收集
|
||
if config.EnableMetrics {
|
||
go pool.collectMetrics()
|
||
}
|
||
|
||
pool.logInfo("Pool initialized with %d workers", config.MinWorkers)
|
||
return pool
|
||
}
|
||
|
||
// initializeWorkers 初始化工作者
|
||
func (p *CompletePool) initializeWorkers() {
|
||
for i := 0; i < p.config.MinWorkers; i++ {
|
||
p.createWorker()
|
||
}
|
||
|
||
atomic.StoreInt32(&p.totalWorkers, int32(p.config.MinWorkers))
|
||
atomic.StoreInt32(&p.runningWorkers, int32(p.config.MinWorkers))
|
||
}
|
||
|
||
// createWorker 创建工作者
|
||
func (p *CompletePool) createWorker() int32 {
|
||
// 检查最大工作者数
|
||
if atomic.LoadInt32(&p.workerCount) >= int32(p.config.MaxWorkers) {
|
||
return -1
|
||
}
|
||
|
||
workerID := atomic.AddInt32(&p.workerSeq, 1)
|
||
config := &WorkerConfig{
|
||
ID: workerID,
|
||
Name: fmt.Sprintf("worker_%d", workerID),
|
||
QueueSize: int32(p.config.QueueSize),
|
||
TimeoutMs: int32(p.config.TaskTimeoutMs),
|
||
MaxRetries: int32(p.config.MaxRetries),
|
||
AutoRestart: true, // 改为 true
|
||
EnableRecovery: true, // 改为 true
|
||
RecoveryDelayMs: 1000,
|
||
PriorityEnabled: p.config.PriorityQueue,
|
||
}
|
||
|
||
worker := NewCompleteWorker(workerID, config.Name, config)
|
||
|
||
// 保存工作者
|
||
p.workers.Store(workerID, worker)
|
||
atomic.AddInt32(&p.workerCount, 1)
|
||
|
||
// 更新状态分组
|
||
p.addWorkerToStatus(workerID, int32(WorkerStatusRunning))
|
||
|
||
// 启动工作者
|
||
go worker.Run()
|
||
|
||
// 等待工作者完全启动(减少等待时间)
|
||
time.Sleep(10 * time.Millisecond)
|
||
|
||
p.logInfo("Worker %d created", workerID)
|
||
return workerID
|
||
}
|
||
|
||
// controlLoop 控制循环
|
||
func (p *CompletePool) controlLoop() {
|
||
for {
|
||
select {
|
||
case <-p.shutdownCtx.Done():
|
||
return
|
||
case msg := <-p.controlChan:
|
||
p.handlePoolControl(msg)
|
||
}
|
||
}
|
||
}
|
||
|
||
// handlePoolControl 处理池控制消息
|
||
func (p *CompletePool) handlePoolControl(msg PoolControlMessage) {
|
||
// 创建响应和错误通道(如果未提供)
|
||
var responseChan chan interface{}
|
||
var errorChan chan error
|
||
var doneChan chan bool
|
||
|
||
if msg.Response == nil {
|
||
responseChan = make(chan interface{}, 1)
|
||
msg.Response = responseChan
|
||
}
|
||
|
||
if msg.Error == nil {
|
||
errorChan = make(chan error, 1)
|
||
msg.Error = errorChan
|
||
}
|
||
|
||
if msg.Done == nil {
|
||
doneChan = make(chan bool, 1)
|
||
msg.Done = doneChan
|
||
}
|
||
|
||
// 在goroutine中处理,避免阻塞控制循环
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- fmt.Errorf("panic in control handler: %v", r):
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
if msg.Done != nil {
|
||
close(msg.Done)
|
||
}
|
||
}()
|
||
|
||
switch msg.Type {
|
||
case 1: // 暂停工作者
|
||
p.pauseWorker(msg.WorkerID, msg)
|
||
case 2: // 恢复工作者
|
||
p.resumeWorker(msg.WorkerID, msg)
|
||
case 3: // 停止工作者
|
||
p.stopWorker(msg.WorkerID, msg)
|
||
case 4: // 排空工作者
|
||
p.drainWorker(msg.WorkerID, msg)
|
||
case 5: // 重启工作者
|
||
p.restartWorker(msg.WorkerID, msg)
|
||
case 6: // 更新工作者配置
|
||
p.updateWorkerConfig(msg.WorkerID, msg.Data, msg)
|
||
case 7: // 删除工作者
|
||
p.deleteWorker(msg.WorkerID, msg)
|
||
case 8: // 获取工作者状态
|
||
p.getWorkerStatus(msg.WorkerID, msg)
|
||
case 9: // 创建工作者
|
||
p.createNewWorker(msg)
|
||
case 10: // 暂停所有工作者
|
||
p.pauseAllWorkers(msg)
|
||
case 11: // 恢复所有工作者
|
||
p.resumeAllWorkers(msg)
|
||
case 12: // 停止所有工作者
|
||
p.stopAllWorkers(msg)
|
||
case 13: // 优雅关闭
|
||
p.gracefulShutdown(msg)
|
||
case 14: // 更新池配置
|
||
p.updatePoolConfig(msg.Data, msg)
|
||
default:
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("unknown control type: %d", msg.Type)
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// pauseWorker 暂停工作者
|
||
func (p *CompletePool) pauseWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
// 创建响应通道
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
workerMsg := ControlMessage{
|
||
Type: ControlMsgPause,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
}
|
||
|
||
// 发送控制消息
|
||
select {
|
||
case worker.(*CompleteWorker).controlChan <- workerMsg:
|
||
// 等待响应
|
||
select {
|
||
case <-doneChan:
|
||
// 更新状态分组
|
||
p.removeWorkerFromStatus(workerID, int32(WorkerStatusRunning))
|
||
p.addWorkerToStatus(workerID, int32(WorkerStatusPaused))
|
||
|
||
atomic.AddInt32(&p.runningWorkers, -1)
|
||
atomic.AddInt32(&p.pausedWorkers, 1)
|
||
|
||
// 发送成功响应
|
||
if msg.Response != nil {
|
||
select {
|
||
case msg.Response <- true:
|
||
default:
|
||
}
|
||
}
|
||
case <-time.After(2 * time.Second):
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("pause timeout")
|
||
}
|
||
}
|
||
case <-time.After(1 * time.Second):
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("failed to send control message")
|
||
}
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// resumeWorker 恢复工作者
|
||
func (p *CompletePool) resumeWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
// 简化逻辑,直接发送控制消息
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
// 使用goroutine发送控制消息,避免阻塞
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
if errorChan != nil {
|
||
errorChan <- fmt.Errorf("panic in resume worker: %v", r)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 使用工作者提供的安全发送方法
|
||
resp, err := worker.(*CompleteWorker).SendControlSafe(ControlMessage{
|
||
Type: ControlMsgResume,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
})
|
||
|
||
if err != nil {
|
||
if errorChan != nil {
|
||
errorChan <- err
|
||
}
|
||
} else if responseChan != nil {
|
||
responseChan <- resp
|
||
}
|
||
}()
|
||
|
||
// 等待响应
|
||
timeout := time.After(5 * time.Second)
|
||
|
||
select {
|
||
case resp := <-responseChan:
|
||
// 更新状态分组和统计
|
||
p.removeWorkerFromStatus(workerID, int32(WorkerStatusPaused))
|
||
p.addWorkerToStatus(workerID, int32(WorkerStatusRunning))
|
||
|
||
// 更新统计
|
||
if atomic.AddInt32(&p.pausedWorkers, -1) < 0 {
|
||
atomic.StoreInt32(&p.pausedWorkers, 0)
|
||
}
|
||
atomic.AddInt32(&p.runningWorkers, 1)
|
||
|
||
// 发送响应给调用者
|
||
if msg.Response != nil {
|
||
select {
|
||
case msg.Response <- resp:
|
||
default:
|
||
}
|
||
}
|
||
|
||
case err := <-errorChan:
|
||
if msg.Error != nil {
|
||
msg.Error <- err
|
||
}
|
||
|
||
case <-doneChan:
|
||
// 操作完成,发送默认响应
|
||
if msg.Response != nil {
|
||
msg.Response <- true
|
||
}
|
||
|
||
case <-timeout:
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("resume worker timeout")
|
||
}
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// stopWorker 停止工作者
|
||
func (p *CompletePool) stopWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
workerInst := worker.(*CompleteWorker)
|
||
|
||
// 检查是否已停止
|
||
if workerInst.IsStopped() {
|
||
if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- fmt.Errorf("worker already stopped"):
|
||
default:
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// 创建响应通道
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
workerMsg := ControlMessage{
|
||
Type: ControlMsgStop,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
}
|
||
|
||
// 使用goroutine发送停止消息,避免阻塞
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
// 发送错误响应
|
||
if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- fmt.Errorf("worker stop panic: %v", r):
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 使用安全的发送方法
|
||
_, err := workerInst.SendControlSafe(workerMsg)
|
||
if err != nil {
|
||
if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- err:
|
||
default:
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// 等待停止完成
|
||
select {
|
||
case <-doneChan:
|
||
// 从池中移除工作者
|
||
p.workers.Delete(workerID)
|
||
atomic.AddInt32(&p.workerCount, -1)
|
||
|
||
// 更新状态分组和统计
|
||
p.removeWorkerFromStatus(workerID, int32(WorkerStatusStopped))
|
||
atomic.AddInt32(&p.stoppedWorkers, 1)
|
||
|
||
// 发送成功响应
|
||
if msg.Response != nil {
|
||
select {
|
||
case msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"workerID": workerID,
|
||
}:
|
||
default:
|
||
}
|
||
}
|
||
case <-time.After(5 * time.Second):
|
||
// 超时,强制移除
|
||
p.workers.Delete(workerID)
|
||
if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- fmt.Errorf("worker stop timeout, forced removal"):
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
}()
|
||
} else if msg.Error != nil {
|
||
select {
|
||
case msg.Error <- fmt.Errorf("worker not found"):
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
// drainWorker 排空工作者
|
||
func (p *CompletePool) drainWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
worker.(*CompleteWorker).controlChan <- ControlMessage{
|
||
Type: ControlMsgDrain,
|
||
Response: msg.Response,
|
||
Error: msg.Error,
|
||
}
|
||
|
||
// 更新状态分组
|
||
oldStatus := worker.(*CompleteWorker).status
|
||
p.removeWorkerFromStatus(workerID, oldStatus)
|
||
p.addWorkerToStatus(workerID, int32(WorkerStatusDraining))
|
||
|
||
// 更新统计
|
||
switch oldStatus {
|
||
case WorkerStatusRunning:
|
||
atomic.AddInt32(&p.runningWorkers, -1)
|
||
case WorkerStatusPaused:
|
||
atomic.AddInt32(&p.pausedWorkers, -1)
|
||
}
|
||
|
||
atomic.AddInt32(&p.drainingWorkers, 1)
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// restartWorker 重启工作者
|
||
func (p *CompletePool) restartWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
worker.(*CompleteWorker).controlChan <- ControlMessage{
|
||
Type: ControlMsgRestart,
|
||
Response: msg.Response,
|
||
Error: msg.Error,
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// deleteWorker 删除工作者
|
||
func (p *CompletePool) deleteWorker(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
worker.(*CompleteWorker).controlChan <- ControlMessage{
|
||
Type: ControlMsgDelete,
|
||
Response: msg.Response,
|
||
Error: msg.Error,
|
||
}
|
||
|
||
// 从池中移除
|
||
p.workers.Delete(workerID)
|
||
atomic.AddInt32(&p.workerCount, -1)
|
||
|
||
// 更新统计
|
||
atomic.AddInt32(&p.totalWorkers, -1)
|
||
status := atomic.LoadInt32(&worker.(*CompleteWorker).status)
|
||
switch status {
|
||
case WorkerStatusRunning:
|
||
atomic.AddInt32(&p.runningWorkers, -1)
|
||
case WorkerStatusPaused:
|
||
atomic.AddInt32(&p.pausedWorkers, -1)
|
||
case WorkerStatusIdle:
|
||
atomic.AddInt32(&p.idleWorkers, -1)
|
||
case WorkerStatusStopped:
|
||
atomic.AddInt32(&p.stoppedWorkers, -1)
|
||
case WorkerStatusDraining:
|
||
atomic.AddInt32(&p.drainingWorkers, -1)
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// updateWorkerConfig 更新工作者配置
|
||
func (p *CompletePool) updateWorkerConfig(workerID int32, data interface{}, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
if config, ok := data.(*WorkerConfig); ok {
|
||
worker.(*CompleteWorker).controlChan <- ControlMessage{
|
||
Type: ControlMsgUpdate,
|
||
Data: config,
|
||
Response: msg.Response,
|
||
Error: msg.Error,
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("invalid config data")
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// getWorkerStatus 获取工作者状态
|
||
func (p *CompletePool) getWorkerStatus(workerID int32, msg PoolControlMessage) {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
// 使用带缓冲的通道
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
workerMsg := ControlMessage{
|
||
Type: ControlMsgHealth,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
}
|
||
|
||
// 发送控制消息
|
||
select {
|
||
case worker.(*CompleteWorker).controlChan <- workerMsg:
|
||
// 等待响应
|
||
select {
|
||
case <-doneChan:
|
||
select {
|
||
case resp := <-responseChan:
|
||
if msg.Response != nil {
|
||
msg.Response <- resp
|
||
}
|
||
case err := <-errorChan:
|
||
if msg.Error != nil {
|
||
msg.Error <- err
|
||
}
|
||
default:
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("no response from worker")
|
||
}
|
||
}
|
||
case <-time.After(2 * time.Second):
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("response timeout")
|
||
}
|
||
}
|
||
case <-time.After(1 * time.Second):
|
||
if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("failed to send control message")
|
||
}
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("worker not found")
|
||
}
|
||
}
|
||
|
||
// createNewWorker 创建新工作者
|
||
func (p *CompletePool) createNewWorker(msg PoolControlMessage) {
|
||
workerID := p.createWorker()
|
||
if workerID > 0 && msg.Response != nil {
|
||
msg.Response <- map[string]interface{}{
|
||
"workerID": workerID,
|
||
"success": true,
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("failed to create worker")
|
||
}
|
||
}
|
||
|
||
// pauseAllWorkers 暂停所有工作者
|
||
func (p *CompletePool) pauseAllWorkers(msg PoolControlMessage) {
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
worker.controlChan <- ControlMessage{
|
||
Type: ControlMsgPause,
|
||
}
|
||
return true
|
||
})
|
||
|
||
// 更新统计
|
||
atomic.StoreInt32(&p.pausedWorkers, atomic.LoadInt32(&p.workerCount))
|
||
atomic.StoreInt32(&p.runningWorkers, 0)
|
||
atomic.StoreInt32(&p.idleWorkers, 0)
|
||
atomic.StoreInt32(&p.drainingWorkers, 0)
|
||
|
||
if msg.Response != nil {
|
||
msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"workers": atomic.LoadInt32(&p.workerCount),
|
||
}
|
||
}
|
||
}
|
||
|
||
// resumeAllWorkers 恢复所有工作者
|
||
func (p *CompletePool) resumeAllWorkers(msg PoolControlMessage) {
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
|
||
worker.controlChan <- ControlMessage{
|
||
Type: ControlMsgResume,
|
||
}
|
||
}
|
||
return true
|
||
})
|
||
|
||
// 更新统计
|
||
atomic.StoreInt32(&p.runningWorkers, atomic.LoadInt32(&p.pausedWorkers))
|
||
atomic.StoreInt32(&p.pausedWorkers, 0)
|
||
|
||
if msg.Response != nil {
|
||
msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"workers": atomic.LoadInt32(&p.runningWorkers),
|
||
}
|
||
}
|
||
}
|
||
|
||
// stopAllWorkers 停止所有工作者
|
||
func (p *CompletePool) stopAllWorkers(msg PoolControlMessage) {
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
worker.controlChan <- ControlMessage{
|
||
Type: ControlMsgStop,
|
||
}
|
||
return true
|
||
})
|
||
|
||
// 更新统计
|
||
atomic.StoreInt32(&p.stoppedWorkers, atomic.LoadInt32(&p.workerCount))
|
||
atomic.StoreInt32(&p.runningWorkers, 0)
|
||
atomic.StoreInt32(&p.pausedWorkers, 0)
|
||
atomic.StoreInt32(&p.idleWorkers, 0)
|
||
atomic.StoreInt32(&p.drainingWorkers, 0)
|
||
|
||
if msg.Response != nil {
|
||
msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"workers": atomic.LoadInt32(&p.workerCount),
|
||
}
|
||
}
|
||
}
|
||
|
||
// gracefulShutdown 优雅关闭
|
||
func (p *CompletePool) gracefulShutdown(msg PoolControlMessage) {
|
||
// 先标记为排空状态,防止新任务提交
|
||
atomic.StoreInt32(&p.status, int32(WorkerStatusDraining))
|
||
|
||
logMessage(LogLevelInfo, "Pool graceful shutdown initiated")
|
||
|
||
// 收集所有工作者
|
||
workers := make([]*CompleteWorker, 0)
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
workers = append(workers, worker)
|
||
return true
|
||
})
|
||
|
||
// 排空所有工作者
|
||
for _, worker := range workers {
|
||
p.safeSendControlToWorker(worker.id, ControlMessage{Type: ControlMsgDrain})
|
||
}
|
||
|
||
// 等待所有任务完成
|
||
timeout := time.Duration(p.config.ShutdownTimeoutMs) * time.Millisecond
|
||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||
defer cancel()
|
||
|
||
ticker := time.NewTicker(100 * time.Millisecond)
|
||
defer ticker.Stop()
|
||
|
||
allTasksCompleted := false
|
||
attempts := 0
|
||
|
||
for !allTasksCompleted && attempts < 100 { // 最多等待10秒
|
||
select {
|
||
case <-ctx.Done():
|
||
logMessage(LogLevelWarn, "Graceful shutdown timeout, forcing stop")
|
||
break
|
||
case <-ticker.C:
|
||
// 检查所有任务是否完成
|
||
allDone := true
|
||
for _, worker := range workers {
|
||
if atomic.LoadInt32(&worker.activeTasks) > 0 || worker.taskQueue.Size() > 0 {
|
||
allDone = false
|
||
break
|
||
}
|
||
}
|
||
|
||
if allDone {
|
||
allTasksCompleted = true
|
||
logMessage(LogLevelInfo, "All tasks completed, stopping workers")
|
||
break
|
||
}
|
||
|
||
attempts++
|
||
|
||
// 每1秒记录一次进度
|
||
if attempts%10 == 0 {
|
||
totalActive := int32(0)
|
||
totalQueued := int32(0)
|
||
for _, worker := range workers {
|
||
totalActive += atomic.LoadInt32(&worker.activeTasks)
|
||
totalQueued += worker.taskQueue.Size()
|
||
}
|
||
logMessage(LogLevelInfo, "Graceful shutdown in progress: %d active tasks, %d queued tasks",
|
||
totalActive, totalQueued)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 停止所有工作者
|
||
for _, worker := range workers {
|
||
p.safeSendControlToWorker(worker.id, ControlMessage{Type: ControlMsgStop})
|
||
}
|
||
|
||
// 等待工作者停止
|
||
time.Sleep(200 * time.Millisecond)
|
||
|
||
// 关闭池
|
||
p.shutdownCancel()
|
||
|
||
// 更新状态
|
||
atomic.StoreInt32(&p.status, int32(WorkerStatusStopped))
|
||
|
||
// 清空工作者列表
|
||
p.workers = sync.Map{}
|
||
atomic.StoreInt32(&p.workerCount, 0)
|
||
|
||
logMessage(LogLevelInfo, "Pool graceful shutdown completed")
|
||
|
||
// 发送响应
|
||
if msg.Response != nil {
|
||
select {
|
||
case msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"message": "pool gracefully shutdown",
|
||
}:
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
// updatePoolConfig 更新池配置
|
||
func (p *CompletePool) updatePoolConfig(data interface{}, msg PoolControlMessage) {
|
||
if config, ok := data.(*PoolConfig); ok {
|
||
p.configMu.Lock()
|
||
p.config = config
|
||
p.configMu.Unlock()
|
||
|
||
if msg.Response != nil {
|
||
msg.Response <- map[string]interface{}{
|
||
"success": true,
|
||
"message": "pool config updated",
|
||
}
|
||
}
|
||
} else if msg.Error != nil {
|
||
msg.Error <- fmt.Errorf("invalid config data")
|
||
}
|
||
}
|
||
|
||
// addWorkerToStatus 添加工作者到状态分组
|
||
func (p *CompletePool) addWorkerToStatus(workerID, status int32) {
|
||
// 这里需要锁来更新map
|
||
// 由于访问不频繁,使用锁是可以接受的
|
||
if _, ok := p.workerStatus[status]; !ok {
|
||
p.workerStatus[status] = make([]int32, 0)
|
||
}
|
||
p.workerStatus[status] = append(p.workerStatus[status], workerID)
|
||
}
|
||
|
||
// removeWorkerFromStatus 从状态分组移除工作者
|
||
func (p *CompletePool) removeWorkerFromStatus(workerID, status int32) {
|
||
if workers, ok := p.workerStatus[status]; ok {
|
||
for i, id := range workers {
|
||
if id == workerID {
|
||
p.workerStatus[status] = append(workers[:i], workers[i+1:]...)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// collectMetrics 收集指标
|
||
func (p *CompletePool) collectMetrics() {
|
||
ticker := time.NewTicker(5 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
p.updateStatistics()
|
||
case <-p.shutdownCtx.Done():
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// updateStatistics 更新统计
|
||
func (p *CompletePool) updateStatistics() {
|
||
// 收集工作者统计
|
||
var totalQueued, totalActive int32
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
totalQueued += worker.taskQueue.Size()
|
||
totalActive += atomic.LoadInt32(&worker.activeTasks)
|
||
return true
|
||
})
|
||
|
||
atomic.StoreInt32(&p.queuedTasks, totalQueued)
|
||
atomic.StoreInt32(&p.activeTasks, totalActive)
|
||
}
|
||
|
||
// SubmitTask 提交任务
|
||
func (p *CompletePool) SubmitTask(workerID int32, funcName, param string, priority int32) (int64, error) {
|
||
if atomic.LoadInt32(&p.status) != int32(WorkerStatusRunning) {
|
||
return -1, fmt.Errorf("pool not running")
|
||
}
|
||
|
||
// 检查函数是否注册
|
||
if _, ok := p.functions.Load(funcName); !ok {
|
||
return -1, fmt.Errorf("function not registered")
|
||
}
|
||
|
||
// 选择工作者
|
||
var worker *CompleteWorker
|
||
if workerID > 0 {
|
||
if w, ok := p.workers.Load(workerID); ok {
|
||
worker = w.(*CompleteWorker)
|
||
// 检查工作者状态 - 增强检查
|
||
workerStatus := atomic.LoadInt32(&worker.status)
|
||
if workerStatus != int32(WorkerStatusRunning) &&
|
||
workerStatus != int32(WorkerStatusIdle) &&
|
||
workerStatus != int32(WorkerStatusDraining) {
|
||
return -1, fmt.Errorf("worker not available, status: %d", workerStatus)
|
||
}
|
||
// 额外检查:如果工作者暂停了,队列也应该拒绝任务
|
||
if workerStatus == int32(WorkerStatusPaused) {
|
||
return -1, fmt.Errorf("worker paused")
|
||
}
|
||
} else {
|
||
return -1, fmt.Errorf("worker not found")
|
||
}
|
||
} else {
|
||
// 负载均衡选择 - 排除暂停的工作者
|
||
worker = p.selectWorker()
|
||
if worker == nil {
|
||
return -1, fmt.Errorf("no available worker")
|
||
}
|
||
// 确保选中的工作者不是暂停状态
|
||
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
|
||
return -1, fmt.Errorf("selected worker is paused")
|
||
}
|
||
}
|
||
|
||
// 创建任务
|
||
task := NewAtomicTask(int(worker.id), funcName, param,
|
||
int(priority), int(p.config.MaxRetries), p.config.TaskTimeoutMs)
|
||
|
||
// 提交到任务队列
|
||
if !worker.taskQueue.Push(task) {
|
||
return -1, fmt.Errorf("queue full or worker paused")
|
||
}
|
||
|
||
// 注册任务到全局注册表
|
||
if globalManager != nil && globalManager.registry != nil {
|
||
globalManager.registry.Register(task.id, task)
|
||
}
|
||
|
||
// 更新统计
|
||
atomic.AddInt64(&p.totalTasks, 1)
|
||
|
||
return task.id, nil
|
||
}
|
||
|
||
// selectWorker 选择工作者(负载均衡)
|
||
func (p *CompletePool) selectWorker() *CompleteWorker {
|
||
var selected *CompleteWorker
|
||
var minLoad int32 = 1<<31 - 1
|
||
|
||
p.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
status := atomic.LoadInt32(&worker.status)
|
||
|
||
// 只选择运行中的工作者
|
||
if status != int32(WorkerStatusRunning) && status != int32(WorkerStatusIdle) {
|
||
return true
|
||
}
|
||
|
||
load := worker.taskQueue.Size()
|
||
if load < minLoad {
|
||
minLoad = load
|
||
selected = worker
|
||
}
|
||
|
||
return true
|
||
})
|
||
|
||
return selected
|
||
}
|
||
|
||
// RegisterFunction 注册函数
|
||
func (p *CompletePool) RegisterFunction(name string, fn TaskFunc) error {
|
||
p.functions.Store(name, fn)
|
||
return nil
|
||
}
|
||
|
||
// GetFunction 获取函数
|
||
func (p *CompletePool) GetFunction(name string) TaskFunc {
|
||
if fn, ok := p.functions.Load(name); ok {
|
||
return fn.(TaskFunc)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// SendControl 发送控制消息
|
||
func (p *CompletePool) SendControl(msg PoolControlMessage) (interface{}, error) {
|
||
// 检查池状态
|
||
if atomic.LoadInt32(&p.status) != int32(WorkerStatusRunning) {
|
||
return nil, fmt.Errorf("pool not running")
|
||
}
|
||
|
||
done := make(chan bool, 1)
|
||
response := make(chan interface{}, 1)
|
||
errChan := make(chan error, 1)
|
||
|
||
msg.Done = done
|
||
msg.Response = response
|
||
msg.Error = errChan
|
||
|
||
// 发送控制消息
|
||
select {
|
||
case p.controlChan <- msg:
|
||
// 等待响应
|
||
select {
|
||
case <-done:
|
||
select {
|
||
case resp := <-response:
|
||
return resp, nil
|
||
case err := <-errChan:
|
||
return nil, err
|
||
default:
|
||
return nil, fmt.Errorf("no response from worker")
|
||
}
|
||
case <-time.After(5 * time.Second):
|
||
return nil, fmt.Errorf("control message timeout")
|
||
}
|
||
case <-time.After(2 * time.Second):
|
||
return nil, fmt.Errorf("control channel full or pool stopped")
|
||
}
|
||
}
|
||
|
||
func (p *CompletePool) safeSendControlToWorker(workerID int32, msg ControlMessage) bool {
|
||
if worker, ok := p.workers.Load(workerID); ok {
|
||
workerInst := worker.(*CompleteWorker)
|
||
status := atomic.LoadInt32(&workerInst.status)
|
||
|
||
// 检查工作者状态
|
||
if status == int32(WorkerStatusStopped) {
|
||
return false
|
||
}
|
||
|
||
// 尝试发送消息
|
||
select {
|
||
case workerInst.controlChan <- msg:
|
||
return true
|
||
case <-time.After(1 * time.Second):
|
||
return false
|
||
case <-p.shutdownCtx.Done():
|
||
return false
|
||
}
|
||
}
|
||
return false
|
||
}
|
||
|
||
// Shutdown 关闭池
|
||
func (p *CompletePool) Shutdown() {
|
||
p.gracefulShutdown(PoolControlMessage{
|
||
Type: ControlMsgGraceful,
|
||
})
|
||
}
|
||
|
||
// GetStats 获取统计信息
|
||
func (p *CompletePool) GetStats() map[string]interface{} {
|
||
stats := make(map[string]interface{})
|
||
stats["totalWorkers"] = atomic.LoadInt32(&p.totalWorkers)
|
||
stats["runningWorkers"] = atomic.LoadInt32(&p.runningWorkers)
|
||
stats["idleWorkers"] = atomic.LoadInt32(&p.idleWorkers)
|
||
stats["pausedWorkers"] = atomic.LoadInt32(&p.pausedWorkers)
|
||
stats["stoppedWorkers"] = atomic.LoadInt32(&p.stoppedWorkers)
|
||
stats["drainingWorkers"] = atomic.LoadInt32(&p.drainingWorkers)
|
||
stats["totalTasks"] = atomic.LoadInt64(&p.totalTasks)
|
||
stats["successTasks"] = atomic.LoadInt64(&p.successTasks)
|
||
stats["failedTasks"] = atomic.LoadInt64(&p.failedTasks)
|
||
stats["cancelledTasks"] = atomic.LoadInt64(&p.cancelledTasks)
|
||
stats["queuedTasks"] = atomic.LoadInt32(&p.queuedTasks)
|
||
stats["activeTasks"] = atomic.LoadInt32(&p.activeTasks)
|
||
|
||
// 收集内存使用
|
||
var memStats runtime.MemStats
|
||
runtime.ReadMemStats(&memStats)
|
||
stats["memoryUsageMB"] = float64(memStats.Alloc) / 1024 / 1024
|
||
|
||
// 计算运行时间
|
||
uptimeSeconds := float64(time.Now().UnixNano()/1e6-p.startTime) / 1000.0
|
||
stats["uptimeSeconds"] = uptimeSeconds
|
||
|
||
// 计算吞吐量
|
||
if uptimeSeconds > 0 {
|
||
stats["throughput"] = float64(atomic.LoadInt64(&p.totalTasks)) / uptimeSeconds
|
||
} else {
|
||
stats["throughput"] = 0.0
|
||
}
|
||
|
||
return stats
|
||
}
|
||
|
||
// logInfo 记录信息日志
|
||
func (p *CompletePool) logInfo(format string, args ...interface{}) {
|
||
logMessage(LogLevelInfo, "[Pool] %s", fmt.Sprintf(format, args...))
|
||
}
|
||
|
||
// ==================== 任务注册表 ====================
|
||
|
||
// TaskRegistry 任务注册表
|
||
type TaskRegistry struct {
|
||
tasks sync.Map
|
||
cleanupMu sync.RWMutex
|
||
}
|
||
|
||
// NewTaskRegistry 创建任务注册表
|
||
func NewTaskRegistry() *TaskRegistry {
|
||
return &TaskRegistry{}
|
||
}
|
||
|
||
// Register 注册任务
|
||
func (tr *TaskRegistry) Register(taskID int64, task *AtomicTask) {
|
||
tr.tasks.Store(taskID, task)
|
||
}
|
||
|
||
// Get 获取任务
|
||
func (tr *TaskRegistry) Get(taskID int64) (*AtomicTask, bool) {
|
||
if task, ok := tr.tasks.Load(taskID); ok {
|
||
return task.(*AtomicTask), true
|
||
}
|
||
return nil, false
|
||
}
|
||
|
||
// Remove 移除任务
|
||
func (tr *TaskRegistry) Remove(taskID int64) {
|
||
tr.tasks.Delete(taskID)
|
||
}
|
||
|
||
// Cleanup 清理过期任务
|
||
func (tr *TaskRegistry) Cleanup(maxAge time.Duration) int {
|
||
tr.cleanupMu.Lock()
|
||
defer tr.cleanupMu.Unlock()
|
||
|
||
now := time.Now().UnixNano() / 1e6
|
||
removed := 0
|
||
|
||
tr.tasks.Range(func(key, value interface{}) bool {
|
||
task := value.(*AtomicTask)
|
||
taskAge := now - atomic.LoadInt64(&task.endTime)
|
||
|
||
// 如果任务已完成且超过最大年龄,或者任务创建时间超过24小时,则删除
|
||
if (atomic.LoadInt32(&task.status) >= TaskStatusCompleted && taskAge > maxAge.Milliseconds()) ||
|
||
(now-task.submitTime > 24*60*60*1000) {
|
||
tr.tasks.Delete(key)
|
||
removed++
|
||
}
|
||
return true
|
||
})
|
||
|
||
return removed
|
||
}
|
||
|
||
// ==================== 全局管理器 ====================
|
||
|
||
// GlobalManager 全局管理器
|
||
type GlobalManager struct {
|
||
pool *CompletePool
|
||
registry *TaskRegistry
|
||
|
||
mu sync.RWMutex
|
||
startTime int64
|
||
cleanupTicker *time.Ticker
|
||
cleanupDone chan bool
|
||
}
|
||
|
||
var (
|
||
globalManager *GlobalManager
|
||
taskIDCounter int64
|
||
initialized int32
|
||
shuttingDown int32
|
||
)
|
||
|
||
// InitGlobalManager 初始化全局管理器
|
||
func InitGlobalManager(config *PoolConfig) error {
|
||
if atomic.LoadInt32(&shuttingDown) == 1 {
|
||
return fmt.Errorf("system shutting down")
|
||
}
|
||
|
||
manager := &GlobalManager{
|
||
registry: NewTaskRegistry(),
|
||
startTime: time.Now().UnixNano() / 1e6,
|
||
cleanupDone: make(chan bool),
|
||
}
|
||
|
||
pool := NewCompletePool(config)
|
||
manager.pool = pool
|
||
|
||
// 启动任务清理器
|
||
manager.startCleanup()
|
||
|
||
// 原子交换
|
||
atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&globalManager)),
|
||
unsafe.Pointer(manager))
|
||
|
||
atomic.StoreInt32(&initialized, 1)
|
||
|
||
logMessage(LogLevelInfo, "Global manager initialized")
|
||
return nil
|
||
}
|
||
|
||
// startCleanup 启动任务清理器
|
||
func (gm *GlobalManager) startCleanup() {
|
||
gm.cleanupTicker = time.NewTicker(30 * time.Minute) // 每30分钟清理一次
|
||
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-gm.cleanupTicker.C:
|
||
removed := gm.registry.Cleanup(1 * time.Hour) // 清理超过1小时的已完成任务
|
||
if removed > 0 {
|
||
logMessage(LogLevelInfo, "Task cleanup: removed %d expired tasks", removed)
|
||
}
|
||
case <-gm.cleanupDone:
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// stopCleanup 停止任务清理器
|
||
func (gm *GlobalManager) stopCleanup() {
|
||
if gm.cleanupTicker != nil {
|
||
gm.cleanupTicker.Stop()
|
||
}
|
||
close(gm.cleanupDone)
|
||
}
|
||
|
||
// getFunction 获取函数(线程安全)
|
||
func (gm *GlobalManager) getFunction(name string) TaskFunc {
|
||
if gm == nil || gm.pool == nil {
|
||
return nil
|
||
}
|
||
|
||
gm.mu.RLock()
|
||
defer gm.mu.RUnlock()
|
||
|
||
return gm.pool.GetFunction(name)
|
||
}
|
||
|
||
// replaceWorker 替换工作者
|
||
func (gm *GlobalManager) replaceWorker(workerID int32, worker *CompleteWorker) {
|
||
if gm == nil || gm.pool == nil {
|
||
return
|
||
}
|
||
|
||
gm.mu.Lock()
|
||
defer gm.mu.Unlock()
|
||
|
||
gm.pool.workers.Store(workerID, worker)
|
||
}
|
||
|
||
// removeWorker 移除工作者
|
||
func (gm *GlobalManager) removeWorker(workerID int32) {
|
||
if gm == nil || gm.pool == nil {
|
||
return
|
||
}
|
||
|
||
gm.mu.Lock()
|
||
defer gm.mu.Unlock()
|
||
|
||
gm.pool.workers.Delete(workerID)
|
||
atomic.AddInt32(&gm.pool.workerCount, -1)
|
||
}
|
||
|
||
// sendControlToWorker 发送控制消息给工作者
|
||
func (gm *GlobalManager) sendControlToWorkerWithRetry(workerID int32, msgType int, data interface{}, maxRetries int) (interface{}, error) {
|
||
var lastErr error
|
||
|
||
for attempt := 0; attempt <= maxRetries; attempt++ {
|
||
if attempt > 0 {
|
||
// 指数退避等待
|
||
waitTime := time.Duration(200*attempt) * time.Millisecond
|
||
logMessage(LogLevelInfo, "Retrying control message in %v (attempt %d/%d)",
|
||
waitTime, attempt, maxRetries+1)
|
||
time.Sleep(waitTime)
|
||
}
|
||
|
||
// 检查工作者是否存在
|
||
worker, ok := gm.pool.workers.Load(workerID)
|
||
if !ok {
|
||
return nil, fmt.Errorf("worker not found")
|
||
}
|
||
|
||
// 创建响应通道
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
// 发送控制消息
|
||
select {
|
||
case worker.(*CompleteWorker).controlChan <- ControlMessage{
|
||
Type: msgType,
|
||
Data: data,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
}:
|
||
// 等待响应
|
||
select {
|
||
case <-doneChan:
|
||
select {
|
||
case resp := <-responseChan:
|
||
return resp, nil
|
||
case err := <-errorChan:
|
||
lastErr = err
|
||
// 如果是致命错误,不重试
|
||
if strings.Contains(err.Error(), "worker not found") ||
|
||
strings.Contains(err.Error(), "worker is stopped") {
|
||
return nil, err
|
||
}
|
||
continue
|
||
}
|
||
case <-time.After(2 * time.Second):
|
||
lastErr = fmt.Errorf("response timeout")
|
||
continue
|
||
}
|
||
case <-time.After(1 * time.Second):
|
||
lastErr = fmt.Errorf("control channel busy")
|
||
continue
|
||
}
|
||
}
|
||
|
||
return nil, fmt.Errorf("control message failed after %d retries: %v", maxRetries, lastErr)
|
||
}
|
||
|
||
// ==================== 公共API ====================
|
||
|
||
// TaskFunc 任务函数类型
|
||
type TaskFunc func(param string) (string, error)
|
||
|
||
// PoolConfig 池配置
|
||
type PoolConfig struct {
|
||
MinWorkers int `json:"minWorkers"`
|
||
MaxWorkers int `json:"maxWorkers"`
|
||
QueueSize int `json:"queueSize"`
|
||
TaskTimeoutMs int `json:"taskTimeoutMs"`
|
||
WorkerIdleTimeoutMs int `json:"workerIdleTimeoutMs"`
|
||
MemoryLimitMB int `json:"memoryLimitMB"`
|
||
ShutdownTimeoutMs int `json:"shutdownTimeoutMs"`
|
||
HealthCheckDelayMs int `json:"healthCheckDelayMs"`
|
||
MaxRetries int `json:"maxRetries"`
|
||
DefaultLanguage string `json:"defaultLanguage"`
|
||
EnableMetrics bool `json:"enableMetrics"`
|
||
EnableAutoScaling bool `json:"enableAutoScaling"`
|
||
PriorityQueue bool `json:"priorityQueue"`
|
||
Name string `json:"name"`
|
||
}
|
||
|
||
// ControlMessage 控制消息(公共API使用)
|
||
type ControlMessage struct {
|
||
Type int
|
||
Data interface{}
|
||
Done chan bool
|
||
Error chan error
|
||
Response chan interface{}
|
||
}
|
||
|
||
// Initialize 初始化
|
||
func Initialize(minWorkers, maxWorkers int, configJSON string) error {
|
||
config := &PoolConfig{
|
||
MinWorkers: minWorkers, // 最小工作线程数 - 池启动时的基础线程数量
|
||
MaxWorkers: maxWorkers, // 最大工作线程数 - 池允许创建的最大线程数
|
||
QueueSize: DefaultQueueSize, // 任务队列大小 - 当所有工作线程忙时,可排队的任务数量
|
||
TaskTimeoutMs: DefaultTaskTimeout, // 任务超时时间(毫秒) - 单个任务执行的最大时间
|
||
WorkerIdleTimeoutMs: DefaultIdleTimeout, // 工作线程空闲超时(毫秒) - 空闲线程被回收的时间
|
||
MemoryLimitMB: DefaultMaxMemoryMB, // 内存限制(MB) - 池允许使用的最大内存
|
||
ShutdownTimeoutMs: DefaultShutdownTimeout, // 关闭超时(毫秒) - 优雅关闭的最大等待时间
|
||
HealthCheckDelayMs: DefaultHealthCheckDelay, // 健康检查延迟(毫秒) - 健康检查的时间间隔
|
||
MaxRetries: DefaultMaxRetries, // 最大重试次数 - 任务失败时的重试次数
|
||
DefaultLanguage: LanguageEN, // 默认语言 - 用于日志/错误消息
|
||
EnableMetrics: true, // 启用指标收集 - 是否收集性能指标
|
||
EnableAutoScaling: true, // 启用自动伸缩 - 是否根据负载自动调整线程数
|
||
PriorityQueue: false, // 优先级队列 - 是否按优先级处理任务
|
||
Name: "default", // 池名称 - 用于标识和监控
|
||
}
|
||
|
||
if configJSON != "" {
|
||
if err := json.Unmarshal([]byte(configJSON), config); err != nil {
|
||
return fmt.Errorf("invalid config: %v", err)
|
||
}
|
||
}
|
||
|
||
return InitGlobalManager(config)
|
||
}
|
||
|
||
// SubmitTask 提交任务
|
||
func SubmitTask(workerID int, funcName, param string, priority int) (int64, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return -1, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return -1, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 检查函数是否注册
|
||
if manager.pool.GetFunction(funcName) == nil {
|
||
return -1, fmt.Errorf("function not registered")
|
||
}
|
||
|
||
// 如果指定了workerID,检查工作者状态
|
||
if workerID > 0 {
|
||
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
|
||
workerStatus := atomic.LoadInt32(&worker.(*CompleteWorker).status)
|
||
// 如果工作者暂停,不允许提交任务
|
||
if workerStatus == int32(WorkerStatusPaused) {
|
||
return -1, fmt.Errorf("worker %d is paused", workerID)
|
||
}
|
||
}
|
||
}
|
||
|
||
return manager.pool.SubmitTask(int32(workerID), funcName, param, int32(priority))
|
||
}
|
||
|
||
// RegisterFunction 注册函数
|
||
func RegisterFunction(name string, fn TaskFunc) error {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
return manager.pool.RegisterFunction(name, fn)
|
||
}
|
||
|
||
// PauseWorker 暂停工作者
|
||
func PauseWorker(workerID int) error {
|
||
return sendControlToWorker(workerID, 1, nil)
|
||
}
|
||
|
||
// ResumeWorker 恢复工作者
|
||
func ResumeWorker(workerID int) error {
|
||
return sendControlToWorker(workerID, 2, nil)
|
||
}
|
||
|
||
// StopWorker 停止工作者
|
||
func StopWorker(workerID int) error {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 检查工作者是否存在
|
||
worker, ok := manager.pool.workers.Load(int32(workerID))
|
||
if !ok {
|
||
// 工作者不存在,视为成功停止
|
||
return nil
|
||
}
|
||
|
||
workerInst := worker.(*CompleteWorker)
|
||
|
||
// 先检查工作者状态
|
||
if workerInst.IsStopped() {
|
||
// 工作者已经停止,视为成功
|
||
return nil
|
||
}
|
||
|
||
// 使用简单的直接调用,不使用重试循环
|
||
responseChan := make(chan interface{}, 1)
|
||
errorChan := make(chan error, 1)
|
||
doneChan := make(chan bool, 1)
|
||
|
||
// 创建一个简单的控制消息
|
||
msg := ControlMessage{
|
||
Type: ControlMsgStop,
|
||
Response: responseChan,
|
||
Error: errorChan,
|
||
Done: doneChan,
|
||
}
|
||
|
||
// 直接发送控制消息
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
select {
|
||
case errorChan <- fmt.Errorf("panic in stop worker: %v", r):
|
||
default:
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 使用工作者的安全发送方法
|
||
resp, err := workerInst.SendControlSafe(msg)
|
||
if err != nil {
|
||
select {
|
||
case errorChan <- err:
|
||
default:
|
||
}
|
||
} else if responseChan != nil {
|
||
responseChan <- resp
|
||
}
|
||
}()
|
||
|
||
// 等待响应,超时设为3秒
|
||
timeout := time.After(3 * time.Second)
|
||
|
||
select {
|
||
case <-doneChan:
|
||
// 成功停止
|
||
return nil
|
||
case err := <-errorChan:
|
||
// 如果是"worker is stopped"或"worker not found",视为成功
|
||
if strings.Contains(strings.ToLower(err.Error()), "worker is stopped") ||
|
||
strings.Contains(strings.ToLower(err.Error()), "worker not found") {
|
||
return nil
|
||
}
|
||
return err
|
||
case <-timeout:
|
||
// 超时,检查工作者是否已经停止
|
||
if workerInst.IsStopped() {
|
||
return nil
|
||
}
|
||
return fmt.Errorf("stop worker timeout")
|
||
}
|
||
}
|
||
|
||
// DrainWorker 排空工作者
|
||
func DrainWorker(workerID int) error {
|
||
return sendControlToWorker(workerID, 4, nil)
|
||
}
|
||
|
||
// RestartWorker 重启工作者
|
||
func RestartWorker(workerID int) error {
|
||
return sendControlToWorker(workerID, 5, nil)
|
||
}
|
||
|
||
// DeleteWorker 删除工作者
|
||
func DeleteWorker(workerID int) error {
|
||
return sendControlToWorker(workerID, 7, nil)
|
||
}
|
||
|
||
// GetWorkerStatus 获取工作者状态
|
||
func GetWorkerStatus(workerID int) (map[string]interface{}, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 直接访问工作者状态
|
||
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
|
||
return worker.(*CompleteWorker).getHealthStatus(), nil
|
||
}
|
||
|
||
return nil, fmt.Errorf("worker not found")
|
||
}
|
||
|
||
// CreateWorker 创建工作者
|
||
func CreateWorker() (int, error) {
|
||
resp, err := sendControlToPool(9, nil)
|
||
if err != nil {
|
||
return -1, err
|
||
}
|
||
|
||
if data, ok := resp.(map[string]interface{}); ok {
|
||
if workerID, ok := data["workerID"].(int32); ok {
|
||
return int(workerID), nil
|
||
}
|
||
}
|
||
|
||
return -1, fmt.Errorf("invalid response")
|
||
}
|
||
|
||
// PauseAllWorkers 暂停所有工作者
|
||
func PauseAllWorkers() (int, error) {
|
||
resp, err := sendControlToPool(10, nil)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
if data, ok := resp.(map[string]interface{}); ok {
|
||
if workers, ok := data["workers"].(int32); ok {
|
||
return int(workers), nil
|
||
}
|
||
}
|
||
|
||
return 0, fmt.Errorf("invalid response")
|
||
}
|
||
|
||
// ResumeAllWorkers 恢复所有工作者
|
||
func ResumeAllWorkers() (int, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return 0, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return 0, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 收集所有暂停的工作者
|
||
pausedWorkers := make([]int32, 0)
|
||
manager.pool.workers.Range(func(key, value interface{}) bool {
|
||
worker := value.(*CompleteWorker)
|
||
if atomic.LoadInt32(&worker.status) == int32(WorkerStatusPaused) {
|
||
pausedWorkers = append(pausedWorkers, worker.id)
|
||
}
|
||
return true
|
||
})
|
||
|
||
// 恢复每个暂停的工作者
|
||
resumedCount := 0
|
||
for _, workerID := range pausedWorkers {
|
||
if worker, ok := manager.pool.workers.Load(workerID); ok {
|
||
// 直接调用工作者的恢复方法,而不是通过控制通道
|
||
worker.(*CompleteWorker).resume()
|
||
resumedCount++
|
||
|
||
// 等待一小段时间确保状态更新
|
||
time.Sleep(10 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
// 更新池统计
|
||
if resumedCount > 0 {
|
||
atomic.StoreInt32(&manager.pool.runningWorkers,
|
||
atomic.LoadInt32(&manager.pool.runningWorkers)+int32(resumedCount))
|
||
atomic.StoreInt32(&manager.pool.pausedWorkers,
|
||
atomic.LoadInt32(&manager.pool.pausedWorkers)-int32(resumedCount))
|
||
}
|
||
|
||
return resumedCount, nil
|
||
}
|
||
|
||
// StopAllWorkers 停止所有工作者
|
||
func StopAllWorkers() (int, error) {
|
||
resp, err := sendControlToPool(12, nil)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
if data, ok := resp.(map[string]interface{}); ok {
|
||
if workers, ok := data["workers"].(int32); ok {
|
||
return int(workers), nil
|
||
}
|
||
}
|
||
|
||
return 0, fmt.Errorf("invalid response")
|
||
}
|
||
|
||
// GracefulShutdown 优雅关闭
|
||
func GracefulShutdown() error {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 发送优雅关闭控制消息(同步等待)
|
||
_, err := sendControlToPool(13, nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 等待池完全关闭(最多30秒)
|
||
timeout := time.After(30 * time.Second)
|
||
ticker := time.NewTicker(100 * time.Millisecond)
|
||
defer ticker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-timeout:
|
||
return fmt.Errorf("graceful shutdown timeout after 30 seconds")
|
||
case <-ticker.C:
|
||
// 检查池是否已完全关闭
|
||
if !IsInitialized() {
|
||
logMessage(LogLevelInfo, "Graceful shutdown completed successfully")
|
||
return nil
|
||
}
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// UpdatePoolConfig 更新池配置
|
||
func UpdatePoolConfig(configJSON string) error {
|
||
var config PoolConfig
|
||
if err := json.Unmarshal([]byte(configJSON), &config); err != nil {
|
||
return fmt.Errorf("invalid config: %v", err)
|
||
}
|
||
|
||
_, err := sendControlToPool(14, &config)
|
||
return err
|
||
}
|
||
|
||
// GetPoolStats 获取池统计
|
||
func GetPoolStats() (map[string]interface{}, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
return manager.pool.GetStats(), nil
|
||
}
|
||
|
||
// GetTaskInfo 获取任务信息
|
||
func GetTaskInfo(taskID int64) (map[string]interface{}, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.registry == nil {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
task, ok := manager.registry.Get(taskID)
|
||
if !ok {
|
||
return nil, fmt.Errorf("task not found")
|
||
}
|
||
|
||
result := make(map[string]interface{})
|
||
result["id"] = task.id
|
||
result["goroutineID"] = task.goroutineID
|
||
result["status"] = atomic.LoadInt32(&task.status)
|
||
|
||
// 安全获取原子值
|
||
if funcName := task.funcName.Load(); funcName != nil {
|
||
result["funcName"] = funcName
|
||
} else {
|
||
result["funcName"] = ""
|
||
}
|
||
|
||
if param := task.param.Load(); param != nil {
|
||
result["param"] = param
|
||
} else {
|
||
result["param"] = ""
|
||
}
|
||
|
||
if resultVal := task.result.Load(); resultVal != nil {
|
||
result["result"] = resultVal
|
||
} else {
|
||
result["result"] = ""
|
||
}
|
||
|
||
if errorMsg := task.errorMsg.Load(); errorMsg != nil {
|
||
result["error"] = errorMsg
|
||
} else {
|
||
result["error"] = ""
|
||
}
|
||
|
||
result["priority"] = atomic.LoadInt32(&task.priority)
|
||
result["submitTime"] = task.submitTime
|
||
result["queueTime"] = task.queueTime
|
||
result["dequeueTime"] = task.dequeueTime
|
||
result["startTime"] = atomic.LoadInt64(&task.startTime)
|
||
result["endTime"] = atomic.LoadInt64(&task.endTime)
|
||
result["waitDuration"] = atomic.LoadInt64(&task.waitDuration)
|
||
result["execDuration"] = atomic.LoadInt64(&task.execDuration)
|
||
result["totalDuration"] = atomic.LoadInt64(&task.totalDuration)
|
||
|
||
return result, nil
|
||
}
|
||
|
||
// CancelTask 取消任务
|
||
func CancelTask(taskID int64) error {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.registry == nil {
|
||
return fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
task, ok := manager.registry.Get(taskID)
|
||
if !ok {
|
||
return fmt.Errorf("task not found")
|
||
}
|
||
|
||
// 取消任务
|
||
if task.cancel != nil {
|
||
task.cancel()
|
||
}
|
||
|
||
// 更新状态
|
||
if atomic.CompareAndSwapInt32(&task.status,
|
||
int32(TaskStatusPending), int32(TaskStatusCancelled)) {
|
||
task.errorMsg.Store("Task cancelled")
|
||
// 更新池的取消统计
|
||
if globalManager != nil && globalManager.pool != nil {
|
||
atomic.AddInt64(&globalManager.pool.cancelledTasks, 1)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
if atomic.CompareAndSwapInt32(&task.status,
|
||
int32(TaskStatusRunning), int32(TaskStatusCancelled)) {
|
||
task.errorMsg.Store("Task cancelled")
|
||
// 更新池的取消统计
|
||
if globalManager != nil && globalManager.pool != nil {
|
||
atomic.AddInt64(&globalManager.pool.cancelledTasks, 1)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
return fmt.Errorf("task cannot be cancelled")
|
||
}
|
||
|
||
// Shutdown 关闭池
|
||
func Shutdown() {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager != nil {
|
||
if manager.pool != nil {
|
||
manager.pool.Shutdown()
|
||
}
|
||
manager.stopCleanup()
|
||
}
|
||
|
||
// 清空全局变量
|
||
globalManager = nil
|
||
atomic.StoreInt32(&initialized, 0)
|
||
|
||
logMessage(LogLevelInfo, "Pool shutdown completed")
|
||
}
|
||
|
||
// IsInitialized 检查是否初始化
|
||
func IsInitialized() bool {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return false
|
||
}
|
||
|
||
// 额外的检查:确保池仍然有效
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
atomic.StoreInt32(&initialized, 0) // 自动修复状态
|
||
return false
|
||
}
|
||
|
||
// 检查池的状态
|
||
poolStatus := atomic.LoadInt32(&manager.pool.status)
|
||
if poolStatus == int32(WorkerStatusStopped) {
|
||
// 池已停止,但全局状态还未更新
|
||
atomic.StoreInt32(&initialized, 0)
|
||
return false
|
||
}
|
||
|
||
return true
|
||
}
|
||
|
||
// 辅助函数
|
||
func sendControlToWorker(workerID, msgType int, data interface{}) error {
|
||
_, err := sendControlToWorkerWithResponse(workerID, msgType, data)
|
||
return err
|
||
}
|
||
|
||
// sendControlToWorkerWithResponse 发送控制消息到工作者并获取响应
|
||
func sendControlToWorkerWithResponse(workerID, msgType int, data interface{}) (interface{}, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
// 直接发送,不重试
|
||
resp, err := manager.pool.SendControl(PoolControlMessage{
|
||
Type: msgType,
|
||
WorkerID: int32(workerID),
|
||
Data: data,
|
||
})
|
||
|
||
if err != nil {
|
||
// 如果是超时错误,可以重试一次
|
||
if strings.Contains(err.Error(), "timeout") {
|
||
time.Sleep(100 * time.Millisecond)
|
||
return manager.pool.SendControl(PoolControlMessage{
|
||
Type: msgType,
|
||
WorkerID: int32(workerID),
|
||
Data: data,
|
||
})
|
||
}
|
||
}
|
||
|
||
return resp, err
|
||
}
|
||
|
||
// WaitForWorkerState 等待工作者达到指定状态
|
||
func WaitForWorkerState(workerID int, targetState int, timeoutMs int) bool {
|
||
deadline := time.Now().Add(time.Duration(timeoutMs) * time.Millisecond)
|
||
|
||
for time.Now().Before(deadline) {
|
||
status, err := GetWorkerStatus(workerID)
|
||
if err != nil {
|
||
// 如果工作者不存在,检查是否目标状态是停止
|
||
if strings.Contains(err.Error(), "not found") && targetState == WorkerStatusStopped {
|
||
return true
|
||
}
|
||
time.Sleep(50 * time.Millisecond)
|
||
continue
|
||
}
|
||
|
||
if s, ok := status["status"].(float64); ok && int(s) == targetState {
|
||
return true
|
||
}
|
||
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
|
||
return false
|
||
}
|
||
|
||
func sendControlToPool(msgType int, data interface{}) (interface{}, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return nil, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
return manager.pool.SendControl(PoolControlMessage{
|
||
Type: msgType,
|
||
Data: data,
|
||
})
|
||
}
|
||
|
||
// IsWorkerAvailable 检查工作者是否可接受任务
|
||
func IsWorkerAvailable(workerID int) (bool, error) {
|
||
if atomic.LoadInt32(&initialized) == 0 {
|
||
return false, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
manager := globalManager
|
||
if manager == nil || manager.pool == nil {
|
||
return false, fmt.Errorf("pool not initialized")
|
||
}
|
||
|
||
if worker, ok := manager.pool.workers.Load(int32(workerID)); ok {
|
||
status := atomic.LoadInt32(&worker.(*CompleteWorker).status)
|
||
return status == int32(WorkerStatusRunning) || status == int32(WorkerStatusIdle), nil
|
||
}
|
||
|
||
return false, fmt.Errorf("worker not found")
|
||
}
|
||
|
||
// ==================== 辅助函数 ====================
|
||
|
||
func logMessage(level int, format string, args ...interface{}) {
|
||
if atomic.LoadInt32(&shuttingDown) == 1 {
|
||
return
|
||
}
|
||
|
||
levelStr := "INFO"
|
||
switch level {
|
||
case LogLevelDebug:
|
||
levelStr = "DEBUG"
|
||
case LogLevelWarn:
|
||
levelStr = "WARN"
|
||
case LogLevelError:
|
||
levelStr = "ERROR"
|
||
case LogLevelFatal:
|
||
levelStr = "FATAL"
|
||
}
|
||
|
||
timestamp := time.Now().Format("2006-01-02 15:04:05.000")
|
||
message := fmt.Sprintf(format, args...)
|
||
|
||
fmt.Printf("[%s] [%s] %s\n", timestamp, levelStr, message)
|
||
}
|
||
|
||
func roundUpToPowerOfTwo(n int32) int32 {
|
||
n--
|
||
n |= n >> 1
|
||
n |= n >> 2
|
||
n |= n >> 4
|
||
n |= n >> 8
|
||
n |= n >> 16
|
||
n++
|
||
return n
|
||
}
|
||
|
||
func init() {
|
||
atomic.StoreInt32(&initialized, 0)
|
||
atomic.StoreInt32(&shuttingDown, 0)
|
||
debug.SetGCPercent(200)
|
||
debug.SetMemoryLimit(int64(DefaultMaxMemoryMB) * 1024 * 1024)
|
||
|
||
// 设置panic处理器
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
logMessage(LogLevelFatal, "Global panic: %v\nStack: %s",
|
||
r, string(debug.Stack()))
|
||
os.Exit(1)
|
||
}
|
||
}()
|
||
|
||
logMessage(LogLevelInfo, "Goroutine pool module initialized")
|
||
}
|