360 lines
10 KiB
Go
360 lines
10 KiB
Go
package utils
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"os"
|
||
"os/exec"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
// RedisUtils Redis工具类
|
||
type RedisUtils struct {
|
||
client *redis.Client
|
||
ctx context.Context
|
||
processes map[string]*os.Process // 存储进程句柄的映射
|
||
processesLock sync.Mutex // 保护进程映射的锁
|
||
}
|
||
|
||
var Redis *RedisUtils // 默认Redis(用于任务存储)
|
||
var CacheRedis *RedisUtils // 缓存Redis(用于商品图片缓存)
|
||
|
||
// InitRedis 初始化默认Redis(用于任务存储)
|
||
func InitRedis(addr, password string, db int) {
|
||
Redis = &RedisUtils{
|
||
client: redis.NewClient(&redis.Options{
|
||
Addr: addr,
|
||
Password: password,
|
||
DB: db,
|
||
}),
|
||
ctx: context.Background(),
|
||
processes: make(map[string]*os.Process),
|
||
}
|
||
}
|
||
|
||
// InitCacheRedis 初始化缓存Redis(用于商品图片缓存)
|
||
func InitCacheRedis(addr, password string, db int) {
|
||
CacheRedis = &RedisUtils{
|
||
client: redis.NewClient(&redis.Options{
|
||
Addr: addr,
|
||
Password: password,
|
||
DB: db,
|
||
}),
|
||
ctx: context.Background(),
|
||
}
|
||
}
|
||
|
||
// GetSecondCacheObject 获取缓存对象(秒级缓存)
|
||
func (r *RedisUtils) GetSecondCacheObject(key string) (string, error) {
|
||
return r.client.Get(r.ctx, key).Result()
|
||
}
|
||
|
||
// SetSecondCacheObject 设置缓存对象(秒级缓存)
|
||
func (r *RedisUtils) SetSecondCacheObject(key, value string) error {
|
||
// 默认缓存24小时
|
||
return r.client.Set(r.ctx, key, value, 24*time.Hour).Err()
|
||
}
|
||
|
||
// DelCacheObject 删除缓存
|
||
func (r *RedisUtils) DelCacheObject(key string) error {
|
||
return r.client.Del(r.ctx, key).Err()
|
||
}
|
||
|
||
// 任务存储相关方法
|
||
|
||
// SaveTaskToRedis 保存任务到Redis
|
||
func (r *RedisUtils) SaveTaskToRedis(taskKey string, task interface{}) error {
|
||
taskJSON, err := json.Marshal(task)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化任务失败: %v", err)
|
||
}
|
||
|
||
// 使用Hash存储任务信息,设置24小时过期
|
||
err = r.client.HSet(r.ctx, "tasks", taskKey, taskJSON).Err()
|
||
if err != nil {
|
||
return fmt.Errorf("保存任务到Redis失败: %v", err)
|
||
}
|
||
|
||
// 同时将任务ID添加到待处理任务队列
|
||
err = r.client.LPush(r.ctx, "pending_tasks", taskKey).Err()
|
||
if err != nil {
|
||
return fmt.Errorf("添加到待处理队列失败: %v", err)
|
||
}
|
||
|
||
// 设置过期时间
|
||
r.client.Expire(r.ctx, "tasks", 24*time.Hour)
|
||
r.client.Expire(r.ctx, "pending_tasks", 24*time.Hour)
|
||
|
||
return nil
|
||
}
|
||
|
||
// SaveRunningTaskToRedis 保存运行任务到Redis
|
||
func (r *RedisUtils) SaveRunningTaskToRedis(runningTask interface{}) error {
|
||
taskJSON, err := json.Marshal(runningTask)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化运行任务失败: %v", err)
|
||
}
|
||
|
||
// 生成唯一键
|
||
taskKey := fmt.Sprintf("running_task_%d", time.Now().UnixNano())
|
||
|
||
// 保存到运行任务Hash中
|
||
err = r.client.HSet(r.ctx, "running_tasks", taskKey, taskJSON).Err()
|
||
if err != nil {
|
||
return fmt.Errorf("保存运行任务到Redis失败: %v", err)
|
||
}
|
||
|
||
// 同时将任务键添加到待执行队列
|
||
err = r.client.LPush(r.ctx, "pending_running_tasks", taskKey).Err()
|
||
if err != nil {
|
||
return fmt.Errorf("添加到待执行运行任务队列失败: %v", err)
|
||
}
|
||
|
||
// 设置过期时间
|
||
r.client.Expire(r.ctx, "running_tasks", 24*time.Hour)
|
||
r.client.Expire(r.ctx, "pending_running_tasks", 24*time.Hour)
|
||
|
||
return nil
|
||
}
|
||
|
||
// BatchSaveRunningTasksToRedis 批量保存运行任务到Redis(使用List类型队列)
|
||
func (r *RedisUtils) BatchSaveRunningTasksToRedis(tasks []interface{}) error {
|
||
if len(tasks) == 0 {
|
||
return nil
|
||
}
|
||
|
||
// 使用pipeline批量操作
|
||
pipe := r.client.Pipeline()
|
||
|
||
// 用于收集所有唯一的队列名
|
||
queueNames := make(map[string]bool)
|
||
|
||
for _, task := range tasks {
|
||
taskJSON, err := json.Marshal(task)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
// 默认队列名
|
||
queueName := "pending_running_tasks"
|
||
|
||
// 尝试从JSON中提取字段以生成唯一的队列名
|
||
var taskMap map[string]interface{}
|
||
if err := json.Unmarshal(taskJSON, &taskMap); err == nil {
|
||
|
||
// 先尝试从外层提取 data 字段
|
||
dataStr, hasData := taskMap["data"].(string)
|
||
if hasData && dataStr != "" {
|
||
// 解析 data 字段中的 JSON
|
||
var dataMap map[string]interface{}
|
||
if err := json.Unmarshal([]byte(dataStr), &dataMap); err == nil {
|
||
|
||
// 从 data 中提取字段
|
||
shopID, hasShopID := dataMap["shopId"].(string)
|
||
taskID, hasTaskID := dataMap["taskId"].(string)
|
||
taskType, hasTaskType := dataMap["taskType"].(string)
|
||
|
||
// 提取 csvFileNameTimestamp
|
||
timestamp := ""
|
||
if ts, hasTimestamp := dataMap["csvFileNameTimestamp"].(string); hasTimestamp {
|
||
timestamp = ts
|
||
} else {
|
||
// 生成时间戳:年月日时分秒
|
||
timestamp = time.Now().Format("20060102150405")
|
||
}
|
||
|
||
// 如果能从 data 中提取到所有必需字段,使用特定队列名
|
||
if hasShopID && hasTaskID && hasTaskType {
|
||
// 队列名格式: shopId_taskId_年月日时分秒_tasktype_1
|
||
queueName = fmt.Sprintf("%s_%s_%s_%s_1",
|
||
shopID,
|
||
taskID,
|
||
timestamp,
|
||
taskType)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
// 收集队列名
|
||
queueNames[queueName] = true
|
||
|
||
// 将任务JSON推入List队列左侧(LPUSH)
|
||
pipe.LPush(r.ctx, queueName, taskJSON)
|
||
|
||
// 设置队列过期时间为24小时
|
||
pipe.Expire(r.ctx, queueName, 24*time.Hour)
|
||
}
|
||
|
||
// 执行批量操作
|
||
_, err := pipe.Exec(r.ctx)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// 为每个队列调用SendPublishing程序处理任务
|
||
for queueName := range queueNames {
|
||
go func(qName string) {
|
||
if _, err := r.callSendPublishing(qName); err != nil {
|
||
log.Printf("调用SendPublishing程序失败: %v, 队列: %s", err, qName)
|
||
}
|
||
}(queueName)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// CallSendPublishing 调用SendPublishing程序处理任务(公开方法)
|
||
// 返回进程句柄和错误
|
||
func (r *RedisUtils) CallSendPublishing(qName string) (*os.Process, error) {
|
||
return r.callSendPublishing(qName)
|
||
}
|
||
|
||
// callSendPublishing 调用SendPublishing程序处理任务(内部方法)
|
||
// 返回进程句柄和错误
|
||
func (r *RedisUtils) callSendPublishing(qName string) (*os.Process, error) {
|
||
log.Printf("准备启动SendPublishing程序,队列: %s", qName)
|
||
|
||
// 为每个队列名启动一个SendPublishing程序实例
|
||
// 构建SendPublishing程序路径(相对于项目根目录的父目录)
|
||
programPath := "/www/wwwroot/GetErpSendPubishing/SendPublishing"
|
||
|
||
// 构建命令行参数:传递redisKey标志
|
||
args := []string{"-redisKey", qName}
|
||
|
||
// 启动SendPublishing程序
|
||
cmd := exec.Command(programPath, args...)
|
||
|
||
// 设置输出和错误输出
|
||
cmd.Stdout = os.Stdout
|
||
cmd.Stderr = os.Stderr
|
||
|
||
// 执行命令(不等待完成,异步执行)
|
||
if err := cmd.Start(); err != nil {
|
||
log.Printf("启动SendPublishing程序失败: %v, 队列: %s", err, qName)
|
||
return nil, fmt.Errorf("启动程序失败: %w", err)
|
||
}
|
||
|
||
// 获取进程句柄
|
||
process := cmd.Process
|
||
|
||
// 保存进程句柄到映射中
|
||
r.processesLock.Lock()
|
||
r.processes[qName] = process
|
||
r.processesLock.Unlock()
|
||
|
||
log.Printf("SendPublishing程序已启动,PID: %d, 队列: %s", process.Pid, qName)
|
||
|
||
// 启动一个goroutine等待进程结束,自动清理映射
|
||
go func() {
|
||
err := cmd.Wait()
|
||
r.processesLock.Lock()
|
||
delete(r.processes, qName)
|
||
r.processesLock.Unlock()
|
||
|
||
if err != nil {
|
||
log.Printf("SendPublishing程序异常退出,PID: %d, 队列: %s, 错误: %v", process.Pid, qName, err)
|
||
} else {
|
||
log.Printf("SendPublishing程序正常退出,PID: %d, 队列: %s", process.Pid, qName)
|
||
}
|
||
}()
|
||
|
||
return process, nil
|
||
}
|
||
|
||
// GetProcess 根据队列名称获取进程句柄
|
||
func (r *RedisUtils) GetProcess(qName string) *os.Process {
|
||
r.processesLock.Lock()
|
||
defer r.processesLock.Unlock()
|
||
return r.processes[qName]
|
||
}
|
||
|
||
// GetTaskFromRedis 从Redis获取任务
|
||
func (r *RedisUtils) GetTaskFromRedis(taskKey string) (string, error) {
|
||
return r.client.HGet(r.ctx, "tasks", taskKey).Result()
|
||
}
|
||
|
||
// GetRunningTaskFromRedis 从Redis获取运行任务
|
||
func (r *RedisUtils) GetRunningTaskFromRedis(taskKey string) (string, error) {
|
||
return r.client.HGet(r.ctx, "running_tasks", taskKey).Result()
|
||
}
|
||
|
||
// GetPendingRunningTask 从待执行队列获取一个运行任务
|
||
func (r *RedisUtils) GetPendingRunningTask() (string, error) {
|
||
return r.client.RPop(r.ctx, "pending_running_tasks").Result()
|
||
}
|
||
|
||
// GetTaskCount 获取任务数量
|
||
func (r *RedisUtils) GetTaskCount() (int64, error) {
|
||
return r.client.HLen(r.ctx, "tasks").Result()
|
||
}
|
||
|
||
// GetRunningTaskCount 获取运行任务数量
|
||
func (r *RedisUtils) GetRunningTaskCount() (int64, error) {
|
||
return r.client.HLen(r.ctx, "running_tasks").Result()
|
||
}
|
||
|
||
// GetClient 获取底层Redis客户端
|
||
func (r *RedisUtils) GetClient() *redis.Client {
|
||
return r.client
|
||
}
|
||
|
||
// GetContext 获取上下文
|
||
func (r *RedisUtils) GetContext() context.Context {
|
||
return r.ctx
|
||
}
|
||
|
||
// Get Redis Get方法
|
||
func (r *RedisUtils) Get(key string) *redis.StringCmd {
|
||
return r.client.Get(r.ctx, key)
|
||
}
|
||
|
||
// Set Redis Set方法
|
||
func (r *RedisUtils) Set(key string, value interface{}, expiration time.Duration) *redis.StatusCmd {
|
||
return r.client.Set(r.ctx, key, value, expiration)
|
||
}
|
||
|
||
// Keys Redis Keys方法
|
||
func (r *RedisUtils) Keys(pattern string) *redis.StringSliceCmd {
|
||
return r.client.Keys(r.ctx, pattern)
|
||
}
|
||
|
||
// LPush Redis LPush方法
|
||
func (r *RedisUtils) LPush(key string, values ...interface{}) *redis.IntCmd {
|
||
return r.client.LPush(r.ctx, key, values...)
|
||
}
|
||
|
||
// RPush Redis RPush方法
|
||
func (r *RedisUtils) RPush(key string, values ...interface{}) *redis.IntCmd {
|
||
return r.client.RPush(r.ctx, key, values...)
|
||
}
|
||
|
||
// LPop Redis LPop方法
|
||
func (r *RedisUtils) LPop(key string) *redis.StringCmd {
|
||
return r.client.LPop(r.ctx, key)
|
||
}
|
||
|
||
// BLPop Redis BLPop方法
|
||
func (r *RedisUtils) BLPop(timeout time.Duration, keys ...string) *redis.StringSliceCmd {
|
||
return r.client.BLPop(r.ctx, timeout, keys...)
|
||
}
|
||
|
||
// LLen Redis LLen方法
|
||
func (r *RedisUtils) LLen(key string) *redis.IntCmd {
|
||
return r.client.LLen(r.ctx, key)
|
||
}
|
||
|
||
// Expire Redis Expire方法
|
||
func (r *RedisUtils) Expire(key string, expiration time.Duration) *redis.BoolCmd {
|
||
return r.client.Expire(r.ctx, key, expiration)
|
||
}
|
||
|
||
// TTL Redis TTL方法
|
||
func (r *RedisUtils) TTL(key string) *redis.DurationCmd {
|
||
return r.client.TTL(r.ctx, key)
|
||
}
|