daShangDao_batch/shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go
2026-06-15 16:34:21 +08:00

540 lines
13 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"batch/util/dbConnectUtil"
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"math"
"strconv"
"strings"
"github.com/go-redis/redis/v8"
_ "github.com/go-sql-driver/mysql"
)
// 嵌套的JSON数据结构
type ShopDataRecord struct {
SourceTable string `json:"source_table"`
Data map[string]interface{} `json:"data"`
}
// 价格范围项结构
type PriceRangeItem struct {
MinPrice interface{} `json:"minPrice"` // 可以是数字或字符串
MaxPrice interface{} `json:"maxPrice"` // 可以是数字或字符串
AdjustPercent interface{} `json:"adjustPercent"` // 可以是数字或字符串
AdjustAmount interface{} `json:"adjustAmount"` // 可以是数字或字符串
}
// 数据库配置
func main() {
// 配置数据库连接
db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306)
if err != nil {
log.Fatal("数据库连接失败:", err)
}
defer db.Close()
// 测试数据库连接
if err := db.Ping(); err != nil {
log.Fatal("数据库ping失败:", err)
}
// Redis连接
rdb := redis.NewClient(&redis.Options{
Addr: "36.212.20.113:7963",
Password: "j8nZ4jra2E",
DB: 7,
})
// 测试Redis连接
ctx := context.Background()
if _, err := rdb.Ping(ctx).Result(); err != nil {
log.Fatal("Redis连接失败:", err)
}
// 执行数据迁移
if err := migrateData(db, rdb); err != nil {
log.Fatal("数据迁移失败:", err)
}
fmt.Println("数据迁移完成!")
}
func migrateData(db *sql.DB, rdb *redis.Client) error {
ctx := context.Background()
fmt.Println("开始查询店铺数据...")
// 第一步查询所有店铺ID
shopQuery := `
SELECT DISTINCT shop_id
FROM t_shop_detail
WHERE shop_id IS NOT NULL AND del_flag = 0
`
shopRows, err := db.Query(shopQuery)
if err != nil {
return fmt.Errorf("查询店铺失败: %v", err)
}
defer shopRows.Close()
var shopIDs []int64
for shopRows.Next() {
var shopID int64
if err := shopRows.Scan(&shopID); err != nil {
return fmt.Errorf("扫描店铺数据失败: %v", err)
}
shopIDs = append(shopIDs, shopID)
}
fmt.Printf("找到 %d 个店铺\n", len(shopIDs))
// 第二步为每个店铺收集数据并存储到Redis
for i, shopID := range shopIDs {
fmt.Printf("[%d/%d] 处理店铺: %d\n", i+1, len(shopIDs), shopID)
// 收集该店铺的所有数据
var allRecords []ShopDataRecord
// 1. 尝试查询店铺主表数据
shopMainData, err := queryTableToShopData(db, "t_shop", `
SELECT * FROM t_shop WHERE id = ?
`, shopID)
if err != nil {
log.Printf("警告: 查询店铺主表失败 (店铺 %d): %v", shopID, err)
} else if len(shopMainData) > 0 {
allRecords = append(allRecords, shopMainData...)
fmt.Printf(" 找到店铺主表数据\n")
} else {
fmt.Printf(" 未找到店铺主表数据 (id=%d)\n", shopID)
}
// 2. 查询该店铺的所有店铺详情数据
shopDetails, err := queryTableToShopData(db, "t_shop_detail", `
SELECT * FROM t_shop_detail WHERE shop_id = ?
`, shopID)
if err != nil {
log.Printf("警告: 查询店铺详情失败 (店铺 %d): %v", shopID, err)
continue
}
allRecords = append(allRecords, shopDetails...)
if len(shopDetails) == 0 {
fmt.Printf(" 店铺 %d 没有详情数据,跳过\n", shopID)
continue
}
// 3. 尝试查询店铺上下文数据(商品描述)
shopContextData, err := queryTableToShopData(db, "t_shop_context", `
SELECT * FROM t_shop_context WHERE shop_id = ?
`, shopID)
if err != nil {
log.Printf("警告: 查询店铺上下文数据失败 (店铺 %d): %v", shopID, err)
} else if len(shopContextData) > 0 {
allRecords = append(allRecords, shopContextData...)
fmt.Printf(" 找到店铺上下文数据 (%d条)\n", len(shopContextData))
} else {
fmt.Printf(" 未找到店铺上下文数据\n")
}
// 4. 尝试查询规格设置数据
specData, err := queryTableToShopData(db, "t_spec", `
SELECT * FROM t_spec WHERE shop_id = ?
`, shopID)
if err != nil {
log.Printf("警告: 查询规格设置数据失败 (店铺 %d): %v", shopID, err)
} else if len(specData) > 0 {
allRecords = append(allRecords, specData...)
fmt.Printf(" 找到规格设置数据 (%d条)\n", len(specData))
} else {
fmt.Printf(" 未找到规格设置数据\n")
}
// 5. 从第一条店铺详情记录中获取 sale_template_id
var saleTemplateID sql.NullInt64
if shopDetails[0].Data["saleTemplateId"] != nil {
switch v := shopDetails[0].Data["saleTemplateId"].(type) {
case string:
if v != "" && v != "0" && v != "null" {
if id, err := strconv.ParseInt(v, 10, 64); err == nil {
saleTemplateID = sql.NullInt64{Int64: id, Valid: true}
}
}
case int64:
if v > 0 {
saleTemplateID = sql.NullInt64{Int64: v, Valid: true}
}
case float64:
if v > 0 {
saleTemplateID = sql.NullInt64{Int64: int64(v), Valid: true}
}
case []byte:
strVal := string(v)
if strVal != "" && strVal != "0" && strVal != "null" {
if id, err := strconv.ParseInt(strVal, 10, 64); err == nil {
saleTemplateID = sql.NullInt64{Int64: id, Valid: true}
}
}
}
}
// 6. 根据 sale_template_id 查询价格模板
if saleTemplateID.Valid && saleTemplateID.Int64 > 0 {
priceTemplates, err := queryPriceTemplateData(db, saleTemplateID.Int64)
if err != nil {
log.Printf("警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v",
shopID, saleTemplateID.Int64, err)
} else if len(priceTemplates) == 0 {
fmt.Printf(" 警告: 店铺 %d 的 sale_template_id %d 没有对应的价格模板\n",
shopID, saleTemplateID.Int64)
} else {
allRecords = append(allRecords, priceTemplates...)
}
} else {
fmt.Printf(" 店铺 %d 没有有效的 sale_template_id\n", shopID)
}
// 将数据存储到Redis List
redisKey := strconv.FormatInt(shopID, 10)
// 删除旧的Key
rdb.Del(ctx, redisKey)
// 添加每条数据到List
for _, record := range allRecords {
// 转换为JSON字符串
jsonBytes, err := json.Marshal(record)
if err != nil {
log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err)
continue
}
// 添加到Redis List
if err := rdb.RPush(ctx, redisKey, jsonBytes).Err(); err != nil {
log.Printf("警告: Redis写入失败 (店铺 %d): %v", shopID, err)
}
}
// 设置过期时间
//rdb.Expire(ctx, redisKey, 24*3600*time.Second)
// 统计各类数据数量
counts := make(map[string]int)
for _, record := range allRecords {
counts[record.SourceTable]++
}
fmt.Printf(" 店铺 %d: 存储了 %d 条记录\n", shopID, len(allRecords))
for table, count := range counts {
fmt.Printf(" - %s: %d 条\n", table, count)
}
// 显示匹配的模板ID
if saleTemplateID.Valid && saleTemplateID.Int64 > 0 {
fmt.Printf(" 匹配的saleTemplateId: %d\n", saleTemplateID.Int64)
}
}
return nil
}
// 查询价格模板数据特殊处理rangePrice字段
func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) {
query := `SELECT * FROM t_price_template WHERE id = ?`
rows, err := db.Query(query, templateID)
if err != nil {
return nil, err
}
defer rows.Close()
// 获取列信息
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// 准备扫描结果
var records []ShopDataRecord
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for rows.Next() {
// 准备扫描指针
for i := range columns {
valuePtrs[i] = &values[i]
}
// 扫描行
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// 创建data map使用驼峰格式的字段名
dataMap := make(map[string]interface{})
for i, col := range columns {
val := values[i]
// 转换字段名为驼峰格式
camelCol := snakeToCamel(col)
// 特殊处理rangePrice字段
if camelCol == "rangePrice" {
// 处理rangePrice字段
processedVal, err := processRangePriceField(val)
if err != nil {
log.Printf("警告: 处理rangePrice字段失败: %v", err)
// 如果处理失败,保留原始值
dataMap[camelCol] = val
} else {
dataMap[camelCol] = processedVal
}
} else {
// 其他字段正常处理
switch v := val.(type) {
case []byte:
// 字节数组直接转为字符串
dataMap[camelCol] = string(v)
default:
dataMap[camelCol] = v
}
}
}
// 创建ShopDataRecord
record := ShopDataRecord{
SourceTable: "t_price_template",
Data: dataMap,
}
records = append(records, record)
}
return records, nil
}
// 处理rangePrice字段修复浮点数精度问题
func processRangePriceField(val interface{}) (interface{}, error) {
if val == nil {
return nil, nil
}
// 将值转换为字符串
var jsonStr string
switch v := val.(type) {
case string:
jsonStr = v
case []byte:
jsonStr = string(v)
default:
// 如果无法转换为字符串,直接返回原值
return val, nil
}
// 如果为空字符串,直接返回
if jsonStr == "" {
return jsonStr, nil
}
// 先尝试解析为通用的[]map[string]interface{}
var rawRanges []map[string]interface{}
if err := json.Unmarshal([]byte(jsonStr), &rawRanges); err != nil {
return jsonStr, fmt.Errorf("JSON解析失败: %v", err)
}
// 处理每个价格范围项将字符串数字转换为float64
fixedRanges := make([]map[string]interface{}, len(rawRanges))
for i, item := range rawRanges {
fixedItem := make(map[string]interface{})
// 处理MinPrice
if minPrice, ok := item["minPrice"]; ok {
fixedItem["minPrice"] = toFloat64(minPrice)
}
// 处理MaxPrice
if maxPrice, ok := item["maxPrice"]; ok {
fixedItem["maxPrice"] = toFloat64(maxPrice)
}
// 处理AdjustPercent
if adjustPercent, ok := item["adjustPercent"]; ok {
fixedItem["adjustPercent"] = toFloat64(adjustPercent)
}
// 处理AdjustAmount
if adjustAmount, ok := item["adjustAmount"]; ok {
fixedItem["adjustAmount"] = toFloat64(adjustAmount)
}
fixedRanges[i] = fixedItem
}
// 重新编码为JSON字符串
fixedJSON, err := json.Marshal(fixedRanges)
if err != nil {
return jsonStr, fmt.Errorf("JSON编码失败: %v", err)
}
return string(fixedJSON), nil
}
// 修复浮点数:如果包含小数点,只保留整数部分
func fixFloatToInt(value float64) float64 {
// 使用math.Trunc直接截断小数部分
// 注意这里使用float64返回但值是整数
return math.Trunc(value)
}
// 将下划线命名转换为驼峰命名
func snakeToCamel(s string) string {
parts := strings.Split(s, "_")
for i := range parts {
if i > 0 {
// 首字母大写
if len(parts[i]) > 0 {
parts[i] = strings.ToUpper(parts[i][:1]) + parts[i][1:]
}
}
}
return strings.Join(parts, "")
}
// 查询表数据并返回ShopDataRecord切片自动转换字段名为驼峰格式
func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) {
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
// 获取列信息
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// 准备扫描结果
var records []ShopDataRecord
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for rows.Next() {
// 准备扫描指针
for i := range columns {
valuePtrs[i] = &values[i]
}
// 扫描行
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// 创建data map使用驼峰格式的字段名
dataMap := make(map[string]interface{})
for i, col := range columns {
val := values[i]
// 转换字段名为驼峰格式
camelCol := snakeToCamel(col)
// 直接赋值,不进行任何转换
switch v := val.(type) {
case []byte:
// 字节数组直接转为字符串
dataMap[camelCol] = string(v)
default:
dataMap[camelCol] = v
}
}
// 创建ShopDataRecord
record := ShopDataRecord{
SourceTable: tableName,
Data: dataMap,
}
records = append(records, record)
}
return records, nil
}
// 原有的查询函数保持不变
func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) {
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
// 获取列信息
columns, err := rows.Columns()
if err != nil {
return nil, err
}
// 准备扫描结果
var results []map[string]interface{}
values := make([]interface{}, len(columns))
valuePtrs := make([]interface{}, len(columns))
for rows.Next() {
// 准备扫描指针
for i := range columns {
valuePtrs[i] = &values[i]
}
// 扫描行
if err := rows.Scan(valuePtrs...); err != nil {
return nil, err
}
// 创建map
rowMap := make(map[string]interface{})
for i, col := range columns {
val := values[i]
// 直接赋值,不进行任何转换
switch v := val.(type) {
case []byte:
// 字节数组直接转为字符串
rowMap[col] = string(v)
default:
rowMap[col] = v
}
}
results = append(results, rowMap)
}
return results, nil
}
func toFloat64(v interface{}) float64 {
if v == nil {
return 0
}
switch val := v.(type) {
case float64:
return fixFloatToInt(val)
case float32:
return fixFloatToInt(float64(val))
case int:
return float64(val)
case int64:
return float64(val)
case string:
// 尝试将字符串转换为float64
if f, err := strconv.ParseFloat(val, 64); err == nil {
return fixFloatToInt(f)
}
return 0
default:
return 0
}
}