package service import ( "errors" "log" "psi/models" "time" "gorm.io/gorm" ) const ( TaskTypePurchasePush = "purchase_push" TaskTypeSyncProvider = "sync_provider" TaskTypeSyncWarehouse = "sync_warehouse" TaskTypeSyncGoods = "sync_goods" TaskStatusRunning = "running" TaskStatusCompleted = "completed" TaskStatusCancelled = "cancelled" TaskStatusFailed = "failed" ) // EnsureNoRunningTask 检查是否存在该类型的运行中任务,存在则返回错误 func EnsureNoRunningTask(db *gorm.DB, taskType string) error { var count int64 db.Model(&models.WangdianSyncTask{}). Where("task_type = ? AND status = ?", taskType, TaskStatusRunning). Count(&count) if count > 0 { return errors.New("该类型的同步任务正在执行中,请等待完成后重试") } return nil } // CreateSyncTask 创建同步任务 func CreateSyncTask(db *gorm.DB, taskType string) *models.WangdianSyncTask { now := time.Now().Unix() task := &models.WangdianSyncTask{ TaskType: taskType, Status: TaskStatusRunning, Progress: 0, Total: 0, StartedAt: now, CreatedAt: now, UpdatedAt: now, } db.Create(task) return task } // UpdateSyncTaskProgress 更新任务进度 func UpdateSyncTaskProgress(db *gorm.DB, taskID int64, progress, total int) { db.Model(&models.WangdianSyncTask{}). Where("id = ?", taskID). Updates(map[string]interface{}{ "progress": progress, "total": total, "updated_at": time.Now().Unix(), }) } // CompleteSyncTask 完成任务(成功) func CompleteSyncTask(db *gorm.DB, taskID int64) { now := time.Now().Unix() db.Model(&models.WangdianSyncTask{}). Where("id = ?", taskID). Updates(map[string]interface{}{ "status": TaskStatusCompleted, "finished_at": now, "updated_at": now, }) } // FailSyncTask 完成任务(失败) func FailSyncTask(db *gorm.DB, taskID int64, errMsg string) { now := time.Now().Unix() db.Model(&models.WangdianSyncTask{}). Where("id = ?", taskID). Updates(map[string]interface{}{ "status": TaskStatusFailed, "error_msg": errMsg, "finished_at": now, "updated_at": now, }) } // CancelSyncTask 取消任务 func CancelSyncTask(db *gorm.DB, taskID int64) error { var task models.WangdianSyncTask if err := db.Where("id = ?", taskID).First(&task).Error; err != nil { return err } if task.Status != TaskStatusRunning { return errors.New("只能取消运行中的任务") } now := time.Now().Unix() db.Model(&models.WangdianSyncTask{}). Where("id = ?", taskID). Updates(map[string]interface{}{ "status": TaskStatusCancelled, "finished_at": now, "updated_at": now, }) return nil } // GetSyncTaskStatus 获取任务状态 func GetSyncTaskStatus(db *gorm.DB, taskType string) (*models.WangdianSyncTask, error) { var task models.WangdianSyncTask err := db.Where("task_type = ?", taskType). Order("id DESC").First(&task).Error if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { return nil, nil } return nil, err } return &task, nil } // WrapSyncTask 包装同步任务:检查→创建→执行→完成/失败,返回初始任务ID // fn 传入 taskID 和 db,由外部执行实际同步逻辑 func WrapSyncTask(taskType string, db *gorm.DB, fn func(taskID int64, db *gorm.DB)) (int64, error) { if err := EnsureNoRunningTask(db, taskType); err != nil { return 0, err } task := CreateSyncTask(db, taskType) go func() { defer func() { if r := recover(); r != nil { errMsg := "panic" if err, ok := r.(error); ok { errMsg = err.Error() } log.Printf("[旺店通任务] 任务%d(%s) panic: %v", task.ID, taskType, r) FailSyncTask(db, task.ID, errMsg) } }() fn(task.ID, db) }() return task.ID, nil } // ResetRunningTasks 重置所有运行中的任务为失败状态(用于服务重启后清理) func ResetRunningTasks(db *gorm.DB) { if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { return } now := time.Now().Unix() result := db.Model(&models.WangdianSyncTask{}). Where("status = ?", TaskStatusRunning). Updates(map[string]interface{}{ "status": TaskStatusFailed, "error_msg": "服务重启,任务已重置", "finished_at": now, "updated_at": now, }) if result.Error == nil && result.RowsAffected > 0 { log.Printf("[旺店通任务] 服务重启, 已重置 %d 个运行中的任务为失败状态", result.RowsAffected) } } // ListSyncTask 获取同步任务列表 func ListSyncTask(db *gorm.DB, taskType string, pageNo, pageSize int) ([]models.WangdianSyncTask, int64, error) { var tasks []models.WangdianSyncTask var total int64 query := db.Model(&models.WangdianSyncTask{}) if taskType != "" { query = query.Where("task_type = ?", taskType) } if err := query.Count(&total).Error; err != nil { return nil, 0, err } if pageNo <= 0 { pageNo = 1 } if pageSize <= 0 { pageSize = 20 } offset := (pageNo - 1) * pageSize if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error; err != nil { return nil, 0, err } return tasks, total, nil }