919 lines
24 KiB
Go
919 lines
24 KiB
Go
package main
|
||
|
||
import (
|
||
"batch/util/dbConnectUtil"
|
||
"context"
|
||
"database/sql"
|
||
"encoding/json"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"os"
|
||
"os/signal"
|
||
"reflect"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
_ "github.com/go-sql-driver/mysql"
|
||
)
|
||
|
||
// ShopMsg 店铺信息结构体
|
||
type ShopMsg struct {
|
||
ID int64 `json:"id"`
|
||
ShopAliasName string `json:"shop_alias_name"`
|
||
ShopName string `json:"shop_name"`
|
||
Token string `json:"token"`
|
||
GoodsNamePrefix string `json:"goods_name_prefix"`
|
||
GoodsNameSuffix string `json:"goods_name_suffix"`
|
||
TitleConsistOf string `json:"title_consist_of"`
|
||
SpaceCharacter string `json:"space_character"`
|
||
WatermarkImgUrl string `json:"watermark_img_url"`
|
||
CarouseLastImgUrlArray []string `json:"carouse_last_img_url_array"`
|
||
GoodsDetailFirstImgUrlArray []string `json:"goods_detail_first_img_url_array"`
|
||
GoodsDetailLastImgUrlArray []string `json:"goods_detail_last_img_url_array"`
|
||
SpecName string `json:"spec_name"`
|
||
SpecId int64 `json:"spec_id"`
|
||
SpecChildName string `json:"spec_child_name"`
|
||
IsFolt bool `json:"is_fotl"`
|
||
IsPreSale bool `json:"is_pre_sale"`
|
||
IsRefundable bool `json:"is_refundable"`
|
||
IsSecondHand bool `json:"is_second_hand"`
|
||
ShipmentLimitSecond int64 `json:"shipment_limit_second"`
|
||
CostTemplateId int64 `json:"cost_template_id"`
|
||
DefStock int64 `json:"def_stock"`
|
||
TwoDiscount int64 `json:"two_discount"`
|
||
}
|
||
|
||
// 店铺主表数据结构
|
||
type ShopMainData struct {
|
||
Id int64 `json:"id"`
|
||
ShopAliasName sql.NullString `json:"shop_alias_name"`
|
||
ShopName sql.NullString `json:"shop_name"`
|
||
Token sql.NullString `json:"token"`
|
||
}
|
||
|
||
// 店铺详情数据结构
|
||
type ShopDetailData struct {
|
||
Id int64 `json:"id"`
|
||
ShopId int64 `json:"shop_id"`
|
||
TemplateId *int64 `json:"template_id"`
|
||
TitlePrefix sql.NullString `json:"title_prefix"`
|
||
TitleSuffix sql.NullString `json:"title_suffix"`
|
||
TitleConsistOf sql.NullString `json:"title_consist_of"`
|
||
SpaceCharacter sql.NullString `json:"space_character"`
|
||
StockDeff *int64 `json:"stock_deff"`
|
||
TwoDiscount *int64 `json:"two_discount"`
|
||
Presale sql.NullString `json:"presale"`
|
||
Fake sql.NullString `json:"fake"`
|
||
SevenDays sql.NullString `json:"seven_days"`
|
||
IsSecondHand sql.NullString `json:"is_second_hand"`
|
||
DeliveryTime sql.NullString `json:"delivery_time"`
|
||
}
|
||
|
||
// 规格数据结构
|
||
type SpecData struct {
|
||
Id int64 `json:"id"`
|
||
ShopId int64 `json:"shop_id"`
|
||
SpecName sql.NullString `json:"spec_name"`
|
||
SpecPrefix sql.NullString `json:"spec_prefix"`
|
||
}
|
||
|
||
// 图片数据结构
|
||
type ShopImageData struct {
|
||
Id int64 `json:"id"`
|
||
Pid int64 `json:"pid"`
|
||
Type string `json:"type"`
|
||
AbsolutePath string `json:"absolute_path"`
|
||
}
|
||
|
||
// 嵌套的JSON数据结构
|
||
type ShopDataRecord struct {
|
||
SourceTable string `json:"source_table"`
|
||
Data map[string]interface{} `json:"data"`
|
||
}
|
||
|
||
// 并发处理结果
|
||
type ShopProcessResult struct {
|
||
ShopID int64
|
||
Count int
|
||
Err error
|
||
}
|
||
|
||
// normalizeFieldValue 根据字段名和表名对值进行标准化:
|
||
// - 特定表特定字段保持 int64 数字:t_shop_context.id, t_spec.id, t_price_template.id, t_shop_detail.district_id, 以及 mall_id
|
||
// - 其他 id / *_id / create_by / update_by 字段 → 字符串
|
||
// - create_time/update_time/add_time/expiration_time → 格式化为 "2006-01-02 15:04:05"(无时区)
|
||
// - 其他字段:[]byte → string
|
||
func normalizeFieldValue(colName string, val interface{}, tableName string) interface{} {
|
||
if val == nil {
|
||
return nil
|
||
}
|
||
|
||
// 特殊处理:保持 int64 数字的字段(不转为字符串)
|
||
// 1. mall_id 保持数字
|
||
// 2. t_shop_context.id
|
||
// 3. t_spec.id
|
||
// 4. t_price_template.id
|
||
// 5. t_shop_detail.district_id
|
||
if colName == "mall_id" {
|
||
return ensureInt64(val)
|
||
}
|
||
if tableName == "t_shop_detail" && colName == "district_id" {
|
||
return ensureInt64(val)
|
||
}
|
||
|
||
// 1) ID 类字段(除上述保留数字的字段外):转为字符串
|
||
if colName == "id" || colName == "create_by" || colName == "update_by" || strings.HasSuffix(colName, "_id") {
|
||
switch v := val.(type) {
|
||
case string:
|
||
return v
|
||
case []byte:
|
||
return string(v)
|
||
case int, int8, int16, int32, int64:
|
||
return strconv.FormatInt(reflect.ValueOf(v).Int(), 10)
|
||
case uint, uint8, uint16, uint32, uint64:
|
||
return strconv.FormatUint(reflect.ValueOf(v).Uint(), 10)
|
||
case float32, float64:
|
||
f := reflect.ValueOf(v).Float()
|
||
return strconv.FormatInt(int64(f), 10)
|
||
default:
|
||
return fmt.Sprint(v)
|
||
}
|
||
}
|
||
|
||
// 2) 时间字段:格式化为 "2006-01-02 15:04:05"
|
||
if colName == "create_time" || colName == "update_time" || colName == "add_time" || colName == "expiration_time" {
|
||
var t time.Time
|
||
switch v := val.(type) {
|
||
case time.Time:
|
||
t = v
|
||
case []byte:
|
||
s := string(v)
|
||
if s == "" {
|
||
return nil
|
||
}
|
||
t = parseTime(s)
|
||
case string:
|
||
if v == "" {
|
||
return nil
|
||
}
|
||
t = parseTime(v)
|
||
default:
|
||
return val
|
||
}
|
||
if t.IsZero() {
|
||
return nil
|
||
}
|
||
return t.Format("2006-01-02 15:04:05")
|
||
}
|
||
|
||
// 3) 其他字段:[]byte 转为 string
|
||
switch v := val.(type) {
|
||
case []byte:
|
||
return string(v)
|
||
default:
|
||
return v
|
||
}
|
||
}
|
||
|
||
// ensureInt64 将各种类型转换为 int64,用于保持数字的字段
|
||
func ensureInt64(val interface{}) interface{} {
|
||
switch v := val.(type) {
|
||
case int64:
|
||
return v
|
||
case int:
|
||
return int64(v)
|
||
case int32:
|
||
return int64(v)
|
||
case float64:
|
||
return int64(v)
|
||
case string:
|
||
if i, err := strconv.ParseInt(v, 10, 64); err == nil {
|
||
return i
|
||
}
|
||
return v
|
||
case []byte:
|
||
s := string(v)
|
||
if i, err := strconv.ParseInt(s, 10, 64); err == nil {
|
||
return i
|
||
}
|
||
return s
|
||
default:
|
||
return val
|
||
}
|
||
}
|
||
|
||
// parseTime 尝试多种布局解析时间字符串
|
||
func parseTime(s string) time.Time {
|
||
layouts := []string{
|
||
"2006-01-02 15:04:05",
|
||
"2006-01-02T15:04:05Z",
|
||
"2006-01-02T15:04:05",
|
||
"2006-01-02 15:04:05.999999",
|
||
"2006-01-02",
|
||
}
|
||
for _, layout := range layouts {
|
||
if t, err := time.Parse(layout, s); err == nil {
|
||
return t
|
||
}
|
||
}
|
||
return time.Time{}
|
||
}
|
||
|
||
// 查询店铺主表数据
|
||
func queryShopMainData(db *sql.DB, shopID int64) (ShopMainData, error) {
|
||
query := `SELECT id, shop_alias_name, shop_name, token FROM t_shop WHERE id = ? AND del_flag = '0'`
|
||
row := db.QueryRow(query, shopID)
|
||
|
||
var data ShopMainData
|
||
err := row.Scan(&data.Id, &data.ShopAliasName, &data.ShopName, &data.Token)
|
||
if err != nil {
|
||
return data, err
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
// 查询店铺详情数据
|
||
func queryShopDetailData(db *sql.DB, shopID int64) ([]ShopDetailData, error) {
|
||
query := `
|
||
SELECT id, shop_id, template_id, title_prefix, title_suffix, title_consist_of,
|
||
space_character, stock_deff, two_discount, presale, fake, seven_days,
|
||
is_second_hand, delivery_time
|
||
FROM t_shop_detail
|
||
WHERE shop_id = ? AND del_flag = '0'
|
||
`
|
||
rows, err := db.Query(query, shopID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var data []ShopDetailData
|
||
for rows.Next() {
|
||
var item ShopDetailData
|
||
err := rows.Scan(
|
||
&item.Id, &item.ShopId, &item.TemplateId, &item.TitlePrefix, &item.TitleSuffix,
|
||
&item.TitleConsistOf, &item.SpaceCharacter, &item.StockDeff, &item.TwoDiscount,
|
||
&item.Presale, &item.Fake, &item.SevenDays, &item.IsSecondHand, &item.DeliveryTime,
|
||
)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
data = append(data, item)
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
// 查询规格数据
|
||
func querySpecData(db *sql.DB, shopID int64) ([]SpecData, error) {
|
||
query := `SELECT id, shop_id, spec_name, spec_prefix FROM t_spec WHERE shop_id = ? AND del_flag = '0'`
|
||
rows, err := db.Query(query, shopID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var data []SpecData
|
||
for rows.Next() {
|
||
var item SpecData
|
||
err := rows.Scan(&item.Id, &item.ShopId, &item.SpecName, &item.SpecPrefix)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
data = append(data, item)
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
// 查询图片数据
|
||
func queryShopImageData(db *sql.DB, shopID int64) ([]ShopImageData, error) {
|
||
query := `SELECT id, pid, type, absolute_path FROM t_shop_img WHERE pid = ? AND del_flag = '0'`
|
||
rows, err := db.Query(query, shopID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var data []ShopImageData
|
||
for rows.Next() {
|
||
var item ShopImageData
|
||
err := rows.Scan(&item.Id, &item.Pid, &item.Type, &item.AbsolutePath)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
data = append(data, item)
|
||
}
|
||
|
||
return data, nil
|
||
}
|
||
|
||
func main() {
|
||
// 配置数据库连接
|
||
db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306)
|
||
if err != nil {
|
||
log.Fatal("数据库连接失败:", err)
|
||
}
|
||
defer db.Close()
|
||
|
||
// 【优化1】数据库连接池配置 - 降低并发避免瞬时带宽压力
|
||
db.SetMaxOpenConns(10) // 最大打开连接数(降低到10)
|
||
db.SetMaxIdleConns(5) // 最大空闲连接数(降低到5)
|
||
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期
|
||
db.SetConnMaxIdleTime(10 * time.Minute) // 空闲连接最大存活时间
|
||
|
||
// 测试数据库连接
|
||
if err := db.Ping(); err != nil {
|
||
log.Fatal("数据库ping失败:", err)
|
||
}
|
||
|
||
// Redis连接 - 降低连接池大小
|
||
rdb := redis.NewClient(&redis.Options{
|
||
Addr: "36.212.12.247:6379",
|
||
Password: "long6166@@",
|
||
DB: 8,
|
||
PoolSize: 10, // 连接池大小(降低到10)
|
||
MinIdleConns: 5, // 最小空闲连接数(降低到5)
|
||
})
|
||
|
||
// 测试Redis连接
|
||
ctx := context.Background()
|
||
if _, err := rdb.Ping(ctx).Result(); err != nil {
|
||
log.Fatal("Redis连接失败:", err)
|
||
}
|
||
|
||
// 【新增】设置信号捕获,优雅退出
|
||
stopChan := make(chan os.Signal, 1)
|
||
signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM)
|
||
|
||
// 【新增】循环运行,每次间隔5分钟
|
||
runCount := 0
|
||
for {
|
||
runCount++
|
||
startTime := time.Now()
|
||
|
||
fmt.Printf("\n%s\n", strings.Repeat("=", 60))
|
||
fmt.Printf("第 %d 次运行开始 - %s\n", runCount, startTime.Format("2006-01-02 15:04:05"))
|
||
fmt.Printf("%s\n", strings.Repeat("=", 60))
|
||
|
||
// 执行数据迁移
|
||
if err := migrateDataOptimized(db, rdb); err != nil {
|
||
log.Printf("数据迁移失败: %v", err)
|
||
}
|
||
|
||
elapsed := time.Since(startTime)
|
||
fmt.Printf("\n第 %d 次运行完成,耗时: %v\n", runCount, elapsed)
|
||
fmt.Printf("下次运行时间: %s\n", time.Now().Add(5*time.Minute).Format("2006-01-02 15:04:05"))
|
||
|
||
// 等待间隔时间,但可以提前退出
|
||
// 根据运行耗时动态调整间隔
|
||
var waitInterval time.Duration
|
||
if elapsed < 3*time.Minute {
|
||
waitInterval = 5 * time.Minute
|
||
} else if elapsed < 6*time.Minute {
|
||
waitInterval = 8 * time.Minute
|
||
} else {
|
||
waitInterval = 10 * time.Minute
|
||
}
|
||
|
||
fmt.Printf("\n本次耗时: %v, 下次运行间隔: %v (按 Ctrl+C 退出)...\n", elapsed, waitInterval)
|
||
|
||
select {
|
||
case <-stopChan:
|
||
fmt.Printf("\n\n收到退出信号,程序即将退出...\n")
|
||
fmt.Printf("总共运行了 %d 次\n", runCount)
|
||
return
|
||
case <-time.After(waitInterval):
|
||
// 继续下一次循环
|
||
}
|
||
}
|
||
}
|
||
|
||
// 【优化版本】并发处理数据迁移
|
||
func migrateDataOptimized(db *sql.DB, rdb *redis.Client) error {
|
||
ctx := context.Background()
|
||
|
||
fmt.Println("开始查询店铺数据...")
|
||
|
||
// 第一步:查询所有店铺ID
|
||
shopQuery := `
|
||
SELECT DISTINCT id
|
||
FROM t_shop
|
||
WHERE id IS NOT NULL
|
||
`
|
||
|
||
shopRows, err := db.Query(shopQuery)
|
||
if err != nil {
|
||
return fmt.Errorf("查询店铺失败: %v", err)
|
||
}
|
||
defer shopRows.Close()
|
||
|
||
var shopIDs []int64
|
||
for shopRows.Next() {
|
||
var shopID int64
|
||
if err := shopRows.Scan(&shopID); err != nil {
|
||
return fmt.Errorf("扫描店铺数据失败: %v", err)
|
||
}
|
||
shopIDs = append(shopIDs, shopID)
|
||
}
|
||
|
||
fmt.Printf("找到 %d 个店铺\n", len(shopIDs))
|
||
|
||
// 【优化2】使用Worker Pool并发处理 - 降低并发避免带宽压力
|
||
workers := 3 // 并发worker数量(降低到3,减少瞬时压力)
|
||
jobs := make(chan int64, len(shopIDs))
|
||
results := make(chan ShopProcessResult, len(shopIDs))
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动worker
|
||
for w := 0; w < workers; w++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
for shopID := range jobs {
|
||
count, err := processShop(db, rdb, ctx, shopID, workerID)
|
||
results <- ShopProcessResult{
|
||
ShopID: shopID,
|
||
Count: count,
|
||
Err: err,
|
||
}
|
||
// 【新增】添加处理延迟,降低瞬时带宽压力
|
||
time.Sleep(100 * time.Millisecond) // 每个店铺处理完后休息100ms
|
||
}
|
||
}(w)
|
||
}
|
||
|
||
// 发送任务
|
||
go func() {
|
||
for _, id := range shopIDs {
|
||
jobs <- id
|
||
}
|
||
close(jobs)
|
||
}()
|
||
|
||
// 等待所有worker完成
|
||
go func() {
|
||
wg.Wait()
|
||
close(results)
|
||
}()
|
||
|
||
// 收集结果并统计
|
||
successCount := 0
|
||
failCount := 0
|
||
totalRecords := 0
|
||
|
||
for result := range results {
|
||
if result.Err != nil {
|
||
failCount++
|
||
log.Printf("[失败] 店铺 %d: %v", result.ShopID, result.Err)
|
||
} else {
|
||
successCount++
|
||
totalRecords += result.Count
|
||
if successCount%10 == 0 {
|
||
fmt.Printf("[进度] 已完成: %d/%d, 总记录: %d\n",
|
||
successCount, len(shopIDs), totalRecords)
|
||
}
|
||
}
|
||
}
|
||
|
||
fmt.Printf("\n========== 迁移完成 ==========\n")
|
||
fmt.Printf("成功: %d 店铺\n", successCount)
|
||
fmt.Printf("失败: %d 店铺\n", failCount)
|
||
fmt.Printf("总记录数: %d\n", totalRecords)
|
||
|
||
return nil
|
||
}
|
||
|
||
// processShop 处理单个店铺的所有数据
|
||
func processShop(db *sql.DB, rdb *redis.Client, ctx context.Context, shopID int64, workerID int) (int, error) {
|
||
// 收集该店铺的所有数据
|
||
var allRecords []ShopDataRecord
|
||
|
||
// 1. 查询店铺主表数据
|
||
shopMainData, err := queryTableToShopData(db, "t_shop", `
|
||
SELECT * FROM t_shop WHERE id = ? AND del_flag = '0'
|
||
`, shopID)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("查询店铺主表失败: %v", err)
|
||
}
|
||
allRecords = append(allRecords, shopMainData...)
|
||
|
||
// 2. 查询该店铺的所有店铺详情数据
|
||
shopDetails, err := queryTableToShopData(db, "t_shop_detail", `
|
||
SELECT * FROM t_shop_detail WHERE shop_id = ? AND del_flag = '0'
|
||
`, shopID)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("查询店铺详情失败: %v", err)
|
||
}
|
||
|
||
if len(shopDetails) == 0 {
|
||
return 0, nil // 没有详情数据,跳过
|
||
}
|
||
|
||
// 查询图片数据以获取watermark_img_url
|
||
shopImgDataList, err := queryShopImageData(db, shopID)
|
||
if err != nil {
|
||
log.Printf("[Worker-%d] 警告: 查询图片数据失败 (店铺 %d): %v", workerID, shopID, err)
|
||
}
|
||
|
||
// 处理图片数据
|
||
var watermarkImgUrl string
|
||
var skuWatermarkImgUrl string
|
||
var carouseLastImgUrlArray = []string{}
|
||
var goodsDetailFirstImgUrlArray = []string{}
|
||
var goodsDetailLastImgUrlArray = []string{}
|
||
|
||
if err == nil {
|
||
for _, img := range shopImgDataList {
|
||
if img.AbsolutePath == "" {
|
||
continue
|
||
}
|
||
switch img.Type {
|
||
case "1":
|
||
watermarkImgUrl = img.AbsolutePath
|
||
case "3":
|
||
goodsDetailFirstImgUrlArray = append(goodsDetailFirstImgUrlArray, img.AbsolutePath)
|
||
case "4":
|
||
goodsDetailLastImgUrlArray = append(goodsDetailLastImgUrlArray, img.AbsolutePath)
|
||
case "5":
|
||
carouseLastImgUrlArray = append(carouseLastImgUrlArray, img.AbsolutePath)
|
||
case "7":
|
||
skuWatermarkImgUrl = img.AbsolutePath
|
||
}
|
||
}
|
||
}
|
||
|
||
// 将watermark_img_url等添加到shopdetail记录中
|
||
for i := range shopDetails {
|
||
if shopDetails[i].Data == nil {
|
||
shopDetails[i].Data = make(map[string]interface{})
|
||
}
|
||
shopDetails[i].Data["watermark_img_url"] = watermarkImgUrl
|
||
shopDetails[i].Data["carouse_last_img_url_array"] = carouseLastImgUrlArray
|
||
shopDetails[i].Data["goods_detail_first_img_url_array"] = goodsDetailFirstImgUrlArray
|
||
shopDetails[i].Data["goods_detail_last_img_url_array"] = goodsDetailLastImgUrlArray
|
||
shopDetails[i].Data["sku_watermark_img_url"] = skuWatermarkImgUrl
|
||
}
|
||
|
||
allRecords = append(allRecords, shopDetails...)
|
||
|
||
// 3. 查询店铺上下文数据
|
||
shopContextData, err := queryTableToShopData(db, "t_shop_context", `
|
||
SELECT * FROM t_shop_context WHERE shop_id = ? AND del_flag = '0'
|
||
`, shopID)
|
||
if err != nil {
|
||
log.Printf("[Worker-%d] 警告: 查询店铺上下文数据失败 (店铺 %d): %v", workerID, shopID, err)
|
||
}
|
||
allRecords = append(allRecords, shopContextData...)
|
||
|
||
// 4. 查询规格设置数据
|
||
specData, err := queryTableToShopData(db, "t_spec", `
|
||
SELECT * FROM t_spec WHERE shop_id = ? AND del_flag = '0'
|
||
`, shopID)
|
||
if err != nil {
|
||
log.Printf("[Worker-%d] 警告: 查询规格设置数据失败 (店铺 %d): %v", workerID, shopID, err)
|
||
}
|
||
allRecords = append(allRecords, specData...)
|
||
|
||
// 5. 从第一条店铺详情记录中获取 sale_template_id
|
||
var saleTemplateID int64
|
||
if firstDetail := shopDetails[0].Data; firstDetail != nil {
|
||
if v, ok := firstDetail["sale_template_id"]; ok && v != nil {
|
||
switch val := v.(type) {
|
||
case string:
|
||
if val != "" && val != "0" && val != "null" {
|
||
if id, err := strconv.ParseInt(val, 10, 64); err == nil && id > 0 {
|
||
saleTemplateID = id
|
||
}
|
||
}
|
||
case int64:
|
||
if val > 0 {
|
||
saleTemplateID = val
|
||
}
|
||
case float64:
|
||
if val > 0 {
|
||
saleTemplateID = int64(val)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 6. 根据 sale_template_id 查询价格模板
|
||
if saleTemplateID > 0 {
|
||
priceTemplates, err := queryPriceTemplateData(db, saleTemplateID)
|
||
if err != nil {
|
||
log.Printf("[Worker-%d] 警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v",
|
||
workerID, shopID, saleTemplateID, err)
|
||
} else {
|
||
allRecords = append(allRecords, priceTemplates...)
|
||
}
|
||
}
|
||
|
||
// 【优化3】批量写入Redis
|
||
if err := saveToRedisBatch(rdb, ctx, shopID, allRecords); err != nil {
|
||
return 0, fmt.Errorf("Redis写入失败: %v", err)
|
||
}
|
||
|
||
return len(allRecords), nil
|
||
}
|
||
|
||
// 【优化3】使用Pipeline批量写入Redis
|
||
func saveToRedisBatch(rdb *redis.Client, ctx context.Context, shopID int64, records []ShopDataRecord) error {
|
||
if len(records) == 0 {
|
||
return nil
|
||
}
|
||
|
||
redisKey := strconv.FormatInt(shopID, 10)
|
||
|
||
// 使用Pipeline批量执行
|
||
pipe := rdb.Pipeline()
|
||
|
||
// 删除旧key
|
||
pipe.Del(ctx, redisKey)
|
||
|
||
// 批量添加数据
|
||
for _, record := range records {
|
||
jsonBytes, err := json.Marshal(record)
|
||
if err != nil {
|
||
log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err)
|
||
continue
|
||
}
|
||
pipe.RPush(ctx, redisKey, jsonBytes)
|
||
}
|
||
|
||
// 一次性执行所有命令
|
||
_, err := pipe.Exec(ctx)
|
||
return err
|
||
}
|
||
|
||
// 查询价格模板数据,特殊处理 range_price 和 add_amount 字段
|
||
func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) {
|
||
query := `SELECT * FROM t_price_template WHERE id = ?`
|
||
rows, err := db.Query(query, templateID)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
columns, err := rows.Columns()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var records []ShopDataRecord
|
||
values := make([]interface{}, len(columns))
|
||
valuePtrs := make([]interface{}, len(columns))
|
||
|
||
for rows.Next() {
|
||
for i := range columns {
|
||
valuePtrs[i] = &values[i]
|
||
}
|
||
if err := rows.Scan(valuePtrs...); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
dataMap := make(map[string]interface{})
|
||
for i, col := range columns {
|
||
snakeCol := camelToSnake(col)
|
||
|
||
if snakeCol == "range_price" {
|
||
processedVal, err := processRangePriceField(values[i])
|
||
if err != nil {
|
||
log.Printf("警告: 处理 range_price 字段失败: %v", err)
|
||
processedVal = values[i]
|
||
}
|
||
dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template")
|
||
continue
|
||
}
|
||
|
||
if snakeCol == "add_amount" {
|
||
processedVal, err := processAddAmountField(values[i])
|
||
if err != nil {
|
||
log.Printf("警告: 处理 add_amount 字段失败: %v", err)
|
||
processedVal = values[i]
|
||
}
|
||
dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template")
|
||
continue
|
||
}
|
||
|
||
dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], "t_price_template")
|
||
}
|
||
|
||
records = append(records, ShopDataRecord{
|
||
SourceTable: "t_price_template",
|
||
Data: dataMap,
|
||
})
|
||
}
|
||
return records, nil
|
||
}
|
||
|
||
func processAddAmountField(val interface{}) (interface{}, error) {
|
||
if val == nil {
|
||
return nil, nil
|
||
}
|
||
var f float64
|
||
switch v := val.(type) {
|
||
case float64:
|
||
f = v
|
||
case float32:
|
||
f = float64(v)
|
||
case int:
|
||
f = float64(v)
|
||
case int64:
|
||
f = float64(v)
|
||
case int32:
|
||
f = float64(v)
|
||
case string:
|
||
if v == "" {
|
||
return nil, nil
|
||
}
|
||
parsed, err := strconv.ParseFloat(v, 64)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无法解析字符串为浮点数: %v", err)
|
||
}
|
||
f = parsed
|
||
case []byte:
|
||
s := string(v)
|
||
if s == "" {
|
||
return nil, nil
|
||
}
|
||
parsed, err := strconv.ParseFloat(s, 64)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("无法解析字节数组为浮点数: %v", err)
|
||
}
|
||
f = parsed
|
||
default:
|
||
return nil, fmt.Errorf("不支持的类型: %T", v)
|
||
}
|
||
scaled := math.Round(f * 100)
|
||
return int64(scaled), nil
|
||
}
|
||
|
||
func processRangePriceField(val interface{}) (interface{}, error) {
|
||
if val == nil {
|
||
return nil, nil
|
||
}
|
||
var jsonStr string
|
||
switch v := val.(type) {
|
||
case string:
|
||
jsonStr = v
|
||
case []byte:
|
||
jsonStr = string(v)
|
||
default:
|
||
return val, nil
|
||
}
|
||
if jsonStr == "" {
|
||
return jsonStr, nil
|
||
}
|
||
var rawItems []map[string]interface{}
|
||
if err := json.Unmarshal([]byte(jsonStr), &rawItems); err != nil {
|
||
return jsonStr, fmt.Errorf("JSON解析失败: %v", err)
|
||
}
|
||
fixedRanges := make([]map[string]interface{}, len(rawItems))
|
||
for i, item := range rawItems {
|
||
fixedItem := make(map[string]interface{})
|
||
if minPrice, ok := item["minPrice"]; ok {
|
||
fixedItem["minPrice"] = convertToFloat64(minPrice)
|
||
}
|
||
if maxPrice, ok := item["maxPrice"]; ok {
|
||
fixedItem["maxPrice"] = convertToFloat64(maxPrice)
|
||
}
|
||
if adjustPercent, ok := item["adjustPercent"]; ok {
|
||
fixedItem["adjustPercent"] = convertToFloat64(adjustPercent)
|
||
}
|
||
if adjustAmount, ok := item["adjustAmount"]; ok {
|
||
fixedItem["adjustAmount"] = convertToFloat64(adjustAmount)
|
||
}
|
||
fixedRanges[i] = fixedItem
|
||
}
|
||
fixedJSON, err := json.Marshal(fixedRanges)
|
||
if err != nil {
|
||
return jsonStr, fmt.Errorf("JSON编码失败: %v", err)
|
||
}
|
||
return string(fixedJSON), nil
|
||
}
|
||
|
||
func convertToFloat64(v interface{}) float64 {
|
||
switch val := v.(type) {
|
||
case float64:
|
||
return fixFloatToInt(val)
|
||
case float32:
|
||
return fixFloatToInt(float64(val))
|
||
case int:
|
||
return float64(val)
|
||
case int64:
|
||
return float64(val)
|
||
case int32:
|
||
return float64(val)
|
||
case string:
|
||
if f, err := strconv.ParseFloat(val, 64); err == nil {
|
||
return fixFloatToInt(f)
|
||
}
|
||
return 0
|
||
case bool:
|
||
if val {
|
||
return 1
|
||
}
|
||
return 0
|
||
default:
|
||
return 0
|
||
}
|
||
}
|
||
|
||
func fixFloatToInt(value float64) float64 {
|
||
return math.Floor(value)
|
||
}
|
||
|
||
func camelToSnake(s string) string {
|
||
var result []rune
|
||
for i, r := range s {
|
||
if i > 0 && r >= 'A' && r <= 'Z' {
|
||
result = append(result, '_')
|
||
}
|
||
result = append(result, r)
|
||
}
|
||
return strings.ToLower(string(result))
|
||
}
|
||
|
||
// 通用表查询函数,自动应用字段标准化
|
||
func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) {
|
||
rows, err := db.Query(query, args...)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
columns, err := rows.Columns()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var records []ShopDataRecord
|
||
values := make([]interface{}, len(columns))
|
||
valuePtrs := make([]interface{}, len(columns))
|
||
|
||
for rows.Next() {
|
||
for i := range columns {
|
||
valuePtrs[i] = &values[i]
|
||
}
|
||
if err := rows.Scan(valuePtrs...); err != nil {
|
||
return nil, err
|
||
}
|
||
dataMap := make(map[string]interface{})
|
||
for i, col := range columns {
|
||
snakeCol := camelToSnake(col)
|
||
dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], tableName)
|
||
}
|
||
records = append(records, ShopDataRecord{
|
||
SourceTable: tableName,
|
||
Data: dataMap,
|
||
})
|
||
}
|
||
return records, nil
|
||
}
|
||
|
||
// 保留原有函数(如需使用)
|
||
func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) {
|
||
rows, err := db.Query(query, args...)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
columns, err := rows.Columns()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
var results []map[string]interface{}
|
||
values := make([]interface{}, len(columns))
|
||
valuePtrs := make([]interface{}, len(columns))
|
||
|
||
for rows.Next() {
|
||
for i := range columns {
|
||
valuePtrs[i] = &values[i]
|
||
}
|
||
if err := rows.Scan(valuePtrs...); err != nil {
|
||
return nil, err
|
||
}
|
||
rowMap := make(map[string]interface{})
|
||
for i, col := range columns {
|
||
switch v := values[i].(type) {
|
||
case []byte:
|
||
rowMap[col] = string(v)
|
||
default:
|
||
rowMap[col] = v
|
||
}
|
||
}
|
||
results = append(results, rowMap)
|
||
}
|
||
return results, nil
|
||
}
|