package main import ( "container/heap" "fmt" "sync" "time" ) //func main() { // // 创建一个每秒触发一次的定时器 // ticker := time.NewTicker(1 * time.Second) // defer ticker.Stop() // 确保资源被释放 // // fmt.Println("定时器启动,每秒执行一次...") // // for i := 1; i <= 5; i++ { // <-ticker.C // 等待定时器触发 // fmt.Printf("执行第 %d 次,当前时间: %v\n", i, time.Now().Format("15:04:05")) // } // // fmt.Println("定时器执行完成") //} //func main() { // ticker := time.NewTicker(500 * time.Millisecond) // done := make(chan bool) // // // 5秒后停止定时器 // go func() { // time.Sleep(5 * time.Second) // ticker.Stop() // done <- true // fmt.Println("定时器已停止") // }() // // fmt.Println("定时器启动,每500毫秒执行一次...") // // for { // select { // case t := <-ticker.C: // fmt.Printf("定时任务执行,时间: %v\n", t.Format("15:04:05.000")) // case <-done: // return // } // } //} //func main() { // fmt.Println("程序启动:", time.Now().Format("15:04:05")) // // // 创建一个3秒后触发的单次定时器 // timer := time.NewTimer(3 * time.Second) // // fmt.Println("等待3秒...") // // // 阻塞直到定时器触发 // <-timer.C // fmt.Println("3秒时间到!", time.Now().Format("15:04:05")) // // // 定时器重置 // fmt.Println("\n重置定时器为2秒...") // timer.Reset(2 * time.Second) // <-timer.C // fmt.Println("2秒时间到!", time.Now().Format("15:04:05")) // // // 停止定时器示例 // timer2 := time.NewTimer(5 * time.Second) // go func() { // time.Sleep(2 * time.Second) // if timer2.Stop() { // fmt.Println("定时器在触发前被停止了") // } // }() // // select { // case <-timer2.C: // fmt.Println("定时器正常触发") // case <-time.After(3 * time.Second): // fmt.Println("检查完成") // } //} // 延迟任务 type DelayedTask struct { ExecuteAt time.Time Task func() Index int // heap需要的索引 } // 延迟队列 type DelayQueue struct { tasks []*DelayedTask mu sync.Mutex cond *sync.Cond done chan struct{} } func NewDelayQueue() *DelayQueue { dq := &DelayQueue{ tasks: make([]*DelayedTask, 0), done: make(chan struct{}), } dq.cond = sync.NewCond(&dq.mu) return dq } // heap接口实现 func (dq *DelayQueue) Len() int { return len(dq.tasks) } func (dq *DelayQueue) Less(i, j int) bool { return dq.tasks[i].ExecuteAt.Before(dq.tasks[j].ExecuteAt) } func (dq *DelayQueue) Swap(i, j int) { dq.tasks[i], dq.tasks[j] = dq.tasks[j], dq.tasks[i] dq.tasks[i].Index = i dq.tasks[j].Index = j } func (dq *DelayQueue) Push(x interface{}) { task := x.(*DelayedTask) task.Index = len(dq.tasks) dq.tasks = append(dq.tasks, task) } func (dq *DelayQueue) Pop() interface{} { n := len(dq.tasks) task := dq.tasks[n-1] dq.tasks[n-1] = nil // 避免内存泄漏 dq.tasks = dq.tasks[:n-1] return task } // 添加延迟任务 func (dq *DelayQueue) AddTask(delay time.Duration, task func()) { dq.mu.Lock() defer dq.mu.Unlock() heap.Push(dq, &DelayedTask{ ExecuteAt: time.Now().Add(delay), Task: task, }) // 通知等待的 goroutine 有新的任务 dq.cond.Signal() } // 停止队列处理器 func (dq *DelayQueue) Stop() { close(dq.done) dq.cond.Broadcast() } // 启动队列处理器 func (dq *DelayQueue) Start() { go func() { for { select { case <-dq.done: return default: dq.processTasks() } } }() } func (dq *DelayQueue) processTasks() { dq.mu.Lock() // 等待直到有任务 for len(dq.tasks) == 0 { dq.cond.Wait() // 检查是否停止 select { case <-dq.done: dq.mu.Unlock() return default: } } now := time.Now() task := dq.tasks[0] // 查看堆顶元素 if now.Before(task.ExecuteAt) { // 还未到执行时间,等待 waitDuration := task.ExecuteAt.Sub(now) // 使用定时器等待 timer := time.NewTimer(waitDuration) // 释放锁并等待 dq.mu.Unlock() select { case <-timer.C: // 时间到了,重新获取锁处理任务 dq.mu.Lock() // 再次检查任务是否还在(可能被其他操作移除) if len(dq.tasks) > 0 && dq.tasks[0] == task { heap.Pop(dq) dq.mu.Unlock() task.Task() } else { dq.mu.Unlock() } return case <-dq.done: timer.Stop() return } } // 立即执行任务 heap.Pop(dq) dq.mu.Unlock() // 执行任务 task.Task() } func main() { queue := NewDelayQueue() queue.Start() fmt.Println("延迟队列启动,当前时间:", time.Now().Format("15:04:05")) // 添加延迟任务 queue.AddTask(2*time.Second, func() { fmt.Printf("[延迟2秒] 执行时间: %v\n", time.Now().Format("15:04:05")) }) queue.AddTask(5*time.Second, func() { fmt.Printf("[延迟5秒] 执行时间: %v\n", time.Now().Format("15:04:05")) }) queue.AddTask(1*time.Second, func() { fmt.Printf("[延迟1秒] 执行时间: %v\n", time.Now().Format("15:04:05")) }) queue.AddTask(3*time.Second, func() { fmt.Printf("[延迟3秒] 执行时间: %v\n", time.Now().Format("15:04:05")) }) // 等待所有任务执行 time.Sleep(10 * time.Second) // 停止队列 queue.Stop() fmt.Println("程序结束") }