665 lines
19 KiB
Go
665 lines
19 KiB
Go
package service
|
||
|
||
import (
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"planA/initialization/golabl"
|
||
"planA/tool"
|
||
toolPdd "planA/tool/pdd"
|
||
_type "planA/type"
|
||
"strconv"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
// ============================================
|
||
// 任务头信息(Header)操作
|
||
// 数据结构: Hash
|
||
// 键格式: {taskKey}:header
|
||
// ============================================
|
||
|
||
// GetTaskHeader 获取任务头信息
|
||
// @param client Redis客户端
|
||
// @param taskKey 任务键
|
||
// @return _type.TaskHeader 任务头信息
|
||
// @return error 错误信息
|
||
func GetTaskHeader(taskKey string) (_type.TaskHeader, error) {
|
||
var header _type.TaskHeader
|
||
headerKey := getHeaderKey(taskKey)
|
||
headerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, headerKey).Result()
|
||
if err != nil {
|
||
return header, err
|
||
}
|
||
|
||
return parseHeaderMap(headerMap)
|
||
}
|
||
|
||
// UpdateTaskHeader 更新任务头信息
|
||
// @param taskKey 任务键
|
||
// @param header 任务头信息
|
||
// @return error 错误信息
|
||
func UpdateTaskHeader(taskKey string, header _type.TaskHeader) error {
|
||
// 将结构体转为 map
|
||
headerMap, err := tool.StructToMap(header)
|
||
if err != nil {
|
||
return fmt.Errorf("转换Header为map失败: %w", err)
|
||
}
|
||
|
||
// 特殊处理price_mod字段
|
||
priceModJSON, err := json.Marshal(headerMap["price_mod"])
|
||
if err != nil {
|
||
return fmt.Errorf("转换price_mod为JSON失败: %w", err)
|
||
}
|
||
headerMap["price_mod"] = priceModJSON
|
||
// 保存到 Redis
|
||
headerKey := getHeaderKey(taskKey)
|
||
|
||
if err := saveHashMap(headerKey, headerMap); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// UpdateHeaderStatus 更新任务头信息中的状态
|
||
// @param client Redis客户端
|
||
// @param taskKey 任务键
|
||
// @param status 任务状态(1=运行中 2=已暂停 3=已停止)
|
||
// @return error 错误信息
|
||
func UpdateHeaderStatus(taskKey string, status int64) error {
|
||
headerKey := getHeaderKey(taskKey)
|
||
if err := golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "status", status).Err(); err != nil {
|
||
return err
|
||
}
|
||
golabl.RedisDbA.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
|
||
return nil
|
||
}
|
||
|
||
// ============================================
|
||
// 任务体信息(Body)操作
|
||
// 数据结构: List
|
||
// 键格式: {taskKey}:body_wait - 等待处理的任务队列
|
||
// {taskKey}:body_over - 已完成处理的任务队列
|
||
// ============================================
|
||
|
||
// UpdateTaskBodyWait 添加任务到等待队列
|
||
// @param taskKey 任务键
|
||
// @param taskBody 任务体数据
|
||
// @return error 错误信息
|
||
func UpdateTaskBodyWait(taskKey string, taskBody _type.TaskBody) error {
|
||
bodyWaitKey := getBodyWaitKey(taskKey)
|
||
|
||
// 序列化任务数据
|
||
bodyWaitJSON, err := json.Marshal(taskBody)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化任务数据失败: %w", err)
|
||
}
|
||
|
||
// 推送到列表尾部
|
||
return golabl.RedisDbA.RPush(golabl.Ctx, bodyWaitKey, string(bodyWaitJSON)).Err()
|
||
}
|
||
|
||
// GetListLength 获取等待队列长度
|
||
// @param taskKey 任务键
|
||
// @return int64 队列长度
|
||
// @return error 错误信息
|
||
func GetListLength(taskKey string) (int64, error) {
|
||
bodyWaitKey := getBodyWaitKey(taskKey)
|
||
return golabl.RedisDbA.LLen(golabl.Ctx, bodyWaitKey).Result()
|
||
}
|
||
|
||
// GetTaskBodyOver 分页获取已完成任务
|
||
// @param taskKey 任务键
|
||
// @param page 页码
|
||
// @param size 每页数量
|
||
// @return []_type.TaskBody 任务体列表
|
||
// @return error 错误信息
|
||
func GetTaskBodyOver(taskKey string, page int, size int) ([]_type.TaskBody, int64, error) {
|
||
return GetBodyOverDataByBatch(taskKey, page, size)
|
||
}
|
||
|
||
// GetBodyOverCount 获取已完成任务总数
|
||
// @param taskKey 任务键
|
||
// @return int64 总数
|
||
// @return error 错误信息
|
||
func GetBodyOverCount(taskKey string) (int64, error) {
|
||
bodyOverKey := getBodyOverKey(taskKey)
|
||
return golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result()
|
||
}
|
||
|
||
// GetBodyOverDataByBatch 批量获取已完成任务数据
|
||
// @param taskKey 任务键
|
||
// @param page 页
|
||
// @param size 页数量
|
||
// @return []_type.TaskBody 任务体列表
|
||
// @return int64 总数
|
||
// @return error 错误信息
|
||
func GetBodyOverDataByBatch(taskKey string, page, size int) ([]_type.TaskBody, int64, error) {
|
||
var bodyOverArr []_type.TaskBody
|
||
|
||
bodyOverKey := getBodyOverKey(taskKey)
|
||
|
||
// 获取总数
|
||
total, err := golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result()
|
||
if err != nil {
|
||
if errors.Is(err, redis.Nil) {
|
||
return bodyOverArr, 0, nil
|
||
}
|
||
return bodyOverArr, 0, fmt.Errorf("获取body_over总数错误: %w", err)
|
||
}
|
||
|
||
// 计算起始索引(从0开始)
|
||
start := (page - 1) * size
|
||
end := start + size - 1
|
||
|
||
// 如果起始索引超出范围,直接返回空数据
|
||
if start >= int(total) {
|
||
return bodyOverArr, total, nil
|
||
}
|
||
|
||
bodyOverStr, err := golabl.RedisDbA.LRange(golabl.Ctx, bodyOverKey, int64(start), int64(end)).Result()
|
||
if err != nil {
|
||
if errors.Is(err, redis.Nil) {
|
||
return bodyOverArr, total, nil
|
||
}
|
||
return bodyOverArr, 0, fmt.Errorf("获取body_over数据错误: %w", err)
|
||
}
|
||
|
||
list, err := parseTaskBodyList(bodyOverStr)
|
||
if err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
return list, total, nil
|
||
}
|
||
|
||
// ClearBodyOver 清空已完成任务队列
|
||
// @param client Redis客户端
|
||
// @param taskKey 任务键
|
||
// @return error 错误信息
|
||
func ClearBodyOver(taskKey string) error {
|
||
return golabl.RedisDbA.Del(golabl.Ctx, getBodyOverKey(taskKey)).Err()
|
||
}
|
||
|
||
// ============================================
|
||
// 任务尾信息(Footer)操作
|
||
// 数据结构: Hash
|
||
// 键格式: {taskKey}:footer
|
||
// ============================================
|
||
|
||
// GetTaskFooter 获取任务尾信息
|
||
// @param taskKey 任务键
|
||
// @return _type.TaskFooter 任务尾信息
|
||
// @return error 错误信息
|
||
func GetTaskFooter(taskKey string) (_type.TaskFooter, error) {
|
||
var footer _type.TaskFooter
|
||
footerKey := getFooterKey(taskKey)
|
||
footerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, footerKey).Result()
|
||
if err != nil {
|
||
return footer, fmt.Errorf("获取Footer失败: %w", err)
|
||
}
|
||
|
||
return footer, parseTaskFooter(footerMap, &footer)
|
||
}
|
||
|
||
// UpdateTaskFooter 更新任务尾信息
|
||
// @param taskKey 任务键
|
||
// @param footer 任务尾信息
|
||
// @return error 错误信息
|
||
func UpdateTaskFooter(taskKey string, footer *_type.TaskFooter) error {
|
||
footerMap := map[string]interface{}{
|
||
"task_count": footer.TaskCount,
|
||
"task_count_true": footer.TaskCountTrue,
|
||
"task_count_wait": footer.TaskCountWait.Load(),
|
||
"task_count_over": footer.TaskCountOver.Load(),
|
||
"task_count_success": footer.TaskCountSuccess.Load(),
|
||
"task_count_error": footer.TaskCountError.Load(),
|
||
"task_qpm": footer.TaskQpm,
|
||
"last_index": footer.LastIndex,
|
||
}
|
||
|
||
footerKey := getFooterKey(taskKey)
|
||
if err := saveHashMap(footerKey, footerMap); err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// ============================================
|
||
// 导出文件(BodyFile)操作
|
||
// 数据结构: Hash
|
||
// 键格式: {taskKey}:body_file
|
||
// ============================================
|
||
|
||
// UpdateExportFileProgress 更新导出文件进度(每次自增+1)
|
||
// @param taskKey 任务键
|
||
// @return error 错误信息
|
||
func UpdateExportFileProgress(taskKey string) error {
|
||
// 使用 HIncrBy 对指定字段进行自增操作,每次增加1
|
||
return golabl.RedisDbA.HIncrBy(golabl.Ctx, getBodyFileKey(taskKey), "complete", 1).Err()
|
||
}
|
||
|
||
// GetExportFileProgress 获取导出文件进度
|
||
// @param taskKey 任务键
|
||
// @return int 完成进度
|
||
// @return error 错误信息
|
||
func GetExportFileProgress(taskKey string) (int, error) {
|
||
return golabl.RedisDbA.HGet(golabl.Ctx, getBodyFileKey(taskKey), "complete").Int()
|
||
}
|
||
|
||
// ============================================
|
||
// 进程号管理操作
|
||
// 数据结构: Hash字段
|
||
// 键格式: {headerKey}
|
||
// 字段: process_number
|
||
// ============================================
|
||
|
||
// GetProcessId 获取进程号
|
||
// @param taskKey 键
|
||
// @return string 进程号
|
||
// @return error 错误信息
|
||
func GetProcessId(taskKey string) (string, error) {
|
||
headerKey := getHeaderKey(taskKey)
|
||
return golabl.RedisDbA.HGet(golabl.Ctx, headerKey, "process_number").Result()
|
||
}
|
||
|
||
// SetProcessId 设置进程号
|
||
// @param taskKey 键
|
||
// @param processId 进程号
|
||
// @return error 错误信息
|
||
func SetProcessId(taskKey string, processId string) error {
|
||
headerKey := getHeaderKey(taskKey)
|
||
golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
|
||
golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
|
||
return golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
|
||
}
|
||
|
||
// DeleteProcessId 删除进程号
|
||
// @param taskKey 头信息键
|
||
// @return error 错误信息
|
||
func DeleteProcessId(taskKey string) error {
|
||
headerKey := getHeaderKey(taskKey)
|
||
return golabl.RedisDbA.HDel(golabl.Ctx, headerKey, "process_number").Err()
|
||
}
|
||
|
||
// ============================================
|
||
// 复合操作(涉及多个数据结构)
|
||
// ============================================
|
||
|
||
// UpdateTaskCountTrue 更新任务计数(原子操作)
|
||
// 同时更新Header和Footer中的task_count_true,以及Footer中的task_count_wait
|
||
// @param taskKey 任务键
|
||
// @param num 增减数量
|
||
// @return error 错误信息
|
||
func UpdateTaskCountTrue(taskKey string, num int64) error {
|
||
// 使用Pipeline确保原子性
|
||
pipe := golabl.RedisDbA.Pipeline()
|
||
|
||
// 更新Header
|
||
headerKey := getHeaderKey(taskKey)
|
||
pipe.HIncrBy(golabl.Ctx, headerKey, "task_count_true", num)
|
||
|
||
// 更新Footer
|
||
footerKey := getFooterKey(taskKey)
|
||
pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_true", num)
|
||
pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_wait", num)
|
||
|
||
// 设置过期时间
|
||
pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
|
||
pipe.Expire(golabl.Ctx, footerKey, golabl.RedisExp)
|
||
|
||
return executePipeline(pipe)
|
||
}
|
||
|
||
// StopTask 停止任务
|
||
// 更新Header状态为已停止,并清空等待队列
|
||
// @param taskId 任务ID
|
||
// @return error 错误信息
|
||
func StopTask(taskId string) error {
|
||
// 开启事务
|
||
pipe := golabl.RedisDbA.TxPipeline()
|
||
|
||
// 更新任务状态
|
||
headerKey := getHeaderKey(taskId)
|
||
pipe.HSet(golabl.Ctx, headerKey, "status", int64(_type.TaskStatusStopped))
|
||
|
||
// 设置过期时间
|
||
pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
|
||
|
||
// 清空等待队列
|
||
bodyWaitKey := getBodyWaitKey(taskId)
|
||
pipe.Del(golabl.Ctx, bodyWaitKey)
|
||
|
||
return executePipeline(pipe)
|
||
}
|
||
|
||
// DelTask 删除任务
|
||
// @param taskId 任务ID
|
||
// @return error 错误信息
|
||
func DelTask(taskId string) error {
|
||
// 开启事务
|
||
pipe := golabl.RedisDbA.TxPipeline()
|
||
|
||
// 删除 header
|
||
pipe.Del(golabl.Ctx, getHeaderKey(taskId))
|
||
// 删除 footer
|
||
pipe.Del(golabl.Ctx, getFooterKey(taskId))
|
||
// 删除 body_wait
|
||
pipe.Del(golabl.Ctx, getBodyWaitKey(taskId))
|
||
// 删除 body_over
|
||
pipe.Del(golabl.Ctx, getBodyOverKey(taskId))
|
||
// 删除 body_file
|
||
pipe.Del(golabl.Ctx, getBodyFileKey(taskId))
|
||
// 删除body_data
|
||
pipe.Del(golabl.Ctx, getBodyDataKey(taskId))
|
||
//删除 body_backup
|
||
pipe.Del(golabl.Ctx, getBodyBackupKey(taskId))
|
||
|
||
return executePipeline(pipe)
|
||
}
|
||
|
||
// GetBodyBackupLen 获取body_backup长度
|
||
// @param taskId 任务ID
|
||
// @return int64 body_backup长度
|
||
// @return error 错误信息
|
||
func GetBodyBackupLen(taskId string) (int64, error) {
|
||
return golabl.RedisDbA.LLen(golabl.Ctx, getBodyBackupKey(taskId)).Result()
|
||
}
|
||
|
||
// GetBodyBackupOne 读取一条body_backup数据
|
||
// @param taskId 任务ID
|
||
// @return string body_backup数据
|
||
// @return error 错误信息
|
||
func GetBodyBackupOne(taskId string) (string, error) {
|
||
return golabl.RedisDbA.LPop(golabl.Ctx, getBodyBackupKey(taskId)).Result()
|
||
}
|
||
|
||
//********************************************以下为是有方法*****************************************//
|
||
|
||
// getHeaderKey 获取任务头信息的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 头信息键
|
||
func getHeaderKey(taskKey string) string {
|
||
return taskKey + ":header"
|
||
}
|
||
|
||
// getFooterKey 获取任务尾信息的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 尾信息键
|
||
func getFooterKey(taskKey string) string {
|
||
return taskKey + ":footer"
|
||
}
|
||
|
||
// getBodyWaitKey 获取等待任务体的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 等待任务体键
|
||
func getBodyWaitKey(taskKey string) string {
|
||
return taskKey + ":body_wait"
|
||
}
|
||
|
||
// getBodyOverKey 获取已完成任务体的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 已完成任务体键
|
||
func getBodyOverKey(taskKey string) string {
|
||
return taskKey + ":body_over"
|
||
}
|
||
|
||
// getBodyDataKey 获取已完成任务体的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 已完成任务体键
|
||
func getBodyDataKey(taskKey string) string {
|
||
return taskKey + ":body_data"
|
||
}
|
||
|
||
// getBodyBackupKey 获取已完成任务体的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 已完成任务体键
|
||
func getBodyBackupKey(taskKey string) string {
|
||
return taskKey + ":body_backup"
|
||
}
|
||
|
||
// getBodyFileKey 获取已完成任务体的Redis键
|
||
// @param taskKey 任务键
|
||
// @return string 已完成任务体键
|
||
func getBodyFileKey(taskKey string) string {
|
||
return taskKey + ":body_file"
|
||
}
|
||
|
||
// parseHeaderMap 从map解析任务头信息
|
||
// @param headerMap 头信息map
|
||
// @return _type.TaskHeader 解析后的头信息
|
||
// @return error 错误信息
|
||
func parseHeaderMap(headerMap map[string]string) (_type.TaskHeader, error) {
|
||
info := _type.TaskHeader{}
|
||
|
||
for key, value := range headerMap {
|
||
switch key {
|
||
case "last_index", "task_count", "task_count_error",
|
||
"task_count_over", "task_count_success", "task_count_true",
|
||
"task_count_wait", "task_create_at", "task_over_at", "task_qpm", "task_type", "img_type", "update_type":
|
||
parseIntField(&info, key, value)
|
||
|
||
case "price_mod":
|
||
var priceMod []_type.PriceMod
|
||
if err := json.Unmarshal([]byte(value), &priceMod); err == nil {
|
||
info.PriceMod = priceMod
|
||
}
|
||
|
||
case "shop_msg":
|
||
var shopMsg _type.ShopMsg
|
||
if err := json.Unmarshal([]byte(value), &shopMsg); err == nil {
|
||
info.ShopMsg = shopMsg
|
||
}
|
||
|
||
case "status":
|
||
if v, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||
info.Status = _type.TaskStatus(v)
|
||
}
|
||
|
||
case "shop_id", "ship_price_mod", "shop_name", "shop_type", "task_id":
|
||
setStringField(&info, key, value)
|
||
}
|
||
}
|
||
return info, nil
|
||
}
|
||
|
||
// parseIntField 解析整数字段
|
||
func parseIntField(info *_type.TaskHeader, key, value string) {
|
||
if v, err := strconv.ParseInt(value, 10, 64); err == nil {
|
||
switch key {
|
||
case "last_index":
|
||
info.LastIndex = v
|
||
case "task_count":
|
||
info.TaskCount = v
|
||
case "task_count_error":
|
||
info.TaskCountError = v
|
||
case "task_count_over":
|
||
info.TaskCountOver = v
|
||
case "task_count_success":
|
||
info.TaskCountSuccess = v
|
||
case "task_count_true":
|
||
info.TaskCountTrue = v
|
||
case "task_count_wait":
|
||
info.TaskCountWait = v
|
||
case "task_create_at":
|
||
info.TaskCreateAt = v
|
||
case "task_over_at":
|
||
info.TaskOverAt = v
|
||
case "task_qpm":
|
||
info.TaskQpm = v
|
||
case "task_type":
|
||
info.TaskType = v
|
||
case "img_type":
|
||
info.ImgType = v
|
||
case "update_type":
|
||
info.UpdateType = v
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
// setStringField 设置字符串字段
|
||
func setStringField(info *_type.TaskHeader, key, value string) {
|
||
switch key {
|
||
case "ship_price_mod":
|
||
info.ShipPriceMod = value
|
||
case "shop_name":
|
||
info.ShopName = value
|
||
case "shop_type":
|
||
info.ShopType = value
|
||
case "task_id":
|
||
info.TaskId = value
|
||
case "shop_id":
|
||
info.ShopId = value
|
||
}
|
||
}
|
||
|
||
// saveHashMap 保存哈希映射到Redis
|
||
// @param key Redis键
|
||
// @param data 数据映射
|
||
// @return error 错误信息
|
||
func saveHashMap(key string, data map[string]interface{}) error {
|
||
for field, value := range data {
|
||
if err := golabl.RedisDbA.HSet(golabl.Ctx, key, field, value).Err(); err != nil {
|
||
return fmt.Errorf("保存字段 %s 失败: %w (值: %v)", field, err, value)
|
||
}
|
||
}
|
||
golabl.RedisDbA.Expire(golabl.Ctx, key, golabl.RedisExp)
|
||
return nil
|
||
}
|
||
|
||
// parseTaskBodyList 解析任务体列表
|
||
// @param bodyStrs 任务体字符串列表
|
||
// @return []_type.TaskBody 解析后的任务体列表
|
||
// @return error 错误信息
|
||
func parseTaskBodyList(bodyStrs []string) ([]_type.TaskBody, error) {
|
||
var bodyList []_type.TaskBody
|
||
|
||
for _, str := range bodyStrs {
|
||
var body _type.TaskBody
|
||
if err := json.Unmarshal([]byte(str), &body); err != nil {
|
||
return bodyList, fmt.Errorf("JSON解析错误: %w, 数据: %s", err, str)
|
||
}
|
||
bodyList = append(bodyList, body)
|
||
}
|
||
|
||
return bodyList, nil
|
||
}
|
||
|
||
// parseTaskFooter 解析任务尾信息
|
||
// @param taskFooter 尾信息map
|
||
// @param footer 目标尾信息结构体
|
||
// @return error 错误信息
|
||
func parseTaskFooter(taskFooter map[string]string, footer *_type.TaskFooter) error {
|
||
var err error
|
||
|
||
if footer.TaskCount, err = strconv.ParseInt(taskFooter["task_count"], 10, 64); err != nil {
|
||
footer.TaskCount = 0
|
||
}
|
||
|
||
if footer.TaskCountTrue, err = strconv.ParseInt(taskFooter["task_count_true"], 10, 64); err != nil {
|
||
footer.TaskCountTrue = 0
|
||
}
|
||
|
||
if taskCountWait, err := strconv.ParseInt(taskFooter["task_count_wait"], 10, 64); err == nil {
|
||
footer.TaskCountWait.Store(taskCountWait)
|
||
}
|
||
|
||
if taskCountOver, err := strconv.ParseInt(taskFooter["task_count_over"], 10, 64); err == nil {
|
||
footer.TaskCountOver.Store(taskCountOver)
|
||
}
|
||
|
||
if taskCountSuccess, err := strconv.ParseInt(taskFooter["task_count_success"], 10, 64); err == nil {
|
||
footer.TaskCountSuccess.Store(taskCountSuccess)
|
||
}
|
||
|
||
if taskCountError, err := strconv.ParseInt(taskFooter["task_count_error"], 10, 64); err == nil {
|
||
footer.TaskCountError.Store(taskCountError)
|
||
}
|
||
|
||
if footer.TaskQpm, err = strconv.ParseInt(taskFooter["task_qpm"], 10, 64); err != nil {
|
||
footer.TaskQpm = 0
|
||
}
|
||
|
||
if footer.LastIndex, err = strconv.ParseInt(taskFooter["last_index"], 10, 64); err != nil {
|
||
footer.LastIndex = 0
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// executePipeline 执行Redis管道操作
|
||
// @param pipe Redis管道
|
||
// @return error 错误信息
|
||
func executePipeline(pipe redis.Pipeliner) error {
|
||
_, err := pipe.Exec(golabl.Ctx)
|
||
return err
|
||
}
|
||
|
||
// ============================================
|
||
// 其他
|
||
// ============================================
|
||
|
||
// GetPddTokenList 获取token列表
|
||
// @return []string token列表
|
||
// @return error 错误信息
|
||
func GetPddTokenList() ([]_type.Shop, error) {
|
||
var shopList []_type.Shop
|
||
//获取 店铺列表中所有的key
|
||
iter := golabl.RedisDbC.Scan(golabl.Ctx, 0, "*", 0).Iterator()
|
||
for iter.Next(golabl.Ctx) {
|
||
key := iter.Val()
|
||
//获取店铺信息
|
||
shopInfo, getTaskShopErr := GetTaskShop(key)
|
||
if getTaskShopErr != nil {
|
||
return shopList, fmt.Errorf("获取店铺信息失败:" + getTaskShopErr.Error())
|
||
}
|
||
// 解析 json数据
|
||
shopData, err := toolPdd.ParseShopData(shopInfo)
|
||
if err != nil {
|
||
return shopList, fmt.Errorf("解析店铺数据失败:" + err.Error())
|
||
}
|
||
if shopData.Shop == nil {
|
||
// 没有店铺数据
|
||
continue
|
||
}
|
||
if shopData.Shop.ExpirationTime == "" {
|
||
// 过期时间为空
|
||
continue
|
||
}
|
||
// 筛选出拼多多的店铺并且订阅未到期的
|
||
expirationTime, err := parseTime(shopData.Shop.ExpirationTime)
|
||
if err != nil {
|
||
return shopList, fmt.Errorf("时间解析失败: %s, 原始值: %s", err.Error(), shopData.Shop.ExpirationTime)
|
||
}
|
||
|
||
now := time.Now()
|
||
if shopData.Shop.ShopType == "1" && expirationTime.After(now) && shopData.Shop.DelFlag == "0" {
|
||
shopList = append(shopList, *shopData.Shop)
|
||
}
|
||
}
|
||
return shopList, nil
|
||
}
|
||
|
||
// parseTime 解析时间字符串,支持多种格式
|
||
func parseTime(timeStr string) (time.Time, error) {
|
||
// 定义支持的时间格式列表
|
||
layouts := []string{
|
||
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
|
||
"2006-01-02T15:04:05-07:00", // "2026-03-27T21:31:17+08:00"
|
||
"2006-01-02 15:04:05", // "2026-04-05 23:59:59"
|
||
"2006-01-02 15:04:05 -0700", // 带时区但空格分隔的格式
|
||
"2006-01-02T15:04:05", // 不带时区的T分隔格式
|
||
}
|
||
|
||
for _, layout := range layouts {
|
||
if t, err := time.Parse(layout, timeStr); err == nil {
|
||
return t, nil
|
||
}
|
||
}
|
||
|
||
return time.Time{}, fmt.Errorf("无法解析时间字符串")
|
||
}
|