// tokenBucketUtil/token_producer.go package tokenBucketUtil import ( "context" "log" "sync" "time" "github.com/redis/go-redis/v9" ) // TokenBucketConfig 令牌桶配置 type TokenBucketConfig struct { BucketKeyPrefix string `ini:"tokenBucket.BucketKeyPrefix"` // 令牌桶key前缀 TokensPerSecond int `ini:"tokenBucket.TokensPerSecond"` // 每秒生成令牌数 BucketSize int `ini:"tokenBucket.BucketSize"` // 令牌桶容量 Delay int `ini:"tokenBucket.Delay"` // 延迟(ms) } // TokenProducer 令牌生产者 type TokenProducer struct { ctx context.Context cancel context.CancelFunc client *redis.Client config TokenBucketConfig wg sync.WaitGroup stopFuncs map[string]func() // 存储不同key的停止函数 mu sync.RWMutex } // StartTokenProducer 启动令牌桶生产者 func StartTokenProducer(client *redis.Client, config TokenBucketConfig) *TokenProducer { log.Printf("启动令牌桶生产者[前缀: %s].....", config.BucketKeyPrefix) ctx, cancel := context.WithCancel(context.Background()) producer := &TokenProducer{ ctx: ctx, cancel: cancel, client: client, config: config, stopFuncs: make(map[string]func()), } log.Printf("令牌桶生产者[前缀: %s]已启动: %d tokens/s, 容量: %d", config.BucketKeyPrefix, config.TokensPerSecond, config.BucketSize) return producer } // AddBucket 为特定的appSecret添加令牌桶 func (p *TokenProducer) AddBucket(appSecret string) { p.mu.Lock() defer p.mu.Unlock() bucketKey := p.config.BucketKeyPrefix + appSecret // 如果已经存在,先停止旧的 if stopFunc, exists := p.stopFuncs[bucketKey]; exists { stopFunc() } // 初始化令牌桶 initBucket(p.client, bucketKey, p.config.BucketSize) // 启动单个令牌桶的生产者 ctx, cancel := context.WithCancel(p.ctx) p.wg.Add(1) go p.runSingleBucket(ctx, bucketKey, cancel) p.stopFuncs[bucketKey] = cancel log.Printf("添加令牌桶: %s", bucketKey) } // RemoveBucket 移除特定的令牌桶 func (p *TokenProducer) RemoveBucket(appSecret string) { p.mu.Lock() defer p.mu.Unlock() bucketKey := p.config.BucketKeyPrefix + appSecret if stopFunc, exists := p.stopFuncs[bucketKey]; exists { stopFunc() delete(p.stopFuncs, bucketKey) log.Printf("移除令牌桶: %s", bucketKey) } } // Stop 停止所有令牌桶生产者 func (p *TokenProducer) Stop() { log.Printf("停止所有令牌桶生产者...") p.cancel() p.mu.Lock() for key, stopFunc := range p.stopFuncs { stopFunc() delete(p.stopFuncs, key) } p.mu.Unlock() p.wg.Wait() log.Printf("所有令牌桶生产者已停止") } // runSingleBucket 运行单个令牌桶的生产者 func (p *TokenProducer) runSingleBucket(ctx context.Context, bucketKey string, cancel context.CancelFunc) { defer p.wg.Done() defer cancel() ticker := time.NewTicker(time.Duration(p.config.Delay) * time.Millisecond / time.Duration(p.config.TokensPerSecond)) defer ticker.Stop() // 设置键的过期时间为桶填满所需时间的2倍 expireTime := (p.config.BucketSize/p.config.TokensPerSecond + 1) * 2 log.Printf("令牌桶[%s]过期时间: %d秒", bucketKey, expireTime) for { select { case <-ctx.Done(): return case <-ticker.C: p.produceToken(bucketKey, expireTime) } } } // produceToken 生产令牌 func (p *TokenProducer) produceToken(bucketKey string, expireTime int) { luaScript := ` local bucket_key = KEYS[1] local bucket_size = tonumber(ARGV[1]) local expire_time = tonumber(ARGV[2]) local current_tokens = tonumber(redis.call('get', bucket_key)) or 0 if current_tokens < bucket_size then redis.call('setex', bucket_key, expire_time, current_tokens + 1) return 1 else redis.call('setex', bucket_key, expire_time, bucket_size) return 0 end ` result, err := p.client.Eval(p.ctx, luaScript, []string{bucketKey}, p.config.BucketSize, expireTime).Int() if err != nil { if p.ctx.Err() != nil { return } log.Printf("令牌桶[%s]生产失败: %v", bucketKey, err) return } // 获取当前令牌数用于日志记录 currentTokens, err := p.client.Get(p.ctx, bucketKey).Int() if err != nil { log.Printf("令牌桶[%s]获取当前令牌数失败: %v", bucketKey, err) return } // 记录令牌桶的进度 if result == 1 && (currentTokens%10 == 0 || currentTokens == p.config.BucketSize) { log.Printf("令牌桶[%s]当前令牌数: %d/%d", bucketKey, currentTokens, p.config.BucketSize) } } // 初始化令牌桶 func initBucket(client *redis.Client, key string, bucketSize int) { log.Printf("初始化令牌桶[%s].....", key) ctx := context.Background() // 删除现有键(如果有) client.Del(ctx, key) // 初始化空桶 err := client.Set(ctx, key, 0, 0).Err() if err != nil { log.Printf("初始化令牌桶[%s]失败: %v", key, err) return } log.Printf("令牌桶[%s]初始化成功,初始容量: 0/%d", key, bucketSize) }