411 lines
14 KiB
Go
411 lines
14 KiB
Go
package logic
|
||
|
||
import (
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"planA/planB/dispatcher"
|
||
"planA/planB/initialization/config"
|
||
"planA/planB/initialization/golabl"
|
||
"planA/planB/initialization/task"
|
||
"planA/planB/modules/logs"
|
||
"planA/planB/service"
|
||
"planA/planB/tool"
|
||
planAType "planA/type"
|
||
planATypeMysql "planA/type/mysql"
|
||
redisType "planA/type/redis"
|
||
"strings"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
var Goto bool = false
|
||
|
||
// Logic 执行任务
|
||
func Logic() {
|
||
//loop:
|
||
// 开始读取待处理任务 等待任务数必须大于0
|
||
for golabl.Task.Footer.TaskCountWait.Load() > 0 {
|
||
// 任务索引
|
||
atomic.AddInt64(&golabl.Logic.TaskIndex, 1)
|
||
|
||
//TODO 在更新config方法出去后应该去除该代码 每次重新获取配置文件
|
||
if configErr := config.GetConfigSetToG(); configErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, configErr.Error())
|
||
return
|
||
}
|
||
|
||
// 使用令牌桶进行速率控制(每秒20个)
|
||
if err := golabl.Speed.Wait(golabl.Ctx); err != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("令牌桶等待失败-原因来自于:%v", err))
|
||
continue
|
||
}
|
||
|
||
//TODO 重新获取任务头尾
|
||
if taskErr := task.GetTaskHeaderAndFooterSetToG(); taskErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, taskErr.Error())
|
||
continue
|
||
}
|
||
|
||
// 如果连续读出 redisNil 的次数大于100
|
||
if atomic.LoadInt64(&golabl.Logic.RedisNilCon) > 100 {
|
||
//Goto = true
|
||
|
||
// 等待所有任务完成 暂停 5 秒
|
||
golabl.Pool.Wg.Wait()
|
||
fmt.Println("等待当前所有协程完成后 暂停5秒,如果等待的任务真的是0的话,则通知A完成任务!")
|
||
time.Sleep(5 * time.Second)
|
||
|
||
//获取任务真实的 wait数量
|
||
count, getTaskBodyWaitCountErr := service.GetTaskBodyWaitCount()
|
||
if getTaskBodyWaitCountErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务任务真实的 wait数量失败-原因来自:%v", getTaskBodyWaitCountErr))
|
||
return
|
||
}
|
||
// 如果数量真的是0,则完成任务
|
||
if count == 0 {
|
||
break
|
||
}
|
||
|
||
atomic.StoreInt64(&golabl.Logic.RedisNilCon, 0)
|
||
}
|
||
|
||
// 创建等待
|
||
golabl.Pool.Wg.Add(1)
|
||
|
||
//协程池 提交
|
||
if golabl.Task.Header.TaskType == 7 {
|
||
// 单线程执行
|
||
taskExecute()
|
||
if taskPoolErr := golabl.Pool.Pool.Submit(taskExecute); taskPoolErr != nil {
|
||
golabl.Pool.Wg.Done()
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("协程池意外-原因来自:%d", taskPoolErr))
|
||
} else {
|
||
golabl.Pool.Wg.Done()
|
||
}
|
||
} else {
|
||
// 多线程执行
|
||
if taskPoolErr := golabl.Pool.Pool.Submit(func() {
|
||
defer golabl.Pool.Wg.Done()
|
||
taskExecute()
|
||
}); taskPoolErr != nil {
|
||
golabl.Pool.Wg.Done()
|
||
}
|
||
}
|
||
|
||
// 判断 任务数是否超过1000 并且 判断是否执行到了1000的倍数
|
||
if golabl.Task.Header.TaskCountTrue > 1000 && golabl.Logic.TaskIndex%1000 == 0 {
|
||
// 更新任务头部信息
|
||
updateTaskHeaderErr := tool.UpdateTaskHeader()
|
||
if updateTaskHeaderErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("更新任务头信息失败-原因来自:%v", updateTaskHeaderErr))
|
||
}
|
||
}
|
||
}
|
||
|
||
// 等待所有任务完成
|
||
golabl.Pool.Wg.Wait()
|
||
|
||
//等待指定时间后重新执行循环
|
||
//if Goto == true {
|
||
// golabl.Logic.RedisNilCon = 0
|
||
// golabl.Logic.LastIndex = golabl.LastIndexRedisNil
|
||
// fmt.Printf("连续读出 redisNil 的次数 %v 暂停%v毫秒", golabl.Logic.RedisNilCon, golabl.Config.Server.ErrPauseTime)
|
||
// time.Sleep(time.Duration(golabl.Config.Server.ErrPauseTime) * time.Millisecond)
|
||
// goto loop
|
||
//}
|
||
|
||
// 更新任务头部信息
|
||
if updateTaskHeaderErr := tool.UpdateTaskHeader(); updateTaskHeaderErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("更新任务头信息失败-原因来自:%v", updateTaskHeaderErr))
|
||
}
|
||
|
||
// 通知 A程序任务完成
|
||
httpTaskStatusOverErr := tool.NotifyA()
|
||
if httpTaskStatusOverErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, httpTaskStatusOverErr.Error())
|
||
}
|
||
|
||
// 延迟2分钟
|
||
time.Sleep(2 * time.Minute)
|
||
}
|
||
|
||
// 任务执行
|
||
func taskExecute() {
|
||
//初始化 变量
|
||
status := golabl.BodyStatusSuccess //默认的书籍执行状态·
|
||
errorStr := "执行成功" //默认的书籍执行描述
|
||
|
||
// 获取任务信息
|
||
taskMsg, taskMsgErr := service.GetTaskToPopFromBodyWait()
|
||
|
||
if errors.Is(taskMsgErr, redis.Nil) {
|
||
//redis 读nil空+1
|
||
fmt.Printf("第 %v 次读出 Redis Nil \n", atomic.LoadInt64(&golabl.Logic.RedisNilCon))
|
||
atomic.AddInt64(&golabl.Logic.RedisNilCon, 1)
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务信息失败-原因来自:%v", taskMsgErr))
|
||
return
|
||
} else if taskMsgErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务信息失败-原因来自:%v", taskMsgErr))
|
||
return
|
||
}
|
||
|
||
//设置混合任务成功状态
|
||
if golabl.Task.Header.TaskType == 5 || golabl.Task.Header.TaskType == 9 {
|
||
switch taskMsg.Detail.Status {
|
||
case 1:
|
||
errorStr = "设置商品上架 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
case 2:
|
||
errorStr = "设置商品下架 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
case 3:
|
||
//删除商品的任务存储到 mysql中
|
||
//删除商品 {"book_info":{"isbn":"9787543982888"},"detail":{"goods_id":935670364385,"status":3}}
|
||
DelTask(taskMsg)
|
||
errorStr = "删除商品 已转转移至删除中心"
|
||
case 4:
|
||
errorStr = "修改商品库存 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
case 5:
|
||
errorStr = "修改商品价格 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
case 6:
|
||
errorStr = "发布商品 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
case 7:
|
||
errorStr = "删除并重新发布 " + errorStr
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
|
||
default:
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
}
|
||
// 更新任务信息
|
||
taskMsg.Detail.Status = status
|
||
taskMsg.Detail.Error = errorStr
|
||
} else if golabl.Task.Header.TaskType == 7 {
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
taskMsg.Detail.Status = status
|
||
if status != 1 {
|
||
taskMsg.Detail.Error = errorStr
|
||
}
|
||
} else {
|
||
//执行任务
|
||
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
|
||
// 更新任务信息
|
||
taskMsg.Detail.Status = status
|
||
taskMsg.Detail.Error = errorStr
|
||
}
|
||
|
||
//isbn 不为空的添加到body中,比如拉取店铺商品信息isbn可以返回空的
|
||
if taskMsg.BookInfo.Isbn != "" && (golabl.TaskType == "3" || golabl.TaskType == "4") {
|
||
// 添加任务到bodyOver、bodyData、bodyBackup
|
||
if addTaskToBodyOverErr := service.AddTaskToBodyOver(taskMsg, []string{}); addTaskToBodyOverErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", addTaskToBodyOverErr))
|
||
}
|
||
} else {
|
||
if taskMsg.BookInfo.Isbn == "" && taskMsg.BookInfo.BookName == "" {
|
||
taskMsg.BookInfo.BookName = "暂无书品信息"
|
||
}
|
||
// 添加任务到bodyOver、bodyData、bodyBackup
|
||
if addTaskToBodyOverErr := service.AddTaskToBodyOver(taskMsg, []string{}); addTaskToBodyOverErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", addTaskToBodyOverErr))
|
||
}
|
||
}
|
||
|
||
// 更新 footer信息
|
||
if updateTaskFooterErr := service.UpdateTaskFooter(status, 1); updateTaskFooterErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", updateTaskFooterErr))
|
||
}
|
||
|
||
// 如果错误是 店铺商品发布达到上限则暂停程序
|
||
if strings.Contains(errorStr, "店铺内发布商品总数已达到上限") {
|
||
golabl.Task.Header.LastIndex = golabl.LastIndexGoodsMaxRestriction
|
||
//暂停 B程序运行
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, "任务失败 添加到BodyOver失败-原因:店铺内发布商品总数已达到上限")
|
||
pauseTaskErr := tool.PauseTask()
|
||
if pauseTaskErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, "任务失败 添加到BodyOver失败-原因:店铺内发布商品总数已达到上限")
|
||
}
|
||
}
|
||
|
||
fmt.Println(errorStr)
|
||
}
|
||
|
||
//****************************工具**************************************//
|
||
|
||
// parseShopData 解析店铺数据
|
||
// @param shopData 店铺数据
|
||
// @return *_type.ShopInfo 店铺信息
|
||
func parseShopData(shopData string) (*planAType.ShopInfo, error) {
|
||
shopData = strings.TrimSpace(shopData)
|
||
|
||
// 直接解析为 RedisData数组
|
||
var redisData []redisType.RedisData
|
||
err := json.Unmarshal([]byte(shopData), &redisData)
|
||
if err != nil {
|
||
// 尝试另一种格式:可能是单对象而不是数组
|
||
var singleData redisType.RedisData
|
||
if singleErr := json.Unmarshal([]byte(shopData), &singleData); singleErr == nil {
|
||
redisData = []redisType.RedisData{singleData}
|
||
} else {
|
||
return nil, fmt.Errorf("JSON解析失败: %v, 原始数据: %s", err, shopData[:min(100, len(shopData))])
|
||
}
|
||
}
|
||
|
||
shopInfo := &planAType.ShopInfo{}
|
||
|
||
// 遍历所有数据,根据source_table分类
|
||
for _, item := range redisData {
|
||
switch item.SourceTable {
|
||
case "t_shop":
|
||
var shop planAType.Shop
|
||
if err := json.Unmarshal(item.Data, &shop); err == nil {
|
||
shopInfo.Shop = &shop
|
||
} else {
|
||
fmt.Printf("解析t_shop失败: %v\n", err)
|
||
}
|
||
case "t_shop_detail":
|
||
var detail planAType.ShopDetail
|
||
if err := json.Unmarshal(item.Data, &detail); err == nil {
|
||
shopInfo.ShopDetail = &detail
|
||
} else {
|
||
fmt.Printf("解析t_shop_detail失败: %v\n", err)
|
||
}
|
||
case "t_shop_context":
|
||
var context planAType.ShopContext
|
||
if err := json.Unmarshal(item.Data, &context); err == nil {
|
||
shopInfo.ShopContext = &context
|
||
} else {
|
||
fmt.Printf("解析t_shop_context失败: %v\n", err)
|
||
}
|
||
case "t_spec":
|
||
var spec planAType.Spec
|
||
if err := json.Unmarshal(item.Data, &spec); err == nil {
|
||
shopInfo.Spec = &spec
|
||
} else {
|
||
fmt.Printf("解析t_spec失败: %v\n", err)
|
||
}
|
||
case "t_price_template":
|
||
var template planAType.PriceTemplate
|
||
if err := json.Unmarshal(item.Data, &template); err == nil {
|
||
shopInfo.PriceTemplate = &template
|
||
} else {
|
||
fmt.Printf("解析t_price_template失败: %v\n", err)
|
||
}
|
||
default:
|
||
fmt.Printf("未知的source_table: %s\n", item.SourceTable)
|
||
}
|
||
}
|
||
|
||
return shopInfo, nil
|
||
}
|
||
|
||
// 调度任务
|
||
func exeTask(taskMsg planAType.TaskBody, status int64, errorStr string) (int64, string, planAType.TaskBody) {
|
||
// 任务调度
|
||
bodyOverJson, err := dispatcher.Go(taskMsg)
|
||
if err != nil {
|
||
//任务调度失败
|
||
status = golabl.BodyStatusError
|
||
errorStr = fmt.Sprintf("任务调度失败-原因来自:%v", err)
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务调度失败-原因来自:%v", err))
|
||
} else {
|
||
//任务调度成功
|
||
var bodyOver planAType.TaskBody
|
||
unmarshalErr := json.Unmarshal([]byte(bodyOverJson), &bodyOver)
|
||
if unmarshalErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("bodyOver json.Unmarshal错误-原因:%v", unmarshalErr))
|
||
}
|
||
//更新 taskMsg
|
||
taskMsg = bodyOver
|
||
}
|
||
return status, errorStr, taskMsg
|
||
}
|
||
|
||
// DelTask 删除任务
|
||
func DelTask(taskMsg planAType.TaskBody) {
|
||
//删除商品的任务存储到 mysql中
|
||
//删除商品 {"book_info":{"isbn":"9787543982888"},"detail":{"goods_id":935670364385,"status":3}}
|
||
delTask, isExistDelTask, delTaskErr := service.GetDelTaskByTaskId()
|
||
if !isExistDelTask && delTaskErr == nil {
|
||
taskCount := 0
|
||
taskCountOver := 0
|
||
sta := 0
|
||
//将header 转为json
|
||
headerByte, headerJsonErr := json.Marshal(golabl.Task.Header)
|
||
if headerJsonErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将header 转为json失败-原因来自:%v", headerJsonErr))
|
||
return
|
||
}
|
||
headerJson := string(headerByte)
|
||
currentTime := time.Now()
|
||
// 查询店铺数据
|
||
shopDataStr, getTaskShopErr := service.GetTaskShop(golabl.Task.Header.ShopId)
|
||
if getTaskShopErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("查询店铺数据失败:%v", headerJsonErr))
|
||
return
|
||
}
|
||
// 解析 json数据
|
||
shopData, parseShopDataErr := parseShopData(shopDataStr)
|
||
if parseShopDataErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("解析店铺数据失败:%v", parseShopDataErr))
|
||
return
|
||
}
|
||
userId := shopData.Shop.CreateBy
|
||
taskType := 1
|
||
//不存在 mysql任务则创建
|
||
createDelTask := planATypeMysql.DelTask{
|
||
UserID: &userId,
|
||
ShopID: &golabl.Task.Header.ShopId,
|
||
TaskID: &golabl.Task.Header.TaskId,
|
||
ShopName: &golabl.Task.Header.ShopName,
|
||
ShopType: &shopData.Shop.ShopType,
|
||
TaskCount: &taskCount,
|
||
TaskCountOver: &taskCountOver,
|
||
Status: &sta,
|
||
TaskType: &taskType,
|
||
Header: &headerJson,
|
||
CreateAt: ¤tTime,
|
||
}
|
||
var err error
|
||
delTask, err = service.CreateDelTask(createDelTask)
|
||
if err != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("创建删除任务失败-原因来自:%v", err))
|
||
return
|
||
}
|
||
} else if delTaskErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取删除任务失败-原因来自:%v", delTaskErr))
|
||
return
|
||
}
|
||
|
||
//将任务状态修改为执行中
|
||
updateDelTaskStatusToDoingErr := service.UpdateDelTaskStatusToDoing()
|
||
if updateDelTaskStatusToDoingErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将删除任务状态修改为执行中失败-原因来自:%v", updateDelTaskStatusToDoingErr))
|
||
return
|
||
}
|
||
// 将明细的删除任务转移到 mysql中
|
||
insertDelTaskDetailErr := service.InsertDelTaskDetail(delTask.ID, taskMsg)
|
||
if insertDelTaskDetailErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将明细的删除任务转移到 mysql中失败-原因来自:%v", insertDelTaskDetailErr))
|
||
return
|
||
}
|
||
// 添加删除任务数量
|
||
addDelTaskDetailCountErr := service.AddDelTaskDetailCount()
|
||
if addDelTaskDetailCountErr != nil {
|
||
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("添加删除任务数量失败-原因来自:%v", addDelTaskDetailCountErr))
|
||
return
|
||
}
|
||
}
|