package main import ( "batch/util/dbConnectUtil" "context" "database/sql" "encoding/json" "fmt" "log" "math" "strconv" "strings" "github.com/go-redis/redis/v8" _ "github.com/go-sql-driver/mysql" ) // 嵌套的JSON数据结构 type ShopDataRecord struct { SourceTable string `json:"source_table"` Data map[string]interface{} `json:"data"` } // 价格范围项结构 type PriceRangeItem struct { MinPrice interface{} `json:"minPrice"` // 可以是数字或字符串 MaxPrice interface{} `json:"maxPrice"` // 可以是数字或字符串 AdjustPercent interface{} `json:"adjustPercent"` // 可以是数字或字符串 AdjustAmount interface{} `json:"adjustAmount"` // 可以是数字或字符串 } // 数据库配置 func main() { // 配置数据库连接 db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306) if err != nil { log.Fatal("数据库连接失败:", err) } defer db.Close() // 测试数据库连接 if err := db.Ping(); err != nil { log.Fatal("数据库ping失败:", err) } // Redis连接 rdb := redis.NewClient(&redis.Options{ Addr: "36.212.20.113:7963", Password: "j8nZ4jra2E", DB: 7, }) // 测试Redis连接 ctx := context.Background() if _, err := rdb.Ping(ctx).Result(); err != nil { log.Fatal("Redis连接失败:", err) } // 执行数据迁移 if err := migrateData(db, rdb); err != nil { log.Fatal("数据迁移失败:", err) } fmt.Println("数据迁移完成!") } func migrateData(db *sql.DB, rdb *redis.Client) error { ctx := context.Background() fmt.Println("开始查询店铺数据...") // 第一步:查询所有店铺ID shopQuery := ` SELECT DISTINCT shop_id FROM t_shop_detail WHERE shop_id IS NOT NULL AND del_flag = 0 ` shopRows, err := db.Query(shopQuery) if err != nil { return fmt.Errorf("查询店铺失败: %v", err) } defer shopRows.Close() var shopIDs []int64 for shopRows.Next() { var shopID int64 if err := shopRows.Scan(&shopID); err != nil { return fmt.Errorf("扫描店铺数据失败: %v", err) } shopIDs = append(shopIDs, shopID) } fmt.Printf("找到 %d 个店铺\n", len(shopIDs)) // 第二步:为每个店铺收集数据并存储到Redis for i, shopID := range shopIDs { fmt.Printf("[%d/%d] 处理店铺: %d\n", i+1, len(shopIDs), shopID) // 收集该店铺的所有数据 var allRecords []ShopDataRecord // 1. 尝试查询店铺主表数据 shopMainData, err := queryTableToShopData(db, "t_shop", ` SELECT * FROM t_shop WHERE id = ? `, shopID) if err != nil { log.Printf("警告: 查询店铺主表失败 (店铺 %d): %v", shopID, err) } else if len(shopMainData) > 0 { allRecords = append(allRecords, shopMainData...) fmt.Printf(" 找到店铺主表数据\n") } else { fmt.Printf(" 未找到店铺主表数据 (id=%d)\n", shopID) } // 2. 查询该店铺的所有店铺详情数据 shopDetails, err := queryTableToShopData(db, "t_shop_detail", ` SELECT * FROM t_shop_detail WHERE shop_id = ? `, shopID) if err != nil { log.Printf("警告: 查询店铺详情失败 (店铺 %d): %v", shopID, err) continue } allRecords = append(allRecords, shopDetails...) if len(shopDetails) == 0 { fmt.Printf(" 店铺 %d 没有详情数据,跳过\n", shopID) continue } // 3. 尝试查询店铺上下文数据(商品描述) shopContextData, err := queryTableToShopData(db, "t_shop_context", ` SELECT * FROM t_shop_context WHERE shop_id = ? `, shopID) if err != nil { log.Printf("警告: 查询店铺上下文数据失败 (店铺 %d): %v", shopID, err) } else if len(shopContextData) > 0 { allRecords = append(allRecords, shopContextData...) fmt.Printf(" 找到店铺上下文数据 (%d条)\n", len(shopContextData)) } else { fmt.Printf(" 未找到店铺上下文数据\n") } // 4. 尝试查询规格设置数据 specData, err := queryTableToShopData(db, "t_spec", ` SELECT * FROM t_spec WHERE shop_id = ? `, shopID) if err != nil { log.Printf("警告: 查询规格设置数据失败 (店铺 %d): %v", shopID, err) } else if len(specData) > 0 { allRecords = append(allRecords, specData...) fmt.Printf(" 找到规格设置数据 (%d条)\n", len(specData)) } else { fmt.Printf(" 未找到规格设置数据\n") } // 5. 从第一条店铺详情记录中获取 sale_template_id var saleTemplateID sql.NullInt64 if shopDetails[0].Data["saleTemplateId"] != nil { switch v := shopDetails[0].Data["saleTemplateId"].(type) { case string: if v != "" && v != "0" && v != "null" { if id, err := strconv.ParseInt(v, 10, 64); err == nil { saleTemplateID = sql.NullInt64{Int64: id, Valid: true} } } case int64: if v > 0 { saleTemplateID = sql.NullInt64{Int64: v, Valid: true} } case float64: if v > 0 { saleTemplateID = sql.NullInt64{Int64: int64(v), Valid: true} } case []byte: strVal := string(v) if strVal != "" && strVal != "0" && strVal != "null" { if id, err := strconv.ParseInt(strVal, 10, 64); err == nil { saleTemplateID = sql.NullInt64{Int64: id, Valid: true} } } } } // 6. 根据 sale_template_id 查询价格模板 if saleTemplateID.Valid && saleTemplateID.Int64 > 0 { priceTemplates, err := queryPriceTemplateData(db, saleTemplateID.Int64) if err != nil { log.Printf("警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v", shopID, saleTemplateID.Int64, err) } else if len(priceTemplates) == 0 { fmt.Printf(" 警告: 店铺 %d 的 sale_template_id %d 没有对应的价格模板\n", shopID, saleTemplateID.Int64) } else { allRecords = append(allRecords, priceTemplates...) } } else { fmt.Printf(" 店铺 %d 没有有效的 sale_template_id\n", shopID) } // 将数据存储到Redis List redisKey := strconv.FormatInt(shopID, 10) // 删除旧的Key rdb.Del(ctx, redisKey) // 添加每条数据到List for _, record := range allRecords { // 转换为JSON字符串 jsonBytes, err := json.Marshal(record) if err != nil { log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err) continue } // 添加到Redis List if err := rdb.RPush(ctx, redisKey, jsonBytes).Err(); err != nil { log.Printf("警告: Redis写入失败 (店铺 %d): %v", shopID, err) } } // 设置过期时间 //rdb.Expire(ctx, redisKey, 24*3600*time.Second) // 统计各类数据数量 counts := make(map[string]int) for _, record := range allRecords { counts[record.SourceTable]++ } fmt.Printf(" 店铺 %d: 存储了 %d 条记录\n", shopID, len(allRecords)) for table, count := range counts { fmt.Printf(" - %s: %d 条\n", table, count) } // 显示匹配的模板ID if saleTemplateID.Valid && saleTemplateID.Int64 > 0 { fmt.Printf(" 匹配的saleTemplateId: %d\n", saleTemplateID.Int64) } } return nil } // 查询价格模板数据,特殊处理rangePrice字段 func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) { query := `SELECT * FROM t_price_template WHERE id = ?` rows, err := db.Query(query, templateID) if err != nil { return nil, err } defer rows.Close() // 获取列信息 columns, err := rows.Columns() if err != nil { return nil, err } // 准备扫描结果 var records []ShopDataRecord values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { // 准备扫描指针 for i := range columns { valuePtrs[i] = &values[i] } // 扫描行 if err := rows.Scan(valuePtrs...); err != nil { return nil, err } // 创建data map,使用驼峰格式的字段名 dataMap := make(map[string]interface{}) for i, col := range columns { val := values[i] // 转换字段名为驼峰格式 camelCol := snakeToCamel(col) // 特殊处理rangePrice字段 if camelCol == "rangePrice" { // 处理rangePrice字段 processedVal, err := processRangePriceField(val) if err != nil { log.Printf("警告: 处理rangePrice字段失败: %v", err) // 如果处理失败,保留原始值 dataMap[camelCol] = val } else { dataMap[camelCol] = processedVal } } else { // 其他字段正常处理 switch v := val.(type) { case []byte: // 字节数组直接转为字符串 dataMap[camelCol] = string(v) default: dataMap[camelCol] = v } } } // 创建ShopDataRecord record := ShopDataRecord{ SourceTable: "t_price_template", Data: dataMap, } records = append(records, record) } return records, nil } // 处理rangePrice字段,修复浮点数精度问题 func processRangePriceField(val interface{}) (interface{}, error) { if val == nil { return nil, nil } // 将值转换为字符串 var jsonStr string switch v := val.(type) { case string: jsonStr = v case []byte: jsonStr = string(v) default: // 如果无法转换为字符串,直接返回原值 return val, nil } // 如果为空字符串,直接返回 if jsonStr == "" { return jsonStr, nil } // 先尝试解析为通用的[]map[string]interface{} var rawRanges []map[string]interface{} if err := json.Unmarshal([]byte(jsonStr), &rawRanges); err != nil { return jsonStr, fmt.Errorf("JSON解析失败: %v", err) } // 处理每个价格范围项,将字符串数字转换为float64 fixedRanges := make([]map[string]interface{}, len(rawRanges)) for i, item := range rawRanges { fixedItem := make(map[string]interface{}) // 处理MinPrice if minPrice, ok := item["minPrice"]; ok { fixedItem["minPrice"] = toFloat64(minPrice) } // 处理MaxPrice if maxPrice, ok := item["maxPrice"]; ok { fixedItem["maxPrice"] = toFloat64(maxPrice) } // 处理AdjustPercent if adjustPercent, ok := item["adjustPercent"]; ok { fixedItem["adjustPercent"] = toFloat64(adjustPercent) } // 处理AdjustAmount if adjustAmount, ok := item["adjustAmount"]; ok { fixedItem["adjustAmount"] = toFloat64(adjustAmount) } fixedRanges[i] = fixedItem } // 重新编码为JSON字符串 fixedJSON, err := json.Marshal(fixedRanges) if err != nil { return jsonStr, fmt.Errorf("JSON编码失败: %v", err) } return string(fixedJSON), nil } // 修复浮点数:如果包含小数点,只保留整数部分 func fixFloatToInt(value float64) float64 { // 使用math.Trunc直接截断小数部分 // 注意:这里使用float64返回,但值是整数 return math.Trunc(value) } // 将下划线命名转换为驼峰命名 func snakeToCamel(s string) string { parts := strings.Split(s, "_") for i := range parts { if i > 0 { // 首字母大写 if len(parts[i]) > 0 { parts[i] = strings.ToUpper(parts[i][:1]) + parts[i][1:] } } } return strings.Join(parts, "") } // 查询表数据并返回ShopDataRecord切片,自动转换字段名为驼峰格式 func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) { rows, err := db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() // 获取列信息 columns, err := rows.Columns() if err != nil { return nil, err } // 准备扫描结果 var records []ShopDataRecord values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { // 准备扫描指针 for i := range columns { valuePtrs[i] = &values[i] } // 扫描行 if err := rows.Scan(valuePtrs...); err != nil { return nil, err } // 创建data map,使用驼峰格式的字段名 dataMap := make(map[string]interface{}) for i, col := range columns { val := values[i] // 转换字段名为驼峰格式 camelCol := snakeToCamel(col) // 直接赋值,不进行任何转换 switch v := val.(type) { case []byte: // 字节数组直接转为字符串 dataMap[camelCol] = string(v) default: dataMap[camelCol] = v } } // 创建ShopDataRecord record := ShopDataRecord{ SourceTable: tableName, Data: dataMap, } records = append(records, record) } return records, nil } // 原有的查询函数保持不变 func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) { rows, err := db.Query(query, args...) if err != nil { return nil, err } defer rows.Close() // 获取列信息 columns, err := rows.Columns() if err != nil { return nil, err } // 准备扫描结果 var results []map[string]interface{} values := make([]interface{}, len(columns)) valuePtrs := make([]interface{}, len(columns)) for rows.Next() { // 准备扫描指针 for i := range columns { valuePtrs[i] = &values[i] } // 扫描行 if err := rows.Scan(valuePtrs...); err != nil { return nil, err } // 创建map rowMap := make(map[string]interface{}) for i, col := range columns { val := values[i] // 直接赋值,不进行任何转换 switch v := val.(type) { case []byte: // 字节数组直接转为字符串 rowMap[col] = string(v) default: rowMap[col] = v } } results = append(results, rowMap) } return results, nil } func toFloat64(v interface{}) float64 { if v == nil { return 0 } switch val := v.(type) { case float64: return fixFloatToInt(val) case float32: return fixFloatToInt(float64(val)) case int: return float64(val) case int64: return float64(val) case string: // 尝试将字符串转换为float64 if f, err := strconv.ParseFloat(val, 64); err == nil { return fixFloatToInt(f) } return 0 default: return 0 } }