daShangDao_getErpSendPublis.../utils/redis_util.go
2026-06-15 16:18:50 +08:00

360 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package utils
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// RedisUtils Redis工具类
type RedisUtils struct {
client *redis.Client
ctx context.Context
processes map[string]*os.Process // 存储进程句柄的映射
processesLock sync.Mutex // 保护进程映射的锁
}
var Redis *RedisUtils // 默认Redis用于任务存储
var CacheRedis *RedisUtils // 缓存Redis用于商品图片缓存
// InitRedis 初始化默认Redis用于任务存储
func InitRedis(addr, password string, db int) {
Redis = &RedisUtils{
client: redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
}),
ctx: context.Background(),
processes: make(map[string]*os.Process),
}
}
// InitCacheRedis 初始化缓存Redis用于商品图片缓存
func InitCacheRedis(addr, password string, db int) {
CacheRedis = &RedisUtils{
client: redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
}),
ctx: context.Background(),
}
}
// GetSecondCacheObject 获取缓存对象(秒级缓存)
func (r *RedisUtils) GetSecondCacheObject(key string) (string, error) {
return r.client.Get(r.ctx, key).Result()
}
// SetSecondCacheObject 设置缓存对象(秒级缓存)
func (r *RedisUtils) SetSecondCacheObject(key, value string) error {
// 默认缓存24小时
return r.client.Set(r.ctx, key, value, 24*time.Hour).Err()
}
// DelCacheObject 删除缓存
func (r *RedisUtils) DelCacheObject(key string) error {
return r.client.Del(r.ctx, key).Err()
}
// 任务存储相关方法
// SaveTaskToRedis 保存任务到Redis
func (r *RedisUtils) SaveTaskToRedis(taskKey string, task interface{}) error {
taskJSON, err := json.Marshal(task)
if err != nil {
return fmt.Errorf("序列化任务失败: %v", err)
}
// 使用Hash存储任务信息设置24小时过期
err = r.client.HSet(r.ctx, "tasks", taskKey, taskJSON).Err()
if err != nil {
return fmt.Errorf("保存任务到Redis失败: %v", err)
}
// 同时将任务ID添加到待处理任务队列
err = r.client.LPush(r.ctx, "pending_tasks", taskKey).Err()
if err != nil {
return fmt.Errorf("添加到待处理队列失败: %v", err)
}
// 设置过期时间
r.client.Expire(r.ctx, "tasks", 24*time.Hour)
r.client.Expire(r.ctx, "pending_tasks", 24*time.Hour)
return nil
}
// SaveRunningTaskToRedis 保存运行任务到Redis
func (r *RedisUtils) SaveRunningTaskToRedis(runningTask interface{}) error {
taskJSON, err := json.Marshal(runningTask)
if err != nil {
return fmt.Errorf("序列化运行任务失败: %v", err)
}
// 生成唯一键
taskKey := fmt.Sprintf("running_task_%d", time.Now().UnixNano())
// 保存到运行任务Hash中
err = r.client.HSet(r.ctx, "running_tasks", taskKey, taskJSON).Err()
if err != nil {
return fmt.Errorf("保存运行任务到Redis失败: %v", err)
}
// 同时将任务键添加到待执行队列
err = r.client.LPush(r.ctx, "pending_running_tasks", taskKey).Err()
if err != nil {
return fmt.Errorf("添加到待执行运行任务队列失败: %v", err)
}
// 设置过期时间
r.client.Expire(r.ctx, "running_tasks", 24*time.Hour)
r.client.Expire(r.ctx, "pending_running_tasks", 24*time.Hour)
return nil
}
// BatchSaveRunningTasksToRedis 批量保存运行任务到Redis使用List类型队列
func (r *RedisUtils) BatchSaveRunningTasksToRedis(tasks []interface{}) error {
if len(tasks) == 0 {
return nil
}
// 使用pipeline批量操作
pipe := r.client.Pipeline()
// 用于收集所有唯一的队列名
queueNames := make(map[string]bool)
for _, task := range tasks {
taskJSON, err := json.Marshal(task)
if err != nil {
continue
}
// 默认队列名
queueName := "pending_running_tasks"
// 尝试从JSON中提取字段以生成唯一的队列名
var taskMap map[string]interface{}
if err := json.Unmarshal(taskJSON, &taskMap); err == nil {
// 先尝试从外层提取 data 字段
dataStr, hasData := taskMap["data"].(string)
if hasData && dataStr != "" {
// 解析 data 字段中的 JSON
var dataMap map[string]interface{}
if err := json.Unmarshal([]byte(dataStr), &dataMap); err == nil {
// 从 data 中提取字段
shopID, hasShopID := dataMap["shopId"].(string)
taskID, hasTaskID := dataMap["taskId"].(string)
taskType, hasTaskType := dataMap["taskType"].(string)
// 提取 csvFileNameTimestamp
timestamp := ""
if ts, hasTimestamp := dataMap["csvFileNameTimestamp"].(string); hasTimestamp {
timestamp = ts
} else {
// 生成时间戳:年月日时分秒
timestamp = time.Now().Format("20060102150405")
}
// 如果能从 data 中提取到所有必需字段,使用特定队列名
if hasShopID && hasTaskID && hasTaskType {
// 队列名格式: shopId_taskId_年月日时分秒_tasktype_1
queueName = fmt.Sprintf("%s_%s_%s_%s_1",
shopID,
taskID,
timestamp,
taskType)
}
}
}
}
// 收集队列名
queueNames[queueName] = true
// 将任务JSON推入List队列左侧LPUSH
pipe.LPush(r.ctx, queueName, taskJSON)
// 设置队列过期时间为24小时
pipe.Expire(r.ctx, queueName, 24*time.Hour)
}
// 执行批量操作
_, err := pipe.Exec(r.ctx)
if err != nil {
return err
}
// 为每个队列调用SendPublishing程序处理任务
for queueName := range queueNames {
go func(qName string) {
if _, err := r.callSendPublishing(qName); err != nil {
log.Printf("调用SendPublishing程序失败: %v, 队列: %s", err, qName)
}
}(queueName)
}
return nil
}
// CallSendPublishing 调用SendPublishing程序处理任务公开方法
// 返回进程句柄和错误
func (r *RedisUtils) CallSendPublishing(qName string) (*os.Process, error) {
return r.callSendPublishing(qName)
}
// callSendPublishing 调用SendPublishing程序处理任务内部方法
// 返回进程句柄和错误
func (r *RedisUtils) callSendPublishing(qName string) (*os.Process, error) {
log.Printf("准备启动SendPublishing程序队列: %s", qName)
// 为每个队列名启动一个SendPublishing程序实例
// 构建SendPublishing程序路径相对于项目根目录的父目录
programPath := "/www/wwwroot/GetErpSendPubishing/SendPublishing"
// 构建命令行参数传递redisKey标志
args := []string{"-redisKey", qName}
// 启动SendPublishing程序
cmd := exec.Command(programPath, args...)
// 设置输出和错误输出
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
// 执行命令(不等待完成,异步执行)
if err := cmd.Start(); err != nil {
log.Printf("启动SendPublishing程序失败: %v, 队列: %s", err, qName)
return nil, fmt.Errorf("启动程序失败: %w", err)
}
// 获取进程句柄
process := cmd.Process
// 保存进程句柄到映射中
r.processesLock.Lock()
r.processes[qName] = process
r.processesLock.Unlock()
log.Printf("SendPublishing程序已启动PID: %d, 队列: %s", process.Pid, qName)
// 启动一个goroutine等待进程结束自动清理映射
go func() {
err := cmd.Wait()
r.processesLock.Lock()
delete(r.processes, qName)
r.processesLock.Unlock()
if err != nil {
log.Printf("SendPublishing程序异常退出PID: %d, 队列: %s, 错误: %v", process.Pid, qName, err)
} else {
log.Printf("SendPublishing程序正常退出PID: %d, 队列: %s", process.Pid, qName)
}
}()
return process, nil
}
// GetProcess 根据队列名称获取进程句柄
func (r *RedisUtils) GetProcess(qName string) *os.Process {
r.processesLock.Lock()
defer r.processesLock.Unlock()
return r.processes[qName]
}
// GetTaskFromRedis 从Redis获取任务
func (r *RedisUtils) GetTaskFromRedis(taskKey string) (string, error) {
return r.client.HGet(r.ctx, "tasks", taskKey).Result()
}
// GetRunningTaskFromRedis 从Redis获取运行任务
func (r *RedisUtils) GetRunningTaskFromRedis(taskKey string) (string, error) {
return r.client.HGet(r.ctx, "running_tasks", taskKey).Result()
}
// GetPendingRunningTask 从待执行队列获取一个运行任务
func (r *RedisUtils) GetPendingRunningTask() (string, error) {
return r.client.RPop(r.ctx, "pending_running_tasks").Result()
}
// GetTaskCount 获取任务数量
func (r *RedisUtils) GetTaskCount() (int64, error) {
return r.client.HLen(r.ctx, "tasks").Result()
}
// GetRunningTaskCount 获取运行任务数量
func (r *RedisUtils) GetRunningTaskCount() (int64, error) {
return r.client.HLen(r.ctx, "running_tasks").Result()
}
// GetClient 获取底层Redis客户端
func (r *RedisUtils) GetClient() *redis.Client {
return r.client
}
// GetContext 获取上下文
func (r *RedisUtils) GetContext() context.Context {
return r.ctx
}
// Get Redis Get方法
func (r *RedisUtils) Get(key string) *redis.StringCmd {
return r.client.Get(r.ctx, key)
}
// Set Redis Set方法
func (r *RedisUtils) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
return r.client.Set(r.ctx, key, value, expiration)
}
// Keys Redis Keys方法
func (r *RedisUtils) Keys(pattern string) *redis.StringSliceCmd {
return r.client.Keys(r.ctx, pattern)
}
// LPush Redis LPush方法
func (r *RedisUtils) LPush(key string, values ...interface{}) *redis.IntCmd {
return r.client.LPush(r.ctx, key, values...)
}
// RPush Redis RPush方法
func (r *RedisUtils) RPush(key string, values ...interface{}) *redis.IntCmd {
return r.client.RPush(r.ctx, key, values...)
}
// LPop Redis LPop方法
func (r *RedisUtils) LPop(key string) *redis.StringCmd {
return r.client.LPop(r.ctx, key)
}
// BLPop Redis BLPop方法
func (r *RedisUtils) BLPop(timeout time.Duration, keys ...string) *redis.StringSliceCmd {
return r.client.BLPop(r.ctx, timeout, keys...)
}
// LLen Redis LLen方法
func (r *RedisUtils) LLen(key string) *redis.IntCmd {
return r.client.LLen(r.ctx, key)
}
// Expire Redis Expire方法
func (r *RedisUtils) Expire(key string, expiration time.Duration) *redis.BoolCmd {
return r.client.Expire(r.ctx, key, expiration)
}
// TTL Redis TTL方法
func (r *RedisUtils) TTL(key string) *redis.DurationCmd {
return r.client.TTL(r.ctx, key)
}