From bb6d91cbd0c316ddf1b8cb2f298947173933404b Mon Sep 17 00:00:00 2001 From: "97694732@qq.com" Date: Sat, 27 Jun 2026 16:53:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=20=E6=97=BA=E5=BA=97?= =?UTF-8?q?=E9=80=9A=E7=9A=84=E4=BB=93=E5=BA=93=E3=80=81=E8=B4=A7=E5=93=81?= =?UTF-8?q?=E3=80=81=E4=BE=9B=E5=BA=94=E5=95=86=E7=9A=84=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E5=8F=8A=E9=87=87=E8=B4=AD=E5=8D=95=E7=9A=84=E5=88=9B=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- config.yaml | 70 +++++ config/config.go | 32 +- controllers/wangdian.go | 185 +++++++++--- database/mysql.go | 5 + database/tenant.go | 39 +++ models/wangdian_sync_task.go | 23 ++ routes/routes.go | 8 +- service/wangdian.go | 540 ++++++++++++++++++++++++++++++---- service/wangdian_sync_task.go | 194 ++++++++++++ 9 files changed, 976 insertions(+), 120 deletions(-) create mode 100644 config.yaml create mode 100644 models/wangdian_sync_task.go create mode 100644 service/wangdian_sync_task.go diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..1d8fa0d --- /dev/null +++ b/config.yaml @@ -0,0 +1,70 @@ +server: + port: "9090" +# host: "https://psi.api.buzhiyushu.cn/" + host: "http://192.168.101.213:9090/" + +database: + host: 175.27.224.66 + port: "3306" + user: root + password: 5e07c0eec1770c94 + name: psi + encrypt_key: "0123456789abcdef0123456789abcdef" + +task_database: + host: nj-cynosdbmysql-grp-1v6vxn5f.sql.tencentcdb.com + port: "26247" + user: root + password: Long6166@@ + name: task + +jwt: + secret: "0123456789abcdef0123456789abcdef" + expire_hours: 24 + +api_sign: + app_key: "psi" + app_secret: "psi_api_sign_secret" + client_id: "psi" + sign_method: "md5" + timestamp_tolerance: 300 + +log: + max_age: 600 + rotate_time: 600 + root_path: "./runtime/logs" + channel: + sql: "/sql/err.log" + work: "/work/err.log" + request: "/request/err.log" + es: "/es/err.log" + redis: "/redis/err.log" + +es: + host: "http://36.212.12.92:9527" + index: "books-from-mysql-v2" + username: "elastic" + password: "+Tz5qR_KushZ-bPgZ_H-" + +ocr: + service_url: "http://127.0.0.1:35569/ocr" + exe_url: "./ocr/OCRService.exe" + +external_api: + # sync_product_url: "http://192.168.101.127:8080/zhishu/filterSet/save" + sync_product_url: "https://api.buzhiyushu.cn/zhishu/filterSet/save" + es_update_book_url: "https://book.center.yushutx.com/api/es/updateBookFieldsByISBN" + # sync_task_url: "http://192.168.101.156:8080/task/create" + sync_task_url: "http://36.212.7.246:8283/task/create" + # sync_task_body_url: "http://192.168.101.156:8080/task/setTaskBody" + sync_task_body_url: "http://36.212.7.246:8283/task/setTaskBody" + timeout: 30 + +wangdian: + host: "api.wangdian.cn" + sandbox_host: "sandbox.wangdian.cn" + sandbox: true + sid: "apidevnew2" + appkey: "skxz2-test" + appsecret: "85bf423bb" + timeout: 30 \ No newline at end of file diff --git a/config/config.go b/config/config.go index ac5b931..f3524e4 100644 --- a/config/config.go +++ b/config/config.go @@ -76,19 +76,25 @@ type ExternalAPIConfig struct { } type WangdianConfig struct { - URL string `yaml:"url"` - SandboxURL string `yaml:"sandbox_url"` - ProviderQueryURL string `yaml:"provider_query_url"` - ProviderQuerySandbox string `yaml:"provider_query_sandbox"` - WarehouseQueryURL string `yaml:"warehouse_query_url"` - WarehouseQuerySandbox string `yaml:"warehouse_query_sandbox"` - GoodsQueryURL string `yaml:"goods_query_url"` - GoodsQuerySandbox string `yaml:"goods_query_sandbox"` - Sandbox bool `yaml:"sandbox"` - Sid string `yaml:"sid"` - AppKey string `yaml:"appkey"` - AppSecret string `yaml:"appsecret"` - Timeout int `yaml:"timeout"` + Host string `yaml:"host"` + SandboxHost string `yaml:"sandbox_host"` + Sandbox bool `yaml:"sandbox"` + Sid string `yaml:"sid"` + AppKey string `yaml:"appkey"` + AppSecret string `yaml:"appsecret"` + Timeout int `yaml:"timeout"` +} + +// GetURL 根据 sandbox 标志和 API 名称构建完整请求地址 +func (c *WangdianConfig) GetURL(apiName string) string { + host := c.Host + if c.Sandbox && c.SandboxHost != "" { + host = c.SandboxHost + } + if host == "" { + host = "api.wangdian.cn" + } + return fmt.Sprintf("https://%s/openapi2/%s", host, apiName) } var AppConfig *Config diff --git a/controllers/wangdian.go b/controllers/wangdian.go index 495f42e..8727af4 100644 --- a/controllers/wangdian.go +++ b/controllers/wangdian.go @@ -1,6 +1,8 @@ package controllers import ( + "fmt" + "log" "net/http" "psi/constant" "psi/database" @@ -9,6 +11,7 @@ import ( "strconv" "github.com/gin-gonic/gin" + "gorm.io/gorm" ) type WangdianApi struct{} @@ -37,79 +40,169 @@ func (i *WangdianApi) CreatePurchaseOrder(c *gin.Context) { } func (i *WangdianApi) QueryProvider(c *gin.Context) { - column := c.Query("column") - providerNo := c.Query("provider_no") - providerName := c.Query("provider_name") - pageSizeStr := c.Query("page_size") - pageNoStr := c.Query("page_no") + db := database.GetDB(c) - pageSize, _ := strconv.Atoi(pageSizeStr) - pageNo, _ := strconv.Atoi(pageNoStr) - - resp, err := service.QueryProvider(column, providerNo, providerName, pageSize, pageNo) + taskID, err := service.WrapSyncTask(service.TaskTypeSyncProvider, db, func(taskID int64, db *gorm.DB) { + if err := service.SyncProvider(db, taskID); err != nil { + service.FailSyncTask(db, taskID, err.Error()) + log.Printf("[旺店通供应商同步] 异步同步失败: %v", err) + } + }) if err != nil { - utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通供应商失败: "+err.Error(), err, c, nil) + utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil) return } c.JSON(http.StatusOK, gin.H{ - "code": resp.Code, - "message": resp.Message, - "total_count": resp.TotalCount, - "provider_list": resp.ProviderList, + "code": 0, + "message": "供应商同步任务已启动", + "task_id": taskID, }) } func (i *WangdianApi) QueryWarehouse(c *gin.Context) { - warehouseNo := c.Query("warehouse_no") - pageSizeStr := c.Query("page_size") - pageNoStr := c.Query("page_no") - isDisabled := c.Query("is_disabled") + db := database.GetDB(c) - pageSize, _ := strconv.Atoi(pageSizeStr) - pageNo, _ := strconv.Atoi(pageNoStr) - - resp, err := service.QueryWarehouse(warehouseNo, 0, 0, pageSize, pageNo, isDisabled) + taskID, err := service.WrapSyncTask(service.TaskTypeSyncWarehouse, db, func(taskID int64, db *gorm.DB) { + if err := service.SyncWarehouse(db, taskID); err != nil { + service.FailSyncTask(db, taskID, err.Error()) + log.Printf("[旺店通仓库同步] 异步同步失败: %v", err) + } + }) if err != nil { - utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通仓库失败: "+err.Error(), err, c, nil) + utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil) return } c.JSON(http.StatusOK, gin.H{ - "code": resp.Code, - "message": resp.Message, - "total_count": resp.TotalCount, - "warehouses": resp.Warehouses, + "code": 0, + "message": "仓库同步任务已启动", + "task_id": taskID, }) } func (i *WangdianApi) QueryGoods(c *gin.Context) { - specNo := c.Query("spec_no") - goodsNo := c.Query("goods_no") - brandNo := c.Query("brand_no") - className := c.Query("class_name") - barcode := c.Query("barcode") + db := database.GetDB(c) startTime := c.Query("start_time") endTime := c.Query("end_time") - pageSizeStr := c.Query("page_size") - pageNoStr := c.Query("page_no") + aboutID := c.GetInt64("about_id") - pageSize, _ := strconv.Atoi(pageSizeStr) - if pageSize <= 0 { - pageSize = 40 - } - pageNo, _ := strconv.Atoi(pageNoStr) - - resp, err := service.QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime, 0, pageSize, pageNo) + taskID, err := service.WrapSyncTask(service.TaskTypeSyncGoods, db, func(taskID int64, db *gorm.DB) { + if err := service.SyncGoods(db, taskID, startTime, endTime, aboutID); err != nil { + service.FailSyncTask(db, taskID, err.Error()) + log.Printf("[旺店通商品同步] 异步同步失败: %v", err) + } + }) if err != nil { - utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通商品失败: "+err.Error(), err, c, nil) + utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil) return } c.JSON(http.StatusOK, gin.H{ - "code": resp.Code, - "message": resp.Message, - "total_count": resp.TotalCount, - "goods_list": resp.GoodsList, + "code": 0, + "message": "商品同步任务已启动", + "task_id": taskID, + }) +} + +func (i *WangdianApi) GoodsPush(c *gin.Context) { + goodsListJSON := c.PostForm("goods_list") + if goodsListJSON == "" { + goodsListJSON = c.Query("goods_list") + } + if goodsListJSON == "" { + utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: goods_list 不能为空", fmt.Errorf("goods_list为空"), c, nil) + return + } + + resp, err := service.PushGoods(goodsListJSON) + if err != nil { + utils.FailWithRequestLog(constant.LoggerChannelWork, "推送货品到旺店通失败: "+err.Error(), err, c, nil) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": resp.Code, + "message": resp.Message, + }) +} + +func (i *WangdianApi) GetSyncTaskStatus(c *gin.Context) { + db := database.GetDB(c) + taskType := c.Query("task_type") + if taskType == "" { + taskType = c.Param("task_type") + } + if taskType == "" { + utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: task_type 不能为空", nil, c, nil) + return + } + + task, err := service.GetSyncTaskStatus(db, taskType) + if err != nil { + utils.FailWithRequestLog(constant.LoggerChannelWork, "查询任务状态失败: "+err.Error(), err, c, nil) + return + } + + if task == nil { + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "暂无任务记录", + "data": nil, + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "ok", + "data": task, + }) +} + +func (i *WangdianApi) CancelSyncTask(c *gin.Context) { + db := database.GetDB(c) + taskIDStr := c.PostForm("task_id") + if taskIDStr == "" { + taskIDStr = c.Query("task_id") + } + taskID, err := strconv.ParseInt(taskIDStr, 10, 64) + if err != nil || taskID <= 0 { + utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: task_id 必须为正整数", nil, c, nil) + return + } + + if err := service.CancelSyncTask(db, taskID); err != nil { + utils.FailWithRequestLog(constant.LoggerChannelWork, "取消任务失败: "+err.Error(), err, c, nil) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "任务已取消", + }) +} + +func (i *WangdianApi) ListSyncTask(c *gin.Context) { + db := database.GetDB(c) + taskType := c.Query("task_type") + pageNo, _ := strconv.Atoi(c.DefaultQuery("page_no", "1")) + pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20")) + + tasks, total, err := service.ListSyncTask(db, taskType, pageNo, pageSize) + if err != nil { + utils.FailWithRequestLog(constant.LoggerChannelWork, "查询同步任务列表失败: "+err.Error(), err, c, nil) + return + } + + c.JSON(http.StatusOK, gin.H{ + "code": 0, + "message": "ok", + "data": gin.H{ + "list": tasks, + "total": total, + "page_no": pageNo, + "page_size": pageSize, + }, }) } diff --git a/database/mysql.go b/database/mysql.go index 126d836..6a608e1 100644 --- a/database/mysql.go +++ b/database/mysql.go @@ -276,6 +276,11 @@ func Init() { log.Fatal("Config表迁移失败:", err) } + err = DB.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{}) + if err != nil { + log.Fatal("WangdianSyncTask表迁移失败:", err) + } + err = DB.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='统计表'").AutoMigrate(&models.Statist{}) if err != nil { log.Fatal("Statist表迁移失败:", err) diff --git a/database/tenant.go b/database/tenant.go index 2f237b8..2c7c5b0 100644 --- a/database/tenant.go +++ b/database/tenant.go @@ -322,6 +322,11 @@ func migrateTenantTables(db *gorm.DB) { log.Printf("Config表迁移警告: %v", err) } + err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{}) + if err != nil { + log.Printf("WangdianSyncTask表迁移警告: %v", err) + } + err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{}) if err != nil { log.Printf("DashboardDailyStat表迁移警告: %v", err) @@ -333,6 +338,9 @@ func migrateTenantTables(db *gorm.DB) { } log.Println("租户业务表迁移完成") + + // 重置旺店通运行中的任务(服务重启后清理) + resetWangdianRunningTasks(db) //tableOptions := "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" // //modelsToMigrate := []interface{}{ @@ -440,4 +448,35 @@ func migrateIncrementalTenantTables(db *gorm.DB) { log.Println("UserDailyStat表增量迁移成功") } } + + if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { + err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{}) + if err != nil { + log.Printf("WangdianSyncTask表增量迁移警告: %v", err) + } else { + log.Println("WangdianSyncTask表增量迁移成功") + } + } + + // 重置旺店通运行中的任务(服务重启后清理) + resetWangdianRunningTasks(db) +} + +// resetWangdianRunningTasks 重置旺店通运行中的任务为失败状态(用于服务重启后清理) +func resetWangdianRunningTasks(db *gorm.DB) { + if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { + return + } + now := time.Now().Unix() + result := db.Model(&models.WangdianSyncTask{}). + Where("status = ?", "running"). + Updates(map[string]interface{}{ + "status": "failed", + "error_msg": "服务重启,任务已重置", + "finished_at": now, + "updated_at": now, + }) + if result.Error == nil && result.RowsAffected > 0 { + log.Printf("[数据库] 服务重启, 已重置 %d 个旺店通运行中的任务为失败状态", result.RowsAffected) + } } diff --git a/models/wangdian_sync_task.go b/models/wangdian_sync_task.go new file mode 100644 index 0000000..9c886b3 --- /dev/null +++ b/models/wangdian_sync_task.go @@ -0,0 +1,23 @@ +package models + +// WangdianSyncTask 旺店通同步任务表 +type WangdianSyncTask struct { + ID int64 `json:"id" gorm:"primarykey;comment:自增ID"` + TaskType string `json:"task_type" gorm:"size:30;not null;default:'';index;comment:任务类型(purchase_push/sync_provider/sync_warehouse/sync_goods)"` + Status string `json:"status" gorm:"size:20;not null;default:'running';comment:状态(running/completed/cancelled/failed)"` + Progress int `json:"progress" gorm:"not null;default:0;comment:已处理数量"` + Total int `json:"total" gorm:"not null;default:0;comment:总数量"` + ErrorMsg string `json:"error_msg" gorm:"type:text;comment:错误信息"` + StartedAt int64 `json:"started_at" gorm:"type:bigint;not null;default:0;comment:开始时间"` + FinishedAt int64 `json:"finished_at" gorm:"type:bigint;not null;default:0;comment:完成时间"` + CreatedAt int64 `json:"created_at" gorm:"type:bigint;not null;default:0;comment:创建时间"` + UpdatedAt int64 `json:"updated_at" gorm:"type:bigint;not null;default:0;comment:更新时间"` +} + +func (WangdianSyncTask) TableName() string { + return "wangdian_sync_task" +} + +func (WangdianSyncTask) TableOptions() string { + return "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'" +} diff --git a/routes/routes.go b/routes/routes.go index 2d43748..66f19d1 100644 --- a/routes/routes.go +++ b/routes/routes.go @@ -127,8 +127,12 @@ func initRouter() (r *gin.Engine) { { authOnly.POST("/wangdian/purchase-order-push", wangdianApi.CreatePurchaseOrder) // 推送采购单到旺店通 authOnly.GET("/wangdian/query-provider", wangdianApi.QueryProvider) // 查询旺店通供应商 - authOnly.GET("/wangdian/query-warehouse", wangdianApi.QueryWarehouse) // 查询旺店通仓库 - authOnly.GET("/wangdian/query-goods", wangdianApi.QueryGoods) // 查询旺店通商品 + authOnly.GET("/wangdian/query-warehouse", wangdianApi.QueryWarehouse) // 同步旺店通仓库 + authOnly.GET("/wangdian/query-goods", wangdianApi.QueryGoods) // 同步旺店通商品 + authOnly.POST("/wangdian/goods-push", wangdianApi.GoodsPush) // 推送货品到旺店通 + authOnly.GET("/wangdian/sync-status/:task_type", wangdianApi.GetSyncTaskStatus) // 查询同步任务状态 + authOnly.POST("/wangdian/sync-cancel", wangdianApi.CancelSyncTask) // 取消同步任务 + authOnly.GET("/wangdian/sync-task-list", wangdianApi.ListSyncTask) // 同步任务列表 } // 需要认证的接口 diff --git a/service/wangdian.go b/service/wangdian.go index 903bc0d..c24428c 100644 --- a/service/wangdian.go +++ b/service/wangdian.go @@ -11,6 +11,7 @@ import ( "strconv" "time" + "gorm.io/datatypes" "gorm.io/gorm" ) @@ -67,15 +68,18 @@ type WangdianWarehouseQueryResponse struct { Warehouses []map[string]interface{} `json:"warehouses"` } -func wangdianURL(sandboxURL, productionURL string) string { - cfg := config.AppConfig.Wangdian - if cfg.Sandbox && sandboxURL != "" { - return sandboxURL - } - if productionURL != "" { - return productionURL - } - return productionURL +const ( + wangdianPrefix = "【旺店通】" + apiPurchasePush = "purchase_order_push.php" + apiProviderQuery = "purchase_provider_query.php" + apiWarehouseQuery = "warehouse_query.php" + apiGoodsQuery = "goods_query.php" + apiGoodsPush = "goods_push.php" +) + +type WangdianGoodsPushResponse struct { + Code int `json:"code"` + Message string `json:"message"` } // QueryProvider 查询旺店通供应商 @@ -107,11 +111,7 @@ func QueryProvider(column, providerNo, providerName string, pageSize, pageNo int } params["sign"] = utils.WangdianSign(params, cfg.AppSecret) - url := wangdianURL(cfg.ProviderQuerySandbox, cfg.ProviderQueryURL) - if url == "" { - url = "https://api.wangdian.cn/openapi2/purchase_provider_query.php" - } - + url := cfg.GetURL(apiProviderQuery) timeout := cfg.Timeout if timeout <= 0 { timeout = 30 @@ -133,35 +133,11 @@ func QueryProvider(column, providerNo, providerName string, pageSize, pageNo int return &resp, nil } -// QueryWarehouse 查询旺店通仓库 -func QueryWarehouse(warehouseNo string, warehouseType int, subType int, pageSize, pageNo int, isDisabled string) (*WangdianWarehouseQueryResponse, error) { +// SyncWarehouse 从旺店通同步仓库数据到本地 +func SyncWarehouse(db *gorm.DB, taskID int64) error { cfg := config.AppConfig.Wangdian if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" { - return nil, fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)") - } - - params := map[string]string{ - "sid": cfg.Sid, - "appkey": cfg.AppKey, - "timestamp": strconv.FormatInt(time.Now().Unix(), 10), - } - if warehouseNo != "" { - params["warehouse_no"] = warehouseNo - } - if pageSize > 0 { - params["page_size"] = strconv.Itoa(pageSize) - } - if pageNo > 0 { - params["page_no"] = strconv.Itoa(pageNo) - } - if isDisabled != "" { - params["is_disabled"] = isDisabled - } - params["sign"] = utils.WangdianSign(params, cfg.AppSecret) - - url := wangdianURL(cfg.WarehouseQuerySandbox, cfg.WarehouseQueryURL) - if url == "" { - url = "https://api.wangdian.cn/openapi2/warehouse_query.php" + return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)") } timeout := cfg.Timeout @@ -169,20 +145,236 @@ func QueryWarehouse(warehouseNo string, warehouseType int, subType int, pageSize timeout = 30 } - log.Printf("[旺店通仓库查询] 请求URL: %s (sandbox=%v), 参数: sid=%s appkey=%s timestamp=%s sign=%s", url, cfg.Sandbox, cfg.Sid, cfg.AppKey, params["timestamp"], params["sign"]) + pageSize := 100 + pageNo := 0 + requestCount := 0 + totalCount := 0 + totalPages := 0 + totalSynced := 0 - respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout) - if err != nil { - return nil, fmt.Errorf("查询请求失败: %v", err) + for { + params := map[string]string{ + "sid": cfg.Sid, + "appkey": cfg.AppKey, + "timestamp": strconv.FormatInt(time.Now().Unix(), 10), + "page_size": strconv.Itoa(pageSize), + "page_no": strconv.Itoa(pageNo), + } + params["sign"] = utils.WangdianSign(params, cfg.AppSecret) + + url := cfg.GetURL(apiWarehouseQuery) + respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout) + requestCount++ + if err != nil { + log.Printf("[旺店通仓库同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount) + break + } + + var pageResp struct { + Code int `json:"code"` + Message string `json:"message"` + TotalCount int `json:"total_count"` + Warehouses []map[string]interface{} `json:"warehouses"` + } + if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil { + log.Printf("[旺店通仓库同步] 解析第%d页失败: %v", pageNo, err) + break + } + + if pageResp.Code != 0 { + log.Printf("[旺店通仓库同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount) + break + } + + if pageNo == 0 { + totalCount = pageResp.TotalCount + totalPages = (totalCount + pageSize - 1) / pageSize + } + + n := upsertWarehouses(db, pageResp.Warehouses) + totalSynced += n + UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount) + log.Printf("[旺店通仓库同步] 第%d/%d页完成, 本页写入%d个仓库, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount) + + if len(pageResp.Warehouses) < pageSize { + break + } + pageNo++ + time.Sleep(500 * time.Millisecond) } - log.Printf("[旺店通仓库查询] 返回信息: %s", string(respBody)) + log.Printf("[旺店通仓库同步] 同步结束, 共同步 %d/%d 个仓库", totalSynced, totalCount) + CompleteSyncTask(db, taskID) + return nil +} - var resp WangdianWarehouseQueryResponse - if err := json.Unmarshal([]byte(respBody), &resp); err != nil { - return nil, fmt.Errorf("解析响应失败(raw=%s): %v", respBody, err) +func upsertWarehouses(db *gorm.DB, warehouses []map[string]interface{}) int { + now := time.Now().Unix() + synced := 0 + for _, wh := range warehouses { + code := toString(wh["warehouse_no"]) + if code == "" { + continue + } + + var existing models.Warehouse + err := db.Where("code = ? AND is_del = 0", code).First(&existing).Error + if err == nil { + continue + } + + status := int8(1) + if toString(wh["is_disabled"]) == "1" { + status = 0 + } + + warehouse := models.Warehouse{ + Code: code, + Name: wangdianPrefix + toString(wh["name"]), + ContactPerson: toString(wh["contact"]), + ContactPhone: toString(wh["telno"]), + Province: toString(wh["province"]), + City: toString(wh["city"]), + District: toString(wh["district"]), + Address: toString(wh["address"]), + Status: status, + CreatedAt: now, + UpdatedAt: now, + Type: 1, + } + if err := db.Create(&warehouse).Error; err != nil { + log.Printf("[旺店通仓库同步] 创建仓库 %s 失败: %v", code, err) + continue + } + synced++ } - return &resp, nil + return synced +} + +func toString(v interface{}) string { + if v == nil { + return "" + } + switch val := v.(type) { + case string: + return val + default: + return fmt.Sprintf("%v", val) + } +} + +// SyncProvider 从旺店通同步供应商数据到本地 +func SyncProvider(db *gorm.DB, taskID int64) error { + cfg := config.AppConfig.Wangdian + if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" { + return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)") + } + + timeout := cfg.Timeout + if timeout <= 0 { + timeout = 30 + } + + pageSize := 100 + pageNo := 0 + requestCount := 0 + totalCount := 0 + totalPages := 0 + totalSynced := 0 + + for { + params := map[string]string{ + "sid": cfg.Sid, + "appkey": cfg.AppKey, + "timestamp": strconv.FormatInt(time.Now().Unix(), 10), + "page_size": strconv.Itoa(pageSize), + "page_no": strconv.Itoa(pageNo), + } + params["sign"] = utils.WangdianSign(params, cfg.AppSecret) + + url := cfg.GetURL(apiProviderQuery) + respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout) + requestCount++ + if err != nil { + log.Printf("[旺店通供应商同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount) + break + } + + var pageResp struct { + Code int `json:"code"` + Message string `json:"message"` + TotalCount int `json:"total_count"` + ProviderList []map[string]interface{} `json:"provider_list"` + } + if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil { + log.Printf("[旺店通供应商同步] 解析第%d页失败: %v", pageNo, err) + break + } + + if pageResp.Code != 0 { + log.Printf("[旺店通供应商同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount) + break + } + + if pageNo == 0 { + totalCount = pageResp.TotalCount + totalPages = (totalCount + pageSize - 1) / pageSize + } + + n := upsertProviders(db, pageResp.ProviderList) + totalSynced += n + UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount) + log.Printf("[旺店通供应商同步] 第%d/%d页完成, 本页写入%d个供应商, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount) + + if len(pageResp.ProviderList) < pageSize { + break + } + pageNo++ + time.Sleep(500 * time.Millisecond) + } + + log.Printf("[旺店通供应商同步] 同步结束, 共同步 %d/%d 个供应商", totalSynced, totalCount) + CompleteSyncTask(db, taskID) + return nil +} + +func upsertProviders(db *gorm.DB, providers []map[string]interface{}) int { + now := time.Now().Unix() + synced := 0 + for _, p := range providers { + code := toString(p["provider_no"]) + if code == "" { + continue + } + + var existing models.Supplier + err := db.Where("code = ? AND is_del = 0", code).First(&existing).Error + if err == nil { + continue + } + + status := int8(1) + if toString(p["is_disabled"]) == "1" { + status = 0 + } + + supplier := models.Supplier{ + Code: code, + Name: wangdianPrefix + toString(p["provider_name"]), + ContactPerson: toString(p["contact"]), + ContactPhone: toString(p["telno"]), + Address: toString(p["address"]), + Status: status, + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&supplier).Error; err != nil { + log.Printf("[旺店通供应商同步] 创建供应商 %s 失败: %v", code, err) + continue + } + synced++ + } + return synced } type WangdianGoodsQueryResponse struct { @@ -229,11 +421,7 @@ func QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime params["page_no"] = strconv.Itoa(pageNo) params["sign"] = utils.WangdianSign(params, cfg.AppSecret) - url := wangdianURL(cfg.GoodsQuerySandbox, cfg.GoodsQueryURL) - if url == "" { - url = "https://api.wangdian.cn/openapi2/goods_query.php" - } - + url := cfg.GetURL(apiGoodsQuery) timeout := cfg.Timeout if timeout <= 0 { timeout = 30 @@ -255,6 +443,243 @@ func QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime return &resp, nil } +// SyncGoods 从旺店通同步商品数据到本地 +func SyncGoods(db *gorm.DB, taskID int64, startTime, endTime string, aboutID int64) error { + cfg := config.AppConfig.Wangdian + if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" { + return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)") + } + + timeout := cfg.Timeout + if timeout <= 0 { + timeout = 30 + } + + if startTime == "" || endTime == "" { + return fmt.Errorf("start_time 和 end_time 不能为空") + } + + startT, err := time.Parse("2006-01-02 15:04:05", startTime) + if err != nil { + return fmt.Errorf("start_time 格式无效, 正确格式: 2006-01-02 15:04:05") + } + endT, err := time.Parse("2006-01-02 15:04:05", endTime) + if err != nil { + return fmt.Errorf("end_time 格式无效, 正确格式: 2006-01-02 15:04:05") + } + if endT.Sub(startT) > 30*24*time.Hour { + return fmt.Errorf("时间跨度不能超过30天") + } + + pageSize := 100 + pageNo := 0 + requestCount := 0 + totalCount := 0 + totalPages := 0 + totalSynced := 0 + + for { + params := map[string]string{ + "sid": cfg.Sid, + "appkey": cfg.AppKey, + "timestamp": strconv.FormatInt(time.Now().Unix(), 10), + "page_size": strconv.Itoa(pageSize), + "page_no": strconv.Itoa(pageNo), + "start_time": startTime, + "end_time": endTime, + } + params["sign"] = utils.WangdianSign(params, cfg.AppSecret) + + url := cfg.GetURL(apiGoodsQuery) + respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout) + requestCount++ + if err != nil { + log.Printf("[旺店通商品同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount) + break + } + + var pageResp struct { + Code int `json:"code"` + Message string `json:"message"` + TotalCount int `json:"total_count"` + GoodsList []map[string]interface{} `json:"goods_list"` + } + if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil { + log.Printf("[旺店通商品同步] 解析第%d页失败: %v", pageNo, err) + break + } + + if pageResp.Code != 0 { + log.Printf("[旺店通商品同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount) + break + } + + if pageNo == 0 { + totalCount = pageResp.TotalCount + totalPages = (totalCount + pageSize - 1) / pageSize + } + + n := upsertGoods(db, pageResp.GoodsList, aboutID) + totalSynced += n + UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount) + log.Printf("[旺店通商品同步] 第%d/%d页完成, 本页写入%d个商品, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount) + + if len(pageResp.GoodsList) < pageSize { + break + } + pageNo++ + time.Sleep(500 * time.Millisecond) + } + + log.Printf("[旺店通商品同步] 同步结束, 共同步 %d/%d 个商品", totalSynced, totalCount) + CompleteSyncTask(db, taskID) + return nil +} + +func upsertGoods(db *gorm.DB, goodsList []map[string]interface{}, aboutID int64) int { + now := time.Now().Unix() + synced := 0 + for _, g := range goodsList { + goodsName := toString(g["goods_name"]) + if goodsName == "" { + continue + } + + goodsPic := toString(g["pic_url"]) + + specList, ok := g["spec_list"].([]interface{}) + if !ok || len(specList) == 0 { + barcode := toString(g["barcode"]) + if barcode == "" { + continue + } + + var existing models.Product + if err := db.Where("barcode = ? AND is_del = 0", barcode).First(&existing).Error; err == nil { + continue + } + + product := models.Product{ + AboutId: aboutID, + Name: wangdianPrefix + goodsName, + Barcode: barcode, + Status: 1, + LiveImage: goodsPicToJSON(goodsPic), + Price: parsePrice(g["lowest_price"]), + SalePrice: parsePrice(g["retail_price"]), + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&product).Error; err != nil { + log.Printf("[旺店通商品同步] 创建商品 %s 失败: %v", barcode, err) + continue + } + synced++ + continue + } + + for _, spec := range specList { + specMap, ok := spec.(map[string]interface{}) + if !ok { + continue + } + + barcode := toString(specMap["barcode"]) + if barcode == "" { + barcode = toString(specMap["spec_no"]) + if barcode == "" { + continue + } + } + + var existing models.Product + if err := db.Where("barcode = ? AND is_del = 0", barcode).First(&existing).Error; err == nil { + continue + } + + specPic := toString(specMap["pic_url"]) + if specPic == "" { + specPic = goodsPic + } + + product := models.Product{ + AboutId: aboutID, + Name: wangdianPrefix + goodsName, + Barcode: barcode, + Status: 1, + LiveImage: goodsPicToJSON(specPic), + Price: parsePrice(specMap["lowest_price"]), + SalePrice: parsePrice(specMap["retail_price"]), + CreatedAt: now, + UpdatedAt: now, + } + if err := db.Create(&product).Error; err != nil { + log.Printf("[旺店通商品同步] 创建商品 %s 失败: %v", barcode, err) + continue + } + synced++ + } + } + return synced +} + +func goodsPicToJSON(picURL string) datatypes.JSON { + if picURL == "" { + return datatypes.JSON([]byte("[]")) + } + return datatypes.JSON([]byte(`["` + picURL + `"]`)) +} + +func parsePrice(v interface{}) int64 { + s := toString(v) + if s == "" { + return 0 + } + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return 0 + } + return int64(f * 100) +} + +// PushGoods 推送货品档案到旺店通 +func PushGoods(goodsListJSON string) (*WangdianGoodsPushResponse, error) { + cfg := config.AppConfig.Wangdian + if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" { + return nil, fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)") + } + + timestamp := time.Now().Unix() + params := map[string]string{ + "sid": cfg.Sid, + "appkey": cfg.AppKey, + "timestamp": strconv.FormatInt(timestamp, 10), + "goods_list": goodsListJSON, + } + params["sign"] = utils.WangdianSign(params, cfg.AppSecret) + + url := cfg.GetURL(apiGoodsPush) + timeout := cfg.Timeout + if timeout <= 0 { + timeout = 30 + } + + log.Printf("[旺店通货品推送] 请求URL: %s (sandbox=%v)", url, cfg.Sandbox) + + respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout) + if err != nil { + return nil, fmt.Errorf("推送请求失败: %v", err) + } + + log.Printf("[旺店通货品推送] 返回信息: %s", string(respBody)) + + var resp WangdianGoodsPushResponse + if err := json.Unmarshal([]byte(respBody), &resp); err != nil { + return nil, fmt.Errorf("解析响应失败(raw=%s): %v", respBody, err) + } + return &resp, nil +} + func PushPurchaseOrder(purchaseOrderID int64, db ...*gorm.DB) (*WangdianPurchasePushResponse, error) { databaseConn := database.OptionalDB(db...) @@ -359,10 +784,7 @@ func PushPurchaseOrder(purchaseOrderID int64, db ...*gorm.DB) (*WangdianPurchase params["sign"] = utils.WangdianSign(params, cfg.AppSecret) // 9. 提交请求 - url := wangdianURL(cfg.SandboxURL, cfg.URL) - if url == "" { - url = "https://api.wangdian.cn/openapi2/purchase_order_push.php" - } + url := cfg.GetURL(apiPurchasePush) timeout := cfg.Timeout if timeout <= 0 { timeout = 30 diff --git a/service/wangdian_sync_task.go b/service/wangdian_sync_task.go new file mode 100644 index 0000000..6cae0a7 --- /dev/null +++ b/service/wangdian_sync_task.go @@ -0,0 +1,194 @@ +package service + +import ( + "errors" + "log" + "psi/models" + "time" + + "gorm.io/gorm" +) + +const ( + TaskTypePurchasePush = "purchase_push" + TaskTypeSyncProvider = "sync_provider" + TaskTypeSyncWarehouse = "sync_warehouse" + TaskTypeSyncGoods = "sync_goods" + + TaskStatusRunning = "running" + TaskStatusCompleted = "completed" + TaskStatusCancelled = "cancelled" + TaskStatusFailed = "failed" +) + +// EnsureNoRunningTask 检查是否存在该类型的运行中任务,存在则返回错误 +func EnsureNoRunningTask(db *gorm.DB, taskType string) error { + var count int64 + db.Model(&models.WangdianSyncTask{}). + Where("task_type = ? AND status = ?", taskType, TaskStatusRunning). + Count(&count) + if count > 0 { + return errors.New("该类型的同步任务正在执行中,请等待完成后重试") + } + return nil +} + +// CreateSyncTask 创建同步任务 +func CreateSyncTask(db *gorm.DB, taskType string) *models.WangdianSyncTask { + now := time.Now().Unix() + task := &models.WangdianSyncTask{ + TaskType: taskType, + Status: TaskStatusRunning, + Progress: 0, + Total: 0, + StartedAt: now, + CreatedAt: now, + UpdatedAt: now, + } + db.Create(task) + return task +} + +// UpdateSyncTaskProgress 更新任务进度 +func UpdateSyncTaskProgress(db *gorm.DB, taskID int64, progress, total int) { + db.Model(&models.WangdianSyncTask{}). + Where("id = ?", taskID). + Updates(map[string]interface{}{ + "progress": progress, + "total": total, + "updated_at": time.Now().Unix(), + }) +} + +// CompleteSyncTask 完成任务(成功) +func CompleteSyncTask(db *gorm.DB, taskID int64) { + now := time.Now().Unix() + db.Model(&models.WangdianSyncTask{}). + Where("id = ?", taskID). + Updates(map[string]interface{}{ + "status": TaskStatusCompleted, + "finished_at": now, + "updated_at": now, + }) +} + +// FailSyncTask 完成任务(失败) +func FailSyncTask(db *gorm.DB, taskID int64, errMsg string) { + now := time.Now().Unix() + db.Model(&models.WangdianSyncTask{}). + Where("id = ?", taskID). + Updates(map[string]interface{}{ + "status": TaskStatusFailed, + "error_msg": errMsg, + "finished_at": now, + "updated_at": now, + }) +} + +// CancelSyncTask 取消任务 +func CancelSyncTask(db *gorm.DB, taskID int64) error { + var task models.WangdianSyncTask + if err := db.Where("id = ?", taskID).First(&task).Error; err != nil { + return err + } + if task.Status != TaskStatusRunning { + return errors.New("只能取消运行中的任务") + } + now := time.Now().Unix() + db.Model(&models.WangdianSyncTask{}). + Where("id = ?", taskID). + Updates(map[string]interface{}{ + "status": TaskStatusCancelled, + "finished_at": now, + "updated_at": now, + }) + return nil +} + +// GetSyncTaskStatus 获取任务状态 +func GetSyncTaskStatus(db *gorm.DB, taskType string) (*models.WangdianSyncTask, error) { + var task models.WangdianSyncTask + err := db.Where("task_type = ?", taskType). + Order("id DESC").First(&task).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, nil + } + return nil, err + } + return &task, nil +} + +// WrapSyncTask 包装同步任务:检查→创建→执行→完成/失败,返回初始任务ID +// fn 传入 taskID 和 db,由外部执行实际同步逻辑 +func WrapSyncTask(taskType string, db *gorm.DB, fn func(taskID int64, db *gorm.DB)) (int64, error) { + if err := EnsureNoRunningTask(db, taskType); err != nil { + return 0, err + } + + task := CreateSyncTask(db, taskType) + + go func() { + defer func() { + if r := recover(); r != nil { + errMsg := "panic" + if err, ok := r.(error); ok { + errMsg = err.Error() + } + log.Printf("[旺店通任务] 任务%d(%s) panic: %v", task.ID, taskType, r) + FailSyncTask(db, task.ID, errMsg) + } + }() + + fn(task.ID, db) + }() + + return task.ID, nil +} + +// ResetRunningTasks 重置所有运行中的任务为失败状态(用于服务重启后清理) +func ResetRunningTasks(db *gorm.DB) { + if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { + return + } + now := time.Now().Unix() + result := db.Model(&models.WangdianSyncTask{}). + Where("status = ?", TaskStatusRunning). + Updates(map[string]interface{}{ + "status": TaskStatusFailed, + "error_msg": "服务重启,任务已重置", + "finished_at": now, + "updated_at": now, + }) + if result.Error == nil && result.RowsAffected > 0 { + log.Printf("[旺店通任务] 服务重启, 已重置 %d 个运行中的任务为失败状态", result.RowsAffected) + } +} + +// ListSyncTask 获取同步任务列表 +func ListSyncTask(db *gorm.DB, taskType string, pageNo, pageSize int) ([]models.WangdianSyncTask, int64, error) { + var tasks []models.WangdianSyncTask + var total int64 + + query := db.Model(&models.WangdianSyncTask{}) + if taskType != "" { + query = query.Where("task_type = ?", taskType) + } + + if err := query.Count(&total).Error; err != nil { + return nil, 0, err + } + + if pageNo <= 0 { + pageNo = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + offset := (pageNo - 1) * pageSize + + if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error; err != nil { + return nil, 0, err + } + return tasks, total, nil +}