253 lines
5.1 KiB
Go
253 lines
5.1 KiB
Go
package main
|
||
|
||
import (
|
||
"container/heap"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
)
|
||
|
||
//func main() {
|
||
// // 创建一个每秒触发一次的定时器
|
||
// ticker := time.NewTicker(1 * time.Second)
|
||
// defer ticker.Stop() // 确保资源被释放
|
||
//
|
||
// fmt.Println("定时器启动,每秒执行一次...")
|
||
//
|
||
// for i := 1; i <= 5; i++ {
|
||
// <-ticker.C // 等待定时器触发
|
||
// fmt.Printf("执行第 %d 次,当前时间: %v\n", i, time.Now().Format("15:04:05"))
|
||
// }
|
||
//
|
||
// fmt.Println("定时器执行完成")
|
||
//}
|
||
|
||
//func main() {
|
||
// ticker := time.NewTicker(500 * time.Millisecond)
|
||
// done := make(chan bool)
|
||
//
|
||
// // 5秒后停止定时器
|
||
// go func() {
|
||
// time.Sleep(5 * time.Second)
|
||
// ticker.Stop()
|
||
// done <- true
|
||
// fmt.Println("定时器已停止")
|
||
// }()
|
||
//
|
||
// fmt.Println("定时器启动,每500毫秒执行一次...")
|
||
//
|
||
// for {
|
||
// select {
|
||
// case t := <-ticker.C:
|
||
// fmt.Printf("定时任务执行,时间: %v\n", t.Format("15:04:05.000"))
|
||
// case <-done:
|
||
// return
|
||
// }
|
||
// }
|
||
//}
|
||
|
||
//func main() {
|
||
// fmt.Println("程序启动:", time.Now().Format("15:04:05"))
|
||
//
|
||
// // 创建一个3秒后触发的单次定时器
|
||
// timer := time.NewTimer(3 * time.Second)
|
||
//
|
||
// fmt.Println("等待3秒...")
|
||
//
|
||
// // 阻塞直到定时器触发
|
||
// <-timer.C
|
||
// fmt.Println("3秒时间到!", time.Now().Format("15:04:05"))
|
||
//
|
||
// // 定时器重置
|
||
// fmt.Println("\n重置定时器为2秒...")
|
||
// timer.Reset(2 * time.Second)
|
||
// <-timer.C
|
||
// fmt.Println("2秒时间到!", time.Now().Format("15:04:05"))
|
||
//
|
||
// // 停止定时器示例
|
||
// timer2 := time.NewTimer(5 * time.Second)
|
||
// go func() {
|
||
// time.Sleep(2 * time.Second)
|
||
// if timer2.Stop() {
|
||
// fmt.Println("定时器在触发前被停止了")
|
||
// }
|
||
// }()
|
||
//
|
||
// select {
|
||
// case <-timer2.C:
|
||
// fmt.Println("定时器正常触发")
|
||
// case <-time.After(3 * time.Second):
|
||
// fmt.Println("检查完成")
|
||
// }
|
||
//}
|
||
|
||
// 延迟任务
|
||
type DelayedTask struct {
|
||
ExecuteAt time.Time
|
||
Task func()
|
||
Index int // heap需要的索引
|
||
}
|
||
|
||
// 延迟队列
|
||
type DelayQueue struct {
|
||
tasks []*DelayedTask
|
||
mu sync.Mutex
|
||
cond *sync.Cond
|
||
done chan struct{}
|
||
}
|
||
|
||
func NewDelayQueue() *DelayQueue {
|
||
dq := &DelayQueue{
|
||
tasks: make([]*DelayedTask, 0),
|
||
done: make(chan struct{}),
|
||
}
|
||
dq.cond = sync.NewCond(&dq.mu)
|
||
return dq
|
||
}
|
||
|
||
// heap接口实现
|
||
func (dq *DelayQueue) Len() int { return len(dq.tasks) }
|
||
func (dq *DelayQueue) Less(i, j int) bool {
|
||
return dq.tasks[i].ExecuteAt.Before(dq.tasks[j].ExecuteAt)
|
||
}
|
||
func (dq *DelayQueue) Swap(i, j int) {
|
||
dq.tasks[i], dq.tasks[j] = dq.tasks[j], dq.tasks[i]
|
||
dq.tasks[i].Index = i
|
||
dq.tasks[j].Index = j
|
||
}
|
||
|
||
func (dq *DelayQueue) Push(x interface{}) {
|
||
task := x.(*DelayedTask)
|
||
task.Index = len(dq.tasks)
|
||
dq.tasks = append(dq.tasks, task)
|
||
}
|
||
|
||
func (dq *DelayQueue) Pop() interface{} {
|
||
n := len(dq.tasks)
|
||
task := dq.tasks[n-1]
|
||
dq.tasks[n-1] = nil // 避免内存泄漏
|
||
dq.tasks = dq.tasks[:n-1]
|
||
return task
|
||
}
|
||
|
||
// 添加延迟任务
|
||
func (dq *DelayQueue) AddTask(delay time.Duration, task func()) {
|
||
dq.mu.Lock()
|
||
defer dq.mu.Unlock()
|
||
|
||
heap.Push(dq, &DelayedTask{
|
||
ExecuteAt: time.Now().Add(delay),
|
||
Task: task,
|
||
})
|
||
|
||
// 通知等待的 goroutine 有新的任务
|
||
dq.cond.Signal()
|
||
}
|
||
|
||
// 停止队列处理器
|
||
func (dq *DelayQueue) Stop() {
|
||
close(dq.done)
|
||
dq.cond.Broadcast()
|
||
}
|
||
|
||
// 启动队列处理器
|
||
func (dq *DelayQueue) Start() {
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-dq.done:
|
||
return
|
||
default:
|
||
dq.processTasks()
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
func (dq *DelayQueue) processTasks() {
|
||
dq.mu.Lock()
|
||
|
||
// 等待直到有任务
|
||
for len(dq.tasks) == 0 {
|
||
dq.cond.Wait()
|
||
|
||
// 检查是否停止
|
||
select {
|
||
case <-dq.done:
|
||
dq.mu.Unlock()
|
||
return
|
||
default:
|
||
}
|
||
}
|
||
|
||
now := time.Now()
|
||
task := dq.tasks[0] // 查看堆顶元素
|
||
|
||
if now.Before(task.ExecuteAt) {
|
||
// 还未到执行时间,等待
|
||
waitDuration := task.ExecuteAt.Sub(now)
|
||
|
||
// 使用定时器等待
|
||
timer := time.NewTimer(waitDuration)
|
||
|
||
// 释放锁并等待
|
||
dq.mu.Unlock()
|
||
|
||
select {
|
||
case <-timer.C:
|
||
// 时间到了,重新获取锁处理任务
|
||
dq.mu.Lock()
|
||
// 再次检查任务是否还在(可能被其他操作移除)
|
||
if len(dq.tasks) > 0 && dq.tasks[0] == task {
|
||
heap.Pop(dq)
|
||
dq.mu.Unlock()
|
||
task.Task()
|
||
} else {
|
||
dq.mu.Unlock()
|
||
}
|
||
return
|
||
case <-dq.done:
|
||
timer.Stop()
|
||
return
|
||
}
|
||
}
|
||
|
||
// 立即执行任务
|
||
heap.Pop(dq)
|
||
dq.mu.Unlock()
|
||
|
||
// 执行任务
|
||
task.Task()
|
||
}
|
||
|
||
func main() {
|
||
queue := NewDelayQueue()
|
||
queue.Start()
|
||
|
||
fmt.Println("延迟队列启动,当前时间:", time.Now().Format("15:04:05"))
|
||
|
||
// 添加延迟任务
|
||
queue.AddTask(2*time.Second, func() {
|
||
fmt.Printf("[延迟2秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
|
||
})
|
||
|
||
queue.AddTask(5*time.Second, func() {
|
||
fmt.Printf("[延迟5秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
|
||
})
|
||
|
||
queue.AddTask(1*time.Second, func() {
|
||
fmt.Printf("[延迟1秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
|
||
})
|
||
|
||
queue.AddTask(3*time.Second, func() {
|
||
fmt.Printf("[延迟3秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
|
||
})
|
||
|
||
// 等待所有任务执行
|
||
time.Sleep(10 * time.Second)
|
||
|
||
// 停止队列
|
||
queue.Stop()
|
||
fmt.Println("程序结束")
|
||
}
|