From c24d87a9885dba8232b998eec3146f7315deee78 Mon Sep 17 00:00:00 2001 From: 97694731 <97694731@qq.com> Date: Mon, 15 Jun 2026 16:34:21 +0800 Subject: [PATCH] first commit --- .idea/.gitignore | 8 + .idea/batch.iml | 9 + .idea/modules.xml | 8 + addSyncPddReject/main.go | 268 +++++ addSyncSuitBook/main.go | 722 ++++++++++++++ go.mod | 21 + go.sum | 57 ++ go_files_explanation.md | 180 ++++ goods_es_to_redis/main.go | 698 +++++++++++++ main.go | 21 + .../CameiCase/mysqlToRedisCameiCase.go | 539 ++++++++++ shop_mysql_to_redis/SanckCase/mysqlToRedis.go | 918 ++++++++++++++++++ util/dbConnectUtil/dbConnectUtil.go | 40 + 13 files changed, 3489 insertions(+) create mode 100644 .idea/.gitignore create mode 100644 .idea/batch.iml create mode 100644 .idea/modules.xml create mode 100644 addSyncPddReject/main.go create mode 100644 addSyncSuitBook/main.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 go_files_explanation.md create mode 100644 goods_es_to_redis/main.go create mode 100644 main.go create mode 100644 shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go create mode 100644 shop_mysql_to_redis/SanckCase/mysqlToRedis.go create mode 100644 util/dbConnectUtil/dbConnectUtil.go diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/batch.iml b/.idea/batch.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/batch.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..c65fa89 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/addSyncPddReject/main.go b/addSyncPddReject/main.go new file mode 100644 index 0000000..b05f7fd --- /dev/null +++ b/addSyncPddReject/main.go @@ -0,0 +1,268 @@ +package main + +import ( + "database/sql" + "fmt" + "log" + "strings" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +const ( + MaxOpenConns = 20 + MaxIdleConns = 10 + ConnMaxLifetime = 10 * time.Minute + ConnMaxIdleTime = 5 * time.Minute +) + +// 雪花算法ID生成器 +type Snowflake struct { + sequence int64 + lastTimestamp int64 +} + +var snowflake = &Snowflake{} + +// 生成19位雪花算法ID +func (s *Snowflake) GenerateID() int64 { + const ( + epoch int64 = 1704067200000 // 2024-01-01 00:00:00 + machineID int64 = 1 + sequenceMask int64 = 4095 + timestampShift = 22 + machineIDShift = 12 + ) + + now := time.Now().UnixNano() / 1e6 + timestamp := now - epoch + + if timestamp == s.lastTimestamp { + s.sequence = (s.sequence + 1) & sequenceMask + if s.sequence == 0 { + for timestamp <= s.lastTimestamp { + time.Sleep(100 * time.Microsecond) + now = time.Now().UnixNano() / 1e6 + timestamp = now - epoch + } + } + } else { + s.sequence = 0 + } + + s.lastTimestamp = timestamp + + id := (timestamp << timestampShift) | (machineID << machineIDShift) | s.sequence + return id +} + +func main() { + fmt.Println("========================================") + fmt.Println(" ISBN数据迁移工具") + fmt.Println("========================================") + fmt.Println() + + // 连接源数据库 (Master) + fmt.Println("正在连接源数据库 (Master)...") + dsnMaster := "zhishu:XsRR4K3ATizyc5BK@tcp(146.56.227.42:3306)/zhishu?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s" + dbMaster, err := sql.Open("mysql", dsnMaster) + if err != nil { + log.Fatalf("❌ 打开源数据库连接失败: %v", err) + } + defer dbMaster.Close() + + dbMaster.SetMaxOpenConns(MaxOpenConns) + dbMaster.SetMaxIdleConns(MaxIdleConns) + dbMaster.SetConnMaxLifetime(ConnMaxLifetime) + dbMaster.SetConnMaxIdleTime(ConnMaxIdleTime) + + err = dbMaster.Ping() + if err != nil { + log.Fatalf("❌ 连接源数据库失败: %v", err) + } + fmt.Println("✅ 源数据库连接成功") + + // 连接目标数据库 (Slave) + fmt.Println("正在连接目标数据库 (Slave)...") + dsnSlave := "zhishu_slave:7DpixiEdCs5p3PEr@tcp(36.212.12.247:3306)/zhishu_slave?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s" + dbSlave, err := sql.Open("mysql", dsnSlave) + if err != nil { + log.Fatalf("❌ 打开目标数据库连接失败: %v", err) + } + defer dbSlave.Close() + + dbSlave.SetMaxOpenConns(MaxOpenConns) + dbSlave.SetMaxIdleConns(MaxIdleConns) + dbSlave.SetConnMaxLifetime(ConnMaxLifetime) + dbSlave.SetConnMaxIdleTime(ConnMaxIdleTime) + + err = dbSlave.Ping() + if err != nil { + log.Fatalf("❌ 连接目标数据库失败: %v", err) + } + fmt.Println("✅ 目标数据库连接成功\n") + + // 从源数据库查询数据 + fmt.Println("========================================") + fmt.Println(" 步骤1: 从源数据库读取数据") + fmt.Println("========================================") + + query := `SELECT isbn, create_time FROM shop_goods_rejection + WHERE rejection_reason = '您发布的商品涉嫌违规,请您检查相关商品并立即整改' + ORDER BY create_time ASC` + + startTime := time.Now() + + rows, err := dbMaster.Query(query) + if err != nil { + log.Fatalf("❌ 查询源数据失败: %v", err) + } + defer rows.Close() + + type Record struct { + ISBN string + CreateTime time.Time + } + + var records []Record + for rows.Next() { + var r Record + var isbn sql.NullString + var createTime sql.NullTime + + if err := rows.Scan(&isbn, &createTime); err != nil { + log.Printf("读取数据失败: %v", err) + continue + } + + if isbn.Valid && isbn.String != "" { + r.ISBN = isbn.String + if createTime.Valid { + r.CreateTime = createTime.Time + } + records = append(records, r) + } + } + + elapsed := time.Since(startTime) + fmt.Printf("查询完成,耗时: %v\n", elapsed) + fmt.Printf("共读取到 %d 条记录\n\n", len(records)) + + if len(records) == 0 { + fmt.Println("没有需要迁移的数据") + return + } + + // 查询目标表中已有的ISBN(去重) + fmt.Println("========================================") + fmt.Println(" 步骤2: 检查目标表已有数据") + fmt.Println("========================================") + + existingISBNs := make(map[string]bool) + existingRows, err := dbSlave.Query("SELECT add_txt FROM t_filter_set WHERE limitation_type = '0'") + if err != nil { + log.Printf("⚠️ 查询已有数据失败(表可能为空): %v", err) + } else { + for existingRows.Next() { + var isbn string + if err := existingRows.Scan(&isbn); err == nil { + existingISBNs[isbn] = true + } + } + existingRows.Close() + } + fmt.Printf("目标表中已有 %d 条ISBN记录\n\n", len(existingISBNs)) + + // 过滤掉已存在的ISBN + fmt.Println("========================================") + fmt.Println(" 步骤3: 过滤重复数据") + fmt.Println("========================================") + + var newRecords []Record + duplicateCount := 0 + for _, r := range records { + if !existingISBNs[r.ISBN] { + newRecords = append(newRecords, r) + } else { + duplicateCount++ + } + } + + fmt.Printf("过滤完成\n") + fmt.Printf("需要插入的新记录: %d 条\n", len(newRecords)) + fmt.Printf("已存在的重复记录: %d 条\n\n", duplicateCount) + + if len(newRecords) == 0 { + fmt.Println("没有新数据需要插入") + return + } + + // 批量插入数据 + fmt.Println("========================================") + fmt.Println(" 步骤4: 批量插入数据") + fmt.Println("========================================") + + batchSize := 100 + totalInserted := 0 + now := time.Now() + + for i := 0; i < len(newRecords); i += batchSize { + end := i + batchSize + if end > len(newRecords) { + end = len(newRecords) + } + + batch := newRecords[i:end] + + // 构建批量插入SQL + var valueStrings []string + var args []interface{} + + for _, r := range batch { + // 这里不需要单独生成id,后面会重新构建 + createTime := now.Format("2006-01-02 15:04:05") + valueStrings = append(valueStrings, "(?, '1', '0', '0', ?, 1, ?, 1, ?, '0', '0', '000000', NULL, NULL, '0', NULL)") + args = append(args, r.ISBN, createTime, createTime) + } + + + + // 重新构建包含ID的SQL + valueStrings = make([]string, 0) + args = make([]interface{}, 0) + + for _, r := range batch { + id := snowflake.GenerateID() + createTime := now.Format("2006-01-02 15:04:05") + + valueStrings = append(valueStrings, "(?, '1', '0', '0', ?, 1, ?, 1, ?, '0', '0', '000000', NULL, NULL, '0', NULL)") + args = append(args, id, r.ISBN, createTime, createTime) + } + + insertSQL := fmt.Sprintf("INSERT INTO t_filter_set (id, filter_type, limitation_type, add_way, add_txt, create_by, create_time, update_by, update_time, status, del_flag, tenant_id, create_dept, reason, sort, shop_type) VALUES %s", + strings.Join(valueStrings, ", ")) + + result, err := dbSlave.Exec(insertSQL, args...) + if err != nil { + log.Printf("❌ 批次 %d-%d 插入失败: %v", i+1, end, err) + continue + } + + rowsAffected, _ := result.RowsAffected() + totalInserted += int(rowsAffected) + + fmt.Printf("进度: %d/%d ( %.1f%% )\n", totalInserted, len(newRecords), float64(totalInserted)/float64(len(newRecords))*100) + } + + totalElapsed := time.Since(startTime) + + fmt.Println("\n========================================") + fmt.Println(" 迁移完成") + fmt.Println("========================================") + fmt.Printf("源数据总数: %d 条\n", len(records)) + fmt.Printf("重复过滤: %d 条\n", duplicateCount) + fmt.Printf("成功插入: %d 条\n", totalInserted) + fmt.Printf("总耗时: %v\n", totalElapsed) + fmt.Println("\n✅ 数据迁移完成!") +} diff --git a/addSyncSuitBook/main.go b/addSyncSuitBook/main.go new file mode 100644 index 0000000..b74f011 --- /dev/null +++ b/addSyncSuitBook/main.go @@ -0,0 +1,722 @@ +package main + +import ( + "bytes" + "database/sql" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + "time" + + _ "github.com/go-sql-driver/mysql" +) + +// ============ 配置 ============ + +// 源库 +const ( + srcHost = "36.134.185.139" + srcPort = 6603 + srcUser = "tao_zhuang_bo" + srcPassword = "Bi54NTUi0i" + srcDatabase = "kong_book_set" +) + +// 目标MySQL库 +const ( + dstHost = "175.27.224.66" + dstPort = 3306 + dstUser = "suit_book" + dstPassword = "2X8b8cBE6SmEECnm" + dstDatabase = "suit_book" +) + +// 目标ES +const ( + esURL = "http://36.212.12.92:9527" + esUser = "elastic" + esPassword = "+Tz5qR_KushZ-bPgZ_H-" + esIndexV2 = "books-from-mysql-v2" + esIndexV3 = "books-from-mysql-v3" +) + +// 批量大小 +const batchSize = 2000 + +// 增量同步状态文件 +const syncStateFile = "sync_state.json" + +// ============ 数据结构 ============ + +type BookRow struct { + KongID int64 + ISBN string + BookName string + Author string + Press string + PressTime interface{} + Binding string + Price float64 + ImgURL string + Edition string + Format string + Pages int64 + WordCount int64 + Category string + CreateTime interface{} + UpdateTime interface{} +} + +type ESDoc struct { + ID int64 `json:"id"` + Fid int64 `json:"fid"` + ISBN string `json:"isbn"` + BookName string `json:"book_name"` + Author string `json:"author"` + Publisher string `json:"publisher"` + PublicationTime string `json:"publication_time"` + BindingLayout string `json:"binding_layout"` + FixPrice float64 `json:"fix_price"` + BookPic ESBookPic `json:"book_pic"` + BookPicS ESBookPicS `json:"book_pic_s"` + Edition string `json:"edition"` + BookFormat string `json:"book_format"` + PageCount int64 `json:"page_count"` + WordCount int64 `json:"word_count"` + Category string `json:"category"` + CatID ESCatID `json:"cat_id"` + UpdateTime string `json:"update_time"` + IsSuit int64 `json:"is_suit"` +} + +type ESBookPic struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` +} + +type ESBookPicS struct { + LocalPath string `json:"localPath"` + PddResponse string `json:"pddResponse"` +} + +type ESCatID struct { + KongFuZiCatID string `json:"kong_fu_zi_cat_id"` +} + +type ImgURLJSON struct { + BookPic ImgBookPic `json:"book_pic"` + BookPicS ImgBookPicS `json:"book_pic_s"` +} +type ImgBookPic struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` +} +type ImgBookPicS struct { + LocalPath string `json:"localPath"` + PddResponse string `json:"pddResponse"` +} + +type SyncState struct { + LastSyncTime string `json:"last_sync_time"` +} + +// ============ 辅助函数 ============ + +func buildImgURLJSON(raw string) string { + if raw == "" { + b, _ := json.Marshal(ImgURLJSON{ + BookPic: ImgBookPic{LocalPath: "", PddPath: ""}, + BookPicS: ImgBookPicS{LocalPath: "", PddResponse: ""}, + }) + return string(b) + } + b, _ := json.Marshal(ImgURLJSON{ + BookPic: ImgBookPic{LocalPath: "", PddPath: ""}, + BookPicS: ImgBookPicS{LocalPath: "", PddResponse: raw}, + }) + return string(b) +} + +func rowToESDoc(r BookRow) ESDoc { + var imgJSON ImgURLJSON + json.Unmarshal([]byte(r.ImgURL), &imgJSON) + + var pubTimeStr string + if r.PressTime != nil { + if t, ok := r.PressTime.(string); ok { + if parsed, err := time.Parse("2006-01-02 15:04:05", t); err == nil { + pubTimeStr = fmt.Sprintf("%d", parsed.Unix()+5364000000) + } else if parsed, err := time.Parse("2006-01-02", t); err == nil { + pubTimeStr = fmt.Sprintf("%d", parsed.Unix()+5364000000) + } else { + pubTimeStr = t + } + } + } + + var updateTimeStr string + if r.UpdateTime != nil { + if t, ok := r.UpdateTime.(string); ok { + if parsed, err := time.Parse("2006-01-02 15:04:05", t); err == nil { + updateTimeStr = fmt.Sprintf("%d", parsed.Unix()) + } else { + updateTimeStr = t + } + } + } + + return ESDoc{ + ID: r.KongID, + Fid: 0, + ISBN: r.ISBN, + BookName: r.BookName, + Author: r.Author, + Publisher: r.Press, + PublicationTime: pubTimeStr, + BindingLayout: r.Binding, + FixPrice: r.Price, + BookPic: ESBookPic{LocalPath: "", PddPath: ""}, + BookPicS: ESBookPicS{LocalPath: "", PddResponse: imgJSON.BookPicS.PddResponse}, + Edition: r.Edition, + BookFormat: r.Format, + PageCount: r.Pages, + WordCount: r.WordCount, + Category: r.Category, + CatID: ESCatID{KongFuZiCatID: r.Category}, + UpdateTime: updateTimeStr, + } +} + +// ============ 同步状态读写 ============ + +func loadSyncState() *SyncState { + data, err := os.ReadFile(syncStateFile) + if err != nil { + return nil + } + var state SyncState + if json.Unmarshal(data, &state) != nil { + return nil + } + return &state +} + +func saveSyncState(t time.Time) { + state := SyncState{ + LastSyncTime: t.Format("2006-01-02 15:04:05"), + } + data, _ := json.MarshalIndent(state, "", " ") + os.WriteFile(syncStateFile, data, 0644) +} + +// ============ ES操作 ============ + +func queryV2ByISBNs(isbns []string) (map[string]bool, error) { + if len(isbns) == 0 { + return nil, nil + } + + maxTerms := 500 + result := make(map[string]bool) + + for i := 0; i < len(isbns); i += maxTerms { + end := i + maxTerms + if end > len(isbns) { + end = len(isbns) + } + batch := isbns[i:end] + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "terms": map[string]interface{}{ + "isbn": batch, + }, + }, + "_source": []string{"isbn"}, + "size": len(batch), + } + queryBytes, _ := json.Marshal(query) + + req, err := http.NewRequest("POST", esURL+"/"+esIndexV2+"/_search", bytes.NewReader(queryBytes)) + if err != nil { + return nil, err + } + req.Header.Set("Content-Type", "application/json") + req.SetBasicAuth(esUser, esPassword) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("ES v2查询失败 HTTP %d: %s", resp.StatusCode, string(body)) + } + + var searchResp struct { + Hits struct { + Hits []struct { + Source struct { + ISBN string `json:"isbn"` + } `json:"_source"` + } `json:"hits"` + } `json:"hits"` + } + body, _ := io.ReadAll(resp.Body) + json.Unmarshal(body, &searchResp) + + for _, hit := range searchResp.Hits.Hits { + result[hit.Source.ISBN] = true + } + } + return result, nil +} + +func esBulkUpdateIsSuit(docs []ESDoc) (int, error) { + if len(docs) == 0 { + return 0, nil + } + + var buf bytes.Buffer + for _, doc := range docs { + meta := fmt.Sprintf(`{"update":{"_index":"%s","_id":"%d"}}`, esIndexV3, doc.ID) + buf.WriteString(meta) + buf.WriteByte('\n') + buf.WriteString(`{"script":{"source":"ctx._source.is_suit=params.val","params":{"val":1}}}`) + buf.WriteByte('\n') + } + + req, err := http.NewRequest("POST", esURL+"/_bulk", &buf) + if err != nil { + return 0, err + } + req.Header.Set("Content-Type", "application/x-ndjson") + req.SetBasicAuth(esUser, esPassword) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("ES bulk更新失败 HTTP %d: %s", resp.StatusCode, string(body)) + } + + var bulkResp struct { + Items []struct { + Update struct { + Status int `json:"status"` + } `json:"update"` + } `json:"items"` + } + body, _ := io.ReadAll(resp.Body) + json.Unmarshal(body, &bulkResp) + + success := 0 + for _, item := range bulkResp.Items { + if item.Update.Status >= 200 && item.Update.Status < 300 { + success++ + } + } + return success, nil +} + +func esBulkInsert(docs []ESDoc, index string) (int, error) { + if len(docs) == 0 { + return 0, nil + } + + var buf bytes.Buffer + for _, doc := range docs { + meta := fmt.Sprintf(`{"index":{"_index":"%s","_id":"%d"}}`, index, doc.ID) + buf.WriteString(meta) + buf.WriteByte('\n') + docBytes, _ := json.Marshal(doc) + buf.Write(docBytes) + buf.WriteByte('\n') + } + + req, err := http.NewRequest("POST", esURL+"/_bulk", &buf) + if err != nil { + return 0, err + } + req.Header.Set("Content-Type", "application/x-ndjson") + req.SetBasicAuth(esUser, esPassword) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + body, _ := io.ReadAll(resp.Body) + return 0, fmt.Errorf("ES bulk失败 HTTP %d: %s", resp.StatusCode, string(body)) + } + + var bulkResp struct { + Items []struct { + Index struct { + Status int `json:"status"` + } `json:"index"` + } `json:"items"` + } + body, _ := io.ReadAll(resp.Body) + json.Unmarshal(body, &bulkResp) + + success := 0 + for _, item := range bulkResp.Items { + if item.Index.Status >= 200 && item.Index.Status < 300 { + success++ + } + } + return success, nil +} + +// ============ MySQL批量操作 ============ + +func batchUpsertMySQL(db *sql.DB, rows []BookRow) (int, error) { + if len(rows) == 0 { + return 0, nil + } + + baseSQL := `REPLACE INTO suit_book_lib_merged +(kong_id, fid, isbn, book_name, subtitle, author, press, press_time, binding, price, img_url, edition, format, pages, word_count, category, create_time, update_time) +VALUES ` + + rowPlaceholder := "(?, 0, ?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + placeholders := make([]string, len(rows)) + for i := range placeholders { + placeholders[i] = rowPlaceholder + } + fullSQL := baseSQL + strings.Join(placeholders, ", ") + + args := make([]interface{}, 0, len(rows)*18) + for _, r := range rows { + args = append(args, + r.KongID, r.ISBN, r.BookName, r.Author, r.Press, + r.PressTime, r.Binding, r.Price, r.ImgURL, + r.Edition, r.Format, r.Pages, r.WordCount, r.Category, + r.CreateTime, r.UpdateTime, + ) + } + + result, err := db.Exec(fullSQL, args...) + if err != nil { + return 0, err + } + affected, _ := result.RowsAffected() + return int(affected), nil +} + +// ============ NULL处理 ============ + +func nullStr(ns sql.NullString) string { + if ns.Valid { + return ns.String + } + return "" +} + +func nullInt64(ns sql.NullInt64) int64 { + if ns.Valid { + return ns.Int64 + } + return 0 +} + +func nullFloat64(ns sql.NullFloat64) float64 { + if ns.Valid { + return ns.Float64 + } + return 0 +} + +func nullTimeInterface(ns sql.NullTime) interface{} { + if ns.Valid { + return ns.Time.Format("2006-01-02 15:04:05") + } + return nil +} + +// ============ 同步任务函数 ============ + +func runSync() error { + state := loadSyncState() + var sinceTime string + if state != nil && state.LastSyncTime != "" { + sinceTime = state.LastSyncTime + log.Printf("📅 增量同步:从 %s 开始", sinceTime) + } else { + log.Println("🚀 首次运行:全量同步") + } + + syncStart := time.Now() + + srcDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&timeout=60s&parseTime=true&readTimeout=300s", + srcUser, srcPassword, srcHost, srcPort, srcDatabase) + srcDB, err := sql.Open("mysql", srcDSN) + if err != nil { + return fmt.Errorf("连接源库失败: %v", err) + } + defer srcDB.Close() + srcDB.SetMaxOpenConns(2) + srcDB.SetMaxIdleConns(2) + + dstDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&timeout=60s&parseTime=true&writeTimeout=300s", + dstUser, dstPassword, dstHost, dstPort, dstDatabase) + dstDB, err := sql.Open("mysql", dstDSN) + if err != nil { + return fmt.Errorf("连接目标库失败: %v", err) + } + defer dstDB.Close() + dstDB.SetMaxOpenConns(10) + dstDB.SetMaxIdleConns(10) + + if err := srcDB.Ping(); err != nil { + return fmt.Errorf("源库连接测试失败: %v", err) + } + log.Println("✅ 源MySQL连接成功") + + if err := dstDB.Ping(); err != nil { + return fmt.Errorf("目标MySQL连接测试失败: %v", err) + } + log.Println("✅ 目标MySQL连接成功") + + req, _ := http.NewRequest("GET", esURL, nil) + req.SetBasicAuth(esUser, esPassword) + resp, err := http.DefaultClient.Do(req) + if err != nil || resp.StatusCode >= 300 { + return fmt.Errorf("ES连接测试失败: %v", err) + } + resp.Body.Close() + log.Println("✅ ES连接成功") + + query := ` +SELECT + b.id, b.isbn, b.book_name, b.author, b.press, b.press_time, + b.binding, b.price, b.img_url, b.create_time, b.update_time, + d.kong_id, d.edition, d.format, d.pages, d.word_count, d.category +FROM book_lib b +INNER JOIN book_lib_details d ON b.isbn = d.isbn +` + var rows *sql.Rows + if sinceTime != "" { + query += " WHERE b.update_time > ?" + rows, err = srcDB.Query(query, sinceTime) + } else { + rows, err = srcDB.Query(query) + } + if err != nil { + return fmt.Errorf("查询源库失败: %v", err) + } + defer rows.Close() + + startTime := time.Now() + if sinceTime != "" { + log.Println("✅ 查询增量数据成功,开始同步 ...") + } else { + log.Println("✅ 查询全量数据成功,开始同步 ...") + } + + totalMySQL := 0 + totalESV3 := 0 + totalIsSuit := 0 + failCount := 0 + batch := make([]BookRow, 0, batchSize) + + for rows.Next() { + var ( + id, kongID, pages, wordCount sql.NullInt64 + isbn, bookName, author, press, binding, imgURL, + edition, format, category sql.NullString + price sql.NullFloat64 + pressTime, createTime, updateTime sql.NullTime + ) + + err := rows.Scan( + &id, &isbn, &bookName, &author, &press, &pressTime, + &binding, &price, &imgURL, &createTime, &updateTime, + &kongID, &edition, &format, &pages, &wordCount, &category, + ) + if err != nil { + failCount++ + continue + } + + row := BookRow{ + KongID: nullInt64(kongID), + ISBN: nullStr(isbn), + BookName: nullStr(bookName), + Author: nullStr(author), + Press: nullStr(press), + PressTime: nullTimeInterface(pressTime), + Binding: nullStr(binding), + Price: nullFloat64(price), + ImgURL: buildImgURLJSON(nullStr(imgURL)), + Edition: nullStr(edition), + Format: nullStr(format), + Pages: nullInt64(pages), + WordCount: nullInt64(wordCount), + Category: nullStr(category), + CreateTime: nullTimeInterface(createTime), + UpdateTime: nullTimeInterface(updateTime), + } + + batch = append(batch, row) + + if len(batch) >= batchSize { + mysqlAffected, err := batchUpsertMySQL(dstDB, batch) + if err != nil { + log.Printf("MySQL批量写入失败: %v", err) + } + totalMySQL += mysqlAffected + + esDocsV3 := make([]ESDoc, len(batch)) + isbns := make([]string, len(batch)) + for i, r := range batch { + esDocsV3[i] = rowToESDoc(r) + isbns[i] = r.ISBN + } + v3Count, err := esBulkInsert(esDocsV3, esIndexV3) + if err != nil { + log.Printf("ES v3 bulk失败: %v", err) + } + totalESV3 += v3Count + + v2Matches, err := queryV2ByISBNs(isbns) + if err != nil { + log.Printf("查询v2失败: %v", err) + } else { + var needUpdate []ESDoc + for _, doc := range esDocsV3 { + if v2Matches[doc.ISBN] { + doc.IsSuit = 1 + needUpdate = append(needUpdate, doc) + } + } + if len(needUpdate) > 0 { + updateCount, err := esBulkUpdateIsSuit(needUpdate) + if err != nil { + log.Printf("更新is_suit失败: %v", err) + } + totalIsSuit += updateCount + } + } + + elapsed := time.Since(startTime) + log.Printf("MySQL: %d | ES-v3: %d | is_suit: %d | 耗时 %v", + totalMySQL, totalESV3, totalIsSuit, elapsed.Round(time.Second)) + + batch = batch[:0] + } + } + + if len(batch) > 0 { + mysqlAffected, _ := batchUpsertMySQL(dstDB, batch) + totalMySQL += mysqlAffected + + esDocsV3 := make([]ESDoc, len(batch)) + isbns := make([]string, len(batch)) + for i, r := range batch { + esDocsV3[i] = rowToESDoc(r) + isbns[i] = r.ISBN + } + v3Count, _ := esBulkInsert(esDocsV3, esIndexV3) + totalESV3 += v3Count + + v2Matches, _ := queryV2ByISBNs(isbns) + if v2Matches != nil { + var needUpdate []ESDoc + for _, doc := range esDocsV3 { + if v2Matches[doc.ISBN] { + doc.IsSuit = 1 + needUpdate = append(needUpdate, doc) + } + } + if len(needUpdate) > 0 { + updateCount, _ := esBulkUpdateIsSuit(needUpdate) + totalIsSuit += updateCount + } + } + } + + totalElapsed := time.Since(startTime) + log.Printf("🎉 同步完成! MySQL: %d | ES v3: %d | is_suit: %d | 跳过: %d | 耗时 %v", + totalMySQL, totalESV3, totalIsSuit, failCount, totalElapsed.Round(time.Millisecond)) + + saveSyncState(syncStart) + log.Printf("📝 已记录同步时间点: %s", syncStart.Format("2006-01-02 15:04:05")) + + return nil +} + +// ============ 主流程 ============ + +func main() { + // 启动时先执行一次 + log.Println("======== 启动同步任务 ========") + log.Println("任务计划:每天凌晨 01:00 执行") + log.Printf("当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05")) + + // 先执行一次同步 + if err := runSync(); err != nil { + log.Printf("❌ 同步任务执行失败: %v", err) + } else { + log.Println("✅ 启动同步完成") + } + + // 计算下一次执行时间(每天凌晨1点) + nextRun := func() time.Time { + now := time.Now() + next := time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, now.Location()) + if now.Hour() < 1 { + next = time.Date(now.Year(), now.Month(), now.Day(), 1, 0, 0, 0, now.Location()) + } + return next + } + + nextTime := nextRun() + log.Printf("⏰ 下一次执行时间:%s (等待 %v)", nextTime.Format("2006-01-02 15:04:05"), time.Until(nextTime).Round(time.Second)) + + // 定时器 + timer := time.NewTimer(time.Until(nextTime)) + defer timer.Stop() + + // 信号处理 + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + for { + select { + case <-timer.C: + log.Println("\n======== 开始执行定时同步 ========") + log.Printf("执行时间:%s", time.Now().Format("2006-01-02 15:04:05")) + + if err := runSync(); err != nil { + log.Printf("❌ 同步任务执行失败: %v", err) + } else { + log.Println("✅ 定时同步完成") + } + + // 计算下一次执行时间 + nextTime = nextRun() + log.Printf("⏰ 下一次执行时间:%s (等待 %v)", nextTime.Format("2006-01-02 15:04:05"), time.Until(nextTime).Round(time.Second)) + timer.Reset(time.Until(nextTime)) + + case sig := <-sigChan: + log.Printf("\n⚠️ 收到信号 %v,程序退出", sig) + return + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..25ae357 --- /dev/null +++ b/go.mod @@ -0,0 +1,21 @@ +module batch + +go 1.26.3 + +require ( + github.com/elastic/go-elasticsearch/v8 v8.19.6 + github.com/go-redis/redis/v8 v8.11.5 + github.com/go-sql-driver/mysql v1.10.0 +) + +require ( + filippo.io/edwards25519 v1.2.0 // indirect + github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/elastic/elastic-transport-go/v8 v8.9.0 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel v1.29.0 // indirect + go.opentelemetry.io/otel/metric v1.29.0 // indirect + go.opentelemetry.io/otel/trace v1.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..7409876 --- /dev/null +++ b/go.sum @@ -0,0 +1,57 @@ +filippo.io/edwards25519 v1.2.0 h1:crnVqOiS4jqYleHd9vaKZ+HKtHfllngJIiOpNpoJsjo= +filippo.io/edwards25519 v1.2.0/go.mod h1:xzAOLCNug/yB62zG1bQ8uziwrIqIuxhctzJT18Q77mc= +github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= +github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/elastic/elastic-transport-go/v8 v8.9.0 h1:KeT/2P54F0xS0S8Y3Pf+tFDg4HmBgReQMB+BMz8dDAs= +github.com/elastic/elastic-transport-go/v8 v8.9.0/go.mod h1:ssMTvNS2hwf7CaiGsRRsx4gQHFZ/jS/DkLcISxekWzc= +github.com/elastic/go-elasticsearch/v8 v8.19.6 h1:4qa7ecJkr5rLsoHKIVGbaqcFt2o57CnOHQJi9Pts/rk= +github.com/elastic/go-elasticsearch/v8 v8.19.6/go.mod h1:jeWebApE1oFEW/hKZqx/IRYmP/aa2+WMJkOfk+AduSI= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= +github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/go-sql-driver/mysql v1.10.0 h1:Q+1LV8DkHJvSYAdR83XzuhDaTykuDx0l6fkXxoWCWfw= +github.com/go-sql-driver/mysql v1.10.0/go.mod h1:M+cqaI7+xxXGG9swrdeUIoPG3Y3KCkF0pZej+SK+nWk= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= +github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= +github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU= +github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.18.1/go.mod h1:0q+aL8jAiMXy9hbwj2mr5GziHiwhAIQpFmmtT5hitRs= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +go.opentelemetry.io/otel v1.29.0 h1:PdomN/Al4q/lN6iBJEN3AwPvUiHPMlt93c8bqTG5Llw= +go.opentelemetry.io/otel v1.29.0/go.mod h1:N/WtXPs1CNCUEx+Agz5uouwCba+i+bJGFicT8SR4NP8= +go.opentelemetry.io/otel/metric v1.29.0 h1:vPf/HFWTNkPu1aYeIsc98l4ktOQaL6LeSoeV2g+8YLc= +go.opentelemetry.io/otel/metric v1.29.0/go.mod h1:auu/QWieFVWx+DmQOUMgj0F8LHWdgalxXqvp7BII/W8= +go.opentelemetry.io/otel/sdk v1.29.0 h1:vkqKjk7gwhS8VaWb0POZKmIEDimRCMsopNYnriHyryo= +go.opentelemetry.io/otel/sdk v1.29.0/go.mod h1:pM8Dx5WKnvxLCb+8lG1PRNIDxu9g9b9g59Qr7hfAAok= +go.opentelemetry.io/otel/trace v1.29.0 h1:J/8ZNK4XgR7a21DZUAsbF8pZ5Jcw1VhACmnYt39JTi4= +go.opentelemetry.io/otel/trace v1.29.0/go.mod h1:eHl3w0sp3paPkYstJOmAimxhiFXPg+MMTlEh3nsQgWQ= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781 h1:DzZ89McO9/gWPsQXS/FVKAlG02ZjaQ6AlZRBimEYOd0= +golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= +gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/go_files_explanation.md b/go_files_explanation.md new file mode 100644 index 0000000..7a34640 --- /dev/null +++ b/go_files_explanation.md @@ -0,0 +1,180 @@ +# Go 文件功能说明 + +## 项目概述 + +该项目是一个 **Go 批处理工具集**,主要用于数据同步和迁移任务,涉及 **MySQL**、**Elasticsearch** 和 **Redis** 之间的数据传输。 + +--- + +## 文件列表与功能 + +### 1. `main.go` — 项目入口(占位模板) + +- **位置**:根目录 +- **作用**:GoLand 自动生成的默认入口文件,仅包含一个 `Hello, world` 打印程序。 +- **说明**:不属于实际的批处理系统,可忽略或用作文档占位。 + +--- + +### 2. `util/dbConnectUtil/dbConnectUtil.go` — 数据库连接工具包 + +- **位置**:`util/dbConnectUtil/` +- **包名**:`dbConnectUtil` +- **作用**:提供可复用的 MySQL 数据库连接初始化函数。 +- **核心功能**: + - `InitDB(username, password, host, port)` — 创建并测试 MySQL 连接 + - 配置连接池参数(最大打开 20 个、最大空闲 10 个) + - 返回全局变量 `DB`(`*sql.DB`) +- **被依赖**:`shop_mysql_to_redis/` 下的两个子项目都导入此包。 + +--- + +### 3. `addSyncSuitBook/main.go` — 图书数据同步工具 + +- **位置**:`addSyncSuitBook/` +- **包名**:`main`(独立可执行程序) +- **作用**:将源 MySQL 中的图书数据同步到目标 MySQL 和 Elasticsearch。 +- **核心功能**: + - **数据源**:源库 `book_lib` JOIN `book_lib_details`(IP: `36.134.185.139:6603`) + - **目标 MySQL**:写入 `suit_book_lib_merged` 表(IP: `175.27.224.66:3306`) + - **目标 ES**:写入 `books-from-mysql-v3` 索引(IP: `36.212.12.92:9527`) + - **同步模式**:全量同步 / 增量同步(基于 `update_time` 记录上次同步时间到 `sync_state.json`) + - **套装书标记**:查询 v2 索引中已存在的 ISBN,在 v3 索引中通过 script 更新 `is_suit = 1` + - **执行计划**:程序启动时立即执行一次,之后每天凌晨 01:00 自动执行 + - **批量操作**:MySQL 使用 `REPLACE INTO`;ES 使用 `_bulk` API + +--- + +### 4. `addSyncPddReject/main.go` — 拼多多违规商品迁移工具 + +- **位置**:`addSyncPddReject/` +- **包名**:`main`(独立可执行程序) +- **作用**:将主库中因"涉嫌违规"被驳回的商品 ISBN 迁移到从库的过滤表中。 +- **核心功能**: + - **源库**:查 `shop_goods_rejection` 表(原因:`'您发布的商品涉嫌违规,请您检查相关商品并立即整改'`) + - **目标库**:写入 `t_filter_set` 表(`limitation_type = '0'` 表示 ISBN 过滤) + - **去重逻辑**:先查询目标表已有 ISBN,仅插入不重复的新数据 + - **ID 生成**:雪花算法(Snowflake)生成 19 位分布式 ID + - **批量插入**:每批 100 条,带进度打印 + +--- + +### 5. `goods_es_to_redis/main.go` — ES 到 Redis 全量同步工具 + +- **位置**:`goods_es_to_redis/` +- **包名**:`main`(独立可执行程序) +- **作用**:从 Elasticsearch `books-from-mysql-v2` 索引全量拉取图书数据,写入 Redis DB 1。 +- **核心功能**: + - **数据读取**:使用 ES Scroll API 全量遍历(每批 10,000 条) + - **并发模型**: + - 8 个 ES 读取协程(m 并行处理 scroll 分页) + - 1 个批处理协程(聚合成 500 条/批) + - 32 个 Redis 写入协程(使用 Pipeline 批量 SET) + - **数据转换**: + - 自定义 `FlexInt64` JSON 反序列化(兼容数字/字符串) + - 自定义 `StringOrArray` 类型(兼容字符串/数组) + - 分类 ID 清洗:`A > B > C` → `A/B/C` + - 出版时间时间戳转换 + - 图片信息聚合(轮播图、白底图、详情图、目录图等) + - **Redis 结构**:`key = ISBN`,`value = BookInfo JSON`(book_name/author/publisher/price/images/category 等) + - **数据校验**:同步完成后用 SCAN 命令校验 Redis 中的 key 数量是否与 ES 文档数一致 + - **进度报告**:每 5 秒打印一次同步进度 + +--- + +### 6. `shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go` — 店铺数据迁移(驼峰版) + +- **位置**:`shop_mysql_to_redis/CameiCase/` +- **包名**:`main`(独立可执行程序) +- **作用**:将 MySQL 中的店铺相关数据迁移到 Redis,字段名为**驼峰命名**。 +- **核心功能**: + - **查询数据**:遍历 `t_shop_detail` 中的所有 `shop_id` + - **关联表**: + - `t_shop` — 店铺主表 + - `t_shop_detail` — 店铺详情 + - `t_shop_context` — 店铺上下文(商品描述) + - `t_spec` — 规格设置 + - `t_price_template` — 价格模板(根据 `sale_template_id` 关联) + - **数据格式**: + - 每条记录以 `{"source_table": "表名", "data": {字段: 值...}}` 格式存入 Redis + - 字段名通过 `snakeToCamel()` 转为驼峰(如 `shop_id` → `shopId`) + - 价格模板的 `rangePrice` 字段做浮点精度修复 + - **Redis 结构**:`key = shopID`,`value = List`,每元素为一条 JSON 记录 + - **数据库连接**:使用 `dbConnectUtil` 工具包 + +--- + +### 7. `shop_mysql_to_redis/SanckCase/mysqlToRedis.go` — 店铺数据迁移(下划线版,优化版) + +- **位置**:`shop_mysql_to_redis/SanckCase/` +- **包名**:`main`(独立可执行程序) +- **作用**:与驼峰版类似,但字段名为**下划线命名**,且架构更完善。 +- **增强功能**: + - **并发处理**:3 个 Worker 并发处理店铺,每个店铺间 sleep 100ms 限流 + - **循环执行**:每 5~10 分钟自动重跑(根据耗时动态调整间隔) + - **优雅退出**:监听 `SIGINT`/`SIGTERM` 信号,打印运行次数后退出 + - **字段标准化**(`normalizeFieldValue`): + - ID 类字段(`id`, `*_id`, `create_by`, `update_by`)→ 字符串 + - 时间字段(`create_time`, `update_time`, `add_time`, `expiration_time`)→ `"2006-01-02 15:04:05"` 格式 + - `mall_id`、`district_id` 保持 `int64` 数字 + - **图片处理**:从 `t_shop_img` 表查询图片,按 type 分类: + - `type=1` → 水印图 + - `type=3/4` → 详情首/尾图 + - `type=5` → 末尾轮播图 + - `type=7` → SKU 水印图 + - **价格处理**: + - `range_price` 字段做浮点精度修复 + - `add_amount` 字段 `×100` 转为 `int64`(分单位) + - **Redis 写入**:使用 Pipeline 批量操作(先 DEL 旧 key,再 RPUSH 所有记录) + - **数据库连接**:使用 `dbConnectUtil` 工具包 + +--- + +## 数据流向总图 + +``` +┌─────────────────────────────────────────────────────────────┐ +│ addSyncSuitBook │ +│ MySQL (36.134.185.139:6603) │ +│ book_lib + book_lib_details │ +│ ┌──────────┴──────────┐ │ +│ ▼ ▼ │ +│ MySQL (175.27.224.66) ES (36.212.12.92:9527) │ +│ suit_book_lib_merged books-from-mysql-v3 │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ addSyncPddReject │ +│ MySQL Master (146.56.227.42:3306) │ +│ shop_goods_rejection │ +│ ▼ │ +│ MySQL Slave (36.212.12.247:3306) │ +│ t_filter_set │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ goods_es_to_redis │ +│ ES (36.212.12.92:9527) │ +│ books-from-mysql-v2 │ +│ ▼ │ +│ Redis (36.212.12.247:6379 DB 1) │ +│ key=ISBN → BookInfo JSON │ +└─────────────────────────────────────────────────────────────┘ + +┌─────────────────────────────────────────────────────────────┐ +│ shop_mysql_to_redis │ +│ MySQL (146.56.227.42:3306) │ +│ t_shop / t_shop_detail / t_shop_context │ +│ t_spec / t_price_template / t_shop_img │ +│ ▼ │ +│ Redis (CameiCase: 36.212.20.113:7963 DB 7) │ +│ (SanckCase: 36.212.12.247:6379 DB 8) │ +│ key=shopID → List of JSON records │ +└─────────────────────────────────────────────────────────────┘ +``` + +## 依赖关系 + +- `dbConnectUtil` 被 `shop_mysql_to_redis/` 下的两个子项目导入 +- 所有可执行程序均为 `package main`,各自独立运行 +- 外部依赖:`go-sql-driver/mysql`、`go-elasticsearch`、`go-redis` diff --git a/goods_es_to_redis/main.go b/goods_es_to_redis/main.go new file mode 100644 index 0000000..cb1c400 --- /dev/null +++ b/goods_es_to_redis/main.go @@ -0,0 +1,698 @@ +package main + +import ( + "context" + "crypto/tls" + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/elastic/go-elasticsearch/v8" + "github.com/elastic/go-elasticsearch/v8/esapi" + "github.com/go-redis/redis/v8" +) + +const ( + esAddress = "http://36.212.12.92:9527" + esUsername = "elastic" + esPassword = "+Tz5qR_KushZ-bPgZ_H-" + + redisAddr = "36.212.12.247:6379" + redisPassword = "long6166@@" + redisDB = 1 +) + +/* ================= Client ================= */ + +type ESClient struct { + client *elasticsearch.Client +} + +func NewESClient(addresses []string, username, password string) (*ESClient, error) { + cfg := elasticsearch.Config{ + Addresses: addresses, + Username: username, + Password: password, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + MaxIdleConnsPerHost: 100, + ResponseHeaderTimeout: 60 * time.Second, + }, + } + cli, err := elasticsearch.NewClient(cfg) + if err != nil { + return nil, err + } + return &ESClient{client: cli}, nil +} + +type RedisClient struct { + client *redis.Client +} + +func NewRedisClient(addr, password string, db int) (*RedisClient, error) { + rdb := redis.NewClient(&redis.Options{ + Addr: addr, + Password: password, + DB: db, + PoolSize: 100, + DialTimeout: 10 * time.Second, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + }) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if _, err := rdb.Ping(ctx).Result(); err != nil { + return nil, err + } + + // 解决 Redis MISCONF 错误:禁用 bgsave 错误时的写入限制 + if err := rdb.ConfigSet(ctx, "stop-writes-on-bgsave-error", "no").Err(); err != nil { + log.Printf("警告:无法设置 Redis 配置 stop-writes-on-bgsave-error=no: %v", err) + } else { + log.Println("已设置 Redis 配置:stop-writes-on-bgsave-error=no") + } + + return &RedisClient{client: rdb}, nil +} + +/* ================= Data ================= */ + +type BookPic struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` +} + +type BookPicS struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` + PddResponse string `json:"pddResponse"` + Localh string `json:"localh"` +} + +type BookDetailImage struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` +} + +type BookDirectoryImage struct { + LocalPath string `json:"localPath"` + PddPath string `json:"pddPath"` +} + +type ESCatIdObject struct { + PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` + KongFuZiCatId string `json:"kong_fu_zi_cat_id"` + XianYuCatId string `json:"xian_yu_cat_id"` +} + +type BookData struct { + ID int64 `json:"id"` + BookName StringOrArray `json:"book_name"` + BookPic BookPic `json:"book_pic"` + BookDefPic BookPic `json:"book_def_pic"` + BookPicS BookPicS `json:"book_pic_s"` + BookPicObj string `json:"book_pic_obj"` + BookDetailImage BookDetailImage `json:"book_detail_image"` + BookPicB string `json:"book_pic_b"` + BookDirectoryImage BookDirectoryImage `json:"book_directory_image"` + ISBN string `json:"isbn"` + Author string `json:"author"` + Category string `json:"category"` + Publisher string `json:"publisher"` + PublicationTime string `json:"publication_time"` + BindingLayout string `json:"binding_layout"` + FixPrice FlexInt64 `json:"fix_price"` + PageCount StringOrArray `json:"page_count"` + WordCount StringOrArray `json:"word_count"` + CatId ESCatIdObject `json:"cat_id"` + IsSuit int64 `json:"is_suit"` +} + +type ESResponse struct { + ScrollID string `json:"_scroll_id"` + Hits struct { + Total struct { + Value int64 `json:"value"` + } `json:"total"` + Hits []struct { + Source BookData `json:"_source"` + } `json:"hits"` + } `json:"hits"` +} + +/* ================= Custom Types ================= */ + +type StringOrArray string + +type FlexInt64 int64 + +func (f *FlexInt64) UnmarshalJSON(data []byte) error { + // 尝试直接解析为数字 + var num int64 + if err := json.Unmarshal(data, &num); err == nil { + *f = FlexInt64(num) + return nil + } + // 尝试解析为字符串 + var str string + if err := json.Unmarshal(data, &str); err == nil { + if str == "" { + *f = 0 + return nil + } + if val, err := strconv.ParseInt(str, 10, 64); err == nil { + *f = FlexInt64(val) + return nil + } + if val, err := strconv.ParseFloat(str, 64); err == nil { + *f = FlexInt64(int64(val)) + return nil + } + } + // 默认为0 + *f = 0 + return nil +} + +func (s *StringOrArray) UnmarshalJSON(data []byte) error { + if len(data) > 0 && data[0] == '"' { + var str string + _ = json.Unmarshal(data, &str) + *s = StringOrArray(str) + return nil + } + var arr []string + if json.Unmarshal(data, &arr) == nil && len(arr) > 0 { + *s = StringOrArray(arr[0]) + } + return nil +} + +type Float64OrString float64 + +func (f *Float64OrString) UnmarshalJSON(data []byte) error { + var n float64 + if json.Unmarshal(data, &n) == nil { + *f = Float64OrString(n) + return nil + } + var s string + if json.Unmarshal(data, &s) == nil { + if v, err := strconv.ParseFloat(strings.TrimSpace(s), 64); err == nil { + *f = Float64OrString(v) + } + } + return nil +} + +/* ================= Main ================= */ + +func main() { + flag.Parse() + + log.Println("开始全量同步 ES → Redis(强制全量写入)") + + esClient, err := NewESClient([]string{esAddress}, esUsername, esPassword) + if err != nil { + log.Fatal(err) + } + redisClient, err := NewRedisClient(redisAddr, redisPassword, redisDB) + if err != nil { + log.Fatal(err) + } + defer redisClient.client.Close() + + // 执行同步并获取总数 + totalDocs, err := fetchAndWriteToRedis(context.Background(), esClient, redisClient) + if err != nil { + log.Fatal(err) + } + + // 验证数据完整性 + verifyDataCount(context.Background(), redisClient, totalDocs) +} + +func fetchAndWriteToRedis(ctx context.Context, esClient *ESClient, redisClient *RedisClient) (int64, error) { + // 详细统计变量 + var ( + readCount int64 // 从ES读取的总文档数 + skipEmptyIsbn int64 // ISBN为空跳过的数量 + convertFailed int64 // 转换或序列化失败的数量 + writeSuccess int64 // 实际写入Redis成功的数量 + writeFailed int64 // 写入失败的数量 + sentToBatchChan int64 // 发送到batchChan的文档总数 + receivedByWorkers int64 // 写入协程接收到的文档总数 + ) + + const ( + batchSize = 500 + esWorkerCount = 8 + redisWorkerCount = 32 + bookChanSize = 10000 + batchChanSize = 200 + ) + + bookChan := make(chan BookData, bookChanSize) + batchChan := make(chan []BookData, batchChanSize) + var wg sync.WaitGroup + + // ================= Redis批量写入协程 ================= + for i := 0; i < redisWorkerCount; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for batch := range batchChan { + atomic.AddInt64(&receivedByWorkers, int64(len(batch))) + + // 准备要写入的键值对 + type kv struct { + key string + val []byte + } + kvs := make([]kv, 0, len(batch)) + + for _, book := range batch { + // 跳过空ISBN + if book.ISBN == "" { + atomic.AddInt64(&skipEmptyIsbn, 1) + continue + } + + // 安全转换(带panic恢复) + bookInfo, err := safeConvertBookData(book) + if err != nil { + atomic.AddInt64(&convertFailed, 1) + log.Printf("Worker %d 转换失败 ISBN=%s: %v", workerID, book.ISBN, err) + continue + } + + data, err := json.Marshal(bookInfo) + if err != nil { + atomic.AddInt64(&convertFailed, 1) + log.Printf("Worker %d JSON序列化失败 ISBN=%s: %v", workerID, book.ISBN, err) + continue + } + kvs = append(kvs, kv{key: book.ISBN, val: data}) + } + + if len(kvs) == 0 { + continue + } + + // 使用Pipeline批量写入,并逐个检查结果 + pipe := redisClient.client.Pipeline() + cmds := make([]*redis.StatusCmd, 0, len(kvs)) + for _, kv := range kvs { + cmd := pipe.Set(ctx, kv.key, kv.val, 0) + cmds = append(cmds, cmd) + } + + // 执行Pipeline + _, err := pipe.Exec(ctx) + if err != nil && err != redis.Nil { + // Pipeline整体失败(如网络错误),整批标记为失败 + atomic.AddInt64(&writeFailed, int64(len(kvs))) + log.Printf("Worker %d Pipeline执行失败: %v, 影响 %d 条", workerID, err, len(kvs)) + continue + } + + // 逐个检查每条命令的结果 + successInBatch := 0 + for i, cmd := range cmds { + if cmd.Err() != nil { + atomic.AddInt64(&writeFailed, 1) + log.Printf("Worker %d 单条写入失败 key=%s: %v", workerID, kvs[i].key, cmd.Err()) + } else { + successInBatch++ + } + } + atomic.AddInt64(&writeSuccess, int64(successInBatch)) + if successInBatch < len(kvs) { + log.Printf("Worker %d 批次部分失败: 预期 %d, 成功 %d", workerID, len(kvs), successInBatch) + } + } + log.Printf("Redis写入协程 %d 退出,累计接收文档数 %d", workerID, atomic.LoadInt64(&receivedByWorkers)) + }(i) + } + + // ================= 批处理协程 ================= + wg.Add(1) + go func() { + defer wg.Done() + var batch []BookData + batchSeq := 0 + for book := range bookChan { + batch = append(batch, book) + if len(batch) >= batchSize { + batchSeq++ + atomic.AddInt64(&sentToBatchChan, int64(len(batch))) + batchChan <- batch + batch = []BookData{} + } + } + // 处理剩余数据 + if len(batch) > 0 { + batchSeq++ + atomic.AddInt64(&sentToBatchChan, int64(len(batch))) + batchChan <- batch + } + close(batchChan) + log.Printf("批处理协程退出,共发送 %d 个批次,总文档数 %d", batchSeq, atomic.LoadInt64(&sentToBatchChan)) + }() + + // ================= ES 初始查询 ================= + size := 10000 + req := esapi.SearchRequest{ + Index: []string{"books-from-mysql-v2"}, + Size: &size, + Scroll: time.Minute, + } + res, err := req.Do(ctx, esClient.client) + if err != nil { + return 0, fmt.Errorf("初始查询失败: %w", err) + } + defer res.Body.Close() + + var esResp ESResponse + if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { + return 0, fmt.Errorf("解析初始响应失败: %w", err) + } + + totalDocs := esResp.Hits.Total.Value + log.Printf("ES 总文档数: %d", totalDocs) + + scrollID := esResp.ScrollID + defer clearScroll(ctx, esClient, scrollID) + + // ================= 进度报告协程 ================= + ticker := time.NewTicker(5 * time.Second) + go func() { + for range ticker.C { + read := atomic.LoadInt64(&readCount) + skip := atomic.LoadInt64(&skipEmptyIsbn) + convFail := atomic.LoadInt64(&convertFailed) + writeOK := atomic.LoadInt64(&writeSuccess) + writeErr := atomic.LoadInt64(&writeFailed) + sent := atomic.LoadInt64(&sentToBatchChan) + recv := atomic.LoadInt64(&receivedByWorkers) + progress := float64(read) / float64(totalDocs) * 100 + log.Printf("进度: %.2f%% | 已读: %d | 跳过空ISBN: %d | 转换失败: %d | 写入成功: %d | 写入失败: %d | 发送到批处理: %d | 写入协程接收: %d", + progress, read, skip, convFail, writeOK, writeErr, sent, recv) + } + }() + + // ================= 处理初始结果 ================= + for _, hit := range esResp.Hits.Hits { + bookChan <- hit.Source + atomic.AddInt64(&readCount, 1) + } + + // ================= 并行处理ES数据转换 ================= + var processWg sync.WaitGroup + processChan := make(chan []BookData, esWorkerCount*2) + + for i := 0; i < esWorkerCount; i++ { + processWg.Add(1) + go func(workerID int) { + defer processWg.Done() + for books := range processChan { + for _, book := range books { + bookChan <- book + atomic.AddInt64(&readCount, 1) + } + } + log.Printf("ES处理协程 %d 退出", workerID) + }(i) + } + + // ================= Scroll 循环拉取剩余数据 ================= + currentScrollID := scrollID + batchCount := 0 + + for { + scrollReq := esapi.ScrollRequest{ + ScrollID: currentScrollID, + Scroll: time.Minute, + } + scrollRes, err := scrollReq.Do(ctx, esClient.client) + if err != nil { + log.Printf("Scroll请求失败: %v", err) + break + } + + var scrollResp ESResponse + if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResp); err != nil { + scrollRes.Body.Close() + log.Printf("解析Scroll响应失败: %v", err) + break + } + scrollRes.Body.Close() + + if len(scrollResp.Hits.Hits) == 0 { + break + } + + // 更新scrollID + currentScrollID = scrollResp.ScrollID + + // 收集本批数据 + books := make([]BookData, 0, len(scrollResp.Hits.Hits)) + for _, hit := range scrollResp.Hits.Hits { + books = append(books, hit.Source) + } + processChan <- books + batchCount++ + } + + // 关闭处理通道,等待所有处理协程完成 + close(processChan) + processWg.Wait() + close(bookChan) + wg.Wait() + ticker.Stop() + + // ================= 最终统计 ================= + log.Printf("同步完成汇总: 总读取=%d, 发送到批处理=%d, 写入协程接收=%d, 跳过空ISBN=%d, 转换失败=%d, 写入成功=%d, 写入失败=%d", + readCount, sentToBatchChan, receivedByWorkers, skipEmptyIsbn, convertFailed, writeSuccess, writeFailed) + + // 检查数据完整性 + if writeSuccess+writeFailed+skipEmptyIsbn+convertFailed != readCount { + log.Printf("⚠️ 统计不一致: 写入成功(%d)+失败(%d)+跳过(%d)+转换失败(%d)=%d, 但读取总数=%d", + writeSuccess, writeFailed, skipEmptyIsbn, convertFailed, + writeSuccess+writeFailed+skipEmptyIsbn+convertFailed, readCount) + } + + return totalDocs, nil +} + +// cleanKongFuZiCatId 清理孔夫子分类ID:将 > 替换为 /,并去除各段两端的空格 +func cleanKongFuZiCatId(raw string) string { + parts := strings.Split(raw, ">") + for i, p := range parts { + parts[i] = strings.TrimSpace(p) + } + return strings.Join(parts, "/") +} + +// safeConvertBookData 带 panic 恢复的转换函数 +func safeConvertBookData(bookData BookData) (bookInfo BookInfo, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("panic in convertBookDataToBookInfoSafe: %v", r) + } + }() + return convertBookDataToBookInfoSafe(bookData) +} + +// 安全的转换函数,返回error +func convertBookDataToBookInfoSafe(bookData BookData) (BookInfo, error) { + // 检查必要字段 + if bookData.ISBN == "" { + return BookInfo{}, fmt.Errorf("ISBN为空") + } + + bookInfo := BookInfo{ + Isbn: bookData.ISBN, + BookName: string(bookData.BookName), + Author: bookData.Author, + Publishing: bookData.Publisher, + PublicationDate: bookData.PublicationTime, + Binding: bookData.BindingLayout, + Format: 0, + CatIdObject: CatIdObject{ + PinDuoDuoCatId: bookData.CatId.PinDuoDuoCatId, + KongFuZiCatId: cleanKongFuZiCatId(bookData.CatId.KongFuZiCatId), + XianYuCatId: bookData.CatId.XianYuCatId, + }, + } + + // 页数转换 + if pageCountStr := strings.TrimSpace(string(bookData.PageCount)); pageCountStr != "" { + if pageCount, err := strconv.ParseInt(pageCountStr, 10, 64); err == nil { + bookInfo.PagesCount = pageCount + } + } + + // 字数转换 + if wordCountStr := strings.TrimSpace(string(bookData.WordCount)); wordCountStr != "" { + if wordCount, err := strconv.ParseInt(wordCountStr, 10, 64); err == nil { + bookInfo.WordsCount = wordCount + } + } + + // 出版时间转换(修复逻辑:支持多种格式) + if publicationTimeStr := strings.TrimSpace(bookData.PublicationTime); publicationTimeStr != "" { + publicationTime, err := strconv.ParseInt(publicationTimeStr, 10, 64) + publicationTime = publicationTime - 5364000000 + timestamp, err := strconv.ParseInt(strconv.FormatInt(publicationTime, 10), 10, 64) + //fmt.Println(timestamp) + if err == nil { + var t time.Time + // 判断是秒级还是毫秒级时间戳(假设大于 1e9 的是毫秒级) + //if timestamp > 1e12 { // 毫秒级 + // t = time.UnixMilli(timestamp) + //} else { // 秒级 + t = time.Unix(timestamp, 0) + //} + + bookInfo.PublicationDate = t.Format("2006-01") + + //fmt.Println("=======PublicationDate", bookInfo.PublicationDate, bookData.ISBN) + + } else { + // 转换失败,赋值为空字符串 + bookInfo.PublicationDate = "" + } + } + + // 价格 + bookInfo.Price = int64(bookData.FixPrice) + + // 构建图片对象 + imageObject := ImageObject{ + CarouselUrlArray: []string{}, + DetailUrlObject: DetailImageObject{ + IntroductionUrl: []string{}, + CatalogueUrl: []string{}, + LiveShootingUrl: []string{}, + OtherUrl: []string{}, + }, + } + + if bookData.BookPic.PddPath != "" { + imageObject.CarouselUrlArray = append(imageObject.CarouselUrlArray, bookData.BookPic.PddPath) + } + if bookData.BookDefPic.PddPath != "" { + imageObject.DefaultImageUrl = bookData.BookDefPic.PddPath + } + if bookData.BookPicS.PddResponse != "" { + imageObject.WhiteBackgroundUrl = bookData.BookPicS.PddResponse + } + if bookData.BookDetailImage.PddPath != "" { + imageObject.DetailUrlObject.IntroductionUrl = append(imageObject.DetailUrlObject.IntroductionUrl, bookData.BookDetailImage.PddPath) + } + if bookData.BookDirectoryImage.PddPath != "" { + imageObject.DetailUrlObject.CatalogueUrl = append(imageObject.DetailUrlObject.CatalogueUrl, bookData.BookDirectoryImage.PddPath) + } + + bookInfo.ImageObject = &imageObject + return bookInfo, nil +} + +// 验证数据完整性 +func verifyDataCount(ctx context.Context, redisClient *RedisClient, expectedCount int64) { + log.Println("开始验证数据完整性...") + + var cursor uint64 + var totalCount int64 + var batchSize int64 = 1000 + + for { + var keys []string + var err error + keys, cursor, err = redisClient.client.Scan(ctx, cursor, "*", batchSize).Result() + if err != nil { + log.Printf("Scan失败: %v", err) + break + } + + totalCount += int64(len(keys)) + log.Printf("Scan进度: 已获取 %d 个key", totalCount) + + if cursor == 0 { + break + } + } + + log.Printf("验证结果 - ES文档数: %d, Redis存储数: %d", expectedCount, totalCount) + + if totalCount == expectedCount { + log.Println("✅ 数据完整性验证通过,所有数据已完整写入") + } else if totalCount < expectedCount { + log.Printf("⚠️ 数据不完整,缺失 %d 条", expectedCount-totalCount) + } else { + log.Printf("⚠️ Redis数据多于ES,多出 %d 条", totalCount-expectedCount) + } +} + +func clearScroll(ctx context.Context, esClient *ESClient, scrollID string) { + if scrollID == "" { + return + } + req := esapi.ClearScrollRequest{ScrollID: []string{scrollID}} + res, err := req.Do(ctx, esClient.client) + if err != nil { + log.Printf("清除Scroll失败: %v", err) + return + } + defer res.Body.Close() + log.Println("Scroll已清除") +} + +// BookInfo Redis中存储的书籍信息结构 +type BookInfo struct { + Isbn string `json:"isbn"` // ISBN + BookName string `json:"book_name"` // 书名 + Author string `json:"author"` // 作者 + Publishing string `json:"publishing"` // 出版社 + PublicationDate string `json:"publication_date"` // 出版时间 + Binding string `json:"binding"` // 装帧 + PagesCount int64 `json:"pages_count"` // 页数 + WordsCount int64 `json:"words_count"` // 字数 + Format int64 `json:"format"` // 开本 + ImageObject *ImageObject `json:"image_object"` // 图片 + Price int64 `json:"price"` // 售价(分) + CatIdObject CatIdObject `json:"cat_id"` // 分类 + IsSuit int64 `json:"is_suit"` // 套装书 +} + +// ImageObject 图片对象结构 +type ImageObject struct { + CarouselUrlArray []string `json:"carousel_url_array"` // 轮播图 + WhiteBackgroundUrl string `json:"white_background_url"` // 白底图 + DetailUrlObject DetailImageObject `json:"detail_url_object"` // 详情对象 + DefaultImageUrl string `json:"default_image_url"` // 默认图 +} + +// DetailImageObject 详情图片对象结构 +type DetailImageObject struct { + IntroductionUrl []string `json:"introduction_url"` // 简介图 + CatalogueUrl []string `json:"catalogue_url"` // 目录图 + LiveShootingUrl []string `json:"live_shooting_url"` // 实拍图 + OtherUrl []string `json:"other_url"` // 其他图 +} + +// CatIdObject 分类ID对象 +type CatIdObject struct { + PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` // 拼多多分类 ID + KongFuZiCatId string `json:"kong_fu_zi_cat_id"` // 孔夫子分类 ID + XianYuCatId string `json:"xian_yu_cat_id"` // 闲鱼分类 ID +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..df6b8ab --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" +) + +//TIP

To run your code, right-click the code and select Run.

Alternatively, click +// the icon in the gutter and select the Run menu item from here.

+ +func main() { + //TIP

Press when your caret is at the underlined text + // to see how GoLand suggests fixing the warning.

Alternatively, if available, click the lightbulb to view possible fixes.

+ s := "gopher" + fmt.Printf("Hello and welcome, %s!\n", s) + + for i := 1; i <= 5; i++ { + //TIP

To start your debugging session, right-click your code in the editor and select the Debug option.

We have set one breakpoint + // for you, but you can always add more by pressing .

+ fmt.Println("i =", 100/i) + } +} \ No newline at end of file diff --git a/shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go b/shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go new file mode 100644 index 0000000..470dd6a --- /dev/null +++ b/shop_mysql_to_redis/CameiCase/mysqlToRedisCameiCase.go @@ -0,0 +1,539 @@ +package main + +import ( + "batch/util/dbConnectUtil" + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "math" + "strconv" + "strings" + + "github.com/go-redis/redis/v8" + _ "github.com/go-sql-driver/mysql" +) + +// 嵌套的JSON数据结构 +type ShopDataRecord struct { + SourceTable string `json:"source_table"` + Data map[string]interface{} `json:"data"` +} + +// 价格范围项结构 +type PriceRangeItem struct { + MinPrice interface{} `json:"minPrice"` // 可以是数字或字符串 + MaxPrice interface{} `json:"maxPrice"` // 可以是数字或字符串 + AdjustPercent interface{} `json:"adjustPercent"` // 可以是数字或字符串 + AdjustAmount interface{} `json:"adjustAmount"` // 可以是数字或字符串 +} + +// 数据库配置 +func main() { + // 配置数据库连接 + db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306) + if err != nil { + log.Fatal("数据库连接失败:", err) + } + defer db.Close() + + // 测试数据库连接 + if err := db.Ping(); err != nil { + log.Fatal("数据库ping失败:", err) + } + + // Redis连接 + rdb := redis.NewClient(&redis.Options{ + Addr: "36.212.20.113:7963", + Password: "j8nZ4jra2E", + DB: 7, + }) + + // 测试Redis连接 + ctx := context.Background() + if _, err := rdb.Ping(ctx).Result(); err != nil { + log.Fatal("Redis连接失败:", err) + } + + // 执行数据迁移 + if err := migrateData(db, rdb); err != nil { + log.Fatal("数据迁移失败:", err) + } + + fmt.Println("数据迁移完成!") +} + +func migrateData(db *sql.DB, rdb *redis.Client) error { + ctx := context.Background() + + fmt.Println("开始查询店铺数据...") + + // 第一步:查询所有店铺ID + shopQuery := ` + SELECT DISTINCT shop_id + FROM t_shop_detail + WHERE shop_id IS NOT NULL AND del_flag = 0 + ` + + shopRows, err := db.Query(shopQuery) + if err != nil { + return fmt.Errorf("查询店铺失败: %v", err) + } + defer shopRows.Close() + + var shopIDs []int64 + for shopRows.Next() { + var shopID int64 + if err := shopRows.Scan(&shopID); err != nil { + return fmt.Errorf("扫描店铺数据失败: %v", err) + } + shopIDs = append(shopIDs, shopID) + } + + fmt.Printf("找到 %d 个店铺\n", len(shopIDs)) + + // 第二步:为每个店铺收集数据并存储到Redis + for i, shopID := range shopIDs { + fmt.Printf("[%d/%d] 处理店铺: %d\n", i+1, len(shopIDs), shopID) + + // 收集该店铺的所有数据 + var allRecords []ShopDataRecord + + // 1. 尝试查询店铺主表数据 + shopMainData, err := queryTableToShopData(db, "t_shop", ` + SELECT * FROM t_shop WHERE id = ? + `, shopID) + if err != nil { + log.Printf("警告: 查询店铺主表失败 (店铺 %d): %v", shopID, err) + } else if len(shopMainData) > 0 { + allRecords = append(allRecords, shopMainData...) + fmt.Printf(" 找到店铺主表数据\n") + } else { + fmt.Printf(" 未找到店铺主表数据 (id=%d)\n", shopID) + } + + // 2. 查询该店铺的所有店铺详情数据 + shopDetails, err := queryTableToShopData(db, "t_shop_detail", ` + SELECT * FROM t_shop_detail WHERE shop_id = ? + `, shopID) + if err != nil { + log.Printf("警告: 查询店铺详情失败 (店铺 %d): %v", shopID, err) + continue + } + allRecords = append(allRecords, shopDetails...) + + if len(shopDetails) == 0 { + fmt.Printf(" 店铺 %d 没有详情数据,跳过\n", shopID) + continue + } + + // 3. 尝试查询店铺上下文数据(商品描述) + shopContextData, err := queryTableToShopData(db, "t_shop_context", ` + SELECT * FROM t_shop_context WHERE shop_id = ? + `, shopID) + if err != nil { + log.Printf("警告: 查询店铺上下文数据失败 (店铺 %d): %v", shopID, err) + } else if len(shopContextData) > 0 { + allRecords = append(allRecords, shopContextData...) + fmt.Printf(" 找到店铺上下文数据 (%d条)\n", len(shopContextData)) + } else { + fmt.Printf(" 未找到店铺上下文数据\n") + } + + // 4. 尝试查询规格设置数据 + specData, err := queryTableToShopData(db, "t_spec", ` + SELECT * FROM t_spec WHERE shop_id = ? + `, shopID) + if err != nil { + log.Printf("警告: 查询规格设置数据失败 (店铺 %d): %v", shopID, err) + } else if len(specData) > 0 { + allRecords = append(allRecords, specData...) + fmt.Printf(" 找到规格设置数据 (%d条)\n", len(specData)) + } else { + fmt.Printf(" 未找到规格设置数据\n") + } + + // 5. 从第一条店铺详情记录中获取 sale_template_id + var saleTemplateID sql.NullInt64 + if shopDetails[0].Data["saleTemplateId"] != nil { + switch v := shopDetails[0].Data["saleTemplateId"].(type) { + case string: + if v != "" && v != "0" && v != "null" { + if id, err := strconv.ParseInt(v, 10, 64); err == nil { + saleTemplateID = sql.NullInt64{Int64: id, Valid: true} + } + } + case int64: + if v > 0 { + saleTemplateID = sql.NullInt64{Int64: v, Valid: true} + } + case float64: + if v > 0 { + saleTemplateID = sql.NullInt64{Int64: int64(v), Valid: true} + } + case []byte: + strVal := string(v) + if strVal != "" && strVal != "0" && strVal != "null" { + if id, err := strconv.ParseInt(strVal, 10, 64); err == nil { + saleTemplateID = sql.NullInt64{Int64: id, Valid: true} + } + } + } + } + + // 6. 根据 sale_template_id 查询价格模板 + if saleTemplateID.Valid && saleTemplateID.Int64 > 0 { + priceTemplates, err := queryPriceTemplateData(db, saleTemplateID.Int64) + if err != nil { + log.Printf("警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v", + shopID, saleTemplateID.Int64, err) + } else if len(priceTemplates) == 0 { + fmt.Printf(" 警告: 店铺 %d 的 sale_template_id %d 没有对应的价格模板\n", + shopID, saleTemplateID.Int64) + } else { + allRecords = append(allRecords, priceTemplates...) + } + } else { + fmt.Printf(" 店铺 %d 没有有效的 sale_template_id\n", shopID) + } + + // 将数据存储到Redis List + redisKey := strconv.FormatInt(shopID, 10) + + // 删除旧的Key + rdb.Del(ctx, redisKey) + + // 添加每条数据到List + for _, record := range allRecords { + // 转换为JSON字符串 + jsonBytes, err := json.Marshal(record) + if err != nil { + log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err) + continue + } + + // 添加到Redis List + if err := rdb.RPush(ctx, redisKey, jsonBytes).Err(); err != nil { + log.Printf("警告: Redis写入失败 (店铺 %d): %v", shopID, err) + } + } + + // 设置过期时间 + //rdb.Expire(ctx, redisKey, 24*3600*time.Second) + + // 统计各类数据数量 + counts := make(map[string]int) + for _, record := range allRecords { + counts[record.SourceTable]++ + } + + fmt.Printf(" 店铺 %d: 存储了 %d 条记录\n", shopID, len(allRecords)) + for table, count := range counts { + fmt.Printf(" - %s: %d 条\n", table, count) + } + + // 显示匹配的模板ID + if saleTemplateID.Valid && saleTemplateID.Int64 > 0 { + fmt.Printf(" 匹配的saleTemplateId: %d\n", saleTemplateID.Int64) + } + } + + return nil +} + +// 查询价格模板数据,特殊处理rangePrice字段 +func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) { + query := `SELECT * FROM t_price_template WHERE id = ?` + rows, err := db.Query(query, templateID) + if err != nil { + return nil, err + } + defer rows.Close() + + // 获取列信息 + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + // 准备扫描结果 + var records []ShopDataRecord + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + // 准备扫描指针 + for i := range columns { + valuePtrs[i] = &values[i] + } + + // 扫描行 + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + // 创建data map,使用驼峰格式的字段名 + dataMap := make(map[string]interface{}) + for i, col := range columns { + val := values[i] + + // 转换字段名为驼峰格式 + camelCol := snakeToCamel(col) + + // 特殊处理rangePrice字段 + if camelCol == "rangePrice" { + // 处理rangePrice字段 + processedVal, err := processRangePriceField(val) + if err != nil { + log.Printf("警告: 处理rangePrice字段失败: %v", err) + // 如果处理失败,保留原始值 + dataMap[camelCol] = val + } else { + dataMap[camelCol] = processedVal + } + } else { + // 其他字段正常处理 + switch v := val.(type) { + case []byte: + // 字节数组直接转为字符串 + dataMap[camelCol] = string(v) + default: + dataMap[camelCol] = v + } + } + } + + // 创建ShopDataRecord + record := ShopDataRecord{ + SourceTable: "t_price_template", + Data: dataMap, + } + + records = append(records, record) + } + + return records, nil +} + +// 处理rangePrice字段,修复浮点数精度问题 +func processRangePriceField(val interface{}) (interface{}, error) { + if val == nil { + return nil, nil + } + + // 将值转换为字符串 + var jsonStr string + switch v := val.(type) { + case string: + jsonStr = v + case []byte: + jsonStr = string(v) + default: + // 如果无法转换为字符串,直接返回原值 + return val, nil + } + + // 如果为空字符串,直接返回 + if jsonStr == "" { + return jsonStr, nil + } + + // 先尝试解析为通用的[]map[string]interface{} + var rawRanges []map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &rawRanges); err != nil { + return jsonStr, fmt.Errorf("JSON解析失败: %v", err) + } + + // 处理每个价格范围项,将字符串数字转换为float64 + fixedRanges := make([]map[string]interface{}, len(rawRanges)) + for i, item := range rawRanges { + fixedItem := make(map[string]interface{}) + + // 处理MinPrice + if minPrice, ok := item["minPrice"]; ok { + fixedItem["minPrice"] = toFloat64(minPrice) + } + + // 处理MaxPrice + if maxPrice, ok := item["maxPrice"]; ok { + fixedItem["maxPrice"] = toFloat64(maxPrice) + } + + // 处理AdjustPercent + if adjustPercent, ok := item["adjustPercent"]; ok { + fixedItem["adjustPercent"] = toFloat64(adjustPercent) + } + + // 处理AdjustAmount + if adjustAmount, ok := item["adjustAmount"]; ok { + fixedItem["adjustAmount"] = toFloat64(adjustAmount) + } + + fixedRanges[i] = fixedItem + } + + // 重新编码为JSON字符串 + fixedJSON, err := json.Marshal(fixedRanges) + if err != nil { + return jsonStr, fmt.Errorf("JSON编码失败: %v", err) + } + + return string(fixedJSON), nil +} + +// 修复浮点数:如果包含小数点,只保留整数部分 +func fixFloatToInt(value float64) float64 { + // 使用math.Trunc直接截断小数部分 + // 注意:这里使用float64返回,但值是整数 + return math.Trunc(value) +} + +// 将下划线命名转换为驼峰命名 +func snakeToCamel(s string) string { + parts := strings.Split(s, "_") + for i := range parts { + if i > 0 { + // 首字母大写 + if len(parts[i]) > 0 { + parts[i] = strings.ToUpper(parts[i][:1]) + parts[i][1:] + } + } + } + return strings.Join(parts, "") +} + +// 查询表数据并返回ShopDataRecord切片,自动转换字段名为驼峰格式 +func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) { + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + // 获取列信息 + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + // 准备扫描结果 + var records []ShopDataRecord + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + // 准备扫描指针 + for i := range columns { + valuePtrs[i] = &values[i] + } + + // 扫描行 + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + // 创建data map,使用驼峰格式的字段名 + dataMap := make(map[string]interface{}) + for i, col := range columns { + val := values[i] + + // 转换字段名为驼峰格式 + camelCol := snakeToCamel(col) + + // 直接赋值,不进行任何转换 + switch v := val.(type) { + case []byte: + // 字节数组直接转为字符串 + dataMap[camelCol] = string(v) + default: + dataMap[camelCol] = v + } + } + + // 创建ShopDataRecord + record := ShopDataRecord{ + SourceTable: tableName, + Data: dataMap, + } + + records = append(records, record) + } + + return records, nil +} + +// 原有的查询函数保持不变 +func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) { + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + // 获取列信息 + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + // 准备扫描结果 + var results []map[string]interface{} + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + // 准备扫描指针 + for i := range columns { + valuePtrs[i] = &values[i] + } + + // 扫描行 + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + // 创建map + rowMap := make(map[string]interface{}) + for i, col := range columns { + val := values[i] + + // 直接赋值,不进行任何转换 + switch v := val.(type) { + case []byte: + // 字节数组直接转为字符串 + rowMap[col] = string(v) + default: + rowMap[col] = v + } + } + results = append(results, rowMap) + } + + return results, nil +} + +func toFloat64(v interface{}) float64 { + if v == nil { + return 0 + } + + switch val := v.(type) { + case float64: + return fixFloatToInt(val) + case float32: + return fixFloatToInt(float64(val)) + case int: + return float64(val) + case int64: + return float64(val) + case string: + // 尝试将字符串转换为float64 + if f, err := strconv.ParseFloat(val, 64); err == nil { + return fixFloatToInt(f) + } + return 0 + default: + return 0 + } +} diff --git a/shop_mysql_to_redis/SanckCase/mysqlToRedis.go b/shop_mysql_to_redis/SanckCase/mysqlToRedis.go new file mode 100644 index 0000000..4524eff --- /dev/null +++ b/shop_mysql_to_redis/SanckCase/mysqlToRedis.go @@ -0,0 +1,918 @@ +package main + +import ( + "batch/util/dbConnectUtil" + "context" + "database/sql" + "encoding/json" + "fmt" + "log" + "math" + "os" + "os/signal" + "reflect" + "strconv" + "strings" + "sync" + "syscall" + "time" + + "github.com/go-redis/redis/v8" + _ "github.com/go-sql-driver/mysql" +) + +// ShopMsg 店铺信息结构体 +type ShopMsg struct { + ID int64 `json:"id"` + ShopAliasName string `json:"shop_alias_name"` + ShopName string `json:"shop_name"` + Token string `json:"token"` + GoodsNamePrefix string `json:"goods_name_prefix"` + GoodsNameSuffix string `json:"goods_name_suffix"` + TitleConsistOf string `json:"title_consist_of"` + SpaceCharacter string `json:"space_character"` + WatermarkImgUrl string `json:"watermark_img_url"` + CarouseLastImgUrlArray []string `json:"carouse_last_img_url_array"` + GoodsDetailFirstImgUrlArray []string `json:"goods_detail_first_img_url_array"` + GoodsDetailLastImgUrlArray []string `json:"goods_detail_last_img_url_array"` + SpecName string `json:"spec_name"` + SpecId int64 `json:"spec_id"` + SpecChildName string `json:"spec_child_name"` + IsFolt bool `json:"is_fotl"` + IsPreSale bool `json:"is_pre_sale"` + IsRefundable bool `json:"is_refundable"` + IsSecondHand bool `json:"is_second_hand"` + ShipmentLimitSecond int64 `json:"shipment_limit_second"` + CostTemplateId int64 `json:"cost_template_id"` + DefStock int64 `json:"def_stock"` + TwoDiscount int64 `json:"two_discount"` +} + +// 店铺主表数据结构 +type ShopMainData struct { + Id int64 `json:"id"` + ShopAliasName sql.NullString `json:"shop_alias_name"` + ShopName sql.NullString `json:"shop_name"` + Token sql.NullString `json:"token"` +} + +// 店铺详情数据结构 +type ShopDetailData struct { + Id int64 `json:"id"` + ShopId int64 `json:"shop_id"` + TemplateId *int64 `json:"template_id"` + TitlePrefix sql.NullString `json:"title_prefix"` + TitleSuffix sql.NullString `json:"title_suffix"` + TitleConsistOf sql.NullString `json:"title_consist_of"` + SpaceCharacter sql.NullString `json:"space_character"` + StockDeff *int64 `json:"stock_deff"` + TwoDiscount *int64 `json:"two_discount"` + Presale sql.NullString `json:"presale"` + Fake sql.NullString `json:"fake"` + SevenDays sql.NullString `json:"seven_days"` + IsSecondHand sql.NullString `json:"is_second_hand"` + DeliveryTime sql.NullString `json:"delivery_time"` +} + +// 规格数据结构 +type SpecData struct { + Id int64 `json:"id"` + ShopId int64 `json:"shop_id"` + SpecName sql.NullString `json:"spec_name"` + SpecPrefix sql.NullString `json:"spec_prefix"` +} + +// 图片数据结构 +type ShopImageData struct { + Id int64 `json:"id"` + Pid int64 `json:"pid"` + Type string `json:"type"` + AbsolutePath string `json:"absolute_path"` +} + +// 嵌套的JSON数据结构 +type ShopDataRecord struct { + SourceTable string `json:"source_table"` + Data map[string]interface{} `json:"data"` +} + +// 并发处理结果 +type ShopProcessResult struct { + ShopID int64 + Count int + Err error +} + +// normalizeFieldValue 根据字段名和表名对值进行标准化: +// - 特定表特定字段保持 int64 数字:t_shop_context.id, t_spec.id, t_price_template.id, t_shop_detail.district_id, 以及 mall_id +// - 其他 id / *_id / create_by / update_by 字段 → 字符串 +// - create_time/update_time/add_time/expiration_time → 格式化为 "2006-01-02 15:04:05"(无时区) +// - 其他字段:[]byte → string +func normalizeFieldValue(colName string, val interface{}, tableName string) interface{} { + if val == nil { + return nil + } + + // 特殊处理:保持 int64 数字的字段(不转为字符串) + // 1. mall_id 保持数字 + // 2. t_shop_context.id + // 3. t_spec.id + // 4. t_price_template.id + // 5. t_shop_detail.district_id + if colName == "mall_id" { + return ensureInt64(val) + } + if tableName == "t_shop_detail" && colName == "district_id" { + return ensureInt64(val) + } + + // 1) ID 类字段(除上述保留数字的字段外):转为字符串 + if colName == "id" || colName == "create_by" || colName == "update_by" || strings.HasSuffix(colName, "_id") { + switch v := val.(type) { + case string: + return v + case []byte: + return string(v) + case int, int8, int16, int32, int64: + return strconv.FormatInt(reflect.ValueOf(v).Int(), 10) + case uint, uint8, uint16, uint32, uint64: + return strconv.FormatUint(reflect.ValueOf(v).Uint(), 10) + case float32, float64: + f := reflect.ValueOf(v).Float() + return strconv.FormatInt(int64(f), 10) + default: + return fmt.Sprint(v) + } + } + + // 2) 时间字段:格式化为 "2006-01-02 15:04:05" + if colName == "create_time" || colName == "update_time" || colName == "add_time" || colName == "expiration_time" { + var t time.Time + switch v := val.(type) { + case time.Time: + t = v + case []byte: + s := string(v) + if s == "" { + return nil + } + t = parseTime(s) + case string: + if v == "" { + return nil + } + t = parseTime(v) + default: + return val + } + if t.IsZero() { + return nil + } + return t.Format("2006-01-02 15:04:05") + } + + // 3) 其他字段:[]byte 转为 string + switch v := val.(type) { + case []byte: + return string(v) + default: + return v + } +} + +// ensureInt64 将各种类型转换为 int64,用于保持数字的字段 +func ensureInt64(val interface{}) interface{} { + switch v := val.(type) { + case int64: + return v + case int: + return int64(v) + case int32: + return int64(v) + case float64: + return int64(v) + case string: + if i, err := strconv.ParseInt(v, 10, 64); err == nil { + return i + } + return v + case []byte: + s := string(v) + if i, err := strconv.ParseInt(s, 10, 64); err == nil { + return i + } + return s + default: + return val + } +} + +// parseTime 尝试多种布局解析时间字符串 +func parseTime(s string) time.Time { + layouts := []string{ + "2006-01-02 15:04:05", + "2006-01-02T15:04:05Z", + "2006-01-02T15:04:05", + "2006-01-02 15:04:05.999999", + "2006-01-02", + } + for _, layout := range layouts { + if t, err := time.Parse(layout, s); err == nil { + return t + } + } + return time.Time{} +} + +// 查询店铺主表数据 +func queryShopMainData(db *sql.DB, shopID int64) (ShopMainData, error) { + query := `SELECT id, shop_alias_name, shop_name, token FROM t_shop WHERE id = ? AND del_flag = '0'` + row := db.QueryRow(query, shopID) + + var data ShopMainData + err := row.Scan(&data.Id, &data.ShopAliasName, &data.ShopName, &data.Token) + if err != nil { + return data, err + } + + return data, nil +} + +// 查询店铺详情数据 +func queryShopDetailData(db *sql.DB, shopID int64) ([]ShopDetailData, error) { + query := ` + SELECT id, shop_id, template_id, title_prefix, title_suffix, title_consist_of, + space_character, stock_deff, two_discount, presale, fake, seven_days, + is_second_hand, delivery_time + FROM t_shop_detail + WHERE shop_id = ? AND del_flag = '0' + ` + rows, err := db.Query(query, shopID) + if err != nil { + return nil, err + } + defer rows.Close() + + var data []ShopDetailData + for rows.Next() { + var item ShopDetailData + err := rows.Scan( + &item.Id, &item.ShopId, &item.TemplateId, &item.TitlePrefix, &item.TitleSuffix, + &item.TitleConsistOf, &item.SpaceCharacter, &item.StockDeff, &item.TwoDiscount, + &item.Presale, &item.Fake, &item.SevenDays, &item.IsSecondHand, &item.DeliveryTime, + ) + if err != nil { + return nil, err + } + data = append(data, item) + } + + return data, nil +} + +// 查询规格数据 +func querySpecData(db *sql.DB, shopID int64) ([]SpecData, error) { + query := `SELECT id, shop_id, spec_name, spec_prefix FROM t_spec WHERE shop_id = ? AND del_flag = '0'` + rows, err := db.Query(query, shopID) + if err != nil { + return nil, err + } + defer rows.Close() + + var data []SpecData + for rows.Next() { + var item SpecData + err := rows.Scan(&item.Id, &item.ShopId, &item.SpecName, &item.SpecPrefix) + if err != nil { + return nil, err + } + data = append(data, item) + } + + return data, nil +} + +// 查询图片数据 +func queryShopImageData(db *sql.DB, shopID int64) ([]ShopImageData, error) { + query := `SELECT id, pid, type, absolute_path FROM t_shop_img WHERE pid = ? AND del_flag = '0'` + rows, err := db.Query(query, shopID) + if err != nil { + return nil, err + } + defer rows.Close() + + var data []ShopImageData + for rows.Next() { + var item ShopImageData + err := rows.Scan(&item.Id, &item.Pid, &item.Type, &item.AbsolutePath) + if err != nil { + return nil, err + } + data = append(data, item) + } + + return data, nil +} + +func main() { + // 配置数据库连接 + db, err := dbConnectUtil.InitDB("zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306) + if err != nil { + log.Fatal("数据库连接失败:", err) + } + defer db.Close() + + // 【优化1】数据库连接池配置 - 降低并发避免瞬时带宽压力 + db.SetMaxOpenConns(10) // 最大打开连接数(降低到10) + db.SetMaxIdleConns(5) // 最大空闲连接数(降低到5) + db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期 + db.SetConnMaxIdleTime(10 * time.Minute) // 空闲连接最大存活时间 + + // 测试数据库连接 + if err := db.Ping(); err != nil { + log.Fatal("数据库ping失败:", err) + } + + // Redis连接 - 降低连接池大小 + rdb := redis.NewClient(&redis.Options{ + Addr: "36.212.12.247:6379", + Password: "long6166@@", + DB: 8, + PoolSize: 10, // 连接池大小(降低到10) + MinIdleConns: 5, // 最小空闲连接数(降低到5) + }) + + // 测试Redis连接 + ctx := context.Background() + if _, err := rdb.Ping(ctx).Result(); err != nil { + log.Fatal("Redis连接失败:", err) + } + + // 【新增】设置信号捕获,优雅退出 + stopChan := make(chan os.Signal, 1) + signal.Notify(stopChan, syscall.SIGINT, syscall.SIGTERM) + + // 【新增】循环运行,每次间隔5分钟 + runCount := 0 + for { + runCount++ + startTime := time.Now() + + fmt.Printf("\n%s\n", strings.Repeat("=", 60)) + fmt.Printf("第 %d 次运行开始 - %s\n", runCount, startTime.Format("2006-01-02 15:04:05")) + fmt.Printf("%s\n", strings.Repeat("=", 60)) + + // 执行数据迁移 + if err := migrateDataOptimized(db, rdb); err != nil { + log.Printf("数据迁移失败: %v", err) + } + + elapsed := time.Since(startTime) + fmt.Printf("\n第 %d 次运行完成,耗时: %v\n", runCount, elapsed) + fmt.Printf("下次运行时间: %s\n", time.Now().Add(5*time.Minute).Format("2006-01-02 15:04:05")) + + // 等待间隔时间,但可以提前退出 + // 根据运行耗时动态调整间隔 + var waitInterval time.Duration + if elapsed < 3*time.Minute { + waitInterval = 5 * time.Minute + } else if elapsed < 6*time.Minute { + waitInterval = 8 * time.Minute + } else { + waitInterval = 10 * time.Minute + } + + fmt.Printf("\n本次耗时: %v, 下次运行间隔: %v (按 Ctrl+C 退出)...\n", elapsed, waitInterval) + + select { + case <-stopChan: + fmt.Printf("\n\n收到退出信号,程序即将退出...\n") + fmt.Printf("总共运行了 %d 次\n", runCount) + return + case <-time.After(waitInterval): + // 继续下一次循环 + } + } +} + +// 【优化版本】并发处理数据迁移 +func migrateDataOptimized(db *sql.DB, rdb *redis.Client) error { + ctx := context.Background() + + fmt.Println("开始查询店铺数据...") + + // 第一步:查询所有店铺ID + shopQuery := ` + SELECT DISTINCT id + FROM t_shop + WHERE id IS NOT NULL + ` + + shopRows, err := db.Query(shopQuery) + if err != nil { + return fmt.Errorf("查询店铺失败: %v", err) + } + defer shopRows.Close() + + var shopIDs []int64 + for shopRows.Next() { + var shopID int64 + if err := shopRows.Scan(&shopID); err != nil { + return fmt.Errorf("扫描店铺数据失败: %v", err) + } + shopIDs = append(shopIDs, shopID) + } + + fmt.Printf("找到 %d 个店铺\n", len(shopIDs)) + + // 【优化2】使用Worker Pool并发处理 - 降低并发避免带宽压力 + workers := 3 // 并发worker数量(降低到3,减少瞬时压力) + jobs := make(chan int64, len(shopIDs)) + results := make(chan ShopProcessResult, len(shopIDs)) + + var wg sync.WaitGroup + + // 启动worker + for w := 0; w < workers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for shopID := range jobs { + count, err := processShop(db, rdb, ctx, shopID, workerID) + results <- ShopProcessResult{ + ShopID: shopID, + Count: count, + Err: err, + } + // 【新增】添加处理延迟,降低瞬时带宽压力 + time.Sleep(100 * time.Millisecond) // 每个店铺处理完后休息100ms + } + }(w) + } + + // 发送任务 + go func() { + for _, id := range shopIDs { + jobs <- id + } + close(jobs) + }() + + // 等待所有worker完成 + go func() { + wg.Wait() + close(results) + }() + + // 收集结果并统计 + successCount := 0 + failCount := 0 + totalRecords := 0 + + for result := range results { + if result.Err != nil { + failCount++ + log.Printf("[失败] 店铺 %d: %v", result.ShopID, result.Err) + } else { + successCount++ + totalRecords += result.Count + if successCount%10 == 0 { + fmt.Printf("[进度] 已完成: %d/%d, 总记录: %d\n", + successCount, len(shopIDs), totalRecords) + } + } + } + + fmt.Printf("\n========== 迁移完成 ==========\n") + fmt.Printf("成功: %d 店铺\n", successCount) + fmt.Printf("失败: %d 店铺\n", failCount) + fmt.Printf("总记录数: %d\n", totalRecords) + + return nil +} + +// processShop 处理单个店铺的所有数据 +func processShop(db *sql.DB, rdb *redis.Client, ctx context.Context, shopID int64, workerID int) (int, error) { + // 收集该店铺的所有数据 + var allRecords []ShopDataRecord + + // 1. 查询店铺主表数据 + shopMainData, err := queryTableToShopData(db, "t_shop", ` + SELECT * FROM t_shop WHERE id = ? AND del_flag = '0' + `, shopID) + if err != nil { + return 0, fmt.Errorf("查询店铺主表失败: %v", err) + } + allRecords = append(allRecords, shopMainData...) + + // 2. 查询该店铺的所有店铺详情数据 + shopDetails, err := queryTableToShopData(db, "t_shop_detail", ` + SELECT * FROM t_shop_detail WHERE shop_id = ? AND del_flag = '0' + `, shopID) + if err != nil { + return 0, fmt.Errorf("查询店铺详情失败: %v", err) + } + + if len(shopDetails) == 0 { + return 0, nil // 没有详情数据,跳过 + } + + // 查询图片数据以获取watermark_img_url + shopImgDataList, err := queryShopImageData(db, shopID) + if err != nil { + log.Printf("[Worker-%d] 警告: 查询图片数据失败 (店铺 %d): %v", workerID, shopID, err) + } + + // 处理图片数据 + var watermarkImgUrl string + var skuWatermarkImgUrl string + var carouseLastImgUrlArray = []string{} + var goodsDetailFirstImgUrlArray = []string{} + var goodsDetailLastImgUrlArray = []string{} + + if err == nil { + for _, img := range shopImgDataList { + if img.AbsolutePath == "" { + continue + } + switch img.Type { + case "1": + watermarkImgUrl = img.AbsolutePath + case "3": + goodsDetailFirstImgUrlArray = append(goodsDetailFirstImgUrlArray, img.AbsolutePath) + case "4": + goodsDetailLastImgUrlArray = append(goodsDetailLastImgUrlArray, img.AbsolutePath) + case "5": + carouseLastImgUrlArray = append(carouseLastImgUrlArray, img.AbsolutePath) + case "7": + skuWatermarkImgUrl = img.AbsolutePath + } + } + } + + // 将watermark_img_url等添加到shopdetail记录中 + for i := range shopDetails { + if shopDetails[i].Data == nil { + shopDetails[i].Data = make(map[string]interface{}) + } + shopDetails[i].Data["watermark_img_url"] = watermarkImgUrl + shopDetails[i].Data["carouse_last_img_url_array"] = carouseLastImgUrlArray + shopDetails[i].Data["goods_detail_first_img_url_array"] = goodsDetailFirstImgUrlArray + shopDetails[i].Data["goods_detail_last_img_url_array"] = goodsDetailLastImgUrlArray + shopDetails[i].Data["sku_watermark_img_url"] = skuWatermarkImgUrl + } + + allRecords = append(allRecords, shopDetails...) + + // 3. 查询店铺上下文数据 + shopContextData, err := queryTableToShopData(db, "t_shop_context", ` + SELECT * FROM t_shop_context WHERE shop_id = ? AND del_flag = '0' + `, shopID) + if err != nil { + log.Printf("[Worker-%d] 警告: 查询店铺上下文数据失败 (店铺 %d): %v", workerID, shopID, err) + } + allRecords = append(allRecords, shopContextData...) + + // 4. 查询规格设置数据 + specData, err := queryTableToShopData(db, "t_spec", ` + SELECT * FROM t_spec WHERE shop_id = ? AND del_flag = '0' + `, shopID) + if err != nil { + log.Printf("[Worker-%d] 警告: 查询规格设置数据失败 (店铺 %d): %v", workerID, shopID, err) + } + allRecords = append(allRecords, specData...) + + // 5. 从第一条店铺详情记录中获取 sale_template_id + var saleTemplateID int64 + if firstDetail := shopDetails[0].Data; firstDetail != nil { + if v, ok := firstDetail["sale_template_id"]; ok && v != nil { + switch val := v.(type) { + case string: + if val != "" && val != "0" && val != "null" { + if id, err := strconv.ParseInt(val, 10, 64); err == nil && id > 0 { + saleTemplateID = id + } + } + case int64: + if val > 0 { + saleTemplateID = val + } + case float64: + if val > 0 { + saleTemplateID = int64(val) + } + } + } + } + + // 6. 根据 sale_template_id 查询价格模板 + if saleTemplateID > 0 { + priceTemplates, err := queryPriceTemplateData(db, saleTemplateID) + if err != nil { + log.Printf("[Worker-%d] 警告: 查询价格模板失败 (店铺 %d, sale_template_id: %d): %v", + workerID, shopID, saleTemplateID, err) + } else { + allRecords = append(allRecords, priceTemplates...) + } + } + + // 【优化3】批量写入Redis + if err := saveToRedisBatch(rdb, ctx, shopID, allRecords); err != nil { + return 0, fmt.Errorf("Redis写入失败: %v", err) + } + + return len(allRecords), nil +} + +// 【优化3】使用Pipeline批量写入Redis +func saveToRedisBatch(rdb *redis.Client, ctx context.Context, shopID int64, records []ShopDataRecord) error { + if len(records) == 0 { + return nil + } + + redisKey := strconv.FormatInt(shopID, 10) + + // 使用Pipeline批量执行 + pipe := rdb.Pipeline() + + // 删除旧key + pipe.Del(ctx, redisKey) + + // 批量添加数据 + for _, record := range records { + jsonBytes, err := json.Marshal(record) + if err != nil { + log.Printf("警告: JSON编码失败 (店铺 %d): %v", shopID, err) + continue + } + pipe.RPush(ctx, redisKey, jsonBytes) + } + + // 一次性执行所有命令 + _, err := pipe.Exec(ctx) + return err +} + +// 查询价格模板数据,特殊处理 range_price 和 add_amount 字段 +func queryPriceTemplateData(db *sql.DB, templateID int64) ([]ShopDataRecord, error) { + query := `SELECT * FROM t_price_template WHERE id = ?` + rows, err := db.Query(query, templateID) + if err != nil { + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + var records []ShopDataRecord + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + for i := range columns { + valuePtrs[i] = &values[i] + } + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + + dataMap := make(map[string]interface{}) + for i, col := range columns { + snakeCol := camelToSnake(col) + + if snakeCol == "range_price" { + processedVal, err := processRangePriceField(values[i]) + if err != nil { + log.Printf("警告: 处理 range_price 字段失败: %v", err) + processedVal = values[i] + } + dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template") + continue + } + + if snakeCol == "add_amount" { + processedVal, err := processAddAmountField(values[i]) + if err != nil { + log.Printf("警告: 处理 add_amount 字段失败: %v", err) + processedVal = values[i] + } + dataMap[snakeCol] = normalizeFieldValue(snakeCol, processedVal, "t_price_template") + continue + } + + dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], "t_price_template") + } + + records = append(records, ShopDataRecord{ + SourceTable: "t_price_template", + Data: dataMap, + }) + } + return records, nil +} + +func processAddAmountField(val interface{}) (interface{}, error) { + if val == nil { + return nil, nil + } + var f float64 + switch v := val.(type) { + case float64: + f = v + case float32: + f = float64(v) + case int: + f = float64(v) + case int64: + f = float64(v) + case int32: + f = float64(v) + case string: + if v == "" { + return nil, nil + } + parsed, err := strconv.ParseFloat(v, 64) + if err != nil { + return nil, fmt.Errorf("无法解析字符串为浮点数: %v", err) + } + f = parsed + case []byte: + s := string(v) + if s == "" { + return nil, nil + } + parsed, err := strconv.ParseFloat(s, 64) + if err != nil { + return nil, fmt.Errorf("无法解析字节数组为浮点数: %v", err) + } + f = parsed + default: + return nil, fmt.Errorf("不支持的类型: %T", v) + } + scaled := math.Round(f * 100) + return int64(scaled), nil +} + +func processRangePriceField(val interface{}) (interface{}, error) { + if val == nil { + return nil, nil + } + var jsonStr string + switch v := val.(type) { + case string: + jsonStr = v + case []byte: + jsonStr = string(v) + default: + return val, nil + } + if jsonStr == "" { + return jsonStr, nil + } + var rawItems []map[string]interface{} + if err := json.Unmarshal([]byte(jsonStr), &rawItems); err != nil { + return jsonStr, fmt.Errorf("JSON解析失败: %v", err) + } + fixedRanges := make([]map[string]interface{}, len(rawItems)) + for i, item := range rawItems { + fixedItem := make(map[string]interface{}) + if minPrice, ok := item["minPrice"]; ok { + fixedItem["minPrice"] = convertToFloat64(minPrice) + } + if maxPrice, ok := item["maxPrice"]; ok { + fixedItem["maxPrice"] = convertToFloat64(maxPrice) + } + if adjustPercent, ok := item["adjustPercent"]; ok { + fixedItem["adjustPercent"] = convertToFloat64(adjustPercent) + } + if adjustAmount, ok := item["adjustAmount"]; ok { + fixedItem["adjustAmount"] = convertToFloat64(adjustAmount) + } + fixedRanges[i] = fixedItem + } + fixedJSON, err := json.Marshal(fixedRanges) + if err != nil { + return jsonStr, fmt.Errorf("JSON编码失败: %v", err) + } + return string(fixedJSON), nil +} + +func convertToFloat64(v interface{}) float64 { + switch val := v.(type) { + case float64: + return fixFloatToInt(val) + case float32: + return fixFloatToInt(float64(val)) + case int: + return float64(val) + case int64: + return float64(val) + case int32: + return float64(val) + case string: + if f, err := strconv.ParseFloat(val, 64); err == nil { + return fixFloatToInt(f) + } + return 0 + case bool: + if val { + return 1 + } + return 0 + default: + return 0 + } +} + +func fixFloatToInt(value float64) float64 { + return math.Floor(value) +} + +func camelToSnake(s string) string { + var result []rune + for i, r := range s { + if i > 0 && r >= 'A' && r <= 'Z' { + result = append(result, '_') + } + result = append(result, r) + } + return strings.ToLower(string(result)) +} + +// 通用表查询函数,自动应用字段标准化 +func queryTableToShopData(db *sql.DB, tableName string, query string, args ...interface{}) ([]ShopDataRecord, error) { + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + var records []ShopDataRecord + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + for i := range columns { + valuePtrs[i] = &values[i] + } + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + dataMap := make(map[string]interface{}) + for i, col := range columns { + snakeCol := camelToSnake(col) + dataMap[snakeCol] = normalizeFieldValue(snakeCol, values[i], tableName) + } + records = append(records, ShopDataRecord{ + SourceTable: tableName, + Data: dataMap, + }) + } + return records, nil +} + +// 保留原有函数(如需使用) +func queryTableToMap(db *sql.DB, query string, args ...interface{}) ([]map[string]interface{}, error) { + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + columns, err := rows.Columns() + if err != nil { + return nil, err + } + + var results []map[string]interface{} + values := make([]interface{}, len(columns)) + valuePtrs := make([]interface{}, len(columns)) + + for rows.Next() { + for i := range columns { + valuePtrs[i] = &values[i] + } + if err := rows.Scan(valuePtrs...); err != nil { + return nil, err + } + rowMap := make(map[string]interface{}) + for i, col := range columns { + switch v := values[i].(type) { + case []byte: + rowMap[col] = string(v) + default: + rowMap[col] = v + } + } + results = append(results, rowMap) + } + return results, nil +} diff --git a/util/dbConnectUtil/dbConnectUtil.go b/util/dbConnectUtil/dbConnectUtil.go new file mode 100644 index 0000000..3595ddb --- /dev/null +++ b/util/dbConnectUtil/dbConnectUtil.go @@ -0,0 +1,40 @@ +package dbConnectUtil + +import ( + "database/sql" + "fmt" + "log" + + _ "github.com/go-sql-driver/mysql" +) + +// DB 数据库连接池 +var DB *sql.DB + +// InitDB 初始化数据库连接 +func InitDB(username, password, host string, port int) (*sql.DB, error) { + log.Printf("开始初始化数据库连接") + dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/zhishu?charset=utf8mb4&parseTime=True&loc=Local", + username, password, host, port) + + log.Printf("数据库连接参数: %s", dsn) + log.Print("开始连接数据库") + + var err error + DB, err = sql.Open("mysql", dsn) + if err != nil { + return DB, fmt.Errorf("数据库连接失败: %v", err) + } + + // 测试数据库连接 + err = DB.Ping() + if err != nil { + return DB, fmt.Errorf("数据库连接测试失败: %v", err) + } + + // 设置连接池参数 + DB.SetMaxOpenConns(20) + DB.SetMaxIdleConns(10) + + return DB, nil +}