完成 旺店通的仓库、货品、供应商的同步及采购单的创建

This commit is contained in:
97694732@qq.com 2026-06-27 16:53:53 +08:00
parent 8ffe485fa2
commit bb6d91cbd0
9 changed files with 976 additions and 120 deletions

70
config.yaml Normal file
View File

@ -0,0 +1,70 @@
server:
port: "9090"
# host: "https://psi.api.buzhiyushu.cn/"
host: "http://192.168.101.213:9090/"
database:
host: 175.27.224.66
port: "3306"
user: root
password: 5e07c0eec1770c94
name: psi
encrypt_key: "0123456789abcdef0123456789abcdef"
task_database:
host: nj-cynosdbmysql-grp-1v6vxn5f.sql.tencentcdb.com
port: "26247"
user: root
password: Long6166@@
name: task
jwt:
secret: "0123456789abcdef0123456789abcdef"
expire_hours: 24
api_sign:
app_key: "psi"
app_secret: "psi_api_sign_secret"
client_id: "psi"
sign_method: "md5"
timestamp_tolerance: 300
log:
max_age: 600
rotate_time: 600
root_path: "./runtime/logs"
channel:
sql: "/sql/err.log"
work: "/work/err.log"
request: "/request/err.log"
es: "/es/err.log"
redis: "/redis/err.log"
es:
host: "http://36.212.12.92:9527"
index: "books-from-mysql-v2"
username: "elastic"
password: "+Tz5qR_KushZ-bPgZ_H-"
ocr:
service_url: "http://127.0.0.1:35569/ocr"
exe_url: "./ocr/OCRService.exe"
external_api:
# sync_product_url: "http://192.168.101.127:8080/zhishu/filterSet/save"
sync_product_url: "https://api.buzhiyushu.cn/zhishu/filterSet/save"
es_update_book_url: "https://book.center.yushutx.com/api/es/updateBookFieldsByISBN"
# sync_task_url: "http://192.168.101.156:8080/task/create"
sync_task_url: "http://36.212.7.246:8283/task/create"
# sync_task_body_url: "http://192.168.101.156:8080/task/setTaskBody"
sync_task_body_url: "http://36.212.7.246:8283/task/setTaskBody"
timeout: 30
wangdian:
host: "api.wangdian.cn"
sandbox_host: "sandbox.wangdian.cn"
sandbox: true
sid: "apidevnew2"
appkey: "skxz2-test"
appsecret: "85bf423bb"
timeout: 30

View File

