daShangDao_batch/goods_es_to_redis/main.go
2026-06-15 16:34:21 +08:00

699 lines
20 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"github.com/go-redis/redis/v8"
)
const (
esAddress = "http://36.212.12.92:9527"
esUsername = "elastic"
esPassword = "+Tz5qR_KushZ-bPgZ_H-"
redisAddr = "36.212.12.247:6379"
redisPassword = "long6166@@"
redisDB = 1
)
/* ================= Client ================= */
type ESClient struct {
client *elasticsearch.Client
}
func NewESClient(addresses []string, username, password string) (*ESClient, error) {
cfg := elasticsearch.Config{
Addresses: addresses,
Username: username,
Password: password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConnsPerHost: 100,
ResponseHeaderTimeout: 60 * time.Second,
},
}
cli, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
return &ESClient{client: cli}, nil
}
type RedisClient struct {
client *redis.Client
}
func NewRedisClient(addr, password string, db int) (*RedisClient, error) {
rdb := redis.NewClient(&redis.Options{
Addr: addr,
Password: password,
DB: db,
PoolSize: 100,
DialTimeout: 10 * time.Second,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if _, err := rdb.Ping(ctx).Result(); err != nil {
return nil, err
}
// 解决 Redis MISCONF 错误:禁用 bgsave 错误时的写入限制
if err := rdb.ConfigSet(ctx, "stop-writes-on-bgsave-error", "no").Err(); err != nil {
log.Printf("警告:无法设置 Redis 配置 stop-writes-on-bgsave-error=no: %v", err)
} else {
log.Println("已设置 Redis 配置stop-writes-on-bgsave-error=no")
}
return &RedisClient{client: rdb}, nil
}
/* ================= Data ================= */
type BookPic struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
}
type BookPicS struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
PddResponse string `json:"pddResponse"`
Localh string `json:"localh"`
}
type BookDetailImage struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
}
type BookDirectoryImage struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
}
type ESCatIdObject struct {
PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"`
KongFuZiCatId string `json:"kong_fu_zi_cat_id"`
XianYuCatId string `json:"xian_yu_cat_id"`
}
type BookData struct {
ID int64 `json:"id"`
BookName StringOrArray `json:"book_name"`
BookPic BookPic `json:"book_pic"`
BookDefPic BookPic `json:"book_def_pic"`
BookPicS BookPicS `json:"book_pic_s"`
BookPicObj string `json:"book_pic_obj"`
BookDetailImage BookDetailImage `json:"book_detail_image"`
BookPicB string `json:"book_pic_b"`
BookDirectoryImage BookDirectoryImage `json:"book_directory_image"`
ISBN string `json:"isbn"`
Author string `json:"author"`
Category string `json:"category"`
Publisher string `json:"publisher"`
PublicationTime string `json:"publication_time"`
BindingLayout string `json:"binding_layout"`
FixPrice FlexInt64 `json:"fix_price"`
PageCount StringOrArray `json:"page_count"`
WordCount StringOrArray `json:"word_count"`
CatId ESCatIdObject `json:"cat_id"`
IsSuit int64 `json:"is_suit"`
}
type ESResponse struct {
ScrollID string `json:"_scroll_id"`
Hits struct {
Total struct {
Value int64 `json:"value"`
} `json:"total"`
Hits []struct {
Source BookData `json:"_source"`
} `json:"hits"`
} `json:"hits"`
}
/* ================= Custom Types ================= */
type StringOrArray string
type FlexInt64 int64
func (f *FlexInt64) UnmarshalJSON(data []byte) error {
// 尝试直接解析为数字
var num int64
if err := json.Unmarshal(data, &num); err == nil {
*f = FlexInt64(num)
return nil
}
// 尝试解析为字符串
var str string
if err := json.Unmarshal(data, &str); err == nil {
if str == "" {
*f = 0
return nil
}
if val, err := strconv.ParseInt(str, 10, 64); err == nil {
*f = FlexInt64(val)
return nil
}
if val, err := strconv.ParseFloat(str, 64); err == nil {
*f = FlexInt64(int64(val))
return nil
}
}
// 默认为0
*f = 0
return nil
}
func (s *StringOrArray) UnmarshalJSON(data []byte) error {
if len(data) > 0 && data[0] == '"' {
var str string
_ = json.Unmarshal(data, &str)
*s = StringOrArray(str)
return nil
}
var arr []string
if json.Unmarshal(data, &arr) == nil && len(arr) > 0 {
*s = StringOrArray(arr[0])
}
return nil
}
type Float64OrString float64
func (f *Float64OrString) UnmarshalJSON(data []byte) error {
var n float64
if json.Unmarshal(data, &n) == nil {
*f = Float64OrString(n)
return nil
}
var s string
if json.Unmarshal(data, &s) == nil {
if v, err := strconv.ParseFloat(strings.TrimSpace(s), 64); err == nil {
*f = Float64OrString(v)
}
}
return nil
}
/* ================= Main ================= */
func main() {
flag.Parse()
log.Println("开始全量同步 ES → Redis强制全量写入")
esClient, err := NewESClient([]string{esAddress}, esUsername, esPassword)
if err != nil {
log.Fatal(err)
}
redisClient, err := NewRedisClient(redisAddr, redisPassword, redisDB)
if err != nil {
log.Fatal(err)
}
defer redisClient.client.Close()
// 执行同步并获取总数
totalDocs, err := fetchAndWriteToRedis(context.Background(), esClient, redisClient)
if err != nil {
log.Fatal(err)
}
// 验证数据完整性
verifyDataCount(context.Background(), redisClient, totalDocs)
}
func fetchAndWriteToRedis(ctx context.Context, esClient *ESClient, redisClient *RedisClient) (int64, error) {
// 详细统计变量
var (
readCount int64 // 从ES读取的总文档数
skipEmptyIsbn int64 // ISBN为空跳过的数量
convertFailed int64 // 转换或序列化失败的数量
writeSuccess int64 // 实际写入Redis成功的数量
writeFailed int64 // 写入失败的数量
sentToBatchChan int64 // 发送到batchChan的文档总数
receivedByWorkers int64 // 写入协程接收到的文档总数
)
const (
batchSize = 500
esWorkerCount = 8
redisWorkerCount = 32
bookChanSize = 10000
batchChanSize = 200
)
bookChan := make(chan BookData, bookChanSize)
batchChan := make(chan []BookData, batchChanSize)
var wg sync.WaitGroup
// ================= Redis批量写入协程 =================
for i := 0; i < redisWorkerCount; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for batch := range batchChan {
atomic.AddInt64(&receivedByWorkers, int64(len(batch)))
// 准备要写入的键值对
type kv struct {
key string
val []byte
}
kvs := make([]kv, 0, len(batch))
for _, book := range batch {
// 跳过空ISBN
if book.ISBN == "" {
atomic.AddInt64(&skipEmptyIsbn, 1)
continue
}
// 安全转换带panic恢复
bookInfo, err := safeConvertBookData(book)
if err != nil {
atomic.AddInt64(&convertFailed, 1)
log.Printf("Worker %d 转换失败 ISBN=%s: %v", workerID, book.ISBN, err)
continue
}
data, err := json.Marshal(bookInfo)
if err != nil {
atomic.AddInt64(&convertFailed, 1)
log.Printf("Worker %d JSON序列化失败 ISBN=%s: %v", workerID, book.ISBN, err)
continue
}
kvs = append(kvs, kv{key: book.ISBN, val: data})
}
if len(kvs) == 0 {
continue
}
// 使用Pipeline批量写入并逐个检查结果
pipe := redisClient.client.Pipeline()
cmds := make([]*redis.StatusCmd, 0, len(kvs))
for _, kv := range kvs {
cmd := pipe.Set(ctx, kv.key, kv.val, 0)
cmds = append(cmds, cmd)
}
// 执行Pipeline
_, err := pipe.Exec(ctx)
if err != nil && err != redis.Nil {
// Pipeline整体失败如网络错误整批标记为失败
atomic.AddInt64(&writeFailed, int64(len(kvs)))
log.Printf("Worker %d Pipeline执行失败: %v, 影响 %d 条", workerID, err, len(kvs))
continue
}
// 逐个检查每条命令的结果
successInBatch := 0
for i, cmd := range cmds {
if cmd.Err() != nil {
atomic.AddInt64(&writeFailed, 1)
log.Printf("Worker %d 单条写入失败 key=%s: %v", workerID, kvs[i].key, cmd.Err())
} else {
successInBatch++
}
}
atomic.AddInt64(&writeSuccess, int64(successInBatch))
if successInBatch < len(kvs) {
log.Printf("Worker %d 批次部分失败: 预期 %d, 成功 %d", workerID, len(kvs), successInBatch)
}
}
log.Printf("Redis写入协程 %d 退出,累计接收文档数 %d", workerID, atomic.LoadInt64(&receivedByWorkers))
}(i)
}
// ================= 批处理协程 =================
wg.Add(1)
go func() {
defer wg.Done()
var batch []BookData
batchSeq := 0
for book := range bookChan {
batch = append(batch, book)
if len(batch) >= batchSize {
batchSeq++
atomic.AddInt64(&sentToBatchChan, int64(len(batch)))
batchChan <- batch
batch = []BookData{}
}
}
// 处理剩余数据
if len(batch) > 0 {
batchSeq++
atomic.AddInt64(&sentToBatchChan, int64(len(batch)))
batchChan <- batch
}
close(batchChan)
log.Printf("批处理协程退出,共发送 %d 个批次,总文档数 %d", batchSeq, atomic.LoadInt64(&sentToBatchChan))
}()
// ================= ES 初始查询 =================
size := 10000
req := esapi.SearchRequest{
Index: []string{"books-from-mysql-v2"},
Size: &size,
Scroll: time.Minute,
}
res, err := req.Do(ctx, esClient.client)
if err != nil {
return 0, fmt.Errorf("初始查询失败: %w", err)
}
defer res.Body.Close()
var esResp ESResponse
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
return 0, fmt.Errorf("解析初始响应失败: %w", err)
}
totalDocs := esResp.Hits.Total.Value
log.Printf("ES 总文档数: %d", totalDocs)
scrollID := esResp.ScrollID
defer clearScroll(ctx, esClient, scrollID)
// ================= 进度报告协程 =================
ticker := time.NewTicker(5 * time.Second)
go func() {
for range ticker.C {
read := atomic.LoadInt64(&readCount)
skip := atomic.LoadInt64(&skipEmptyIsbn)
convFail := atomic.LoadInt64(&convertFailed)
writeOK := atomic.LoadInt64(&writeSuccess)
writeErr := atomic.LoadInt64(&writeFailed)
sent := atomic.LoadInt64(&sentToBatchChan)
recv := atomic.LoadInt64(&receivedByWorkers)
progress := float64(read) / float64(totalDocs) * 100
log.Printf("进度: %.2f%% | 已读: %d | 跳过空ISBN: %d | 转换失败: %d | 写入成功: %d | 写入失败: %d | 发送到批处理: %d | 写入协程接收: %d",
progress, read, skip, convFail, writeOK, writeErr, sent, recv)
}
}()
// ================= 处理初始结果 =================
for _, hit := range esResp.Hits.Hits {
bookChan <- hit.Source
atomic.AddInt64(&readCount, 1)
}
// ================= 并行处理ES数据转换 =================
var processWg sync.WaitGroup
processChan := make(chan []BookData, esWorkerCount*2)
for i := 0; i < esWorkerCount; i++ {
processWg.Add(1)
go func(workerID int) {
defer processWg.Done()
for books := range processChan {
for _, book := range books {
bookChan <- book
atomic.AddInt64(&readCount, 1)
}
}
log.Printf("ES处理协程 %d 退出", workerID)
}(i)
}
// ================= Scroll 循环拉取剩余数据 =================
currentScrollID := scrollID
batchCount := 0
for {
scrollReq := esapi.ScrollRequest{
ScrollID: currentScrollID,
Scroll: time.Minute,
}
scrollRes, err := scrollReq.Do(ctx, esClient.client)
if err != nil {
log.Printf("Scroll请求失败: %v", err)
break
}
var scrollResp ESResponse
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResp); err != nil {
scrollRes.Body.Close()
log.Printf("解析Scroll响应失败: %v", err)
break
}
scrollRes.Body.Close()
if len(scrollResp.Hits.Hits) == 0 {
break
}
// 更新scrollID
currentScrollID = scrollResp.ScrollID
// 收集本批数据
books := make([]BookData, 0, len(scrollResp.Hits.Hits))
for _, hit := range scrollResp.Hits.Hits {
books = append(books, hit.Source)
}
processChan <- books
batchCount++
}
// 关闭处理通道,等待所有处理协程完成
close(processChan)
processWg.Wait()
close(bookChan)
wg.Wait()
ticker.Stop()
// ================= 最终统计 =================
log.Printf("同步完成汇总: 总读取=%d, 发送到批处理=%d, 写入协程接收=%d, 跳过空ISBN=%d, 转换失败=%d, 写入成功=%d, 写入失败=%d",
readCount, sentToBatchChan, receivedByWorkers, skipEmptyIsbn, convertFailed, writeSuccess, writeFailed)
// 检查数据完整性
if writeSuccess+writeFailed+skipEmptyIsbn+convertFailed != readCount {
log.Printf("⚠️ 统计不一致: 写入成功(%d)+失败(%d)+跳过(%d)+转换失败(%d)=%d, 但读取总数=%d",
writeSuccess, writeFailed, skipEmptyIsbn, convertFailed,
writeSuccess+writeFailed+skipEmptyIsbn+convertFailed, readCount)
}
return totalDocs, nil
}
// cleanKongFuZiCatId 清理孔夫子分类ID将 > 替换为 /,并去除各段两端的空格
func cleanKongFuZiCatId(raw string) string {
parts := strings.Split(raw, ">")
for i, p := range parts {
parts[i] = strings.TrimSpace(p)
}
return strings.Join(parts, "/")
}
// safeConvertBookData 带 panic 恢复的转换函数
func safeConvertBookData(bookData BookData) (bookInfo BookInfo, err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic in convertBookDataToBookInfoSafe: %v", r)
}
}()
return convertBookDataToBookInfoSafe(bookData)
}
// 安全的转换函数返回error
func convertBookDataToBookInfoSafe(bookData BookData) (BookInfo, error) {
// 检查必要字段
if bookData.ISBN == "" {
return BookInfo{}, fmt.Errorf("ISBN为空")
}
bookInfo := BookInfo{
Isbn: bookData.ISBN,
BookName: string(bookData.BookName),
Author: bookData.Author,
Publishing: bookData.Publisher,
PublicationDate: bookData.PublicationTime,
Binding: bookData.BindingLayout,
Format: 0,
CatIdObject: CatIdObject{
PinDuoDuoCatId: bookData.CatId.PinDuoDuoCatId,
KongFuZiCatId: cleanKongFuZiCatId(bookData.CatId.KongFuZiCatId),
XianYuCatId: bookData.CatId.XianYuCatId,
},
}
// 页数转换
if pageCountStr := strings.TrimSpace(string(bookData.PageCount)); pageCountStr != "" {
if pageCount, err := strconv.ParseInt(pageCountStr, 10, 64); err == nil {
bookInfo.PagesCount = pageCount
}
}
// 字数转换
if wordCountStr := strings.TrimSpace(string(bookData.WordCount)); wordCountStr != "" {
if wordCount, err := strconv.ParseInt(wordCountStr, 10, 64); err == nil {
bookInfo.WordsCount = wordCount
}
}
// 出版时间转换(修复逻辑:支持多种格式)
if publicationTimeStr := strings.TrimSpace(bookData.PublicationTime); publicationTimeStr != "" {
publicationTime, err := strconv.ParseInt(publicationTimeStr, 10, 64)
publicationTime = publicationTime - 5364000000
timestamp, err := strconv.ParseInt(strconv.FormatInt(publicationTime, 10), 10, 64)
//fmt.Println(timestamp)
if err == nil {
var t time.Time
// 判断是秒级还是毫秒级时间戳(假设大于 1e9 的是毫秒级)
//if timestamp > 1e12 { // 毫秒级
// t = time.UnixMilli(timestamp)
//} else { // 秒级
t = time.Unix(timestamp, 0)
//}
bookInfo.PublicationDate = t.Format("2006-01")
//fmt.Println("=======PublicationDate", bookInfo.PublicationDate, bookData.ISBN)
} else {
// 转换失败,赋值为空字符串
bookInfo.PublicationDate = ""
}
}
// 价格
bookInfo.Price = int64(bookData.FixPrice)
// 构建图片对象
imageObject := ImageObject{
CarouselUrlArray: []string{},
DetailUrlObject: DetailImageObject{
IntroductionUrl: []string{},
CatalogueUrl: []string{},
LiveShootingUrl: []string{},
OtherUrl: []string{},
},
}
if bookData.BookPic.PddPath != "" {
imageObject.CarouselUrlArray = append(imageObject.CarouselUrlArray, bookData.BookPic.PddPath)
}
if bookData.BookDefPic.PddPath != "" {
imageObject.DefaultImageUrl = bookData.BookDefPic.PddPath
}
if bookData.BookPicS.PddResponse != "" {
imageObject.WhiteBackgroundUrl = bookData.BookPicS.PddResponse
}
if bookData.BookDetailImage.PddPath != "" {
imageObject.DetailUrlObject.IntroductionUrl = append(imageObject.DetailUrlObject.IntroductionUrl, bookData.BookDetailImage.PddPath)
}
if bookData.BookDirectoryImage.PddPath != "" {
imageObject.DetailUrlObject.CatalogueUrl = append(imageObject.DetailUrlObject.CatalogueUrl, bookData.BookDirectoryImage.PddPath)
}
bookInfo.ImageObject = &imageObject
return bookInfo, nil
}
// 验证数据完整性
func verifyDataCount(ctx context.Context, redisClient *RedisClient, expectedCount int64) {
log.Println("开始验证数据完整性...")
var cursor uint64
var totalCount int64
var batchSize int64 = 1000
for {
var keys []string
var err error
keys, cursor, err = redisClient.client.Scan(ctx, cursor, "*", batchSize).Result()
if err != nil {
log.Printf("Scan失败: %v", err)
break
}
totalCount += int64(len(keys))
log.Printf("Scan进度: 已获取 %d 个key", totalCount)
if cursor == 0 {
break
}
}
log.Printf("验证结果 - ES文档数: %d, Redis存储数: %d", expectedCount, totalCount)
if totalCount == expectedCount {
log.Println("✅ 数据完整性验证通过,所有数据已完整写入")
} else if totalCount < expectedCount {
log.Printf("⚠️ 数据不完整,缺失 %d 条", expectedCount-totalCount)
} else {
log.Printf("⚠️ Redis数据多于ES多出 %d 条", totalCount-expectedCount)
}
}
func clearScroll(ctx context.Context, esClient *ESClient, scrollID string) {
if scrollID == "" {
return
}
req := esapi.ClearScrollRequest{ScrollID: []string{scrollID}}
res, err := req.Do(ctx, esClient.client)
if err != nil {
log.Printf("清除Scroll失败: %v", err)
return
}
defer res.Body.Close()
log.Println("Scroll已清除")
}
// BookInfo Redis中存储的书籍信息结构
type BookInfo struct {
Isbn string `json:"isbn"` // ISBN
BookName string `json:"book_name"` // 书名
Author string `json:"author"` // 作者
Publishing string `json:"publishing"` // 出版社
PublicationDate string `json:"publication_date"` // 出版时间
Binding string `json:"binding"` // 装帧
PagesCount int64 `json:"pages_count"` // 页数
WordsCount int64 `json:"words_count"` // 字数
Format int64 `json:"format"` // 开本
ImageObject *ImageObject `json:"image_object"` // 图片
Price int64 `json:"price"` // 售价(分)
CatIdObject CatIdObject `json:"cat_id"` // 分类
IsSuit int64 `json:"is_suit"` // 套装书
}
// ImageObject 图片对象结构
type ImageObject struct {
CarouselUrlArray []string `json:"carousel_url_array"` // 轮播图
WhiteBackgroundUrl string `json:"white_background_url"` // 白底图
DetailUrlObject DetailImageObject `json:"detail_url_object"` // 详情对象
DefaultImageUrl string `json:"default_image_url"` // 默认图
}
// DetailImageObject 详情图片对象结构
type DetailImageObject struct {
IntroductionUrl []string `json:"introduction_url"` // 简介图
CatalogueUrl []string `json:"catalogue_url"` // 目录图
LiveShootingUrl []string `json:"live_shooting_url"` // 实拍图
OtherUrl []string `json:"other_url"` // 其他图
}
// CatIdObject 分类ID对象
type CatIdObject struct {
PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` // 拼多多分类 ID
KongFuZiCatId string `json:"kong_fu_zi_cat_id"` // 孔夫子分类 ID
XianYuCatId string `json:"xian_yu_cat_id"` // 闲鱼分类 ID
}