195 lines
5.0 KiB
Go
195 lines
5.0 KiB
Go
package service
|
||
|
||
import (
|
||
"errors"
|
||
"log"
|
||
"psi/models"
|
||
"time"
|
||
|
||
"gorm.io/gorm"
|
||
)
|
||
|
||
const (
|
||
TaskTypePurchasePush = "purchase_push"
|
||
TaskTypeSyncProvider = "sync_provider"
|
||
TaskTypeSyncWarehouse = "sync_warehouse"
|
||
TaskTypeSyncGoods = "sync_goods"
|
||
|
||
TaskStatusRunning = "running"
|
||
TaskStatusCompleted = "completed"
|
||
TaskStatusCancelled = "cancelled"
|
||
TaskStatusFailed = "failed"
|
||
)
|
||
|
||
// EnsureNoRunningTask 检查是否存在该类型的运行中任务,存在则返回错误
|
||
func EnsureNoRunningTask(db *gorm.DB, taskType string) error {
|
||
var count int64
|
||
db.Model(&models.WangdianSyncTask{}).
|
||
Where("task_type = ? AND status = ?", taskType, TaskStatusRunning).
|
||
Count(&count)
|
||
if count > 0 {
|
||
return errors.New("该类型的同步任务正在执行中,请等待完成后重试")
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// CreateSyncTask 创建同步任务
|
||
func CreateSyncTask(db *gorm.DB, taskType string) *models.WangdianSyncTask {
|
||
now := time.Now().Unix()
|
||
task := &models.WangdianSyncTask{
|
||
TaskType: taskType,
|
||
Status: TaskStatusRunning,
|
||
Progress: 0,
|
||
Total: 0,
|
||
StartedAt: now,
|
||
CreatedAt: now,
|
||
UpdatedAt: now,
|
||
}
|
||
db.Create(task)
|
||
return task
|
||
}
|
||
|
||
// UpdateSyncTaskProgress 更新任务进度
|
||
func UpdateSyncTaskProgress(db *gorm.DB, taskID int64, progress, total int) {
|
||
db.Model(&models.WangdianSyncTask{}).
|
||
Where("id = ?", taskID).
|
||
Updates(map[string]interface{}{
|
||
"progress": progress,
|
||
"total": total,
|
||
"updated_at": time.Now().Unix(),
|
||
})
|
||
}
|
||
|
||
// CompleteSyncTask 完成任务(成功)
|
||
func CompleteSyncTask(db *gorm.DB, taskID int64) {
|
||
now := time.Now().Unix()
|
||
db.Model(&models.WangdianSyncTask{}).
|
||
Where("id = ?", taskID).
|
||
Updates(map[string]interface{}{
|
||
"status": TaskStatusCompleted,
|
||
"finished_at": now,
|
||
"updated_at": now,
|
||
})
|
||
}
|
||
|
||
// FailSyncTask 完成任务(失败)
|
||
func FailSyncTask(db *gorm.DB, taskID int64, errMsg string) {
|
||
now := time.Now().Unix()
|
||
db.Model(&models.WangdianSyncTask{}).
|
||
Where("id = ?", taskID).
|
||
Updates(map[string]interface{}{
|
||
"status": TaskStatusFailed,
|
||
"error_msg": errMsg,
|
||
"finished_at": now,
|
||
"updated_at": now,
|
||
})
|
||
}
|
||
|
||
// CancelSyncTask 取消任务
|
||
func CancelSyncTask(db *gorm.DB, taskID int64) error {
|
||
var task models.WangdianSyncTask
|
||
if err := db.Where("id = ?", taskID).First(&task).Error; err != nil {
|
||
return err
|
||
}
|
||
if task.Status != TaskStatusRunning {
|
||
return errors.New("只能取消运行中的任务")
|
||
}
|
||
now := time.Now().Unix()
|
||
db.Model(&models.WangdianSyncTask{}).
|
||
Where("id = ?", taskID).
|
||
Updates(map[string]interface{}{
|
||
"status": TaskStatusCancelled,
|
||
"finished_at": now,
|
||
"updated_at": now,
|
||
})
|
||
return nil
|
||
}
|
||
|
||
// GetSyncTaskStatus 获取任务状态
|
||
func GetSyncTaskStatus(db *gorm.DB, taskType string) (*models.WangdianSyncTask, error) {
|
||
var task models.WangdianSyncTask
|
||
err := db.Where("task_type = ?", taskType).
|
||
Order("id DESC").First(&task).Error
|
||
if err != nil {
|
||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||
return nil, nil
|
||
}
|
||
return nil, err
|
||
}
|
||
return &task, nil
|
||
}
|
||
|
||
// WrapSyncTask 包装同步任务:检查→创建→执行→完成/失败,返回初始任务ID
|
||
// fn 传入 taskID 和 db,由外部执行实际同步逻辑
|
||
func WrapSyncTask(taskType string, db *gorm.DB, fn func(taskID int64, db *gorm.DB)) (int64, error) {
|
||
if err := EnsureNoRunningTask(db, taskType); err != nil {
|
||
return 0, err
|
||
}
|
||
|
||
task := CreateSyncTask(db, taskType)
|
||
|
||
go func() {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
errMsg := "panic"
|
||
if err, ok := r.(error); ok {
|
||
errMsg = err.Error()
|
||
}
|
||
log.Printf("[旺店通任务] 任务%d(%s) panic: %v", task.ID, taskType, r)
|
||
FailSyncTask(db, task.ID, errMsg)
|
||
}
|
||
}()
|
||
|
||
fn(task.ID, db)
|
||
}()
|
||
|
||
return task.ID, nil
|
||
}
|
||
|
||
// ResetRunningTasks 重置所有运行中的任务为失败状态(用于服务重启后清理)
|
||
func ResetRunningTasks(db *gorm.DB) {
|
||
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
|
||
return
|
||
}
|
||
now := time.Now().Unix()
|
||
result := db.Model(&models.WangdianSyncTask{}).
|
||
Where("status = ?", TaskStatusRunning).
|
||
Updates(map[string]interface{}{
|
||
"status": TaskStatusFailed,
|
||
"error_msg": "服务重启,任务已重置",
|
||
"finished_at": now,
|
||
"updated_at": now,
|
||
})
|
||
if result.Error == nil && result.RowsAffected > 0 {
|
||
log.Printf("[旺店通任务] 服务重启, 已重置 %d 个运行中的任务为失败状态", result.RowsAffected)
|
||
}
|
||
}
|
||
|
||
// ListSyncTask 获取同步任务列表
|
||
func ListSyncTask(db *gorm.DB, taskType string, pageNo, pageSize int) ([]models.WangdianSyncTask, int64, error) {
|
||
var tasks []models.WangdianSyncTask
|
||
var total int64
|
||
|
||
query := db.Model(&models.WangdianSyncTask{})
|
||
if taskType != "" {
|
||
query = query.Where("task_type = ?", taskType)
|
||
}
|
||
|
||
if err := query.Count(&total).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
|
||
if pageNo <= 0 {
|
||
pageNo = 1
|
||
}
|
||
if pageSize <= 0 {
|
||
pageSize = 20
|
||
}
|
||
offset := (pageNo - 1) * pageSize
|
||
|
||
if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error; err != nil {
|
||
return nil, 0, err
|
||
}
|
||
return tasks, total, nil
|
||
}
|