daShangDao_kfzgw-info/timer/timer.go
2026-02-27 11:46:40 +08:00

253 lines
5.1 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"container/heap"
"fmt"
"sync"
"time"
)
//func main() {
// // 创建一个每秒触发一次的定时器
// ticker := time.NewTicker(1 * time.Second)
// defer ticker.Stop() // 确保资源被释放
//
// fmt.Println("定时器启动,每秒执行一次...")
//
// for i := 1; i <= 5; i++ {
// <-ticker.C // 等待定时器触发
// fmt.Printf("执行第 %d 次,当前时间: %v\n", i, time.Now().Format("15:04:05"))
// }
//
// fmt.Println("定时器执行完成")
//}
//func main() {
// ticker := time.NewTicker(500 * time.Millisecond)
// done := make(chan bool)
//
// // 5秒后停止定时器
// go func() {
// time.Sleep(5 * time.Second)
// ticker.Stop()
// done <- true
// fmt.Println("定时器已停止")
// }()
//
// fmt.Println("定时器启动每500毫秒执行一次...")
//
// for {
// select {
// case t := <-ticker.C:
// fmt.Printf("定时任务执行,时间: %v\n", t.Format("15:04:05.000"))
// case <-done:
// return
// }
// }
//}
//func main() {
// fmt.Println("程序启动:", time.Now().Format("15:04:05"))
//
// // 创建一个3秒后触发的单次定时器
// timer := time.NewTimer(3 * time.Second)
//
// fmt.Println("等待3秒...")
//
// // 阻塞直到定时器触发
// <-timer.C
// fmt.Println("3秒时间到!", time.Now().Format("15:04:05"))
//
// // 定时器重置
// fmt.Println("\n重置定时器为2秒...")
// timer.Reset(2 * time.Second)
// <-timer.C
// fmt.Println("2秒时间到!", time.Now().Format("15:04:05"))
//
// // 停止定时器示例
// timer2 := time.NewTimer(5 * time.Second)
// go func() {
// time.Sleep(2 * time.Second)
// if timer2.Stop() {
// fmt.Println("定时器在触发前被停止了")
// }
// }()
//
// select {
// case <-timer2.C:
// fmt.Println("定时器正常触发")
// case <-time.After(3 * time.Second):
// fmt.Println("检查完成")
// }
//}
// 延迟任务
type DelayedTask struct {
ExecuteAt time.Time
Task func()
Index int // heap需要的索引
}
// 延迟队列
type DelayQueue struct {
tasks []*DelayedTask
mu sync.Mutex
cond *sync.Cond
done chan struct{}
}
func NewDelayQueue() *DelayQueue {
dq := &DelayQueue{
tasks: make([]*DelayedTask, 0),
done: make(chan struct{}),
}
dq.cond = sync.NewCond(&dq.mu)
return dq
}
// heap接口实现
func (dq *DelayQueue) Len() int { return len(dq.tasks) }
func (dq *DelayQueue) Less(i, j int) bool {
return dq.tasks[i].ExecuteAt.Before(dq.tasks[j].ExecuteAt)
}
func (dq *DelayQueue) Swap(i, j int) {
dq.tasks[i], dq.tasks[j] = dq.tasks[j], dq.tasks[i]
dq.tasks[i].Index = i
dq.tasks[j].Index = j
}
func (dq *DelayQueue) Push(x interface{}) {
task := x.(*DelayedTask)
task.Index = len(dq.tasks)
dq.tasks = append(dq.tasks, task)
}
func (dq *DelayQueue) Pop() interface{} {
n := len(dq.tasks)
task := dq.tasks[n-1]
dq.tasks[n-1] = nil // 避免内存泄漏
dq.tasks = dq.tasks[:n-1]
return task
}
// 添加延迟任务
func (dq *DelayQueue) AddTask(delay time.Duration, task func()) {
dq.mu.Lock()
defer dq.mu.Unlock()
heap.Push(dq, &DelayedTask{
ExecuteAt: time.Now().Add(delay),
Task: task,
})
// 通知等待的 goroutine 有新的任务
dq.cond.Signal()
}
// 停止队列处理器
func (dq *DelayQueue) Stop() {
close(dq.done)
dq.cond.Broadcast()
}
// 启动队列处理器
func (dq *DelayQueue) Start() {
go func() {
for {
select {
case <-dq.done:
return
default:
dq.processTasks()
}
}
}()
}
func (dq *DelayQueue) processTasks() {
dq.mu.Lock()
// 等待直到有任务
for len(dq.tasks) == 0 {
dq.cond.Wait()
// 检查是否停止
select {
case <-dq.done:
dq.mu.Unlock()
return
default:
}
}
now := time.Now()
task := dq.tasks[0] // 查看堆顶元素
if now.Before(task.ExecuteAt) {
// 还未到执行时间,等待
waitDuration := task.ExecuteAt.Sub(now)
// 使用定时器等待
timer := time.NewTimer(waitDuration)
// 释放锁并等待
dq.mu.Unlock()
select {
case <-timer.C:
// 时间到了,重新获取锁处理任务
dq.mu.Lock()
// 再次检查任务是否还在(可能被其他操作移除)
if len(dq.tasks) > 0 && dq.tasks[0] == task {
heap.Pop(dq)
dq.mu.Unlock()
task.Task()
} else {
dq.mu.Unlock()
}
return
case <-dq.done:
timer.Stop()
return
}
}
// 立即执行任务
heap.Pop(dq)
dq.mu.Unlock()
// 执行任务
task.Task()
}
func main() {
queue := NewDelayQueue()
queue.Start()
fmt.Println("延迟队列启动,当前时间:", time.Now().Format("15:04:05"))
// 添加延迟任务
queue.AddTask(2*time.Second, func() {
fmt.Printf("[延迟2秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
})
queue.AddTask(5*time.Second, func() {
fmt.Printf("[延迟5秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
})
queue.AddTask(1*time.Second, func() {
fmt.Printf("[延迟1秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
})
queue.AddTask(3*time.Second, func() {
fmt.Printf("[延迟3秒] 执行时间: %v\n", time.Now().Format("15:04:05"))
})
// 等待所有任务执行
time.Sleep(10 * time.Second)
// 停止队列
queue.Stop()
fmt.Println("程序结束")
}