daShangDao_xy_dll/utils/tokenBucketUtil/tokenBucketUtil.go

184 lines
4.8 KiB
Go

// tokenBucketUtil/token_producer.go
package tokenBucketUtil
import (
"context"
"log"
"sync"
"time"
"github.com/redis/go-redis/v9"
)
// TokenBucketConfig 令牌桶配置
type TokenBucketConfig struct {
BucketKeyPrefix string `ini:"tokenBucket.BucketKeyPrefix"` // 令牌桶key前缀
TokensPerSecond int `ini:"tokenBucket.TokensPerSecond"` // 每秒生成令牌数
BucketSize int `ini:"tokenBucket.BucketSize"` // 令牌桶容量
Delay int `ini:"tokenBucket.Delay"` // 延迟(ms)
}
// TokenProducer 令牌生产者
type TokenProducer struct {
ctx context.Context
cancel context.CancelFunc
client *redis.Client
config TokenBucketConfig
wg sync.WaitGroup
stopFuncs map[string]func() // 存储不同key的停止函数
mu sync.RWMutex
}
// StartTokenProducer 启动令牌桶生产者
func StartTokenProducer(client *redis.Client, config TokenBucketConfig) *TokenProducer {
log.Printf("启动令牌桶生产者[前缀: %s].....", config.BucketKeyPrefix)
ctx, cancel := context.WithCancel(context.Background())
producer := &TokenProducer{
ctx: ctx,
cancel: cancel,
client: client,
config: config,
stopFuncs: make(map[string]func()),
}
log.Printf("令牌桶生产者[前缀: %s]已启动: %d tokens/s, 容量: %d",
config.BucketKeyPrefix, config.TokensPerSecond, config.BucketSize)
return producer
}
// AddBucket 为特定的appSecret添加令牌桶
func (p *TokenProducer) AddBucket(appSecret string) {
p.mu.Lock()
defer p.mu.Unlock()
bucketKey := p.config.BucketKeyPrefix + appSecret
// 如果已经存在,先停止旧的
if stopFunc, exists := p.stopFuncs[bucketKey]; exists {
stopFunc()
}
// 初始化令牌桶
initBucket(p.client, bucketKey, p.config.BucketSize)
// 启动单个令牌桶的生产者
ctx, cancel := context.WithCancel(p.ctx)
p.wg.Add(1)
go p.runSingleBucket(ctx, bucketKey, cancel)
p.stopFuncs[bucketKey] = cancel
log.Printf("添加令牌桶: %s", bucketKey)
}
// RemoveBucket 移除特定的令牌桶
func (p *TokenProducer) RemoveBucket(appSecret string) {
p.mu.Lock()
defer p.mu.Unlock()
bucketKey := p.config.BucketKeyPrefix + appSecret
if stopFunc, exists := p.stopFuncs[bucketKey]; exists {
stopFunc()
delete(p.stopFuncs, bucketKey)
log.Printf("移除令牌桶: %s", bucketKey)
}
}
// Stop 停止所有令牌桶生产者
func (p *TokenProducer) Stop() {
log.Printf("停止所有令牌桶生产者...")
p.cancel()
p.mu.Lock()
for key, stopFunc := range p.stopFuncs {
stopFunc()
delete(p.stopFuncs, key)
}
p.mu.Unlock()
p.wg.Wait()
log.Printf("所有令牌桶生产者已停止")
}
// runSingleBucket 运行单个令牌桶的生产者
func (p *TokenProducer) runSingleBucket(ctx context.Context, bucketKey string, cancel context.CancelFunc) {
defer p.wg.Done()
defer cancel()
ticker := time.NewTicker(time.Duration(p.config.Delay) * time.Millisecond / time.Duration(p.config.TokensPerSecond))
defer ticker.Stop()
// 设置键的过期时间为桶填满所需时间的2倍
expireTime := (p.config.BucketSize/p.config.TokensPerSecond + 1) * 2
log.Printf("令牌桶[%s]过期时间: %d秒", bucketKey, expireTime)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.produceToken(bucketKey, expireTime)
}
}
}
// produceToken 生产令牌
func (p *TokenProducer) produceToken(bucketKey string, expireTime int) {
luaScript := `
local bucket_key = KEYS[1]
local bucket_size = tonumber(ARGV[1])
local expire_time = tonumber(ARGV[2])
local current_tokens = tonumber(redis.call('get', bucket_key)) or 0
if current_tokens < bucket_size then
redis.call('setex', bucket_key, expire_time, current_tokens + 1)
return 1
else
redis.call('setex', bucket_key, expire_time, bucket_size)
return 0
end
`
result, err := p.client.Eval(p.ctx, luaScript, []string{bucketKey}, p.config.BucketSize, expireTime).Int()
if err != nil {
if p.ctx.Err() != nil {
return
}
log.Printf("令牌桶[%s]生产失败: %v", bucketKey, err)
return
}
// 获取当前令牌数用于日志记录
currentTokens, err := p.client.Get(p.ctx, bucketKey).Int()
if err != nil {
log.Printf("令牌桶[%s]获取当前令牌数失败: %v", bucketKey, err)
return
}
// 记录令牌桶的进度
if result == 1 && (currentTokens%10 == 0 || currentTokens == p.config.BucketSize) {
log.Printf("令牌桶[%s]当前令牌数: %d/%d", bucketKey, currentTokens, p.config.BucketSize)
}
}
// 初始化令牌桶
func initBucket(client *redis.Client, key string, bucketSize int) {
log.Printf("初始化令牌桶[%s].....", key)
ctx := context.Background()
// 删除现有键(如果有)
client.Del(ctx, key)
// 初始化空桶
err := client.Set(ctx, key, 0, 0).Err()
if err != nil {
log.Printf("初始化令牌桶[%s]失败: %v", key, err)
return
}
log.Printf("令牌桶[%s]初始化成功,初始容量: 0/%d", key, bucketSize)
}