package service import ( "encoding/json" "errors" "fmt" "planA/initialization/golabl" "planA/tool" toolPdd "planA/tool/pdd" _type "planA/type" "strconv" "time" "github.com/go-redis/redis/v8" ) // ============================================ // 任务头信息(Header)操作 // 数据结构: Hash // 键格式: {taskKey}:header // ============================================ // GetTaskHeader 获取任务头信息 // @param client Redis客户端 // @param taskKey 任务键 // @return _type.TaskHeader 任务头信息 // @return error 错误信息 func GetTaskHeader(taskKey string) (_type.TaskHeader, error) { var header _type.TaskHeader headerKey := getHeaderKey(taskKey) headerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, headerKey).Result() if err != nil { return header, err } return parseHeaderMap(headerMap) } // UpdateTaskHeader 更新任务头信息 // @param taskKey 任务键 // @param header 任务头信息 // @return error 错误信息 func UpdateTaskHeader(taskKey string, header _type.TaskHeader) error { // 将结构体转为 map headerMap, err := tool.StructToMap(header) if err != nil { return fmt.Errorf("转换Header为map失败: %w", err) } // 特殊处理price_mod字段 priceModJSON, err := json.Marshal(headerMap["price_mod"]) if err != nil { return fmt.Errorf("转换price_mod为JSON失败: %w", err) } headerMap["price_mod"] = priceModJSON // 保存到 Redis headerKey := getHeaderKey(taskKey) if err := saveHashMap(headerKey, headerMap); err != nil { return err } return nil } // UpdateHeaderStatus 更新任务头信息中的状态 // @param client Redis客户端 // @param taskKey 任务键 // @param status 任务状态(1=运行中 2=已暂停 3=已停止) // @return error 错误信息 func UpdateHeaderStatus(taskKey string, status int64) error { headerKey := getHeaderKey(taskKey) if err := golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "status", status).Err(); err != nil { return err } golabl.RedisDbA.Expire(golabl.Ctx, headerKey, golabl.RedisExp) return nil } // ============================================ // 任务体信息(Body)操作 // 数据结构: List // 键格式: {taskKey}:body_wait - 等待处理的任务队列 // {taskKey}:body_over - 已完成处理的任务队列 // ============================================ // UpdateTaskBodyWait 添加任务到等待队列 // @param taskKey 任务键 // @param taskBody 任务体数据 // @return error 错误信息 func UpdateTaskBodyWait(taskKey string, taskBody _type.TaskBody) error { bodyWaitKey := getBodyWaitKey(taskKey) // 序列化任务数据 bodyWaitJSON, err := json.Marshal(taskBody) if err != nil { return fmt.Errorf("序列化任务数据失败: %w", err) } // 推送到列表尾部 return golabl.RedisDbA.RPush(golabl.Ctx, bodyWaitKey, string(bodyWaitJSON)).Err() } // GetListLength 获取等待队列长度 // @param taskKey 任务键 // @return int64 队列长度 // @return error 错误信息 func GetListLength(taskKey string) (int64, error) { bodyWaitKey := getBodyWaitKey(taskKey) return golabl.RedisDbA.LLen(golabl.Ctx, bodyWaitKey).Result() } // GetTaskBodyOver 分页获取已完成任务 // @param taskKey 任务键 // @param page 页码 // @param size 每页数量 // @return []_type.TaskBody 任务体列表 // @return error 错误信息 func GetTaskBodyOver(taskKey string, page int, size int) ([]_type.TaskBody, int64, error) { return GetBodyOverDataByBatch(taskKey, page, size) } // GetBodyOverCount 获取已完成任务总数 // @param taskKey 任务键 // @return int64 总数 // @return error 错误信息 func GetBodyOverCount(taskKey string) (int64, error) { bodyOverKey := getBodyOverKey(taskKey) return golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result() } // GetBodyOverDataByBatch 批量获取已完成任务数据 // @param taskKey 任务键 // @param page 页 // @param size 页数量 // @return []_type.TaskBody 任务体列表 // @return int64 总数 // @return error 错误信息 func GetBodyOverDataByBatch(taskKey string, page, size int) ([]_type.TaskBody, int64, error) { var bodyOverArr []_type.TaskBody bodyOverKey := getBodyOverKey(taskKey) // 获取总数 total, err := golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result() if err != nil { if errors.Is(err, redis.Nil) { return bodyOverArr, 0, nil } return bodyOverArr, 0, fmt.Errorf("获取body_over总数错误: %w", err) } // 计算起始索引(从0开始) start := (page - 1) * size end := start + size - 1 // 如果起始索引超出范围,直接返回空数据 if start >= int(total) { return bodyOverArr, total, nil } bodyOverStr, err := golabl.RedisDbA.LRange(golabl.Ctx, bodyOverKey, int64(start), int64(end)).Result() if err != nil { if errors.Is(err, redis.Nil) { return bodyOverArr, total, nil } return bodyOverArr, 0, fmt.Errorf("获取body_over数据错误: %w", err) } list, err := parseTaskBodyList(bodyOverStr) if err != nil { return nil, 0, err } return list, total, nil } // ClearBodyOver 清空已完成任务队列 // @param client Redis客户端 // @param taskKey 任务键 // @return error 错误信息 func ClearBodyOver(taskKey string) error { return golabl.RedisDbA.Del(golabl.Ctx, getBodyOverKey(taskKey)).Err() } // ============================================ // 任务尾信息(Footer)操作 // 数据结构: Hash // 键格式: {taskKey}:footer // ============================================ // GetTaskFooter 获取任务尾信息 // @param taskKey 任务键 // @return _type.TaskFooter 任务尾信息 // @return error 错误信息 func GetTaskFooter(taskKey string) (_type.TaskFooter, error) { var footer _type.TaskFooter footerKey := getFooterKey(taskKey) footerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, footerKey).Result() if err != nil { return footer, fmt.Errorf("获取Footer失败: %w", err) } return footer, parseTaskFooter(footerMap, &footer) } // UpdateTaskFooter 更新任务尾信息 // @param taskKey 任务键 // @param footer 任务尾信息 // @return error 错误信息 func UpdateTaskFooter(taskKey string, footer *_type.TaskFooter) error { footerMap := map[string]interface{}{ "task_count": footer.TaskCount, "task_count_true": footer.TaskCountTrue, "task_count_wait": footer.TaskCountWait.Load(), "task_count_over": footer.TaskCountOver.Load(), "task_count_success": footer.TaskCountSuccess.Load(), "task_count_error": footer.TaskCountError.Load(), "task_qpm": footer.TaskQpm, "last_index": footer.LastIndex, } footerKey := getFooterKey(taskKey) if err := saveHashMap(footerKey, footerMap); err != nil { return err } return nil } // ============================================ // 导出文件(BodyFile)操作 // 数据结构: Hash // 键格式: {taskKey}:body_file // ============================================ // UpdateExportFileProgress 更新导出文件进度(每次自增+1) // @param taskKey 任务键 // @return error 错误信息 func UpdateExportFileProgress(taskKey string) error { // 使用 HIncrBy 对指定字段进行自增操作,每次增加1 return golabl.RedisDbA.HIncrBy(golabl.Ctx, getBodyFileKey(taskKey), "complete", 1).Err() } // GetExportFileProgress 获取导出文件进度 // @param taskKey 任务键 // @return int 完成进度 // @return error 错误信息 func GetExportFileProgress(taskKey string) (int, error) { return golabl.RedisDbA.HGet(golabl.Ctx, getBodyFileKey(taskKey), "complete").Int() } // ============================================ // 进程号管理操作 // 数据结构: Hash字段 // 键格式: {headerKey} // 字段: process_number // ============================================ // GetProcessId 获取进程号 // @param taskKey 键 // @return string 进程号 // @return error 错误信息 func GetProcessId(taskKey string) (string, error) { headerKey := getHeaderKey(taskKey) return golabl.RedisDbA.HGet(golabl.Ctx, headerKey, "process_number").Result() } // SetProcessId 设置进程号 // @param taskKey 键 // @param processId 进程号 // @return error 错误信息 func SetProcessId(taskKey string, processId string) error { headerKey := getHeaderKey(taskKey) golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err() golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err() return golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err() } // DeleteProcessId 删除进程号 // @param taskKey 头信息键 // @return error 错误信息 func DeleteProcessId(taskKey string) error { headerKey := getHeaderKey(taskKey) return golabl.RedisDbA.HDel(golabl.Ctx, headerKey, "process_number").Err() } // ============================================ // 复合操作(涉及多个数据结构) // ============================================ // UpdateTaskCountTrue 更新任务计数(原子操作) // 同时更新Header和Footer中的task_count_true,以及Footer中的task_count_wait // @param taskKey 任务键 // @param num 增减数量 // @return error 错误信息 func UpdateTaskCountTrue(taskKey string, num int64) error { // 使用Pipeline确保原子性 pipe := golabl.RedisDbA.Pipeline() // 更新Header headerKey := getHeaderKey(taskKey) pipe.HIncrBy(golabl.Ctx, headerKey, "task_count_true", num) // 更新Footer footerKey := getFooterKey(taskKey) pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_true", num) pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_wait", num) // 设置过期时间 pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp) pipe.Expire(golabl.Ctx, footerKey, golabl.RedisExp) return executePipeline(pipe) } // StopTask 停止任务 // 更新Header状态为已停止,并清空等待队列 // @param taskId 任务ID // @return error 错误信息 func StopTask(taskId string) error { // 开启事务 pipe := golabl.RedisDbA.TxPipeline() // 更新任务状态 headerKey := getHeaderKey(taskId) pipe.HSet(golabl.Ctx, headerKey, "status", int64(_type.TaskStatusStopped)) // 设置过期时间 pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp) // 清空等待队列 bodyWaitKey := getBodyWaitKey(taskId) pipe.Del(golabl.Ctx, bodyWaitKey) return executePipeline(pipe) } // DelTask 删除任务 // @param taskId 任务ID // @return error 错误信息 func DelTask(taskId string) error { // 开启事务 pipe := golabl.RedisDbA.TxPipeline() // 删除 header pipe.Del(golabl.Ctx, getHeaderKey(taskId)) // 删除 footer pipe.Del(golabl.Ctx, getFooterKey(taskId)) // 删除 body_wait pipe.Del(golabl.Ctx, getBodyWaitKey(taskId)) // 删除 body_over pipe.Del(golabl.Ctx, getBodyOverKey(taskId)) // 删除 body_file pipe.Del(golabl.Ctx, getBodyFileKey(taskId)) // 删除body_data pipe.Del(golabl.Ctx, getBodyDataKey(taskId)) //删除 body_backup pipe.Del(golabl.Ctx, getBodyBackupKey(taskId)) return executePipeline(pipe) } // GetBodyBackupLen 获取body_backup长度 // @param taskId 任务ID // @return int64 body_backup长度 // @return error 错误信息 func GetBodyBackupLen(taskId string) (int64, error) { return golabl.RedisDbA.LLen(golabl.Ctx, getBodyBackupKey(taskId)).Result() } // GetBodyBackupOne 读取一条body_backup数据 // @param taskId 任务ID // @return string body_backup数据 // @return error 错误信息 func GetBodyBackupOne(taskId string) (string, error) { return golabl.RedisDbA.LPop(golabl.Ctx, getBodyBackupKey(taskId)).Result() } //********************************************以下为是有方法*****************************************// // getHeaderKey 获取任务头信息的Redis键 // @param taskKey 任务键 // @return string 头信息键 func getHeaderKey(taskKey string) string { return taskKey + ":header" } // getFooterKey 获取任务尾信息的Redis键 // @param taskKey 任务键 // @return string 尾信息键 func getFooterKey(taskKey string) string { return taskKey + ":footer" } // getBodyWaitKey 获取等待任务体的Redis键 // @param taskKey 任务键 // @return string 等待任务体键 func getBodyWaitKey(taskKey string) string { return taskKey + ":body_wait" } // getBodyOverKey 获取已完成任务体的Redis键 // @param taskKey 任务键 // @return string 已完成任务体键 func getBodyOverKey(taskKey string) string { return taskKey + ":body_over" } // getBodyDataKey 获取已完成任务体的Redis键 // @param taskKey 任务键 // @return string 已完成任务体键 func getBodyDataKey(taskKey string) string { return taskKey + ":body_data" } // getBodyBackupKey 获取已完成任务体的Redis键 // @param taskKey 任务键 // @return string 已完成任务体键 func getBodyBackupKey(taskKey string) string { return taskKey + ":body_backup" } // getBodyFileKey 获取已完成任务体的Redis键 // @param taskKey 任务键 // @return string 已完成任务体键 func getBodyFileKey(taskKey string) string { return taskKey + ":body_file" } // parseHeaderMap 从map解析任务头信息 // @param headerMap 头信息map // @return _type.TaskHeader 解析后的头信息 // @return error 错误信息 func parseHeaderMap(headerMap map[string]string) (_type.TaskHeader, error) { info := _type.TaskHeader{} for key, value := range headerMap { switch key { case "last_index", "task_count", "task_count_error", "task_count_over", "task_count_success", "task_count_true", "task_count_wait", "task_create_at", "task_over_at", "task_qpm", "task_type", "img_type", "update_type": parseIntField(&info, key, value) case "price_mod": var priceMod []_type.PriceMod if err := json.Unmarshal([]byte(value), &priceMod); err == nil { info.PriceMod = priceMod } case "shop_msg": var shopMsg _type.ShopMsg if err := json.Unmarshal([]byte(value), &shopMsg); err == nil { info.ShopMsg = shopMsg } case "status": if v, err := strconv.ParseInt(value, 10, 64); err == nil { info.Status = _type.TaskStatus(v) } case "shop_id", "ship_price_mod", "shop_name", "shop_type", "task_id": setStringField(&info, key, value) } } return info, nil } // parseIntField 解析整数字段 func parseIntField(info *_type.TaskHeader, key, value string) { if v, err := strconv.ParseInt(value, 10, 64); err == nil { switch key { case "last_index": info.LastIndex = v case "task_count": info.TaskCount = v case "task_count_error": info.TaskCountError = v case "task_count_over": info.TaskCountOver = v case "task_count_success": info.TaskCountSuccess = v case "task_count_true": info.TaskCountTrue = v case "task_count_wait": info.TaskCountWait = v case "task_create_at": info.TaskCreateAt = v case "task_over_at": info.TaskOverAt = v case "task_qpm": info.TaskQpm = v case "task_type": info.TaskType = v case "img_type": info.ImgType = v case "update_type": info.UpdateType = v } } } // setStringField 设置字符串字段 func setStringField(info *_type.TaskHeader, key, value string) { switch key { case "ship_price_mod": info.ShipPriceMod = value case "shop_name": info.ShopName = value case "shop_type": info.ShopType = value case "task_id": info.TaskId = value case "shop_id": info.ShopId = value } } // saveHashMap 保存哈希映射到Redis // @param key Redis键 // @param data 数据映射 // @return error 错误信息 func saveHashMap(key string, data map[string]interface{}) error { for field, value := range data { if err := golabl.RedisDbA.HSet(golabl.Ctx, key, field, value).Err(); err != nil { return fmt.Errorf("保存字段 %s 失败: %w (值: %v)", field, err, value) } } golabl.RedisDbA.Expire(golabl.Ctx, key, golabl.RedisExp) return nil } // parseTaskBodyList 解析任务体列表 // @param bodyStrs 任务体字符串列表 // @return []_type.TaskBody 解析后的任务体列表 // @return error 错误信息 func parseTaskBodyList(bodyStrs []string) ([]_type.TaskBody, error) { var bodyList []_type.TaskBody for _, str := range bodyStrs { var body _type.TaskBody if err := json.Unmarshal([]byte(str), &body); err != nil { return bodyList, fmt.Errorf("JSON解析错误: %w, 数据: %s", err, str) } bodyList = append(bodyList, body) } return bodyList, nil } // parseTaskFooter 解析任务尾信息 // @param taskFooter 尾信息map // @param footer 目标尾信息结构体 // @return error 错误信息 func parseTaskFooter(taskFooter map[string]string, footer *_type.TaskFooter) error { var err error if footer.TaskCount, err = strconv.ParseInt(taskFooter["task_count"], 10, 64); err != nil { footer.TaskCount = 0 } if footer.TaskCountTrue, err = strconv.ParseInt(taskFooter["task_count_true"], 10, 64); err != nil { footer.TaskCountTrue = 0 } if taskCountWait, err := strconv.ParseInt(taskFooter["task_count_wait"], 10, 64); err == nil { footer.TaskCountWait.Store(taskCountWait) } if taskCountOver, err := strconv.ParseInt(taskFooter["task_count_over"], 10, 64); err == nil { footer.TaskCountOver.Store(taskCountOver) } if taskCountSuccess, err := strconv.ParseInt(taskFooter["task_count_success"], 10, 64); err == nil { footer.TaskCountSuccess.Store(taskCountSuccess) } if taskCountError, err := strconv.ParseInt(taskFooter["task_count_error"], 10, 64); err == nil { footer.TaskCountError.Store(taskCountError) } if footer.TaskQpm, err = strconv.ParseInt(taskFooter["task_qpm"], 10, 64); err != nil { footer.TaskQpm = 0 } if footer.LastIndex, err = strconv.ParseInt(taskFooter["last_index"], 10, 64); err != nil { footer.LastIndex = 0 } return nil } // executePipeline 执行Redis管道操作 // @param pipe Redis管道 // @return error 错误信息 func executePipeline(pipe redis.Pipeliner) error { _, err := pipe.Exec(golabl.Ctx) return err } // ============================================ // 其他 // ============================================ // GetPddTokenList 获取token列表 // @return []string token列表 // @return error 错误信息 func GetPddTokenList() ([]_type.Shop, error) { var shopList []_type.Shop //获取 店铺列表中所有的key iter := golabl.RedisDbC.Scan(golabl.Ctx, 0, "*", 0).Iterator() for iter.Next(golabl.Ctx) { key := iter.Val() //获取店铺信息 shopInfo, getTaskShopErr := GetTaskShop(key) if getTaskShopErr != nil { return shopList, fmt.Errorf("获取店铺信息失败:" + getTaskShopErr.Error()) } // 解析 json数据 shopData, err := toolPdd.ParseShopData(shopInfo) if err != nil { return shopList, fmt.Errorf("解析店铺数据失败:" + err.Error()) } if shopData.Shop == nil { // 没有店铺数据 continue } if shopData.Shop.ExpirationTime == "" { // 过期时间为空 continue } // 筛选出拼多多的店铺并且订阅未到期的 expirationTime, err := parseTime(shopData.Shop.ExpirationTime) if err != nil { return shopList, fmt.Errorf("时间解析失败: %s, 原始值: %s", err.Error(), shopData.Shop.ExpirationTime) } now := time.Now() if shopData.Shop.ShopType == "1" && expirationTime.After(now) && shopData.Shop.DelFlag == "0" { shopList = append(shopList, *shopData.Shop) } } return shopList, nil } // parseTime 解析时间字符串,支持多种格式 func parseTime(timeStr string) (time.Time, error) { // 定义支持的时间格式列表 layouts := []string{ time.RFC3339, // "2006-01-02T15:04:05Z07:00" "2006-01-02T15:04:05-07:00", // "2026-03-27T21:31:17+08:00" "2006-01-02 15:04:05", // "2026-04-05 23:59:59" "2006-01-02 15:04:05 -0700", // 带时区但空格分隔的格式 "2006-01-02T15:04:05", // 不带时区的T分隔格式 } for _, layout := range layouts { if t, err := time.Parse(layout, timeStr); err == nil { return t, nil } } return time.Time{}, fmt.Errorf("无法解析时间字符串") }