@ -76,14 +76,8 @@ type ExternalAPIConfig struct {
}
type WangdianConfig struct {
URL string `yaml:"url"`
SandboxURL string `yaml:"sandbox_url"`
ProviderQueryURL string `yaml:"provider_query_url"`
ProviderQuerySandbox string `yaml:"provider_query_sandbox"`
WarehouseQueryURL string `yaml:"warehouse_query_url"`
WarehouseQuerySandbox string `yaml:"warehouse_query_sandbox"`
GoodsQueryURL string `yaml:"goods_query_url"`
GoodsQuerySandbox string `yaml:"goods_query_sandbox"`
Host string `yaml:"host"`
SandboxHost string `yaml:"sandbox_host"`
Sandbox bool `yaml:"sandbox"`
Sid string `yaml:"sid"`
AppKey string `yaml:"appkey"`
@ -91,6 +85,18 @@ type WangdianConfig struct {
Timeout int `yaml:"timeout"`
}
// GetURL 根据 sandbox 标志和 API 名称构建完整请求地址
func (c *WangdianConfig) GetURL(apiName string) string {
host := c.Host
if c.Sandbox && c.SandboxHost != "" {
host = c.SandboxHost
}
if host == "" {
host = "api.wangdian.cn"
}
return fmt.Sprintf("https://%s/openapi2/%s", host, apiName)
}
var AppConfig *Config
func Init() {

View File

@ -1,6 +1,8 @@
package controllers
import (
"fmt"
"log"
"net/http"
"psi/constant"
"psi/database"
@ -9,6 +11,7 @@ import (
"strconv"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type WangdianApi struct{}
@ -37,79 +40,169 @@ func (i *WangdianApi) CreatePurchaseOrder(c *gin.Context) {
}
func (i *WangdianApi) QueryProvider(c *gin.Context) {
column := c.Query("column")
providerNo := c.Query("provider_no")
providerName := c.Query("provider_name")
pageSizeStr := c.Query("page_size")
pageNoStr := c.Query("page_no")
db := database.GetDB(c)
pageSize, _ := strconv.Atoi(pageSizeStr)
pageNo, _ := strconv.Atoi(pageNoStr)
resp, err := service.QueryProvider(column, providerNo, providerName, pageSize, pageNo)
taskID, err := service.WrapSyncTask(service.TaskTypeSyncProvider, db, func(taskID int64, db *gorm.DB) {
if err := service.SyncProvider(db, taskID); err != nil {
service.FailSyncTask(db, taskID, err.Error())
log.Printf("[旺店通供应商同步] 异步同步失败: %v", err)
}
})
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通供应商失败: "+err.Error(), err, c, nil)
utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": resp.Code,
"message": resp.Message,
"total_count": resp.TotalCount,
"provider_list": resp.ProviderList,
"code": 0,
"message": "供应商同步任务已启动",
"task_id": taskID,
})
}
func (i *WangdianApi) QueryWarehouse(c *gin.Context) {
warehouseNo := c.Query("warehouse_no")
pageSizeStr := c.Query("page_size")
pageNoStr := c.Query("page_no")
isDisabled := c.Query("is_disabled")
db := database.GetDB(c)
pageSize, _ := strconv.Atoi(pageSizeStr)
pageNo, _ := strconv.Atoi(pageNoStr)
resp, err := service.QueryWarehouse(warehouseNo, 0, 0, pageSize, pageNo, isDisabled)
taskID, err := service.WrapSyncTask(service.TaskTypeSyncWarehouse, db, func(taskID int64, db *gorm.DB) {
if err := service.SyncWarehouse(db, taskID); err != nil {
service.FailSyncTask(db, taskID, err.Error())
log.Printf("[旺店通仓库同步] 异步同步失败: %v", err)
}
})
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通仓库失败: "+err.Error(), err, c, nil)
utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": resp.Code,
"message": resp.Message,
"total_count": resp.TotalCount,
"warehouses": resp.Warehouses,
"code": 0,
"message": "仓库同步任务已启动",
"task_id": taskID,
})
}
func (i *WangdianApi) QueryGoods(c *gin.Context) {
specNo := c.Query("spec_no")
goodsNo := c.Query("goods_no")
brandNo := c.Query("brand_no")
className := c.Query("class_name")
barcode := c.Query("barcode")
db := database.GetDB(c)
startTime := c.Query("start_time")
endTime := c.Query("end_time")
pageSizeStr := c.Query("page_size")
pageNoStr := c.Query("page_no")
aboutID := c.GetInt64("about_id")
pageSize, _ := strconv.Atoi(pageSizeStr)
if pageSize <= 0 {
pageSize = 40
taskID, err := service.WrapSyncTask(service.TaskTypeSyncGoods, db, func(taskID int64, db *gorm.DB) {
if err := service.SyncGoods(db, taskID, startTime, endTime, aboutID); err != nil {
service.FailSyncTask(db, taskID, err.Error())
log.Printf("[旺店通商品同步] 异步同步失败: %v", err)
}
pageNo, _ := strconv.Atoi(pageNoStr)
resp, err := service.QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime, 0, pageSize, pageNo)
})
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "查询旺店通商品失败: "+err.Error(), err, c, nil)
utils.FailWithRequestLog(constant.LoggerChannelWork, err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "商品同步任务已启动",
"task_id": taskID,
})
}
func (i *WangdianApi) GoodsPush(c *gin.Context) {
goodsListJSON := c.PostForm("goods_list")
if goodsListJSON == "" {
goodsListJSON = c.Query("goods_list")
}
if goodsListJSON == "" {
utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: goods_list 不能为空", fmt.Errorf("goods_list为空"), c, nil)
return
}
resp, err := service.PushGoods(goodsListJSON)
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "推送货品到旺店通失败: "+err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": resp.Code,
"message": resp.Message,
"total_count": resp.TotalCount,
"goods_list": resp.GoodsList,
})
}
func (i *WangdianApi) GetSyncTaskStatus(c *gin.Context) {
db := database.GetDB(c)
taskType := c.Query("task_type")
if taskType == "" {
taskType = c.Param("task_type")
}
if taskType == "" {
utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: task_type 不能为空", nil, c, nil)
return
}
task, err := service.GetSyncTaskStatus(db, taskType)
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "查询任务状态失败: "+err.Error(), err, c, nil)
return
}
if task == nil {
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "暂无任务记录",
"data": nil,
})
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "ok",
"data": task,
})
}
func (i *WangdianApi) CancelSyncTask(c *gin.Context) {
db := database.GetDB(c)
taskIDStr := c.PostForm("task_id")
if taskIDStr == "" {
taskIDStr = c.Query("task_id")
}
taskID, err := strconv.ParseInt(taskIDStr, 10, 64)
if err != nil || taskID <= 0 {
utils.FailWithRequestLog(constant.LoggerChannelRequest, "参数错误: task_id 必须为正整数", nil, c, nil)
return
}
if err := service.CancelSyncTask(db, taskID); err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "取消任务失败: "+err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "任务已取消",
})
}
func (i *WangdianApi) ListSyncTask(c *gin.Context) {
db := database.GetDB(c)
taskType := c.Query("task_type")
pageNo, _ := strconv.Atoi(c.DefaultQuery("page_no", "1"))
pageSize, _ := strconv.Atoi(c.DefaultQuery("page_size", "20"))
tasks, total, err := service.ListSyncTask(db, taskType, pageNo, pageSize)
if err != nil {
utils.FailWithRequestLog(constant.LoggerChannelWork, "查询同步任务列表失败: "+err.Error(), err, c, nil)
return
}
c.JSON(http.StatusOK, gin.H{
"code": 0,
"message": "ok",
"data": gin.H{
"list": tasks,
"total": total,
"page_no": pageNo,
"page_size": pageSize,
},
})
}

