daShangDao_planA/service/task.go

665 lines
19 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"encoding/json"
"errors"
"fmt"
"planA/initialization/golabl"
"planA/tool"
toolPdd "planA/tool/pdd"
_type "planA/type"
"strconv"
"time"
"github.com/go-redis/redis/v8"
)
// ============================================
// 任务头信息(Header)操作
// 数据结构: Hash
// 键格式: {taskKey}:header
// ============================================
// GetTaskHeader 获取任务头信息
// @param client Redis客户端
// @param taskKey 任务键
// @return _type.TaskHeader 任务头信息
// @return error 错误信息
func GetTaskHeader(taskKey string) (_type.TaskHeader, error) {
var header _type.TaskHeader
headerKey := getHeaderKey(taskKey)
headerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, headerKey).Result()
if err != nil {
return header, err
}
return parseHeaderMap(headerMap)
}
// UpdateTaskHeader 更新任务头信息
// @param taskKey 任务键
// @param header 任务头信息
// @return error 错误信息
func UpdateTaskHeader(taskKey string, header _type.TaskHeader) error {
// 将结构体转为 map
headerMap, err := tool.StructToMap(header)
if err != nil {
return fmt.Errorf("转换Header为map失败: %w", err)
}
// 特殊处理price_mod字段
priceModJSON, err := json.Marshal(headerMap["price_mod"])
if err != nil {
return fmt.Errorf("转换price_mod为JSON失败: %w", err)
}
headerMap["price_mod"] = priceModJSON
// 保存到 Redis
headerKey := getHeaderKey(taskKey)
if err := saveHashMap(headerKey, headerMap); err != nil {
return err
}
return nil
}
// UpdateHeaderStatus 更新任务头信息中的状态
// @param client Redis客户端
// @param taskKey 任务键
// @param status 任务状态1=运行中 2=已暂停 3=已停止)
// @return error 错误信息
func UpdateHeaderStatus(taskKey string, status int64) error {
headerKey := getHeaderKey(taskKey)
if err := golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "status", status).Err(); err != nil {
return err
}
golabl.RedisDbA.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
return nil
}
// ============================================
// 任务体信息(Body)操作
// 数据结构: List
// 键格式: {taskKey}:body_wait - 等待处理的任务队列
// {taskKey}:body_over - 已完成处理的任务队列
// ============================================
// UpdateTaskBodyWait 添加任务到等待队列
// @param taskKey 任务键
// @param taskBody 任务体数据
// @return error 错误信息
func UpdateTaskBodyWait(taskKey string, taskBody _type.TaskBody) error {
bodyWaitKey := getBodyWaitKey(taskKey)
// 序列化任务数据
bodyWaitJSON, err := json.Marshal(taskBody)
if err != nil {
return fmt.Errorf("序列化任务数据失败: %w", err)
}
// 推送到列表尾部
return golabl.RedisDbA.RPush(golabl.Ctx, bodyWaitKey, string(bodyWaitJSON)).Err()
}
// GetListLength 获取等待队列长度
// @param taskKey 任务键
// @return int64 队列长度
// @return error 错误信息
func GetListLength(taskKey string) (int64, error) {
bodyWaitKey := getBodyWaitKey(taskKey)
return golabl.RedisDbA.LLen(golabl.Ctx, bodyWaitKey).Result()
}
// GetTaskBodyOver 分页获取已完成任务
// @param taskKey 任务键
// @param page 页码
// @param size 每页数量
// @return []_type.TaskBody 任务体列表
// @return error 错误信息
func GetTaskBodyOver(taskKey string, page int, size int) ([]_type.TaskBody, int64, error) {
return GetBodyOverDataByBatch(taskKey, page, size)
}
// GetBodyOverCount 获取已完成任务总数
// @param taskKey 任务键
// @return int64 总数
// @return error 错误信息
func GetBodyOverCount(taskKey string) (int64, error) {
bodyOverKey := getBodyOverKey(taskKey)
return golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result()
}
// GetBodyOverDataByBatch 批量获取已完成任务数据
// @param taskKey 任务键
// @param page 页
// @param size 页数量
// @return []_type.TaskBody 任务体列表
// @return int64 总数
// @return error 错误信息
func GetBodyOverDataByBatch(taskKey string, page, size int) ([]_type.TaskBody, int64, error) {
var bodyOverArr []_type.TaskBody
bodyOverKey := getBodyOverKey(taskKey)
// 获取总数
total, err := golabl.RedisDbA.LLen(golabl.Ctx, bodyOverKey).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return bodyOverArr, 0, nil
}
return bodyOverArr, 0, fmt.Errorf("获取body_over总数错误: %w", err)
}
// 计算起始索引从0开始
start := (page - 1) * size
end := start + size - 1
// 如果起始索引超出范围,直接返回空数据
if start >= int(total) {
return bodyOverArr, total, nil
}
bodyOverStr, err := golabl.RedisDbA.LRange(golabl.Ctx, bodyOverKey, int64(start), int64(end)).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return bodyOverArr, total, nil
}
return bodyOverArr, 0, fmt.Errorf("获取body_over数据错误: %w", err)
}
list, err := parseTaskBodyList(bodyOverStr)
if err != nil {
return nil, 0, err
}
return list, total, nil
}
// ClearBodyOver 清空已完成任务队列
// @param client Redis客户端
// @param taskKey 任务键
// @return error 错误信息
func ClearBodyOver(taskKey string) error {
return golabl.RedisDbA.Del(golabl.Ctx, getBodyOverKey(taskKey)).Err()
}
// ============================================
// 任务尾信息(Footer)操作
// 数据结构: Hash
// 键格式: {taskKey}:footer
// ============================================
// GetTaskFooter 获取任务尾信息
// @param taskKey 任务键
// @return _type.TaskFooter 任务尾信息
// @return error 错误信息
func GetTaskFooter(taskKey string) (_type.TaskFooter, error) {
var footer _type.TaskFooter
footerKey := getFooterKey(taskKey)
footerMap, err := golabl.RedisDbA.HGetAll(golabl.Ctx, footerKey).Result()
if err != nil {
return footer, fmt.Errorf("获取Footer失败: %w", err)
}
return footer, parseTaskFooter(footerMap, &footer)
}
// UpdateTaskFooter 更新任务尾信息
// @param taskKey 任务键
// @param footer 任务尾信息
// @return error 错误信息
func UpdateTaskFooter(taskKey string, footer *_type.TaskFooter) error {
footerMap := map[string]interface{}{
"task_count": footer.TaskCount,
"task_count_true": footer.TaskCountTrue,
"task_count_wait": footer.TaskCountWait.Load(),
"task_count_over": footer.TaskCountOver.Load(),
"task_count_success": footer.TaskCountSuccess.Load(),
"task_count_error": footer.TaskCountError.Load(),
"task_qpm": footer.TaskQpm,
"last_index": footer.LastIndex,
}
footerKey := getFooterKey(taskKey)
if err := saveHashMap(footerKey, footerMap); err != nil {
return err
}
return nil
}
// ============================================
// 导出文件(BodyFile)操作
// 数据结构: Hash
// 键格式: {taskKey}:body_file
// ============================================
// UpdateExportFileProgress 更新导出文件进度(每次自增+1
// @param taskKey 任务键
// @return error 错误信息
func UpdateExportFileProgress(taskKey string) error {
// 使用 HIncrBy 对指定字段进行自增操作每次增加1
return golabl.RedisDbA.HIncrBy(golabl.Ctx, getBodyFileKey(taskKey), "complete", 1).Err()
}
// GetExportFileProgress 获取导出文件进度
// @param taskKey 任务键
// @return int 完成进度
// @return error 错误信息
func GetExportFileProgress(taskKey string) (int, error) {
return golabl.RedisDbA.HGet(golabl.Ctx, getBodyFileKey(taskKey), "complete").Int()
}
// ============================================
// 进程号管理操作
// 数据结构: Hash字段
// 键格式: {headerKey}
// 字段: process_number
// ============================================
// GetProcessId 获取进程号
// @param taskKey 键
// @return string 进程号
// @return error 错误信息
func GetProcessId(taskKey string) (string, error) {
headerKey := getHeaderKey(taskKey)
return golabl.RedisDbA.HGet(golabl.Ctx, headerKey, "process_number").Result()
}
// SetProcessId 设置进程号
// @param taskKey 键
// @param processId 进程号
// @return error 错误信息
func SetProcessId(taskKey string, processId string) error {
headerKey := getHeaderKey(taskKey)
golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
return golabl.RedisDbA.HSet(golabl.Ctx, headerKey, "process_number", processId).Err()
}
// DeleteProcessId 删除进程号
// @param taskKey 头信息键
// @return error 错误信息
func DeleteProcessId(taskKey string) error {
headerKey := getHeaderKey(taskKey)
return golabl.RedisDbA.HDel(golabl.Ctx, headerKey, "process_number").Err()
}
// ============================================
// 复合操作(涉及多个数据结构)
// ============================================
// UpdateTaskCountTrue 更新任务计数(原子操作)
// 同时更新Header和Footer中的task_count_true以及Footer中的task_count_wait
// @param taskKey 任务键
// @param num 增减数量
// @return error 错误信息
func UpdateTaskCountTrue(taskKey string, num int64) error {
// 使用Pipeline确保原子性
pipe := golabl.RedisDbA.Pipeline()
// 更新Header
headerKey := getHeaderKey(taskKey)
pipe.HIncrBy(golabl.Ctx, headerKey, "task_count_true", num)
// 更新Footer
footerKey := getFooterKey(taskKey)
pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_true", num)
pipe.HIncrBy(golabl.Ctx, footerKey, "task_count_wait", num)
// 设置过期时间
pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
pipe.Expire(golabl.Ctx, footerKey, golabl.RedisExp)
return executePipeline(pipe)
}
// StopTask 停止任务
// 更新Header状态为已停止并清空等待队列
// @param taskId 任务ID
// @return error 错误信息
func StopTask(taskId string) error {
// 开启事务
pipe := golabl.RedisDbA.TxPipeline()
// 更新任务状态
headerKey := getHeaderKey(taskId)
pipe.HSet(golabl.Ctx, headerKey, "status", int64(_type.TaskStatusStopped))
// 设置过期时间
pipe.Expire(golabl.Ctx, headerKey, golabl.RedisExp)
// 清空等待队列
bodyWaitKey := getBodyWaitKey(taskId)
pipe.Del(golabl.Ctx, bodyWaitKey)
return executePipeline(pipe)
}
// DelTask 删除任务
// @param taskId 任务ID
// @return error 错误信息
func DelTask(taskId string) error {
// 开启事务
pipe := golabl.RedisDbA.TxPipeline()
// 删除 header
pipe.Del(golabl.Ctx, getHeaderKey(taskId))
// 删除 footer
pipe.Del(golabl.Ctx, getFooterKey(taskId))
// 删除 body_wait
pipe.Del(golabl.Ctx, getBodyWaitKey(taskId))
// 删除 body_over
pipe.Del(golabl.Ctx, getBodyOverKey(taskId))
// 删除 body_file
pipe.Del(golabl.Ctx, getBodyFileKey(taskId))
// 删除body_data
pipe.Del(golabl.Ctx, getBodyDataKey(taskId))
//删除 body_backup
pipe.Del(golabl.Ctx, getBodyBackupKey(taskId))
return executePipeline(pipe)
}
// GetBodyBackupLen 获取body_backup长度
// @param taskId 任务ID
// @return int64 body_backup长度
// @return error 错误信息
func GetBodyBackupLen(taskId string) (int64, error) {
return golabl.RedisDbA.LLen(golabl.Ctx, getBodyBackupKey(taskId)).Result()
}
// GetBodyBackupOne 读取一条body_backup数据
// @param taskId 任务ID
// @return string body_backup数据
// @return error 错误信息
func GetBodyBackupOne(taskId string) (string, error) {
return golabl.RedisDbA.LPop(golabl.Ctx, getBodyBackupKey(taskId)).Result()
}
//********************************************以下为是有方法*****************************************//
// getHeaderKey 获取任务头信息的Redis键
// @param taskKey 任务键
// @return string 头信息键
func getHeaderKey(taskKey string) string {
return taskKey + ":header"
}
// getFooterKey 获取任务尾信息的Redis键
// @param taskKey 任务键
// @return string 尾信息键
func getFooterKey(taskKey string) string {
return taskKey + ":footer"
}
// getBodyWaitKey 获取等待任务体的Redis键
// @param taskKey 任务键
// @return string 等待任务体键
func getBodyWaitKey(taskKey string) string {
return taskKey + ":body_wait"
}
// getBodyOverKey 获取已完成任务体的Redis键
// @param taskKey 任务键
// @return string 已完成任务体键
func getBodyOverKey(taskKey string) string {
return taskKey + ":body_over"
}
// getBodyDataKey 获取已完成任务体的Redis键
// @param taskKey 任务键
// @return string 已完成任务体键
func getBodyDataKey(taskKey string) string {
return taskKey + ":body_data"
}
// getBodyBackupKey 获取已完成任务体的Redis键
// @param taskKey 任务键
// @return string 已完成任务体键
func getBodyBackupKey(taskKey string) string {
return taskKey + ":body_backup"
}
// getBodyFileKey 获取已完成任务体的Redis键
// @param taskKey 任务键
// @return string 已完成任务体键
func getBodyFileKey(taskKey string) string {
return taskKey + ":body_file"
}
// parseHeaderMap 从map解析任务头信息
// @param headerMap 头信息map
// @return _type.TaskHeader 解析后的头信息
// @return error 错误信息
func parseHeaderMap(headerMap map[string]string) (_type.TaskHeader, error) {
info := _type.TaskHeader{}
for key, value := range headerMap {
switch key {
case "last_index", "task_count", "task_count_error",
"task_count_over", "task_count_success", "task_count_true",
"task_count_wait", "task_create_at", "task_over_at", "task_qpm", "task_type", "img_type", "update_type":
parseIntField(&info, key, value)
case "price_mod":
var priceMod []_type.PriceMod
if err := json.Unmarshal([]byte(value), &priceMod); err == nil {
info.PriceMod = priceMod
}
case "shop_msg":
var shopMsg _type.ShopMsg
if err := json.Unmarshal([]byte(value), &shopMsg); err == nil {
info.ShopMsg = shopMsg
}
case "status":
if v, err := strconv.ParseInt(value, 10, 64); err == nil {
info.Status = _type.TaskStatus(v)
}
case "shop_id", "ship_price_mod", "shop_name", "shop_type", "task_id":
setStringField(&info, key, value)
}
}
return info, nil
}
// parseIntField 解析整数字段
func parseIntField(info *_type.TaskHeader, key, value string) {
if v, err := strconv.ParseInt(value, 10, 64); err == nil {
switch key {
case "last_index":
info.LastIndex = v
case "task_count":
info.TaskCount = v
case "task_count_error":
info.TaskCountError = v
case "task_count_over":
info.TaskCountOver = v
case "task_count_success":
info.TaskCountSuccess = v
case "task_count_true":
info.TaskCountTrue = v
case "task_count_wait":
info.TaskCountWait = v
case "task_create_at":
info.TaskCreateAt = v
case "task_over_at":
info.TaskOverAt = v
case "task_qpm":
info.TaskQpm = v
case "task_type":
info.TaskType = v
case "img_type":
info.ImgType = v
case "update_type":
info.UpdateType = v
}
}
}
// setStringField 设置字符串字段
func setStringField(info *_type.TaskHeader, key, value string) {
switch key {
case "ship_price_mod":
info.ShipPriceMod = value
case "shop_name":
info.ShopName = value
case "shop_type":
info.ShopType = value
case "task_id":
info.TaskId = value
case "shop_id":
info.ShopId = value
}
}
// saveHashMap 保存哈希映射到Redis
// @param key Redis键
// @param data 数据映射
// @return error 错误信息
func saveHashMap(key string, data map[string]interface{}) error {
for field, value := range data {
if err := golabl.RedisDbA.HSet(golabl.Ctx, key, field, value).Err(); err != nil {
return fmt.Errorf("保存字段 %s 失败: %w (值: %v)", field, err, value)
}
}
golabl.RedisDbA.Expire(golabl.Ctx, key, golabl.RedisExp)
return nil
}
// parseTaskBodyList 解析任务体列表
// @param bodyStrs 任务体字符串列表
// @return []_type.TaskBody 解析后的任务体列表
// @return error 错误信息
func parseTaskBodyList(bodyStrs []string) ([]_type.TaskBody, error) {
var bodyList []_type.TaskBody
for _, str := range bodyStrs {
var body _type.TaskBody
if err := json.Unmarshal([]byte(str), &body); err != nil {
return bodyList, fmt.Errorf("JSON解析错误: %w, 数据: %s", err, str)
}
bodyList = append(bodyList, body)
}
return bodyList, nil
}
// parseTaskFooter 解析任务尾信息
// @param taskFooter 尾信息map
// @param footer 目标尾信息结构体
// @return error 错误信息
func parseTaskFooter(taskFooter map[string]string, footer *_type.TaskFooter) error {
var err error
if footer.TaskCount, err = strconv.ParseInt(taskFooter["task_count"], 10, 64); err != nil {
footer.TaskCount = 0
}
if footer.TaskCountTrue, err = strconv.ParseInt(taskFooter["task_count_true"], 10, 64); err != nil {
footer.TaskCountTrue = 0
}
if taskCountWait, err := strconv.ParseInt(taskFooter["task_count_wait"], 10, 64); err == nil {
footer.TaskCountWait.Store(taskCountWait)
}
if taskCountOver, err := strconv.ParseInt(taskFooter["task_count_over"], 10, 64); err == nil {
footer.TaskCountOver.Store(taskCountOver)
}
if taskCountSuccess, err := strconv.ParseInt(taskFooter["task_count_success"], 10, 64); err == nil {
footer.TaskCountSuccess.Store(taskCountSuccess)
}
if taskCountError, err := strconv.ParseInt(taskFooter["task_count_error"], 10, 64); err == nil {
footer.TaskCountError.Store(taskCountError)
}
if footer.TaskQpm, err = strconv.ParseInt(taskFooter["task_qpm"], 10, 64); err != nil {
footer.TaskQpm = 0
}
if footer.LastIndex, err = strconv.ParseInt(taskFooter["last_index"], 10, 64); err != nil {
footer.LastIndex = 0
}
return nil
}
// executePipeline 执行Redis管道操作
// @param pipe Redis管道
// @return error 错误信息
func executePipeline(pipe redis.Pipeliner) error {
_, err := pipe.Exec(golabl.Ctx)
return err
}
// ============================================
// 其他
// ============================================
// GetPddTokenList 获取token列表
// @return []string token列表
// @return error 错误信息
func GetPddTokenList() ([]_type.Shop, error) {
var shopList []_type.Shop
//获取 店铺列表中所有的key
iter := golabl.RedisDbC.Scan(golabl.Ctx, 0, "*", 0).Iterator()
for iter.Next(golabl.Ctx) {
key := iter.Val()
//获取店铺信息
shopInfo, getTaskShopErr := GetTaskShop(key)
if getTaskShopErr != nil {
return shopList, fmt.Errorf("获取店铺信息失败:" + getTaskShopErr.Error())
}
// 解析 json数据
shopData, err := toolPdd.ParseShopData(shopInfo)
if err != nil {
return shopList, fmt.Errorf("解析店铺数据失败:" + err.Error())
}
if shopData.Shop == nil {
// 没有店铺数据
continue
}
if shopData.Shop.ExpirationTime == "" {
// 过期时间为空
continue
}
// 筛选出拼多多的店铺并且订阅未到期的
expirationTime, err := parseTime(shopData.Shop.ExpirationTime)
if err != nil {
return shopList, fmt.Errorf("时间解析失败: %s, 原始值: %s", err.Error(), shopData.Shop.ExpirationTime)
}
now := time.Now()
if shopData.Shop.ShopType == "1" && expirationTime.After(now) && shopData.Shop.DelFlag == "0" {
shopList = append(shopList, *shopData.Shop)
}
}
return shopList, nil
}
// parseTime 解析时间字符串,支持多种格式
func parseTime(timeStr string) (time.Time, error) {
// 定义支持的时间格式列表
layouts := []string{
time.RFC3339, // "2006-01-02T15:04:05Z07:00"
"2006-01-02T15:04:05-07:00", // "2026-03-27T21:31:17+08:00"
"2006-01-02 15:04:05", // "2026-04-05 23:59:59"
"2006-01-02 15:04:05 -0700", // 带时区但空格分隔的格式
"2006-01-02T15:04:05", // 不带时区的T分隔格式
}
for _, layout := range layouts {
if t, err := time.Parse(layout, timeStr); err == nil {
return t, nil
}
}
return time.Time{}, fmt.Errorf("无法解析时间字符串")
}