699 lines
20 KiB
Go
699 lines
20 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"crypto/tls"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"net/http"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
const (
|
||
esAddress = "http://36.212.12.92:9527"
|
||
esUsername = "elastic"
|
||
esPassword = "+Tz5qR_KushZ-bPgZ_H-"
|
||
|
||
redisAddr = "36.212.12.247:6379"
|
||
redisPassword = "long6166@@"
|
||
redisDB = 1
|
||
)
|
||
|
||
/* ================= Client ================= */
|
||
|
||
type ESClient struct {
|
||
client *elasticsearch.Client
|
||
}
|
||
|
||
func NewESClient(addresses []string, username, password string) (*ESClient, error) {
|
||
cfg := elasticsearch.Config{
|
||
Addresses: addresses,
|
||
Username: username,
|
||
Password: password,
|
||
Transport: &http.Transport{
|
||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||
MaxIdleConnsPerHost: 100,
|
||
ResponseHeaderTimeout: 60 * time.Second,
|
||
},
|
||
}
|
||
cli, err := elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &ESClient{client: cli}, nil
|
||
}
|
||
|
||
type RedisClient struct {
|
||
client *redis.Client
|
||
}
|
||
|
||
func NewRedisClient(addr, password string, db int) (*RedisClient, error) {
|
||
rdb := redis.NewClient(&redis.Options{
|
||
Addr: addr,
|
||
Password: password,
|
||
DB: db,
|
||
PoolSize: 100,
|
||
DialTimeout: 10 * time.Second,
|
||
ReadTimeout: 10 * time.Second,
|
||
WriteTimeout: 10 * time.Second,
|
||
})
|
||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||
defer cancel()
|
||
if _, err := rdb.Ping(ctx).Result(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
// 解决 Redis MISCONF 错误:禁用 bgsave 错误时的写入限制
|
||
if err := rdb.ConfigSet(ctx, "stop-writes-on-bgsave-error", "no").Err(); err != nil {
|
||
log.Printf("警告:无法设置 Redis 配置 stop-writes-on-bgsave-error=no: %v", err)
|
||
} else {
|
||
log.Println("已设置 Redis 配置:stop-writes-on-bgsave-error=no")
|
||
}
|
||
|
||
return &RedisClient{client: rdb}, nil
|
||
}
|
||
|
||
/* ================= Data ================= */
|
||
|
||
type BookPic struct {
|
||
LocalPath string `json:"localPath"`
|
||
PddPath string `json:"pddPath"`
|
||
}
|
||
|
||
type BookPicS struct {
|
||
LocalPath string `json:"localPath"`
|
||
PddPath string `json:"pddPath"`
|
||
PddResponse string `json:"pddResponse"`
|
||
Localh string `json:"localh"`
|
||
}
|
||
|
||
type BookDetailImage struct {
|
||
LocalPath string `json:"localPath"`
|
||
PddPath string `json:"pddPath"`
|
||
}
|
||
|
||
type BookDirectoryImage struct {
|
||
LocalPath string `json:"localPath"`
|
||
PddPath string `json:"pddPath"`
|
||
}
|
||
|
||
type ESCatIdObject struct {
|
||
PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"`
|
||
KongFuZiCatId string `json:"kong_fu_zi_cat_id"`
|
||
XianYuCatId string `json:"xian_yu_cat_id"`
|
||
}
|
||
|
||
type BookData struct {
|
||
ID int64 `json:"id"`
|
||
BookName StringOrArray `json:"book_name"`
|
||
BookPic BookPic `json:"book_pic"`
|
||
BookDefPic BookPic `json:"book_def_pic"`
|
||
BookPicS BookPicS `json:"book_pic_s"`
|
||
BookPicObj string `json:"book_pic_obj"`
|
||
BookDetailImage BookDetailImage `json:"book_detail_image"`
|
||
BookPicB string `json:"book_pic_b"`
|
||
BookDirectoryImage BookDirectoryImage `json:"book_directory_image"`
|
||
ISBN string `json:"isbn"`
|
||
Author string `json:"author"`
|
||
Category string `json:"category"`
|
||
Publisher string `json:"publisher"`
|
||
PublicationTime string `json:"publication_time"`
|
||
BindingLayout string `json:"binding_layout"`
|
||
FixPrice FlexInt64 `json:"fix_price"`
|
||
PageCount StringOrArray `json:"page_count"`
|
||
WordCount StringOrArray `json:"word_count"`
|
||
CatId ESCatIdObject `json:"cat_id"`
|
||
IsSuit int64 `json:"is_suit"`
|
||
}
|
||
|
||
type ESResponse struct {
|
||
ScrollID string `json:"_scroll_id"`
|
||
Hits struct {
|
||
Total struct {
|
||
Value int64 `json:"value"`
|
||
} `json:"total"`
|
||
Hits []struct {
|
||
Source BookData `json:"_source"`
|
||
} `json:"hits"`
|
||
} `json:"hits"`
|
||
}
|
||
|
||
/* ================= Custom Types ================= */
|
||
|
||
type StringOrArray string
|
||
|
||
type FlexInt64 int64
|
||
|
||
func (f *FlexInt64) UnmarshalJSON(data []byte) error {
|
||
// 尝试直接解析为数字
|
||
var num int64
|
||
if err := json.Unmarshal(data, &num); err == nil {
|
||
*f = FlexInt64(num)
|
||
return nil
|
||
}
|
||
// 尝试解析为字符串
|
||
var str string
|
||
if err := json.Unmarshal(data, &str); err == nil {
|
||
if str == "" {
|
||
*f = 0
|
||
return nil
|
||
}
|
||
if val, err := strconv.ParseInt(str, 10, 64); err == nil {
|
||
*f = FlexInt64(val)
|
||
return nil
|
||
}
|
||
if val, err := strconv.ParseFloat(str, 64); err == nil {
|
||
*f = FlexInt64(int64(val))
|
||
return nil
|
||
}
|
||
}
|
||
// 默认为0
|
||
*f = 0
|
||
return nil
|
||
}
|
||
|
||
func (s *StringOrArray) UnmarshalJSON(data []byte) error {
|
||
if len(data) > 0 && data[0] == '"' {
|
||
var str string
|
||
_ = json.Unmarshal(data, &str)
|
||
*s = StringOrArray(str)
|
||
return nil
|
||
}
|
||
var arr []string
|
||
if json.Unmarshal(data, &arr) == nil && len(arr) > 0 {
|
||
*s = StringOrArray(arr[0])
|
||
}
|
||
return nil
|
||
}
|
||
|
||
type Float64OrString float64
|
||
|
||
func (f *Float64OrString) UnmarshalJSON(data []byte) error {
|
||
var n float64
|
||
if json.Unmarshal(data, &n) == nil {
|
||
*f = Float64OrString(n)
|
||
return nil
|
||
}
|
||
var s string
|
||
if json.Unmarshal(data, &s) == nil {
|
||
if v, err := strconv.ParseFloat(strings.TrimSpace(s), 64); err == nil {
|
||
*f = Float64OrString(v)
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
/* ================= Main ================= */
|
||
|
||
func main() {
|
||
flag.Parse()
|
||
|
||
log.Println("开始全量同步 ES → Redis(强制全量写入)")
|
||
|
||
esClient, err := NewESClient([]string{esAddress}, esUsername, esPassword)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
redisClient, err := NewRedisClient(redisAddr, redisPassword, redisDB)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
defer redisClient.client.Close()
|
||
|
||
// 执行同步并获取总数
|
||
totalDocs, err := fetchAndWriteToRedis(context.Background(), esClient, redisClient)
|
||
if err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
|
||
// 验证数据完整性
|
||
verifyDataCount(context.Background(), redisClient, totalDocs)
|
||
}
|
||
|
||
func fetchAndWriteToRedis(ctx context.Context, esClient *ESClient, redisClient *RedisClient) (int64, error) {
|
||
// 详细统计变量
|
||
var (
|
||
readCount int64 // 从ES读取的总文档数
|
||
skipEmptyIsbn int64 // ISBN为空跳过的数量
|
||
convertFailed int64 // 转换或序列化失败的数量
|
||
writeSuccess int64 // 实际写入Redis成功的数量
|
||
writeFailed int64 // 写入失败的数量
|
||
sentToBatchChan int64 // 发送到batchChan的文档总数
|
||
receivedByWorkers int64 // 写入协程接收到的文档总数
|
||
)
|
||
|
||
const (
|
||
batchSize = 500
|
||
esWorkerCount = 8
|
||
redisWorkerCount = 32
|
||
bookChanSize = 10000
|
||
batchChanSize = 200
|
||
)
|
||
|
||
bookChan := make(chan BookData, bookChanSize)
|
||
batchChan := make(chan []BookData, batchChanSize)
|
||
var wg sync.WaitGroup
|
||
|
||
// ================= Redis批量写入协程 =================
|
||
for i := 0; i < redisWorkerCount; i++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
for batch := range batchChan {
|
||
atomic.AddInt64(&receivedByWorkers, int64(len(batch)))
|
||
|
||
// 准备要写入的键值对
|
||
type kv struct {
|
||
key string
|
||
val []byte
|
||
}
|
||
kvs := make([]kv, 0, len(batch))
|
||
|
||
for _, book := range batch {
|
||
// 跳过空ISBN
|
||
if book.ISBN == "" {
|
||
atomic.AddInt64(&skipEmptyIsbn, 1)
|
||
continue
|
||
}
|
||
|
||
// 安全转换(带panic恢复)
|
||
bookInfo, err := safeConvertBookData(book)
|
||
if err != nil {
|
||
atomic.AddInt64(&convertFailed, 1)
|
||
log.Printf("Worker %d 转换失败 ISBN=%s: %v", workerID, book.ISBN, err)
|
||
continue
|
||
}
|
||
|
||
data, err := json.Marshal(bookInfo)
|
||
if err != nil {
|
||
atomic.AddInt64(&convertFailed, 1)
|
||
log.Printf("Worker %d JSON序列化失败 ISBN=%s: %v", workerID, book.ISBN, err)
|
||
continue
|
||
}
|
||
kvs = append(kvs, kv{key: book.ISBN, val: data})
|
||
}
|
||
|
||
if len(kvs) == 0 {
|
||
continue
|
||
}
|
||
|
||
// 使用Pipeline批量写入,并逐个检查结果
|
||
pipe := redisClient.client.Pipeline()
|
||
cmds := make([]*redis.StatusCmd, 0, len(kvs))
|
||
for _, kv := range kvs {
|
||
cmd := pipe.Set(ctx, kv.key, kv.val, 0)
|
||
cmds = append(cmds, cmd)
|
||
}
|
||
|
||
// 执行Pipeline
|
||
_, err := pipe.Exec(ctx)
|
||
if err != nil && err != redis.Nil {
|
||
// Pipeline整体失败(如网络错误),整批标记为失败
|
||
atomic.AddInt64(&writeFailed, int64(len(kvs)))
|
||
log.Printf("Worker %d Pipeline执行失败: %v, 影响 %d 条", workerID, err, len(kvs))
|
||
continue
|
||
}
|
||
|
||
// 逐个检查每条命令的结果
|
||
successInBatch := 0
|
||
for i, cmd := range cmds {
|
||
if cmd.Err() != nil {
|
||
atomic.AddInt64(&writeFailed, 1)
|
||
log.Printf("Worker %d 单条写入失败 key=%s: %v", workerID, kvs[i].key, cmd.Err())
|
||
} else {
|
||
successInBatch++
|
||
}
|
||
}
|
||
atomic.AddInt64(&writeSuccess, int64(successInBatch))
|
||
if successInBatch < len(kvs) {
|
||
log.Printf("Worker %d 批次部分失败: 预期 %d, 成功 %d", workerID, len(kvs), successInBatch)
|
||
}
|
||
}
|
||
log.Printf("Redis写入协程 %d 退出,累计接收文档数 %d", workerID, atomic.LoadInt64(&receivedByWorkers))
|
||
}(i)
|
||
}
|
||
|
||
// ================= 批处理协程 =================
|
||
wg.Add(1)
|
||
go func() {
|
||
defer wg.Done()
|
||
var batch []BookData
|
||
batchSeq := 0
|
||
for book := range bookChan {
|
||
batch = append(batch, book)
|
||
if len(batch) >= batchSize {
|
||
batchSeq++
|
||
atomic.AddInt64(&sentToBatchChan, int64(len(batch)))
|
||
batchChan <- batch
|
||
batch = []BookData{}
|
||
}
|
||
}
|
||
// 处理剩余数据
|
||
if len(batch) > 0 {
|
||
batchSeq++
|
||
atomic.AddInt64(&sentToBatchChan, int64(len(batch)))
|
||
batchChan <- batch
|
||
}
|
||
close(batchChan)
|
||
log.Printf("批处理协程退出,共发送 %d 个批次,总文档数 %d", batchSeq, atomic.LoadInt64(&sentToBatchChan))
|
||
}()
|
||
|
||
// ================= ES 初始查询 =================
|
||
size := 10000
|
||
req := esapi.SearchRequest{
|
||
Index: []string{"books-from-mysql-v2"},
|
||
Size: &size,
|
||
Scroll: time.Minute,
|
||
}
|
||
res, err := req.Do(ctx, esClient.client)
|
||
if err != nil {
|
||
return 0, fmt.Errorf("初始查询失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
var esResp ESResponse
|
||
if err := json.NewDecoder(res.Body).Decode(&esResp); err != nil {
|
||
return 0, fmt.Errorf("解析初始响应失败: %w", err)
|
||
}
|
||
|
||
totalDocs := esResp.Hits.Total.Value
|
||
log.Printf("ES 总文档数: %d", totalDocs)
|
||
|
||
scrollID := esResp.ScrollID
|
||
defer clearScroll(ctx, esClient, scrollID)
|
||
|
||
// ================= 进度报告协程 =================
|
||
ticker := time.NewTicker(5 * time.Second)
|
||
go func() {
|
||
for range ticker.C {
|
||
read := atomic.LoadInt64(&readCount)
|
||
skip := atomic.LoadInt64(&skipEmptyIsbn)
|
||
convFail := atomic.LoadInt64(&convertFailed)
|
||
writeOK := atomic.LoadInt64(&writeSuccess)
|
||
writeErr := atomic.LoadInt64(&writeFailed)
|
||
sent := atomic.LoadInt64(&sentToBatchChan)
|
||
recv := atomic.LoadInt64(&receivedByWorkers)
|
||
progress := float64(read) / float64(totalDocs) * 100
|
||
log.Printf("进度: %.2f%% | 已读: %d | 跳过空ISBN: %d | 转换失败: %d | 写入成功: %d | 写入失败: %d | 发送到批处理: %d | 写入协程接收: %d",
|
||
progress, read, skip, convFail, writeOK, writeErr, sent, recv)
|
||
}
|
||
}()
|
||
|
||
// ================= 处理初始结果 =================
|
||
for _, hit := range esResp.Hits.Hits {
|
||
bookChan <- hit.Source
|
||
atomic.AddInt64(&readCount, 1)
|
||
}
|
||
|
||
// ================= 并行处理ES数据转换 =================
|
||
var processWg sync.WaitGroup
|
||
processChan := make(chan []BookData, esWorkerCount*2)
|
||
|
||
for i := 0; i < esWorkerCount; i++ {
|
||
processWg.Add(1)
|
||
go func(workerID int) {
|
||
defer processWg.Done()
|
||
for books := range processChan {
|
||
for _, book := range books {
|
||
bookChan <- book
|
||
atomic.AddInt64(&readCount, 1)
|
||
}
|
||
}
|
||
log.Printf("ES处理协程 %d 退出", workerID)
|
||
}(i)
|
||
}
|
||
|
||
// ================= Scroll 循环拉取剩余数据 =================
|
||
currentScrollID := scrollID
|
||
batchCount := 0
|
||
|
||
for {
|
||
scrollReq := esapi.ScrollRequest{
|
||
ScrollID: currentScrollID,
|
||
Scroll: time.Minute,
|
||
}
|
||
scrollRes, err := scrollReq.Do(ctx, esClient.client)
|
||
if err != nil {
|
||
log.Printf("Scroll请求失败: %v", err)
|
||
break
|
||
}
|
||
|
||
var scrollResp ESResponse
|
||
if err := json.NewDecoder(scrollRes.Body).Decode(&scrollResp); err != nil {
|
||
scrollRes.Body.Close()
|
||
log.Printf("解析Scroll响应失败: %v", err)
|
||
break
|
||
}
|
||
scrollRes.Body.Close()
|
||
|
||
if len(scrollResp.Hits.Hits) == 0 {
|
||
break
|
||
}
|
||
|
||
// 更新scrollID
|
||
currentScrollID = scrollResp.ScrollID
|
||
|
||
// 收集本批数据
|
||
books := make([]BookData, 0, len(scrollResp.Hits.Hits))
|
||
for _, hit := range scrollResp.Hits.Hits {
|
||
books = append(books, hit.Source)
|
||
}
|
||
processChan <- books
|
||
batchCount++
|
||
}
|
||
|
||
// 关闭处理通道,等待所有处理协程完成
|
||
close(processChan)
|
||
processWg.Wait()
|
||
close(bookChan)
|
||
wg.Wait()
|
||
ticker.Stop()
|
||
|
||
// ================= 最终统计 =================
|
||
log.Printf("同步完成汇总: 总读取=%d, 发送到批处理=%d, 写入协程接收=%d, 跳过空ISBN=%d, 转换失败=%d, 写入成功=%d, 写入失败=%d",
|
||
readCount, sentToBatchChan, receivedByWorkers, skipEmptyIsbn, convertFailed, writeSuccess, writeFailed)
|
||
|
||
// 检查数据完整性
|
||
if writeSuccess+writeFailed+skipEmptyIsbn+convertFailed != readCount {
|
||
log.Printf("⚠️ 统计不一致: 写入成功(%d)+失败(%d)+跳过(%d)+转换失败(%d)=%d, 但读取总数=%d",
|
||
writeSuccess, writeFailed, skipEmptyIsbn, convertFailed,
|
||
writeSuccess+writeFailed+skipEmptyIsbn+convertFailed, readCount)
|
||
}
|
||
|
||
return totalDocs, nil
|
||
}
|
||
|
||
// cleanKongFuZiCatId 清理孔夫子分类ID:将 > 替换为 /,并去除各段两端的空格
|
||
func cleanKongFuZiCatId(raw string) string {
|
||
parts := strings.Split(raw, ">")
|
||
for i, p := range parts {
|
||
parts[i] = strings.TrimSpace(p)
|
||
}
|
||
return strings.Join(parts, "/")
|
||
}
|
||
|
||
// safeConvertBookData 带 panic 恢复的转换函数
|
||
func safeConvertBookData(bookData BookData) (bookInfo BookInfo, err error) {
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
err = fmt.Errorf("panic in convertBookDataToBookInfoSafe: %v", r)
|
||
}
|
||
}()
|
||
return convertBookDataToBookInfoSafe(bookData)
|
||
}
|
||
|
||
// 安全的转换函数,返回error
|
||
func convertBookDataToBookInfoSafe(bookData BookData) (BookInfo, error) {
|
||
// 检查必要字段
|
||
if bookData.ISBN == "" {
|
||
return BookInfo{}, fmt.Errorf("ISBN为空")
|
||
}
|
||
|
||
bookInfo := BookInfo{
|
||
Isbn: bookData.ISBN,
|
||
BookName: string(bookData.BookName),
|
||
Author: bookData.Author,
|
||
Publishing: bookData.Publisher,
|
||
PublicationDate: bookData.PublicationTime,
|
||
Binding: bookData.BindingLayout,
|
||
Format: 0,
|
||
CatIdObject: CatIdObject{
|
||
PinDuoDuoCatId: bookData.CatId.PinDuoDuoCatId,
|
||
KongFuZiCatId: cleanKongFuZiCatId(bookData.CatId.KongFuZiCatId),
|
||
XianYuCatId: bookData.CatId.XianYuCatId,
|
||
},
|
||
}
|
||
|
||
// 页数转换
|
||
if pageCountStr := strings.TrimSpace(string(bookData.PageCount)); pageCountStr != "" {
|
||
if pageCount, err := strconv.ParseInt(pageCountStr, 10, 64); err == nil {
|
||
bookInfo.PagesCount = pageCount
|
||
}
|
||
}
|
||
|
||
// 字数转换
|
||
if wordCountStr := strings.TrimSpace(string(bookData.WordCount)); wordCountStr != "" {
|
||
if wordCount, err := strconv.ParseInt(wordCountStr, 10, 64); err == nil {
|
||
bookInfo.WordsCount = wordCount
|
||
}
|
||
}
|
||
|
||
// 出版时间转换(修复逻辑:支持多种格式)
|
||
if publicationTimeStr := strings.TrimSpace(bookData.PublicationTime); publicationTimeStr != "" {
|
||
publicationTime, err := strconv.ParseInt(publicationTimeStr, 10, 64)
|
||
publicationTime = publicationTime - 5364000000
|
||
timestamp, err := strconv.ParseInt(strconv.FormatInt(publicationTime, 10), 10, 64)
|
||
//fmt.Println(timestamp)
|
||
if err == nil {
|
||
var t time.Time
|
||
// 判断是秒级还是毫秒级时间戳(假设大于 1e9 的是毫秒级)
|
||
//if timestamp > 1e12 { // 毫秒级
|
||
// t = time.UnixMilli(timestamp)
|
||
//} else { // 秒级
|
||
t = time.Unix(timestamp, 0)
|
||
//}
|
||
|
||
bookInfo.PublicationDate = t.Format("2006-01")
|
||
|
||
//fmt.Println("=======PublicationDate", bookInfo.PublicationDate, bookData.ISBN)
|
||
|
||
} else {
|
||
// 转换失败,赋值为空字符串
|
||
bookInfo.PublicationDate = ""
|
||
}
|
||
}
|
||
|
||
// 价格
|
||
bookInfo.Price = int64(bookData.FixPrice)
|
||
|
||
// 构建图片对象
|
||
imageObject := ImageObject{
|
||
CarouselUrlArray: []string{},
|
||
DetailUrlObject: DetailImageObject{
|
||
IntroductionUrl: []string{},
|
||
CatalogueUrl: []string{},
|
||
LiveShootingUrl: []string{},
|
||
OtherUrl: []string{},
|
||
},
|
||
}
|
||
|
||
if bookData.BookPic.PddPath != "" {
|
||
imageObject.CarouselUrlArray = append(imageObject.CarouselUrlArray, bookData.BookPic.PddPath)
|
||
}
|
||
if bookData.BookDefPic.PddPath != "" {
|
||
imageObject.DefaultImageUrl = bookData.BookDefPic.PddPath
|
||
}
|
||
if bookData.BookPicS.PddResponse != "" {
|
||
imageObject.WhiteBackgroundUrl = bookData.BookPicS.PddResponse
|
||
}
|
||
if bookData.BookDetailImage.PddPath != "" {
|
||
imageObject.DetailUrlObject.IntroductionUrl = append(imageObject.DetailUrlObject.IntroductionUrl, bookData.BookDetailImage.PddPath)
|
||
}
|
||
if bookData.BookDirectoryImage.PddPath != "" {
|
||
imageObject.DetailUrlObject.CatalogueUrl = append(imageObject.DetailUrlObject.CatalogueUrl, bookData.BookDirectoryImage.PddPath)
|
||
}
|
||
|
||
bookInfo.ImageObject = &imageObject
|
||
return bookInfo, nil
|
||
}
|
||
|
||
// 验证数据完整性
|
||
func verifyDataCount(ctx context.Context, redisClient *RedisClient, expectedCount int64) {
|
||
log.Println("开始验证数据完整性...")
|
||
|
||
var cursor uint64
|
||
var totalCount int64
|
||
var batchSize int64 = 1000
|
||
|
||
for {
|
||
var keys []string
|
||
var err error
|
||
keys, cursor, err = redisClient.client.Scan(ctx, cursor, "*", batchSize).Result()
|
||
if err != nil {
|
||
log.Printf("Scan失败: %v", err)
|
||
break
|
||
}
|
||
|
||
totalCount += int64(len(keys))
|
||
log.Printf("Scan进度: 已获取 %d 个key", totalCount)
|
||
|
||
if cursor == 0 {
|
||
break
|
||
}
|
||
}
|
||
|
||
log.Printf("验证结果 - ES文档数: %d, Redis存储数: %d", expectedCount, totalCount)
|
||
|
||
if totalCount == expectedCount {
|
||
log.Println("✅ 数据完整性验证通过,所有数据已完整写入")
|
||
} else if totalCount < expectedCount {
|
||
log.Printf("⚠️ 数据不完整,缺失 %d 条", expectedCount-totalCount)
|
||
} else {
|
||
log.Printf("⚠️ Redis数据多于ES,多出 %d 条", totalCount-expectedCount)
|
||
}
|
||
}
|
||
|
||
func clearScroll(ctx context.Context, esClient *ESClient, scrollID string) {
|
||
if scrollID == "" {
|
||
return
|
||
}
|
||
req := esapi.ClearScrollRequest{ScrollID: []string{scrollID}}
|
||
res, err := req.Do(ctx, esClient.client)
|
||
if err != nil {
|
||
log.Printf("清除Scroll失败: %v", err)
|
||
return
|
||
}
|
||
defer res.Body.Close()
|
||
log.Println("Scroll已清除")
|
||
}
|
||
|
||
// BookInfo Redis中存储的书籍信息结构
|
||
type BookInfo struct {
|
||
Isbn string `json:"isbn"` // ISBN
|
||
BookName string `json:"book_name"` // 书名
|
||
Author string `json:"author"` // 作者
|
||
Publishing string `json:"publishing"` // 出版社
|
||
PublicationDate string `json:"publication_date"` // 出版时间
|
||
Binding string `json:"binding"` // 装帧
|
||
PagesCount int64 `json:"pages_count"` // 页数
|
||
WordsCount int64 `json:"words_count"` // 字数
|
||
Format int64 `json:"format"` // 开本
|
||
ImageObject *ImageObject `json:"image_object"` // 图片
|
||
Price int64 `json:"price"` // 售价(分)
|
||
CatIdObject CatIdObject `json:"cat_id"` // 分类
|
||
IsSuit int64 `json:"is_suit"` // 套装书
|
||
}
|
||
|
||
// ImageObject 图片对象结构
|
||
type ImageObject struct {
|
||
CarouselUrlArray []string `json:"carousel_url_array"` // 轮播图
|
||
WhiteBackgroundUrl string `json:"white_background_url"` // 白底图
|
||
DetailUrlObject DetailImageObject `json:"detail_url_object"` // 详情对象
|
||
DefaultImageUrl string `json:"default_image_url"` // 默认图
|
||
}
|
||
|
||
// DetailImageObject 详情图片对象结构
|
||
type DetailImageObject struct {
|
||
IntroductionUrl []string `json:"introduction_url"` // 简介图
|
||
CatalogueUrl []string `json:"catalogue_url"` // 目录图
|
||
LiveShootingUrl []string `json:"live_shooting_url"` // 实拍图
|
||
OtherUrl []string `json:"other_url"` // 其他图
|
||
}
|
||
|
||
// CatIdObject 分类ID对象
|
||
type CatIdObject struct {
|
||
PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` // 拼多多分类 ID
|
||
KongFuZiCatId string `json:"kong_fu_zi_cat_id"` // 孔夫子分类 ID
|
||
XianYuCatId string `json:"xian_yu_cat_id"` // 闲鱼分类 ID
|
||
}
|