daShangDao_psiServer/service/wangdian_sync_task.go

195 lines
5.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"errors"
"log"
"psi/models"
"time"
"gorm.io/gorm"
)
const (
TaskTypePurchasePush = "purchase_push"
TaskTypeSyncProvider = "sync_provider"
TaskTypeSyncWarehouse = "sync_warehouse"
TaskTypeSyncGoods = "sync_goods"
TaskStatusRunning = "running"
TaskStatusCompleted = "completed"
TaskStatusCancelled = "cancelled"
TaskStatusFailed = "failed"
)
// EnsureNoRunningTask 检查是否存在该类型的运行中任务,存在则返回错误
func EnsureNoRunningTask(db *gorm.DB, taskType string) error {
var count int64
db.Model(&models.WangdianSyncTask{}).
Where("task_type = ? AND status = ?", taskType, TaskStatusRunning).
Count(&count)
if count > 0 {
return errors.New("该类型的同步任务正在执行中,请等待完成后重试")
}
return nil
}
// CreateSyncTask 创建同步任务
func CreateSyncTask(db *gorm.DB, taskType string) *models.WangdianSyncTask {
now := time.Now().Unix()
task := &models.WangdianSyncTask{
TaskType: taskType,
Status: TaskStatusRunning,
Progress: 0,
Total: 0,
StartedAt: now,
CreatedAt: now,
UpdatedAt: now,
}
db.Create(task)
return task
}
// UpdateSyncTaskProgress 更新任务进度
func UpdateSyncTaskProgress(db *gorm.DB, taskID int64, progress, total int) {
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"progress": progress,
"total": total,
"updated_at": time.Now().Unix(),
})
}
// CompleteSyncTask 完成任务(成功)
func CompleteSyncTask(db *gorm.DB, taskID int64) {
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusCompleted,
"finished_at": now,
"updated_at": now,
})
}
// FailSyncTask 完成任务(失败)
func FailSyncTask(db *gorm.DB, taskID int64, errMsg string) {
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusFailed,
"error_msg": errMsg,
"finished_at": now,
"updated_at": now,
})
}
// CancelSyncTask 取消任务
func CancelSyncTask(db *gorm.DB, taskID int64) error {
var task models.WangdianSyncTask
if err := db.Where("id = ?", taskID).First(&task).Error; err != nil {
return err
}
if task.Status != TaskStatusRunning {
return errors.New("只能取消运行中的任务")
}
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusCancelled,
"finished_at": now,
"updated_at": now,
})
return nil
}
// GetSyncTaskStatus 获取任务状态
func GetSyncTaskStatus(db *gorm.DB, taskType string) (*models.WangdianSyncTask, error) {
var task models.WangdianSyncTask
err := db.Where("task_type = ?", taskType).
Order("id DESC").First(&task).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &task, nil
}
// WrapSyncTask 包装同步任务:检查→创建→执行→完成/失败返回初始任务ID
// fn 传入 taskID 和 db由外部执行实际同步逻辑
func WrapSyncTask(taskType string, db *gorm.DB, fn func(taskID int64, db *gorm.DB)) (int64, error) {
if err := EnsureNoRunningTask(db, taskType); err != nil {
return 0, err
}
task := CreateSyncTask(db, taskType)
go func() {
defer func() {
if r := recover(); r != nil {
errMsg := "panic"
if err, ok := r.(error); ok {
errMsg = err.Error()
}
log.Printf("[旺店通任务] 任务%d(%s) panic: %v", task.ID, taskType, r)
FailSyncTask(db, task.ID, errMsg)
}
}()
fn(task.ID, db)
}()
return task.ID, nil
}
// ResetRunningTasks 重置所有运行中的任务为失败状态(用于服务重启后清理)
func ResetRunningTasks(db *gorm.DB) {
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
return
}
now := time.Now().Unix()
result := db.Model(&models.WangdianSyncTask{}).
Where("status = ?", TaskStatusRunning).
Updates(map[string]interface{}{
"status": TaskStatusFailed,
"error_msg": "服务重启,任务已重置",
"finished_at": now,
"updated_at": now,
})
if result.Error == nil && result.RowsAffected > 0 {
log.Printf("[旺店通任务] 服务重启, 已重置 %d 个运行中的任务为失败状态", result.RowsAffected)
}
}
// ListSyncTask 获取同步任务列表
func ListSyncTask(db *gorm.DB, taskType string, pageNo, pageSize int) ([]models.WangdianSyncTask, int64, error) {
var tasks []models.WangdianSyncTask
var total int64
query := db.Model(&models.WangdianSyncTask{})
if taskType != "" {
query = query.Where("task_type = ?", taskType)
}
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
if pageNo <= 0 {
pageNo = 1
}
if pageSize <= 0 {
pageSize = 20
}
offset := (pageNo - 1) * pageSize
if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error; err != nil {
return nil, 0, err
}
return tasks, total, nil
}