daShangDao_planA/planB/logic/logic.go

411 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package logic
import (
"encoding/json"
"errors"
"fmt"
"planA/planB/dispatcher"
"planA/planB/initialization/config"
"planA/planB/initialization/golabl"
"planA/planB/initialization/task"
"planA/planB/modules/logs"
"planA/planB/service"
"planA/planB/tool"
planAType "planA/type"
planATypeMysql "planA/type/mysql"
redisType "planA/type/redis"
"strings"
"sync/atomic"
"time"
"github.com/go-redis/redis/v8"
)
var Goto bool = false
// Logic 执行任务
func Logic() {
//loop:
// 开始读取待处理任务 等待任务数必须大于0
for golabl.Task.Footer.TaskCountWait.Load() > 0 {
// 任务索引
atomic.AddInt64(&golabl.Logic.TaskIndex, 1)
//TODO 在更新config方法出去后应该去除该代码 每次重新获取配置文件
if configErr := config.GetConfigSetToG(); configErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, configErr.Error())
return
}
// 使用令牌桶进行速率控制每秒20个
if err := golabl.Speed.Wait(golabl.Ctx); err != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("令牌桶等待失败-原因来自于:%v", err))
continue
}
//TODO 重新获取任务头尾
if taskErr := task.GetTaskHeaderAndFooterSetToG(); taskErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, taskErr.Error())
continue
}
// 如果连续读出 redisNil 的次数大于100
if atomic.LoadInt64(&golabl.Logic.RedisNilCon) > 100 {
//Goto = true
// 等待所有任务完成 暂停 5 秒
golabl.Pool.Wg.Wait()
fmt.Println("等待当前所有协程完成后 暂停5秒如果等待的任务真的是0的话则通知A完成任务")
time.Sleep(5 * time.Second)
//获取任务真实的 wait数量
count, getTaskBodyWaitCountErr := service.GetTaskBodyWaitCount()
if getTaskBodyWaitCountErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务任务真实的 wait数量失败-原因来自:%v", getTaskBodyWaitCountErr))
return
}
// 如果数量真的是0则完成任务
if count == 0 {
break
}
atomic.StoreInt64(&golabl.Logic.RedisNilCon, 0)
}
// 创建等待
golabl.Pool.Wg.Add(1)
//协程池 提交
if golabl.Task.Header.TaskType == 7 {
// 单线程执行
taskExecute()
if taskPoolErr := golabl.Pool.Pool.Submit(taskExecute); taskPoolErr != nil {
golabl.Pool.Wg.Done()
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("协程池意外-原因来自:%d", taskPoolErr))
} else {
golabl.Pool.Wg.Done()
}
} else {
// 多线程执行
if taskPoolErr := golabl.Pool.Pool.Submit(func() {
defer golabl.Pool.Wg.Done()
taskExecute()
}); taskPoolErr != nil {
golabl.Pool.Wg.Done()
}
}
// 判断 任务数是否超过1000 并且 判断是否执行到了1000的倍数
if golabl.Task.Header.TaskCountTrue > 1000 && golabl.Logic.TaskIndex%1000 == 0 {
// 更新任务头部信息
updateTaskHeaderErr := tool.UpdateTaskHeader()
if updateTaskHeaderErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("更新任务头信息失败-原因来自:%v", updateTaskHeaderErr))
}
}
}
// 等待所有任务完成
golabl.Pool.Wg.Wait()
//等待指定时间后重新执行循环
//if Goto == true {
// golabl.Logic.RedisNilCon = 0
// golabl.Logic.LastIndex = golabl.LastIndexRedisNil
// fmt.Printf("连续读出 redisNil 的次数 %v 暂停%v毫秒", golabl.Logic.RedisNilCon, golabl.Config.Server.ErrPauseTime)
// time.Sleep(time.Duration(golabl.Config.Server.ErrPauseTime) * time.Millisecond)
// goto loop
//}
// 更新任务头部信息
if updateTaskHeaderErr := tool.UpdateTaskHeader(); updateTaskHeaderErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("更新任务头信息失败-原因来自:%v", updateTaskHeaderErr))
}
// 通知 A程序任务完成
httpTaskStatusOverErr := tool.NotifyA()
if httpTaskStatusOverErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, httpTaskStatusOverErr.Error())
}
// 延迟2分钟
time.Sleep(2 * time.Minute)
}
// 任务执行
func taskExecute() {
//初始化 变量
status := golabl.BodyStatusSuccess //默认的书籍执行状态·
errorStr := "执行成功" //默认的书籍执行描述
// 获取任务信息
taskMsg, taskMsgErr := service.GetTaskToPopFromBodyWait()
if errors.Is(taskMsgErr, redis.Nil) {
//redis 读nil空+1
fmt.Printf("第 %v 次读出 Redis Nil \n", atomic.LoadInt64(&golabl.Logic.RedisNilCon))
atomic.AddInt64(&golabl.Logic.RedisNilCon, 1)
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务信息失败-原因来自:%v", taskMsgErr))
return
} else if taskMsgErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取任务信息失败-原因来自:%v", taskMsgErr))
return
}
//设置混合任务成功状态
if golabl.Task.Header.TaskType == 5 || golabl.Task.Header.TaskType == 9 {
switch taskMsg.Detail.Status {
case 1:
errorStr = "设置商品上架 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
case 2:
errorStr = "设置商品下架 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
case 3:
//删除商品的任务存储到 mysql中
//删除商品 {"book_info":{"isbn":"9787543982888"},"detail":{"goods_id":935670364385,"status":3}}
DelTask(taskMsg)
errorStr = "删除商品 已转转移至删除中心"
case 4:
errorStr = "修改商品库存 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
case 5:
errorStr = "修改商品价格 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
case 6:
errorStr = "发布商品 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
case 7:
errorStr = "删除并重新发布 " + errorStr
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
default:
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
}
// 更新任务信息
taskMsg.Detail.Status = status
taskMsg.Detail.Error = errorStr
} else if golabl.Task.Header.TaskType == 7 {
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
taskMsg.Detail.Status = status
if status != 1 {
taskMsg.Detail.Error = errorStr
}
} else {
//执行任务
status, errorStr, taskMsg = exeTask(taskMsg, status, errorStr)
// 更新任务信息
taskMsg.Detail.Status = status
taskMsg.Detail.Error = errorStr
}
//isbn 不为空的添加到body中比如拉取店铺商品信息isbn可以返回空的
if taskMsg.BookInfo.Isbn != "" && (golabl.TaskType == "3" || golabl.TaskType == "4") {
// 添加任务到bodyOver、bodyData、bodyBackup
if addTaskToBodyOverErr := service.AddTaskToBodyOver(taskMsg, []string{}); addTaskToBodyOverErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", addTaskToBodyOverErr))
}
} else {
if taskMsg.BookInfo.Isbn == "" && taskMsg.BookInfo.BookName == "" {
taskMsg.BookInfo.BookName = "暂无书品信息"
}
// 添加任务到bodyOver、bodyData、bodyBackup
if addTaskToBodyOverErr := service.AddTaskToBodyOver(taskMsg, []string{}); addTaskToBodyOverErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", addTaskToBodyOverErr))
}
}
// 更新 footer信息
if updateTaskFooterErr := service.UpdateTaskFooter(status, 1); updateTaskFooterErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务失败 添加到BodyOver失败-原因:%v", updateTaskFooterErr))
}
// 如果错误是 店铺商品发布达到上限则暂停程序
if strings.Contains(errorStr, "店铺内发布商品总数已达到上限") {
golabl.Task.Header.LastIndex = golabl.LastIndexGoodsMaxRestriction
//暂停 B程序运行
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, "任务失败 添加到BodyOver失败-原因:店铺内发布商品总数已达到上限")
pauseTaskErr := tool.PauseTask()
if pauseTaskErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, "任务失败 添加到BodyOver失败-原因:店铺内发布商品总数已达到上限")
}
}
fmt.Println(errorStr)
}
//****************************工具**************************************//
// parseShopData 解析店铺数据
// @param shopData 店铺数据
// @return *_type.ShopInfo 店铺信息
func parseShopData(shopData string) (*planAType.ShopInfo, error) {
shopData = strings.TrimSpace(shopData)
// 直接解析为 RedisData数组
var redisData []redisType.RedisData
err := json.Unmarshal([]byte(shopData), &redisData)
if err != nil {
// 尝试另一种格式:可能是单对象而不是数组
var singleData redisType.RedisData
if singleErr := json.Unmarshal([]byte(shopData), &singleData); singleErr == nil {
redisData = []redisType.RedisData{singleData}
} else {
return nil, fmt.Errorf("JSON解析失败: %v, 原始数据: %s", err, shopData[:min(100, len(shopData))])
}
}
shopInfo := &planAType.ShopInfo{}
// 遍历所有数据根据source_table分类
for _, item := range redisData {
switch item.SourceTable {
case "t_shop":
var shop planAType.Shop
if err := json.Unmarshal(item.Data, &shop); err == nil {
shopInfo.Shop = &shop
} else {
fmt.Printf("解析t_shop失败: %v\n", err)
}
case "t_shop_detail":
var detail planAType.ShopDetail
if err := json.Unmarshal(item.Data, &detail); err == nil {
shopInfo.ShopDetail = &detail
} else {
fmt.Printf("解析t_shop_detail失败: %v\n", err)
}
case "t_shop_context":
var context planAType.ShopContext
if err := json.Unmarshal(item.Data, &context); err == nil {
shopInfo.ShopContext = &context
} else {
fmt.Printf("解析t_shop_context失败: %v\n", err)
}
case "t_spec":
var spec planAType.Spec
if err := json.Unmarshal(item.Data, &spec); err == nil {
shopInfo.Spec = &spec
} else {
fmt.Printf("解析t_spec失败: %v\n", err)
}
case "t_price_template":
var template planAType.PriceTemplate
if err := json.Unmarshal(item.Data, &template); err == nil {
shopInfo.PriceTemplate = &template
} else {
fmt.Printf("解析t_price_template失败: %v\n", err)
}
default:
fmt.Printf("未知的source_table: %s\n", item.SourceTable)
}
}
return shopInfo, nil
}
// 调度任务
func exeTask(taskMsg planAType.TaskBody, status int64, errorStr string) (int64, string, planAType.TaskBody) {
// 任务调度
bodyOverJson, err := dispatcher.Go(taskMsg)
if err != nil {
//任务调度失败
status = golabl.BodyStatusError
errorStr = fmt.Sprintf("任务调度失败-原因来自:%v", err)
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("任务调度失败-原因来自:%v", err))
} else {
//任务调度成功
var bodyOver planAType.TaskBody
unmarshalErr := json.Unmarshal([]byte(bodyOverJson), &bodyOver)
if unmarshalErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("bodyOver json.Unmarshal错误-原因:%v", unmarshalErr))
}
//更新 taskMsg
taskMsg = bodyOver
}
return status, errorStr, taskMsg
}
// DelTask 删除任务
func DelTask(taskMsg planAType.TaskBody) {
//删除商品的任务存储到 mysql中
//删除商品 {"book_info":{"isbn":"9787543982888"},"detail":{"goods_id":935670364385,"status":3}}
delTask, isExistDelTask, delTaskErr := service.GetDelTaskByTaskId()
if !isExistDelTask && delTaskErr == nil {
taskCount := 0
taskCountOver := 0
sta := 0
//将header 转为json
headerByte, headerJsonErr := json.Marshal(golabl.Task.Header)
if headerJsonErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将header 转为json失败-原因来自:%v", headerJsonErr))
return
}
headerJson := string(headerByte)
currentTime := time.Now()
// 查询店铺数据
shopDataStr, getTaskShopErr := service.GetTaskShop(golabl.Task.Header.ShopId)
if getTaskShopErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("查询店铺数据失败:%v", headerJsonErr))
return
}
// 解析 json数据
shopData, parseShopDataErr := parseShopData(shopDataStr)
if parseShopDataErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("解析店铺数据失败:%v", parseShopDataErr))
return
}
userId := shopData.Shop.CreateBy
taskType := 1
//不存在 mysql任务则创建
createDelTask := planATypeMysql.DelTask{
UserID: &userId,
ShopID: &golabl.Task.Header.ShopId,
TaskID: &golabl.Task.Header.TaskId,
ShopName: &golabl.Task.Header.ShopName,
ShopType: &shopData.Shop.ShopType,
TaskCount: &taskCount,
TaskCountOver: &taskCountOver,
Status: &sta,
TaskType: &taskType,
Header: &headerJson,
CreateAt: &currentTime,
}
var err error
delTask, err = service.CreateDelTask(createDelTask)
if err != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("创建删除任务失败-原因来自:%v", err))
return
}
} else if delTaskErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("获取删除任务失败-原因来自:%v", delTaskErr))
return
}
//将任务状态修改为执行中
updateDelTaskStatusToDoingErr := service.UpdateDelTaskStatusToDoing()
if updateDelTaskStatusToDoingErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将删除任务状态修改为执行中失败-原因来自:%v", updateDelTaskStatusToDoingErr))
return
}
// 将明细的删除任务转移到 mysql中
insertDelTaskDetailErr := service.InsertDelTaskDetail(delTask.ID, taskMsg)
if insertDelTaskDetailErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("将明细的删除任务转移到 mysql中失败-原因来自:%v", insertDelTaskDetailErr))
return
}
// 添加删除任务数量
addDelTaskDetailCountErr := service.AddDelTaskDetailCount()
if addDelTaskDetailCountErr != nil {
tool.LoggingMiddleware(logs.LOG_LEVEL_ERROR, fmt.Sprintf("添加删除任务数量失败-原因来自:%v", addDelTaskDetailCountErr))
return
}
}