View File

@ -276,6 +276,11 @@ func Init() {
log.Fatal("Config表迁移失败:", err)
}
err = DB.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{})
if err != nil {
log.Fatal("WangdianSyncTask表迁移失败:", err)
}
err = DB.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='统计表'").AutoMigrate(&models.Statist{})
if err != nil {
log.Fatal("Statist表迁移失败:", err)

View File

@ -322,6 +322,11 @@ func migrateTenantTables(db *gorm.DB) {
log.Printf("Config表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{})
if err != nil {
log.Printf("WangdianSyncTask表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{})
if err != nil {
log.Printf("DashboardDailyStat表迁移警告: %v", err)
@ -333,6 +338,9 @@ func migrateTenantTables(db *gorm.DB) {
}
log.Println("租户业务表迁移完成")
// 重置旺店通运行中的任务(服务重启后清理)
resetWangdianRunningTasks(db)
//tableOptions := "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
//
//modelsToMigrate := []interface{}{
@ -440,4 +448,35 @@ func migrateIncrementalTenantTables(db *gorm.DB) {
log.Println("UserDailyStat表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{})
if err != nil {
log.Printf("WangdianSyncTask表增量迁移警告: %v", err)
} else {
log.Println("WangdianSyncTask表增量迁移成功")
}
}
// 重置旺店通运行中的任务(服务重启后清理)
resetWangdianRunningTasks(db)
}
// resetWangdianRunningTasks 重置旺店通运行中的任务为失败状态(用于服务重启后清理)
func resetWangdianRunningTasks(db *gorm.DB) {
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
return
}
now := time.Now().Unix()
result := db.Model(&models.WangdianSyncTask{}).
Where("status = ?", "running").
Updates(map[string]interface{}{
"status": "failed",
"error_msg": "服务重启,任务已重置",
"finished_at": now,
"updated_at": now,
})
if result.Error == nil && result.RowsAffected > 0 {
log.Printf("[数据库] 服务重启, 已重置 %d 个旺店通运行中的任务为失败状态", result.RowsAffected)
}
}

View File

@ -0,0 +1,23 @@
package models
// WangdianSyncTask 旺店通同步任务表
type WangdianSyncTask struct {
ID int64 `json:"id" gorm:"primarykey;comment:自增ID"`
TaskType string `json:"task_type" gorm:"size:30;not null;default:'';index;comment:任务类型(purchase_push/sync_provider/sync_warehouse/sync_goods)"`
Status string `json:"status" gorm:"size:20;not null;default:'running';comment:状态(running/completed/cancelled/failed)"`
Progress int `json:"progress" gorm:"not null;default:0;comment:已处理数量"`
Total int `json:"total" gorm:"not null;default:0;comment:总数量"`
ErrorMsg string `json:"error_msg" gorm:"type:text;comment:错误信息"`
StartedAt int64 `json:"started_at" gorm:"type:bigint;not null;default:0;comment:开始时间"`
FinishedAt int64 `json:"finished_at" gorm:"type:bigint;not null;default:0;comment:完成时间"`
CreatedAt int64 `json:"created_at" gorm:"type:bigint;not null;default:0;comment:创建时间"`
UpdatedAt int64 `json:"updated_at" gorm:"type:bigint;not null;default:0;comment:更新时间"`
}
func (WangdianSyncTask) TableName() string {
return "wangdian_sync_task"
}
func (WangdianSyncTask) TableOptions() string {
return "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'"
}

View File

@ -127,8 +127,12 @@ func initRouter() (r *gin.Engine) {
{
authOnly.POST("/wangdian/purchase-order-push", wangdianApi.CreatePurchaseOrder) // 推送采购单到旺店通
authOnly.GET("/wangdian/query-provider", wangdianApi.QueryProvider) // 查询旺店通供应商
authOnly.GET("/wangdian/query-warehouse", wangdianApi.QueryWarehouse) // 查询旺店通仓库
authOnly.GET("/wangdian/query-goods", wangdianApi.QueryGoods) // 查询旺店通商品
authOnly.GET("/wangdian/query-warehouse", wangdianApi.QueryWarehouse) // 同步旺店通仓库
authOnly.GET("/wangdian/query-goods", wangdianApi.QueryGoods) // 同步旺店通商品
authOnly.POST("/wangdian/goods-push", wangdianApi.GoodsPush) // 推送货品到旺店通
authOnly.GET("/wangdian/sync-status/:task_type", wangdianApi.GetSyncTaskStatus) // 查询同步任务状态
authOnly.POST("/wangdian/sync-cancel", wangdianApi.CancelSyncTask) // 取消同步任务
authOnly.GET("/wangdian/sync-task-list", wangdianApi.ListSyncTask) // 同步任务列表
}
// 需要认证的接口

View File

@ -11,6 +11,7 @@ import (
"strconv"
"time"
"gorm.io/datatypes"
"gorm.io/gorm"
)
@ -67,15 +68,18 @@ type WangdianWarehouseQueryResponse struct {
Warehouses []map[string]interface{} `json:"warehouses"`
}
func wangdianURL(sandboxURL, productionURL string) string {
cfg := config.AppConfig.Wangdian
if cfg.Sandbox && sandboxURL != "" {
return sandboxURL
}
if productionURL != "" {
return productionURL
}
return productionURL
const (
wangdianPrefix = "【旺店通】"
apiPurchasePush = "purchase_order_push.php"
apiProviderQuery = "purchase_provider_query.php"
apiWarehouseQuery = "warehouse_query.php"
apiGoodsQuery = "goods_query.php"
apiGoodsPush = "goods_push.php"
)
type WangdianGoodsPushResponse struct {
Code int `json:"code"`
Message string `json:"message"`
}
// QueryProvider 查询旺店通供应商
@ -107,11 +111,7 @@ func QueryProvider(column, providerNo, providerName string, pageSize, pageNo int
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := wangdianURL(cfg.ProviderQuerySandbox, cfg.ProviderQueryURL)
if url == "" {
url = "https://api.wangdian.cn/openapi2/purchase_provider_query.php"
}
url := cfg.GetURL(apiProviderQuery)
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30
@ -133,35 +133,11 @@ func QueryProvider(column, providerNo, providerName string, pageSize, pageNo int
return &resp, nil
}
// QueryWarehouse 查询旺店通仓库
func QueryWarehouse(warehouseNo string, warehouseType int, subType int, pageSize, pageNo int, isDisabled string) (*WangdianWarehouseQueryResponse, error) {
// SyncWarehouse 从旺店通同步仓库数据到本地
func SyncWarehouse(db *gorm.DB, taskID int64) error {
cfg := config.AppConfig.Wangdian
if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" {
return nil, fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)")
}
params := map[string]string{
"sid": cfg.Sid,
"appkey": cfg.AppKey,
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
}
if warehouseNo != "" {
params["warehouse_no"] = warehouseNo
}
if pageSize > 0 {
params["page_size"] = strconv.Itoa(pageSize)
}
if pageNo > 0 {
params["page_no"] = strconv.Itoa(pageNo)
}
if isDisabled != "" {
params["is_disabled"] = isDisabled
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := wangdianURL(cfg.WarehouseQuerySandbox, cfg.WarehouseQueryURL)
if url == "" {
url = "https://api.wangdian.cn/openapi2/warehouse_query.php"
return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)")
}
timeout := cfg.Timeout
@ -169,20 +145,236 @@ func QueryWarehouse(warehouseNo string, warehouseType int, subType int, pageSize
timeout = 30
}
log.Printf("[旺店通仓库查询] 请求URL: %s (sandbox=%v), 参数: sid=%s appkey=%s timestamp=%s sign=%s", url, cfg.Sandbox, cfg.Sid, cfg.AppKey, params["timestamp"], params["sign"])
pageSize := 100
pageNo := 0
requestCount := 0
totalCount := 0
totalPages := 0
totalSynced := 0
for {
params := map[string]string{
"sid": cfg.Sid,
"appkey": cfg.AppKey,
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
"page_size": strconv.Itoa(pageSize),
"page_no": strconv.Itoa(pageNo),
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := cfg.GetURL(apiWarehouseQuery)
respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout)
requestCount++
if err != nil {
return nil, fmt.Errorf("查询请求失败: %v", err)
log.Printf("[旺店通仓库同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount)
break
}
log.Printf("[旺店通仓库查询] 返回信息: %s", string(respBody))
var resp WangdianWarehouseQueryResponse
if err := json.Unmarshal([]byte(respBody), &resp); err != nil {
return nil, fmt.Errorf("解析响应失败(raw=%s): %v", respBody, err)
var pageResp struct {
Code int `json:"code"`
Message string `json:"message"`
TotalCount int `json:"total_count"`
Warehouses []map[string]interface{} `json:"warehouses"`
}
return &resp, nil
if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil {
log.Printf("[旺店通仓库同步] 解析第%d页失败: %v", pageNo, err)
break
}
if pageResp.Code != 0 {
log.Printf("[旺店通仓库同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount)
break
}
if pageNo == 0 {
totalCount = pageResp.TotalCount
totalPages = (totalCount + pageSize - 1) / pageSize
}
n := upsertWarehouses(db, pageResp.Warehouses)
totalSynced += n
UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount)
log.Printf("[旺店通仓库同步] 第%d/%d页完成, 本页写入%d个仓库, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount)
if len(pageResp.Warehouses) < pageSize {
break
}
pageNo++
time.Sleep(500 * time.Millisecond)
}
log.Printf("[旺店通仓库同步] 同步结束, 共同步 %d/%d 个仓库", totalSynced, totalCount)
CompleteSyncTask(db, taskID)
return nil
}
func upsertWarehouses(db *gorm.DB, warehouses []map[string]interface{}) int {
now := time.Now().Unix()
synced := 0
for _, wh := range warehouses {
code := toString(wh["warehouse_no"])
if code == "" {
continue
}
var existing models.Warehouse
err := db.Where("code = ? AND is_del = 0", code).First(&existing).Error
if err == nil {
continue
}
status := int8(1)
if toString(wh["is_disabled"]) == "1" {
status = 0
}
warehouse := models.Warehouse{
Code: code,
Name: wangdianPrefix + toString(wh["name"]),
ContactPerson: toString(wh["contact"]),
ContactPhone: toString(wh["telno"]),
Province: toString(wh["province"]),
City: toString(wh["city"]),
District: toString(wh["district"]),
Address: toString(wh["address"]),
Status: status,
CreatedAt: now,
UpdatedAt: now,
Type: 1,
}
if err := db.Create(&warehouse).Error; err != nil {
log.Printf("[旺店通仓库同步] 创建仓库 %s 失败: %v", code, err)
continue
}
synced++
}
return synced
}
func toString(v interface{}) string {
if v == nil {
return ""
}
switch val := v.(type) {
case string:
return val
default:
return fmt.Sprintf("%v", val)
}
}
// SyncProvider 从旺店通同步供应商数据到本地
func SyncProvider(db *gorm.DB, taskID int64) error {
cfg := config.AppConfig.Wangdian
if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" {
return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)")
}
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30
}
pageSize := 100
pageNo := 0
requestCount := 0
totalCount := 0
totalPages := 0
totalSynced := 0
for {
params := map[string]string{
"sid": cfg.Sid,
"appkey": cfg.AppKey,
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
"page_size": strconv.Itoa(pageSize),
"page_no": strconv.Itoa(pageNo),
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := cfg.GetURL(apiProviderQuery)
respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout)
requestCount++
if err != nil {
log.Printf("[旺店通供应商同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount)
break
}
var pageResp struct {
Code int `json:"code"`
Message string `json:"message"`
TotalCount int `json:"total_count"`
ProviderList []map[string]interface{} `json:"provider_list"`
}
if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil {
log.Printf("[旺店通供应商同步] 解析第%d页失败: %v", pageNo, err)
break
}
if pageResp.Code != 0 {
log.Printf("[旺店通供应商同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount)
break
}
if pageNo == 0 {
totalCount = pageResp.TotalCount
totalPages = (totalCount + pageSize - 1) / pageSize
}
n := upsertProviders(db, pageResp.ProviderList)
totalSynced += n
UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount)
log.Printf("[旺店通供应商同步] 第%d/%d页完成, 本页写入%d个供应商, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount)
if len(pageResp.ProviderList) < pageSize {
break
}
pageNo++
time.Sleep(500 * time.Millisecond)
}
log.Printf("[旺店通供应商同步] 同步结束, 共同步 %d/%d 个供应商", totalSynced, totalCount)
CompleteSyncTask(db, taskID)
return nil
}
func upsertProviders(db *gorm.DB, providers []map[string]interface{}) int {
now := time.Now().Unix()
synced := 0
for _, p := range providers {
code := toString(p["provider_no"])
if code == "" {
continue
}
var existing models.Supplier
err := db.Where("code = ? AND is_del = 0", code).First(&existing).Error
if err == nil {
continue
}
status := int8(1)
if toString(p["is_disabled"]) == "1" {
status = 0
}
supplier := models.Supplier{
Code: code,
Name: wangdianPrefix + toString(p["provider_name"]),
ContactPerson: toString(p["contact"]),
ContactPhone: toString(p["telno"]),
Address: toString(p["address"]),
Status: status,
CreatedAt: now,
UpdatedAt: now,
}
if err := db.Create(&supplier).Error; err != nil {
log.Printf("[旺店通供应商同步] 创建供应商 %s 失败: %v", code, err)
continue
}
synced++
}
return synced
}
type WangdianGoodsQueryResponse struct {
@ -229,11 +421,7 @@ func QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime
params["page_no"] = strconv.Itoa(pageNo)
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := wangdianURL(cfg.GoodsQuerySandbox, cfg.GoodsQueryURL)
if url == "" {
url = "https://api.wangdian.cn/openapi2/goods_query.php"
}
url := cfg.GetURL(apiGoodsQuery)
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30
@ -255,6 +443,243 @@ func QueryGoods(specNo, goodsNo, brandNo, className, barcode, startTime, endTime
return &resp, nil
}
// SyncGoods 从旺店通同步商品数据到本地
func SyncGoods(db *gorm.DB, taskID int64, startTime, endTime string, aboutID int64) error {
cfg := config.AppConfig.Wangdian
if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" {
return fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)")
}
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30
}
if startTime == "" || endTime == "" {
return fmt.Errorf("start_time 和 end_time 不能为空")
}
startT, err := time.Parse("2006-01-02 15:04:05", startTime)
if err != nil {
return fmt.Errorf("start_time 格式无效, 正确格式: 2006-01-02 15:04:05")
}
endT, err := time.Parse("2006-01-02 15:04:05", endTime)
if err != nil {
return fmt.Errorf("end_time 格式无效, 正确格式: 2006-01-02 15:04:05")
}
if endT.Sub(startT) > 30*24*time.Hour {
return fmt.Errorf("时间跨度不能超过30天")
}
pageSize := 100
pageNo := 0
requestCount := 0
totalCount := 0
totalPages := 0
totalSynced := 0
for {
params := map[string]string{
"sid": cfg.Sid,
"appkey": cfg.AppKey,
"timestamp": strconv.FormatInt(time.Now().Unix(), 10),
"page_size": strconv.Itoa(pageSize),
"page_no": strconv.Itoa(pageNo),
"start_time": startTime,
"end_time": endTime,
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := cfg.GetURL(apiGoodsQuery)
respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout)
requestCount++
if err != nil {
log.Printf("[旺店通商品同步] 查询第%d页失败(网络): %v, 已请求%d次", pageNo, err, requestCount)
break
}
var pageResp struct {
Code int `json:"code"`
Message string `json:"message"`
TotalCount int `json:"total_count"`
GoodsList []map[string]interface{} `json:"goods_list"`
}
if err := json.Unmarshal([]byte(respBody), &pageResp); err != nil {
log.Printf("[旺店通商品同步] 解析第%d页失败: %v", pageNo, err)
break
}
if pageResp.Code != 0 {
log.Printf("[旺店通商品同步] 第%d页返回错误(code=%d): %s, 已请求%d次", pageNo, pageResp.Code, pageResp.Message, requestCount)
break
}
if pageNo == 0 {
totalCount = pageResp.TotalCount
totalPages = (totalCount + pageSize - 1) / pageSize
}
n := upsertGoods(db, pageResp.GoodsList, aboutID)
totalSynced += n
UpdateSyncTaskProgress(db, taskID, totalSynced, totalCount)
log.Printf("[旺店通商品同步] 第%d/%d页完成, 本页写入%d个商品, 累计%d/%d, 已请求%d次", pageNo+1, totalPages, n, totalSynced, totalCount, requestCount)
if len(pageResp.GoodsList) < pageSize {
break
}
pageNo++
time.Sleep(500 * time.Millisecond)
}
log.Printf("[旺店通商品同步] 同步结束, 共同步 %d/%d 个商品", totalSynced, totalCount)
CompleteSyncTask(db, taskID)
return nil
}
func upsertGoods(db *gorm.DB, goodsList []map[string]interface{}, aboutID int64) int {
now := time.Now().Unix()
synced := 0
for _, g := range goodsList {
goodsName := toString(g["goods_name"])
if goodsName == "" {
continue
}
goodsPic := toString(g["pic_url"])
specList, ok := g["spec_list"].([]interface{})
if !ok || len(specList) == 0 {
barcode := toString(g["barcode"])
if barcode == "" {
continue
}
var existing models.Product
if err := db.Where("barcode = ? AND is_del = 0", barcode).First(&existing).Error; err == nil {
continue
}
product := models.Product{
AboutId: aboutID,
Name: wangdianPrefix + goodsName,
Barcode: barcode,
Status: 1,
LiveImage: goodsPicToJSON(goodsPic),
Price: parsePrice(g["lowest_price"]),
SalePrice: parsePrice(g["retail_price"]),
CreatedAt: now,
UpdatedAt: now,
}
if err := db.Create(&product).Error; err != nil {
log.Printf("[旺店通商品同步] 创建商品 %s 失败: %v", barcode, err)
continue
}
synced++
continue
}
for _, spec := range specList {
specMap, ok := spec.(map[string]interface{})
if !ok {
continue
}
barcode := toString(specMap["barcode"])
if barcode == "" {
barcode = toString(specMap["spec_no"])
if barcode == "" {
continue
}
}
var existing models.Product
if err := db.Where("barcode = ? AND is_del = 0", barcode).First(&existing).Error; err == nil {
continue
}
specPic := toString(specMap["pic_url"])
if specPic == "" {
specPic = goodsPic
}
product := models.Product{
AboutId: aboutID,
Name: wangdianPrefix + goodsName,
Barcode: barcode,
Status: 1,
LiveImage: goodsPicToJSON(specPic),
Price: parsePrice(specMap["lowest_price"]),
SalePrice: parsePrice(specMap["retail_price"]),
CreatedAt: now,
UpdatedAt: now,
}
if err := db.Create(&product).Error; err != nil {
log.Printf("[旺店通商品同步] 创建商品 %s 失败: %v", barcode, err)
continue
}
synced++
}
}
return synced
}
func goodsPicToJSON(picURL string) datatypes.JSON {
if picURL == "" {
return datatypes.JSON([]byte("[]"))
}
return datatypes.JSON([]byte(`["` + picURL + `"]`))
}
func parsePrice(v interface{}) int64 {
s := toString(v)
if s == "" {
return 0
}
f, err := strconv.ParseFloat(s, 64)
if err != nil {
return 0
}
return int64(f * 100)
}
// PushGoods 推送货品档案到旺店通
func PushGoods(goodsListJSON string) (*WangdianGoodsPushResponse, error) {
cfg := config.AppConfig.Wangdian
if cfg.Sid == "" || cfg.AppKey == "" || cfg.AppSecret == "" {
return nil, fmt.Errorf("旺店通接口配置不完整(sid/appkey/appsecret)")
}
timestamp := time.Now().Unix()
params := map[string]string{
"sid": cfg.Sid,
"appkey": cfg.AppKey,
"timestamp": strconv.FormatInt(timestamp, 10),
"goods_list": goodsListJSON,
}
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
url := cfg.GetURL(apiGoodsPush)
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30
}
log.Printf("[旺店通货品推送] 请求URL: %s (sandbox=%v)", url, cfg.Sandbox)
respBody, err := utils.SubmitFormDataWithTimeout(url, params, timeout)
if err != nil {
return nil, fmt.Errorf("推送请求失败: %v", err)
}
log.Printf("[旺店通货品推送] 返回信息: %s", string(respBody))
var resp WangdianGoodsPushResponse
if err := json.Unmarshal([]byte(respBody), &resp); err != nil {
return nil, fmt.Errorf("解析响应失败(raw=%s): %v", respBody, err)
}
return &resp, nil
}
func PushPurchaseOrder(purchaseOrderID int64, db ...*gorm.DB) (*WangdianPurchasePushResponse, error) {
databaseConn := database.OptionalDB(db...)
@ -359,10 +784,7 @@ func PushPurchaseOrder(purchaseOrderID int64, db ...*gorm.DB) (*WangdianPurchase
params["sign"] = utils.WangdianSign(params, cfg.AppSecret)
// 9. 提交请求
url := wangdianURL(cfg.SandboxURL, cfg.URL)
if url == "" {
url = "https://api.wangdian.cn/openapi2/purchase_order_push.php"
}
url := cfg.GetURL(apiPurchasePush)
timeout := cfg.Timeout
if timeout <= 0 {
timeout = 30

View File

@ -0,0 +1,194 @@
package service
import (
"errors"
"log"
"psi/models"
"time"
"gorm.io/gorm"
)
const (
TaskTypePurchasePush = "purchase_push"
TaskTypeSyncProvider = "sync_provider"
TaskTypeSyncWarehouse = "sync_warehouse"
TaskTypeSyncGoods = "sync_goods"
TaskStatusRunning = "running"
TaskStatusCompleted = "completed"
TaskStatusCancelled = "cancelled"
TaskStatusFailed = "failed"
)
// EnsureNoRunningTask 检查是否存在该类型的运行中任务,存在则返回错误
func EnsureNoRunningTask(db *gorm.DB, taskType string) error {
var count int64
db.Model(&models.WangdianSyncTask{}).
Where("task_type = ? AND status = ?", taskType, TaskStatusRunning).
Count(&count)
if count > 0 {
return errors.New("该类型的同步任务正在执行中,请等待完成后重试")
}
return nil
}
// CreateSyncTask 创建同步任务
func CreateSyncTask(db *gorm.DB, taskType string) *models.WangdianSyncTask {
now := time.Now().Unix()
task := &models.WangdianSyncTask{
TaskType: taskType,
Status: TaskStatusRunning,
Progress: 0,
Total: 0,
StartedAt: now,
CreatedAt: now,
UpdatedAt: now,
}
db.Create(task)
return task
}
// UpdateSyncTaskProgress 更新任务进度
func UpdateSyncTaskProgress(db *gorm.DB, taskID int64, progress, total int) {
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"progress": progress,
"total": total,
"updated_at": time.Now().Unix(),
})
}
// CompleteSyncTask 完成任务(成功)
func CompleteSyncTask(db *gorm.DB, taskID int64) {
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusCompleted,
"finished_at": now,
"updated_at": now,
})
}
// FailSyncTask 完成任务(失败)
func FailSyncTask(db *gorm.DB, taskID int64, errMsg string) {
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusFailed,
"error_msg": errMsg,
"finished_at": now,
"updated_at": now,
})
}
// CancelSyncTask 取消任务
func CancelSyncTask(db *gorm.DB, taskID int64) error {
var task models.WangdianSyncTask
if err := db.Where("id = ?", taskID).First(&task).Error; err != nil {
return err
}
if task.Status != TaskStatusRunning {
return errors.New("只能取消运行中的任务")
}
now := time.Now().Unix()
db.Model(&models.WangdianSyncTask{}).
Where("id = ?", taskID).
Updates(map[string]interface{}{
"status": TaskStatusCancelled,
"finished_at": now,
"updated_at": now,
})
return nil
}
// GetSyncTaskStatus 获取任务状态
func GetSyncTaskStatus(db *gorm.DB, taskType string) (*models.WangdianSyncTask, error) {
var task models.WangdianSyncTask
err := db.Where("task_type = ?", taskType).
Order("id DESC").First(&task).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}
return &task, nil
}
// WrapSyncTask 包装同步任务:检查→创建→执行→完成/失败返回初始任务ID
// fn 传入 taskID 和 db由外部执行实际同步逻辑
func WrapSyncTask(taskType string, db *gorm.DB, fn func(taskID int64, db *gorm.DB)) (int64, error) {
if err := EnsureNoRunningTask(db, taskType); err != nil {
return 0, err
}
task := CreateSyncTask(db, taskType)
go func() {
defer func() {
if r := recover(); r != nil {
errMsg := "panic"
if err, ok := r.(error); ok {
errMsg = err.Error()
}
log.Printf("[旺店通任务] 任务%d(%s) panic: %v", task.ID, taskType, r)
FailSyncTask(db, task.ID, errMsg)
}
}()
fn(task.ID, db)
}()
return task.ID, nil
}
// ResetRunningTasks 重置所有运行中的任务为失败状态(用于服务重启后清理)
func ResetRunningTasks(db *gorm.DB) {
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
return
}
now := time.Now().Unix()
result := db.Model(&models.WangdianSyncTask{}).
Where("status = ?", TaskStatusRunning).
Updates(map[string]interface{}{
"status": TaskStatusFailed,
"error_msg": "服务重启,任务已重置",
"finished_at": now,
"updated_at": now,
})
if result.Error == nil && result.RowsAffected > 0 {
log.Printf("[旺店通任务] 服务重启, 已重置 %d 个运行中的任务为失败状态", result.RowsAffected)
}
}
// ListSyncTask 获取同步任务列表
func ListSyncTask(db *gorm.DB, taskType string, pageNo, pageSize int) ([]models.WangdianSyncTask, int64, error) {
var tasks []models.WangdianSyncTask
var total int64
query := db.Model(&models.WangdianSyncTask{})
if taskType != "" {
query = query.Where("task_type = ?", taskType)
}
if err := query.Count(&total).Error; err != nil {
return nil, 0, err
}
if pageNo <= 0 {
pageNo = 1
}
if pageSize <= 0 {
pageSize = 20
}
offset := (pageNo - 1) * pageSize
if err := query.Order("id DESC").Offset(offset).Limit(pageSize).Find(&tasks).Error; err != nil {
return nil, 0, err
}
return tasks, total, nil
}