package main import ( "context" "crypto/tls" "encoding/json" "flag" "fmt" "log" "net/http" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/go-redis/redis/v8" ) const ( esAddress = "http://36.212.12.92:9527" esUsername = "elastic" esPassword = "+Tz5qR_KushZ-bPgZ_H-" redisAddr = "36.212.12.247:6379" redisPassword = "long6166@@" redisDB = 1 ) /* ================= Client ================= */ type ESClient struct { client *elasticsearch.Client } func NewESClient(addresses []string, username, password string) (*ESClient, error) { cfg := elasticsearch.Config{ Addresses: addresses, Username: username, Password: password, Transport: &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, MaxIdleConnsPerHost: 100, ResponseHeaderTimeout: 60 * time.Second, }, } cli, err := elasticsearch.NewClient(cfg) if err != nil { return nil, err } return &ESClient{client: cli}, nil } type RedisClient struct { client *redis.Client } func NewRedisClient(addr, password string, db int) (*RedisClient, error) { rdb := redis.NewClient(&redis.Options{ Addr: addr, Password: password, DB: db, PoolSize: 100, DialTimeout: 10 * time.Second, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, }) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if _, err := rdb.Ping(ctx).Result(); err != nil { return nil, err } // 解决 Redis MISCONF 错误:禁用 bgsave 错误时的写入限制 if err := rdb.ConfigSet(ctx, "stop-writes-on-bgsave-error", "no").Err(); err != nil { log.Printf("警告:无法设置 Redis 配置 stop-writes-on-bgsave-error=no: %v", err) } else { log.Println("已设置 Redis 配置:stop-writes-on-bgsave-error=no") } return &RedisClient{client: rdb}, nil } /* ================= Data ================= */ type BookPic struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` } type BookPicS struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` PddResponse string `json:"pddResponse"` Localh string `json:"localh"` } type BookDetailImage struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` } type BookDirectoryImage struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` } type ESCatIdObject struct { PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` KongFuZiCatId string `json:"kong_fu_zi_cat_id"` XianYuCatId string `json:"xian_yu_cat_id"` } type BookData struct { ID int64 `json:"id"` BookName StringOrArray `json:"book_name"` BookPic BookPic `json:"book_pic"` BookDefPic BookPic `json:"book_def_pic"` BookPicS BookPicS `json:"book_pic_s"` BookPicObj string `json:"book_pic_obj"` BookDetailImage BookDetailImage `json:"book_detail_image"` BookPicB string `json:"book_pic_b"` BookDirectoryImage BookDirectoryImage `json:"book_directory_image"` ISBN string `json:"isbn"` Author string `json:"author"` Category string `json:"category"` Publisher string `json:"publisher"` PublicationTime string `json:"publication_time"` BindingLayout string `json:"binding_layout"` FixPrice FlexInt64 `json:"fix_price"` PageCount StringOrArray `json:"page_count"` WordCount StringOrArray `json:"word_count"` CatId ESCatIdObject `json:"cat_id"` IsSuit int64 `json:"is_suit"` } type ESResponse struct { ScrollID string `json:"_scroll_id"` Hits struct { Total struct { Value int64 `json:"value"` } `json:"total"` Hits []struct { Source BookData `json:"_source"` } `json:"hits"` } `json:"hits"` } /* ================= Custom Types ================= */ type StringOrArray string type FlexInt64 int64 func (f *FlexInt64) UnmarshalJSON(data []byte) error { // 尝试直接解析为数字 var num int64 if err := json.Unmarshal(data, &num); err == nil { *f = FlexInt64(num) return nil } // 尝试解析为字符串 var str string if err := json.Unmarshal(data, &str); err == nil { if str == "" { *f = 0 return nil } if val, err := strconv.ParseInt(str, 10, 64); err == nil { *f = FlexInt64(val) return nil } if val, err := strconv.ParseFloat(str, 64); err == nil { *f = FlexInt64(int64(val)) return nil } } // 默认为0 *f = 0 return nil } func (s *StringOrArray) UnmarshalJSON(data []byte) error { if len(data) > 0 && data[0] == '"' { var str string _ = json.Unmarshal(data, &str) *s = StringOrArray(str) return nil } var arr []string if json.Unmarshal(data, &arr) == nil && len(arr) > 0 { *s = StringOrArray(arr[0]) } return nil } type Float64OrString float64 func (f *Float64OrString) UnmarshalJSON(data []byte) error { var n float64 if json.Unmarshal(data, &n) == nil { *f = Float64OrString(n) return nil } var s string if json.Unmarshal(data, &s) == nil { if v, err := strconv.ParseFloat(strings.TrimSpace(s), 64); err == nil { *f = Float64OrString(v) } } return nil } /* ================= Main ================= */ func main() { flag.Parse() log.Println("开始全量同步 ES → Redis(强制全量写入)") esClient, err := NewESClient([]string{esAddress}, esUsername, esPassword) if err != nil { log.Fatal(err) } redisClient, err := NewRedisClient(redisAddr, redisPassword, redisDB) if err != nil { log.Fatal(err) } defer redisClient.client.Close() // 执行同步并获取总数 totalDocs, err := fetchAndWriteToRedis(context.Background(), esClient, redisClient) if err != nil { log.Fatal(err) } // 验证数据完整性 verifyDataCount(context.Background(), redisClient, totalDocs) } func fetchAndWriteToRedis(ctx context.Context, esClient *ESClient, redisClient *RedisClient) (int64, error) { // 详细统计变量 var ( readCount int64 // 从ES读取的总文档数 skipEmptyIsbn int64 // ISBN为空跳过的数量 convertFailed int64 // 转换或序列化失败的数量 writeSuccess int64 // 实际写入Redis成功的数量 writeFailed int64 // 写入失败的数量 sentToBatchChan int64 // 发送到batchChan的文档总数 receivedByWorkers int64 // 写入协程接收到的文档总数 ) const ( batchSize = 500 esWorkerCount = 8 redisWorkerCount = 32 bookChanSize = 10000 batchChanSize = 200 ) bookChan := make(chan BookData, bookChanSize) batchChan := make(chan []BookData, batchChanSize) var wg sync.WaitGroup // ================= Redis批量写入协程 ================= for i := 0; i < redisWorkerCount; i++ { wg.Add(1) go func(workerID int) { defer wg.Done() for batch := range batchChan { atomic.AddInt64(&receivedByWorkers, int64(len(batch))) // 准备要写入的键值对 type kv struct { key string val []byte } kvs := make([]kv, 0, len(batch)) for _, book := range batch { // 跳过空ISBN if book.ISBN == "" { atomic.AddInt64(&skipEmptyIsbn, 1) continue } // 安全转换(带panic恢复) bookInfo, err := safeConvertBookData(book) if err != nil { atomic.AddInt64(&convertFailed, 1) log.Printf("Worker %d 转换失败 ISBN=%s: %v", workerID, book.ISBN, err) continue } data, err := json.Marshal(bookInfo) if err != nil { atomic.AddInt64(&convertFailed, 1) log.Printf("Worker %d JSON序列化失败 ISBN=%s: %v", workerID, book.ISBN, err) continue } kvs = append(kvs, kv{key: book.ISBN, val: data}) } if len(kvs) == 0 { continue } // 使用Pipeline批量写入,并逐个检查结果 pipe := redisClient.client.Pipeline() cmds := make([]*redis.StatusCmd, 0, len(kvs)) for _, kv := range kvs { cmd := pipe.Set(ctx, kv.key, kv.val, 0) cmds = append(cmds, cmd) } // 执行Pipeline _, err := pipe.Exec(ctx) if err != nil && err != redis.Nil { // Pipeline整体失败(如网络错误),整批标记为失败 atomic.AddInt64(&writeFailed, int64(len(kvs))) log.Printf("Worker %d Pipeline执行失败: %v, 影响 %d 条", workerID, err, len(kvs)) continue } // 逐个检查每条命令的结果 successInBatch := 0 for i, cmd := range cmds { if cmd.Err() != nil { atomic.AddInt64(&writeFailed, 1) log.Printf("Worker %d 单条写入失败 key=%s: %v", workerID, kvs[i].key, cmd.Err()) } else { successInBatch++ } } atomic.AddInt64(&writeSuccess, int64(successInBatch)) if successInBatch < len(kvs) { log.Printf("Worker %d 批次部分失败: 预期 %d, 成功 %d", workerID, len(kvs), successInBatch) } } log.Printf("Redis写入协程 %d 退出,累计接收文档数 %d", workerID, atomic.LoadInt64(&receivedByWorkers)) }(i) } // ================= 批处理协程 ================= wg.Add(1) go func() { defer wg.Done() var batch []BookData batchSeq := 0 for book := range bookChan { batch = append(batch, book) if len(batch) >= batchSize { batchSeq++ atomic.AddInt64(&sentToBatchChan, int64(len(batch))) batchChan <- batch batch = []BookData{} } } // 处理剩余数据 if len(batch) > 0 { batchSeq++ atomic.AddInt64(&sentToBatchChan, int64(len(batch))) batchChan <- batch } close(batchChan) log.Printf("批处理协程退出,共发送 %d 个批次,总文档数 %d", batchSeq, atomic.LoadInt64(&sentToBatchChan)) }() // ================= ES 初始查询 ================= size := 10000 req := esapi.SearchRequest{ Index: []string{"books-from-mysql-v2"}, Size: &size, Scroll: time.Minute, } res, err := req.Do(ctx, esClient.client) if err != nil { return 0, fmt.Errorf("初始查询失败: %w", err) } defer res.Body.Close() var esResp ESResponse if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil { return 0, fmt.Errorf("解析初始响应失败: %w", err) } totalDocs := esResp.Hits.Total.Value log.Printf("ES 总文档数: %d", totalDocs) scrollID := esResp.ScrollID defer clearScroll(ctx, esClient, scrollID) // ================= 进度报告协程 ================= ticker := time.NewTicker(5 * time.Second) go func() { for range ticker.C { read := atomic.LoadInt64(&readCount) skip := atomic.LoadInt64(&skipEmptyIsbn) convFail := atomic.LoadInt64(&convertFailed) writeOK := atomic.LoadInt64(&writeSuccess) writeErr := atomic.LoadInt64(&writeFailed) sent := atomic.LoadInt64(&sentToBatchChan) recv := atomic.LoadInt64(&receivedByWorkers) progress := float64(read) / float64(totalDocs) * 100 log.Printf("进度: %.2f%% | 已读: %d | 跳过空ISBN: %d | 转换失败: %d | 写入成功: %d | 写入失败: %d | 发送到批处理: %d | 写入协程接收: %d", progress, read, skip, convFail, writeOK, writeErr, sent, recv) } }() // ================= 处理初始结果 ================= for _, hit := range esResp.Hits.Hits { bookChan <- hit.Source atomic.AddInt64(&readCount, 1) } // ================= 并行处理ES数据转换 ================= var processWg sync.WaitGroup processChan := make(chan []BookData, esWorkerCount*2) for i := 0; i < esWorkerCount; i++ { processWg.Add(1) go func(workerID int) { defer processWg.Done() for books := range processChan { for _, book := range books { bookChan <- book atomic.AddInt64(&readCount, 1) } } log.Printf("ES处理协程 %d 退出", workerID) }(i) } // ================= Scroll 循环拉取剩余数据 ================= currentScrollID := scrollID batchCount := 0 for { scrollReq := esapi.ScrollRequest{ ScrollID: currentScrollID, Scroll: time.Minute, } scrollRes, err := scrollReq.Do(ctx, esClient.client) if err != nil { log.Printf("Scroll请求失败: %v", err) break } var scrollResp ESResponse if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResp); err != nil { scrollRes.Body.Close() log.Printf("解析Scroll响应失败: %v", err) break } scrollRes.Body.Close() if len(scrollResp.Hits.Hits) == 0 { break } // 更新scrollID currentScrollID = scrollResp.ScrollID // 收集本批数据 books := make([]BookData, 0, len(scrollResp.Hits.Hits)) for _, hit := range scrollResp.Hits.Hits { books = append(books, hit.Source) } processChan <- books batchCount++ } // 关闭处理通道,等待所有处理协程完成 close(processChan) processWg.Wait() close(bookChan) wg.Wait() ticker.Stop() // ================= 最终统计 ================= log.Printf("同步完成汇总: 总读取=%d, 发送到批处理=%d, 写入协程接收=%d, 跳过空ISBN=%d, 转换失败=%d, 写入成功=%d, 写入失败=%d", readCount, sentToBatchChan, receivedByWorkers, skipEmptyIsbn, convertFailed, writeSuccess, writeFailed) // 检查数据完整性 if writeSuccess+writeFailed+skipEmptyIsbn+convertFailed != readCount { log.Printf("⚠️ 统计不一致: 写入成功(%d)+失败(%d)+跳过(%d)+转换失败(%d)=%d, 但读取总数=%d", writeSuccess, writeFailed, skipEmptyIsbn, convertFailed, writeSuccess+writeFailed+skipEmptyIsbn+convertFailed, readCount) } return totalDocs, nil } // cleanKongFuZiCatId 清理孔夫子分类ID:将 > 替换为 /,并去除各段两端的空格 func cleanKongFuZiCatId(raw string) string { parts := strings.Split(raw, ">") for i, p := range parts { parts[i] = strings.TrimSpace(p) } return strings.Join(parts, "/") } // safeConvertBookData 带 panic 恢复的转换函数 func safeConvertBookData(bookData BookData) (bookInfo BookInfo, err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("panic in convertBookDataToBookInfoSafe: %v", r) } }() return convertBookDataToBookInfoSafe(bookData) } // 安全的转换函数,返回error func convertBookDataToBookInfoSafe(bookData BookData) (BookInfo, error) { // 检查必要字段 if bookData.ISBN == "" { return BookInfo{}, fmt.Errorf("ISBN为空") } bookInfo := BookInfo{ Isbn: bookData.ISBN, BookName: string(bookData.BookName), Author: bookData.Author, Publishing: bookData.Publisher, PublicationDate: bookData.PublicationTime, Binding: bookData.BindingLayout, Format: 0, CatIdObject: CatIdObject{ PinDuoDuoCatId: bookData.CatId.PinDuoDuoCatId, KongFuZiCatId: cleanKongFuZiCatId(bookData.CatId.KongFuZiCatId), XianYuCatId: bookData.CatId.XianYuCatId, }, } // 页数转换 if pageCountStr := strings.TrimSpace(string(bookData.PageCount)); pageCountStr != "" { if pageCount, err := strconv.ParseInt(pageCountStr, 10, 64); err == nil { bookInfo.PagesCount = pageCount } } // 字数转换 if wordCountStr := strings.TrimSpace(string(bookData.WordCount)); wordCountStr != "" { if wordCount, err := strconv.ParseInt(wordCountStr, 10, 64); err == nil { bookInfo.WordsCount = wordCount } } // 出版时间转换(修复逻辑:支持多种格式) if publicationTimeStr := strings.TrimSpace(bookData.PublicationTime); publicationTimeStr != "" { publicationTime, err := strconv.ParseInt(publicationTimeStr, 10, 64) publicationTime = publicationTime - 5364000000 timestamp, err := strconv.ParseInt(strconv.FormatInt(publicationTime, 10), 10, 64) //fmt.Println(timestamp) if err == nil { var t time.Time // 判断是秒级还是毫秒级时间戳(假设大于 1e9 的是毫秒级) //if timestamp > 1e12 { // 毫秒级 // t = time.UnixMilli(timestamp) //} else { // 秒级 t = time.Unix(timestamp, 0) //} bookInfo.PublicationDate = t.Format("2006-01") //fmt.Println("=======PublicationDate", bookInfo.PublicationDate, bookData.ISBN) } else { // 转换失败,赋值为空字符串 bookInfo.PublicationDate = "" } } // 价格 bookInfo.Price = int64(bookData.FixPrice) // 构建图片对象 imageObject := ImageObject{ CarouselUrlArray: []string{}, DetailUrlObject: DetailImageObject{ IntroductionUrl: []string{}, CatalogueUrl: []string{}, LiveShootingUrl: []string{}, OtherUrl: []string{}, }, } if bookData.BookPic.PddPath != "" { imageObject.CarouselUrlArray = append(imageObject.CarouselUrlArray, bookData.BookPic.PddPath) } if bookData.BookDefPic.PddPath != "" { imageObject.DefaultImageUrl = bookData.BookDefPic.PddPath } if bookData.BookPicS.PddResponse != "" { imageObject.WhiteBackgroundUrl = bookData.BookPicS.PddResponse } if bookData.BookDetailImage.PddPath != "" { imageObject.DetailUrlObject.IntroductionUrl = append(imageObject.DetailUrlObject.IntroductionUrl, bookData.BookDetailImage.PddPath) } if bookData.BookDirectoryImage.PddPath != "" { imageObject.DetailUrlObject.CatalogueUrl = append(imageObject.DetailUrlObject.CatalogueUrl, bookData.BookDirectoryImage.PddPath) } bookInfo.ImageObject = &imageObject return bookInfo, nil } // 验证数据完整性 func verifyDataCount(ctx context.Context, redisClient *RedisClient, expectedCount int64) { log.Println("开始验证数据完整性...") var cursor uint64 var totalCount int64 var batchSize int64 = 1000 for { var keys []string var err error keys, cursor, err = redisClient.client.Scan(ctx, cursor, "*", batchSize).Result() if err != nil { log.Printf("Scan失败: %v", err) break } totalCount += int64(len(keys)) log.Printf("Scan进度: 已获取 %d 个key", totalCount) if cursor == 0 { break } } log.Printf("验证结果 - ES文档数: %d, Redis存储数: %d", expectedCount, totalCount) if totalCount == expectedCount { log.Println("✅ 数据完整性验证通过,所有数据已完整写入") } else if totalCount < expectedCount { log.Printf("⚠️ 数据不完整,缺失 %d 条", expectedCount-totalCount) } else { log.Printf("⚠️ Redis数据多于ES,多出 %d 条", totalCount-expectedCount) } } func clearScroll(ctx context.Context, esClient *ESClient, scrollID string) { if scrollID == "" { return } req := esapi.ClearScrollRequest{ScrollID: []string{scrollID}} res, err := req.Do(ctx, esClient.client) if err != nil { log.Printf("清除Scroll失败: %v", err) return } defer res.Body.Close() log.Println("Scroll已清除") } // BookInfo Redis中存储的书籍信息结构 type BookInfo struct { Isbn string `json:"isbn"` // ISBN BookName string `json:"book_name"` // 书名 Author string `json:"author"` // 作者 Publishing string `json:"publishing"` // 出版社 PublicationDate string `json:"publication_date"` // 出版时间 Binding string `json:"binding"` // 装帧 PagesCount int64 `json:"pages_count"` // 页数 WordsCount int64 `json:"words_count"` // 字数 Format int64 `json:"format"` // 开本 ImageObject *ImageObject `json:"image_object"` // 图片 Price int64 `json:"price"` // 售价(分) CatIdObject CatIdObject `json:"cat_id"` // 分类 IsSuit int64 `json:"is_suit"` // 套装书 } // ImageObject 图片对象结构 type ImageObject struct { CarouselUrlArray []string `json:"carousel_url_array"` // 轮播图 WhiteBackgroundUrl string `json:"white_background_url"` // 白底图 DetailUrlObject DetailImageObject `json:"detail_url_object"` // 详情对象 DefaultImageUrl string `json:"default_image_url"` // 默认图 } // DetailImageObject 详情图片对象结构 type DetailImageObject struct { IntroductionUrl []string `json:"introduction_url"` // 简介图 CatalogueUrl []string `json:"catalogue_url"` // 目录图 LiveShootingUrl []string `json:"live_shooting_url"` // 实拍图 OtherUrl []string `json:"other_url"` // 其他图 } // CatIdObject 分类ID对象 type CatIdObject struct { PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` // 拼多多分类 ID KongFuZiCatId string `json:"kong_fu_zi_cat_id"` // 孔夫子分类 ID XianYuCatId string `json:"xian_yu_cat_id"` // 闲鱼分类 ID }