package main import ( "batch/util/dbConnectUtil" "context" "database/sql" "encoding/json" "fmt" "log" "math" "os" "os/signal" "reflect" "strconv" "strings" "sync" "syscall" "time" "github.com/go-redis/redis/v8" _ "github.com/go-sql-driver/mysql" ) // ShopMsg 店铺信息结构体 type ShopMsg struct { ID int64 `json:"id"` ShopAliasName string `json:"shop_alias_name"` ShopName string `json:"shop_name"` Token string `json:"token"` GoodsNamePrefix string `json:"goods_name_prefix"` GoodsNameSuffix string `json:"goods_name_suffix"` TitleConsistOf string `json:"title_consist_of"` SpaceCharacter string `json:"space_character"` WatermarkImgUrl string `json:"watermark_img_url"` CarouseLastImgUrlArray []string `json:"carouse_last_img_url_array"` GoodsDetailFirstImgUrlArray []string `json:"goods_detail_first_img_url_array"` GoodsDetailLastImgUrlArray []string `json:"goods_detail_last_img_url_array"` SpecName string `json:"spec_name"` SpecId int64 `json:"spec_id"` SpecChildName string `json:"spec_child_name"` IsFolt bool `json:"is_fotl"` IsPreSale bool `json:"is_pre_sale"` IsRefundable bool `json:"is_refundable"` IsSecondHand bool `json:"is_second_hand"` ShipmentLimitSecond int64 `json:"shipment_limit_second"` CostTemplateId int64 `json:"cost_template_id"` DefStock int64 `json:"def_stock"` TwoDiscount int64 `json:"two_discount"` } // 店铺主表数据结构 type ShopMainData struct { Id int64 `json:"id"` ShopAliasName sql.NullString `json:"shop_alias_name"` ShopName sql.NullString `json:"shop_name"` Token sql.NullString `json:"token"` } // 店铺详情数据结构 type ShopDetailData struct { Id int64 `json:"id"` ShopId int64 `json:"shop_id"` TemplateId *int64 `json:"template_id"` TitlePrefix sql.NullString `json:"title_prefix"` TitleSuffix sql.NullString `json:"title_suffix"` TitleConsistOf sql.NullString `json:"title_consist_of"` SpaceCharacter sql.NullString `json:"space_character"` StockDeff *int64 `json:"stock_deff"` TwoDiscount *int64 `json:"two_discount"` Presale sql.NullString `json:"presale"` Fake sql.NullString `json:"fake"` SevenDays sql.NullString `json:"seven_days"` IsSecondHand sql.NullString `json:"is_second_hand"` DeliveryTime sql.NullString `json:"delivery_time"` } // 规格数据结构 type SpecData struct { Id int64 `json:"id"` ShopId int64 `json:"shop_id"` SpecName sql.NullString `json:"spec_name"` SpecPrefix sql.NullString `json:"spec_prefix"` } // 图片数据结构 type ShopImageData struct { Id int64 `json:"id"` Pid int64 `json:"pid"` Type string `json:"type"` AbsolutePath string `json:"absolute_path"` } // 嵌套的JSON数据结构 type ShopDataRecord struct { SourceTable string `json:"source_table"` Data map[string]interface{} `json:"data"` } // 并发处理结果 type ShopProcessResult struct { ShopID int64 Count int Err error } // normalizeFieldValue 根据字段名和表名对值进行标准化: // - 特定表特定字段保持 int64 数字:t_shop_context.id, t_spec.id, t_price_template.id, t_shop_detail.district_id, 以及 mall_id // - 其他 id / *_id / create_by / update_by 字段 → 字符串 // - create_time/update_time/add_time/expiration_time → 格式化为 "2006-01-02 15:04:05"(无时区) // - 其他字段:[]byte → string func normalizeFieldValue(colName string, val interface{}, tableName string) interface{} { if val == nil { return nil } // 特殊处理:保持 int64 数字的字段(不转为字符串) // 1. mall_id 保持数字 // 2. t_shop_context.id // 3. t_spec.id // 4. t_price_template.id // 5. t_shop_detail.district_id if colName == "mall_id" { return ensureInt64(val) } if tableName == "t_shop_detail" && colName == "district_id" { return ensureInt64(val) } // 1) ID 类字段(除上述保留数字的字段外):转为字符串 if colName == "id" || colName == "create_by" || colName == "update_by" || strings.HasSuffix(colName, "_id") { switch v := val.(type) { case string: return v case []byte: return string(v) case int, int8, int16, int32, int64: return strconv.FormatInt(reflect.ValueOf(v).Int(), 10) case uint, uint8, uint16, uint32, uint64: return strconv.FormatUint(reflect.ValueOf(v).Uint(), 10) case float32, float64: f := reflect.ValueOf(v).Float() return strconv.FormatInt(int64(f), 10) default: return fmt.Sprint(v) } } // 2) 时间字段:格式化为 "2006-01-02 15:04:05" if colName == "create_time" || colName == "update_time" || colName == "add_time" || colName == "expiration_time" { var t time.Time switch v := val.(type) { case time.Time: t = v case []byte: s := string(v) if s == "" { return nil } t = parseTime(s) case string: if v == "" { return nil } t = parseTime(v) default: return val } if t.IsZero() { return nil } return t.Format("2006-01-02 15:04:05") } // 3) 其他字段:[]byte 转为 string switch v := val.(type) { case []byte: return string(v) default: return v } } // ensureInt64 将各种类型转换为 int64,用于保持数字的字段 func ensureInt64(val interface{}) interface{} { switch v := val.(type) { case int64: return v case int: return int64(v) case int32: return int64(v) case float64: return int64(v) case string: if i, err := strconv.ParseInt(v, 10, 64); err == nil { return i } return v case []byte: s := string(v) if i, err := strconv.ParseInt(s, 10, 64); err == nil { return i } return s default: return val } } // parseTime 尝试多种布局解析时间字符串 func parseTime(s string) time.Time { layouts := []string{ "2006-01-02 15:04:05", "2006-01-02T15:04:05Z", "2006-01-02T15:04:05", "2006-01-02 15:04:05.999999", "2006-01-02", } for _, layout := range layouts { if t, err := time.Parse(layout, s); err == nil { return t } } return time.Time{} } // 查询店铺主表数据 func queryShopMainData(db *sql.DB, shopID int64) (ShopMainData, error) { query := `SELECT id, shop_alias_name, shop_name, token FROM t_shop WHERE id = ? AND del_flag = '0'` row := db.QueryRow(query, shopID) var data ShopMainData err := row.Scan(&data.Id, &data.ShopAliasName, &data.ShopName, &data.Token) if err != nil { return data, err } return data, nil } // 查询店铺详情数据 func queryShopDetailData(db *sql.DB, shopID int64) ([]ShopDetailData, error) { query := ` SELECT id, shop_id, template_id, title_prefix, title_suffix, title_consist_of, space_character, stock_deff, two_discount, presale, fake, seven_days, is_second_hand, delivery_time FROM t_shop_detail WHERE shop_id = ? AND del_flag = '0' ` rows, err := db.Query(query, shopID) if err != nil { return nil, err } defer rows.Close() var data []ShopDetailData for rows.Next() { var item ShopDetailData err := rows.Scan( &item.Id, &item.ShopId, &item.TemplateId, &item.TitlePrefix, &item.TitleSuffix, &item.TitleConsistOf, &item.SpaceCharacter, &item.StockDeff, &item.TwoDiscount, &item.Presale, &item.Fake, &item.SevenDays, &item.IsSecondHand, &item.DeliveryTime, ) if err != nil { return nil, err } data = append(data, item) } return data, nil } // 查询规格数据 func querySpecData(db *sql.DB, shopID int64) ([]SpecData, error) { query := `SELECT id, shop_id, spec_name, spec_prefix FROM t_spec WHERE shop_id = ? AND del_flag = '0'` rows, err := db.Query(query, shopID) if err != nil { return nil, err } defer rows.Close() var data []SpecData for rows.Next() { var item SpecData err := rows.Scan(&item.Id, &item.ShopId, &item.SpecName, &item.SpecPrefix) if err != nil { return nil, err } data = append(data, item) } return data, nil } // 查询图片数据 func queryShopImageData(db *sql.DB, shopID int64) ([]ShopImageData, error) { query := `SELECT id, pid, type, absolute_path FROM t_shop_img WHERE pid = ? AND del_flag = '0'` rows, err := db.Query(query, shopID) if err != nil { return nil, err } defer rows.Close() var data []ShopImageData for rows.Next() { var item ShopImageData err := rows.Scan(&item.Id, &item.Pid, &item.Type, &item.AbsolutePath) if err != nil { return nil, err } data = append(data, item) } return data, nil } func main() { // 配置数据库连接 db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306) if err != nil { log.Fatal("数据库连接失败:", err) } defer db.Close() // 【优化1】数据库连接池配置 - 降低并发避免瞬时带宽压力 db.SetMaxOpenConns(10) // 最大打开连接数(降低到10) db.SetMaxIdleConns(5) // 最大空闲连接数(降低到5) db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期 db.SetConnMaxIdleTime(10 * time.Minute) // 空闲连接最大存活时间 // 测试数据库连接 if err := db.Ping(); err != nil { log.Fatal("数据库ping失败:", err) } // Redis连接 - 降低连接池大小 rdb := redis.NewClient(&redis.Options{ Addr: "36.212.12.247:6379", Password: "long6166@@", DB: 8, PoolSize: 10, // 连接池大小(降低到10) MinIdleConns: 5, // 最小空闲连接数(降低到5) }) // 测试Redis连接 ctx := context.Background() if _, err := rdb.Ping(ctx).Result(); err != nil { log.Fatal("Redis连接失败:", err) } // 【新增】设置信号捕获,优雅退出 stopChan := make(chan os.Signal, 1) signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM) // 【新增】循环运行,每次间隔5分钟 runCount := 0 for { runCount++ startTime := time.Now() fmt.Printf("\n%s\n", strings.Repeat("=", 60)) fmt.Printf("第 %d 次运行开始 - %s\n", runCount, startTime.Format("2006-01-02 15:04:05")) fmt.Printf("%s\n", strings.Repeat("=", 60)) // 执行数据迁移 if err := migrateDataOptimized(db, rdb); err != nil { log.Printf("数据迁移失败: %v", err) } elapsed := time.Since(startTime) fmt.Printf("\n第 %d 次运行完成,耗时: %v\n", runCount, elapsed) fmt.Printf("下次运行时间: %s\n", time.Now().Add(5*time.Minute).Format("2006-01-02 15:04:05")) // 等待间隔时间,但可以提前退出 // 根据运行耗时动态调整间隔 var waitInterval time.Duration if elapsed < 3*time.Minute { waitInterval = 5 * time.Minute } else if elapsed < 6*time.Minute { waitInterval = 8 * time.Minute } else { waitInterval = 10 * time.Minute } fmt.Printf("\n本次耗时: %v, 下次运行间隔: %v (按 Ctrl+C 退出)...\n", elapsed, waitInterval) select { case <-stopChan: fmt.Printf("\n\n收到退出信号,程序即将退出...\n") fmt.Printf("总共运行了 %d 次\n", runCount) return case <-time.After(waitInterval): // 继续下一次循环 } } } // 【优化版本】并发处理数据迁移 func migrateDataOptimized(db *sql.DB, rdb *redis.Client) error { ctx := context.Background() fmt.Println("开始查询店铺数据...") // 第一步:查询所有店铺ID shopQuery := ` SELECT DISTINCT id FROM t_shop WHERE id IS NOT NULL ` shopRows, err := db.Query(shopQuery) if err != nil { return fmt.Errorf("查询店铺失败: %v", err) } defer shopRows.Close() var shopIDs []int64 for shopRows.Next() { var shopID int64 if err := shopRows.Scan(&shopID); err != nil { return fmt.Errorf("扫描店铺数据失败: %v", err) } shopIDs = append(shopIDs, shopID) } fmt.Printf("找到 %d 个店铺\n", len(shopIDs)) // 【优化2】使用Worker Pool并发处理 - 降低并发避免带宽压力 workers := 3 // 并发worker数量(降低到3,减少瞬时压力) jobs := make(chan int64, len(shopIDs)) results := make(chan ShopProcessResult, len(shopIDs)) var wg sync.WaitGroup // 启动worker for w := 0; w < workers; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for shopID := range jobs { count, err := processShop(db, rdb, ctx, shopID, workerID) results <- ShopProcessResult{ ShopID: shopID, Count: count, Err: err, } // 【新增】添加处理延迟,降低瞬时带宽压力 time.Sleep(100 * time.Millisecond) // 每个店铺处理完后休息100ms } }(w) } // 发送任务 go func() { for _, id := range shopIDs { jobs <- id } close(jobs) }() // 等待所有worker完成 go func() { wg.Wait() close(results) }() // 收集结果并统计 successCount := 0 failCount := 0 totalRecords := 0 for result := range results { if result.Err != nil { failCount++ log.Printf("[失败] 店铺 %d: %v", result.ShopID, result.Err) } else { successCount++ totalRecords += result.Count if successCount%10 == 0 { fmt.Printf("[进度] 已完成: %d/%d, 总记录: %d\n", successCount, len(shopIDs), totalRecords) } } } fmt.Printf("\n========== 迁移完成 ==========\n") fmt.Printf("成功: %d 店铺\n", successCount) fmt.Printf("失败: %d 店铺\n", failCount) fmt.Printf("总记录数: %d\n", totalRecords) return nil } // processShop 处理单个店铺的所有数据 func processShop(db *sql.DB, rdb *redis.Client, ctx context.Context, shopID int64, workerID int) (int, error) { // 收集该店铺的所有数据 var allRecords []ShopDataRecord // 1. 查询店铺主表数据 shopMainData, err := queryTableToShopData(db, "t_shop", ` SELECT * FROM t_shop WHERE id = ? AND del_flag = '0' `, shopID) if err != nil { return 0, fmt.Errorf("查询店铺主表失败: %v", err) } allRecords = append(allRecords, shopMainData...) // 2. 查询该店铺的所有店铺详情数据 shopDetails, err := queryTableToShopData(db, "t_shop_detail", ` SELECT * FROM t_shop_detail WHERE shop_id = ? AND del_flag = '0' `, shopID) if err != nil { return 0, fmt.Errorf("查询店铺详情失败: %v", err) } if len(shopDetails) == 0 { return 0, nil // 没有详情数据,跳过 } // 查询图片数据以获取watermark_img_url shopImgDataList, err := queryShopImageData(db, shopID) if err != nil { log.Printf("[Worker-%d] 警告: 查询图片数据失败 (店铺 %d): %v", workerID, shopID, err) } // 处理图片数据 var watermarkImgUrl string var skuWatermarkImgUrl string var carouseLastImgUrlArray = []string{} var goodsDetailFirstImgUrlArray = []string{} var goodsDetailLastImgUrlArray = []string{} if err == nil { for _, img := range shopImgDataList { if img.AbsolutePath == "" { continue } switch img.Type { case "1": watermarkImgUrl = img.AbsolutePath case "3": goodsDetailFirstImgUrlArray = append(goodsDetailFirstImgUrlArray, img.AbsolutePath) case "4": goodsDetailLastImgUrlArray = append(goodsDetailLastImgUrlArray, img.AbsolutePath) case "5": carouseLastImgUrlArray = append(carouseLastImgUrlArray, img.AbsolutePath) case "7": skuWatermarkImgUrl = img.AbsolutePath } } } // 将watermark_img_url等添加到shopdetail记录中 for i := range shopDetails { if shopDetails[i].Data == nil { shopDetails[i].Data = make(map[string]interface{}) } shopDetails[i].Data["watermark_img_url"] = watermarkImgUrl shopDetails[i].Data["carouse_last_img_url_array"] = carouseLastImgUrlArray shopDetails[i].Data["goods_detail_first_img_url_array"] = goodsDetailFirstImgUrlArray shopDetails[i].Data["goods_detail_last_img_url_array"] = goodsDetailLastImgUrlArray shopDetails[i].Data["sku_watermark_img_url"] = skuWatermarkImgUrl } allRecords = append(allRecords, shopDetails...) // 3. 查询店铺上下文数据 shopContextData, err := queryTableToShopData(db, "t_shop_context", ` SELECT * FROM t_shop_context WHERE shop_id = ? AND del_flag = '0' `, shopID) if err != nil { log.Printf("[Worker-%d] 警告: 查询店铺上下文数据失败 (店铺 %d): %v", workerID, shopID, err) } allRecords = append(allRecords, shopContextData...) // 4. 查询规格设置数据 specData, err := queryTableToShopData(db, "t_spec", ` SELECT * FROM t_spec WHERE shop_id = ? AND del_flag = '0' `, shopID) if err != nil { log.Printf("[Worker-%d] 警告: 查询规格设置数据失败 (店铺 %d): %v", workerID, shopID, err) } allRecords = append(allRecords, specData...) // 5. 从第一条店铺详情记录中获取 sale_template_id var saleTemplateID int64 if firstDetail := shopDetails[0].Data; firstDetail != nil { if v, ok := firstDetail["sale_template_id"]; ok && v != nil { switch val := v.(type) { case string: if val != "" && val != "0" && val != "null" { if id, err := strconv.ParseInt(val, 10, 64); err == nil && id > 0 { saleTemplateID = id } } case int64: if val > 0 { saleTemplateID = val } case float64: if val > 0 { saleTemplateID = int64(val) } } } } // 6. 根据 sale_template_id 查询价格模板 if saleTemplateID > 0 { priceTemplates, err := queryPriceTemplateData(db, saleTemplateID) if err != nil { log.Printf("[Worker-%d] 警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v", workerID, shopID, saleTemplateID, err) } else { allRecords = append(allRecords, priceTemplates...) } } // 【优化3】批量写入Redis if err := saveToRedisBatch(rdb, ctx, shopID, allRecords); err != nil { return 0, fmt.Errorf("Redis写入失败: %v", err) } return len(allRecords), nil } // 【优化3】使用Pipeline批量写入Redis func saveToRedisBatch(rdb *redis.Client, ctx context.Context, shopID int64, records []ShopDataRecord) error { if len(records) == 0 { return nil } redisKey := strconv.FormatInt(shopID, 10) // 使用Pipeline批量执行 pipe := rdb.Pipeline() // 删除旧key pipe.Del(ctx, redisKey) // 批量添加数据 for _, record := range records { jsonBytes, err := json.Marshal(record) if err != nil { log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err) continue } pipe.RPush(ctx, redisKey, jsonBytes) } // 一次性执行所有命令 _, err := pipe.Exec(ctx) return err } // 查询价格模板数据,特殊处理 range_price 和 add_amount 字段 func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) { query := `SELECT * FROM t_price_template WHERE id = ?` rows, err := db.Query(query, templateID) if err != nil { return nil, err } defer rows.Close() columns, err := rows.Columns() if err != nil { return nil, err } var records []ShopDataRecord values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { for i := range columns { valuePtrs[i] = &values[i] } if err := rows.Scan(valuePtrs...); err != nil { return nil, err } dataMap := make(map[string]interface{}) for i, col := range columns { snakeCol := camelToSnake(col) if snakeCol == "range_price" { processedVal, err := processRangePriceField(values[i]) if err != nil { log.Printf("警告: 处理 range_price 字段失败: %v", err) processedVal = values[i] } dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template") continue } if snakeCol == "add_amount" { processedVal, err := processAddAmountField(values[i]) if err != nil { log.Printf("警告: 处理 add_amount 字段失败: %v", err) processedVal = values[i] } dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template") continue } dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], "t_price_template") } records = append(records, ShopDataRecord{ SourceTable: "t_price_template", Data: dataMap, }) } return records, nil } func processAddAmountField(val interface{}) (interface{}, error) { if val == nil { return nil, nil } var f float64 switch v := val.(type) { case float64: f = v case float32: f = float64(v) case int: f = float64(v) case int64: f = float64(v) case int32: f = float64(v) case string: if v == "" { return nil, nil } parsed, err := strconv.ParseFloat(v, 64) if err != nil { return nil, fmt.Errorf("无法解析字符串为浮点数: %v", err) } f = parsed case []byte: s := string(v) if s == "" { return nil, nil } parsed, err := strconv.ParseFloat(s, 64) if err != nil { return nil, fmt.Errorf("无法解析字节数组为浮点数: %v", err) } f = parsed default: return nil, fmt.Errorf("不支持的类型: %T", v) } scaled := math.Round(f * 100) return int64(scaled), nil } func processRangePriceField(val interface{}) (interface{}, error) { if val == nil { return nil, nil } var jsonStr string switch v := val.(type) { case string: jsonStr = v case []byte: jsonStr = string(v) default: return val, nil } if jsonStr == "" { return jsonStr, nil } var rawItems []map[string]interface{} if err := json.Unmarshal([]byte(jsonStr), &rawItems); err != nil { return jsonStr, fmt.Errorf("JSON解析失败: %v", err) } fixedRanges := make([]map[string]interface{}, len(rawItems)) for i, item := range rawItems { fixedItem := make(map[string]interface{}) if minPrice, ok := item["minPrice"]; ok { fixedItem["minPrice"] = convertToFloat64(minPrice) } if maxPrice, ok := item["maxPrice"]; ok { fixedItem["maxPrice"] = convertToFloat64(maxPrice) } if adjustPercent, ok := item["adjustPercent"]; ok { fixedItem["adjustPercent"] = convertToFloat64(adjustPercent) } if adjustAmount, ok := item["adjustAmount"]; ok { fixedItem["adjustAmount"] = convertToFloat64(adjustAmount) } fixedRanges[i] = fixedItem } fixedJSON, err := json.Marshal(fixedRanges) if err != nil { return jsonStr, fmt.Errorf("JSON编码失败: %v", err) } return string(fixedJSON), nil } func convertToFloat64(v interface{}) float64 { switch val := v.(type) { case float64: return fixFloatToInt(val) case float32: return fixFloatToInt(float64(val)) case int: return float64(val) case int64: return float64(val) case int32: return float64(val) case string: if f, err := strconv.ParseFloat(val, 64); err == nil { return fixFloatToInt(f) } return 0 case bool: if val { return 1 } return 0 default: return 0 } } func fixFloatToInt(value float64) float64 { return math.Floor(value) } func camelToSnake(s string) string { var result []rune for i, r := range s { if i > 0 && r >= 'A' && r <= 'Z' { result = append(result, '_') } result = append(result, r) } return strings.ToLower(string(result)) } // 通用表查询函数,自动应用字段标准化 func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) { rows, err := db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() columns, err := rows.Columns() if err != nil { return nil, err } var records []ShopDataRecord values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { for i := range columns { valuePtrs[i] = &values[i] } if err := rows.Scan(valuePtrs...); err != nil { return nil, err } dataMap := make(map[string]interface{}) for i, col := range columns { snakeCol := camelToSnake(col) dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], tableName) } records = append(records, ShopDataRecord{ SourceTable: tableName, Data: dataMap, }) } return records, nil } // 保留原有函数(如需使用) func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) { rows, err := db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() columns, err := rows.Columns() if err != nil { return nil, err } var results []map[string]interface{} values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { for i := range columns { valuePtrs[i] = &values[i] } if err := rows.Scan(valuePtrs...); err != nil { return nil, err } rowMap := make(map[string]interface{}) for i, col := range columns { switch v := values[i].(type) { case []byte: rowMap[col] = string(v) default: rowMap[col] = v } } results = append(results, rowMap) } return results, nil }