package service import ( "bufio" "bytes" "centerBook/es" "centerBook/model/request" "centerBook/tail" "centerBook/util" "centerBook/util/redisClient" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v8/esapi" jsoniter "github.com/json-iterator/go" "io" "log" "strconv" "strings" "time" ) // BookService 图书搜索服务 type BookService struct { esClient *es.ESClient } // NewBookService 创建图书搜索服务实例 func NewBookService(esClient *es.ESClient) *BookService { return &BookService{ esClient: esClient, } } // UpdateBookResult 更新结果 type UpdateBookResult struct { ISBN string Updated int Fields map[string]interface{} } // AddBookResult 添加图书结果 type AddBookResult struct { Book *es.ESBook Source string // "es" 或 "service" } // esHitsWrapper ES 命中包装器 type esHitsWrapper struct { Hits struct { Total struct { Value int `json:"value"` } `json:"total"` Hits []struct { Index string `json:"_index"` ID string `json:"_id"` Source es.ESBook `json:"_source"` } `json:"hits"` } `json:"hits"` } // QueryCondition 查询条件构建器 type QueryCondition struct { Field string // ES 字段名 Value interface{} // 查询值 Type string // 查询类型:term, match, prefix, range, wildcard, exists, bool Operator string // match 操作符:and, or GTE interface{} // range: 大于等于 LTE interface{} // range: 小于等于 Pattern string // wildcard: 匹配模式 Must []map[string]interface{} `json:"must,omitempty"` // bool: must MustNot []map[string]interface{} `json:"must_not,omitempty"` // bool: must_not Should []map[string]interface{} `json:"should,omitempty"` // bool: should } // SalesInfo 销量信息 type SalesInfo struct { DaySale7, DaySale15, DaySale30, DaySale60, DaySale90, DaySale180, DaySale365 int ThisYearSale, LastYearSale, TotalSale int } // ESQueryBuilder ES 查询构建器 type ESQueryBuilder struct { mustQueries []map[string]interface{} boolMust []map[string]interface{} boolMustNot []map[string]interface{} boolShould []map[string]interface{} minShouldMatch int } // SearchBookBaseInfo 搜索图书基础信息 func (svc *BookService) SearchBookBaseInfo(request *request.BookSearchRequest) ([]es.ESBook, int, error) { queryBuilder := NewESQueryBuilder() // ===== saleSelect 对应字段映射 ===== saleField := map[string]string{ "7": "day_sale_7", "15": "day_sale_15", "30": "day_sale_30", "60": "day_sale_60", "90": "day_sale_90", "180": "day_sale_180", "365": "day_sale_365", "0": "this_year_sale", "1": "last_year_sale", }[request.SaleSelect] log.Printf("[DEBUG] saleSelect=%s saleField=%s", request.SaleSelect, saleField) // saleField >= 1 默认条件 if saleField != "" { queryBuilder.AddQuery(&QueryCondition{ Field: saleField, Type: "range", GTE: 1, }) } // ========== 构建查询条件 ========== svc.buildISuitCondition(queryBuilder, request.IsSuit) svc.buildIsReturnCondition(queryBuilder, request.IsReturn) svc.buildIsFilterCondition(queryBuilder, request.IsFilter, request.ShopType) svc.buildCategoryTypeCondition(queryBuilder, request.CategoryType) svc.buildCategoryCondition(queryBuilder, request.Category) svc.buildBookPicCondition(queryBuilder, request.BookPic, request.PicType) svc.buildBuyCountsCondition(queryBuilder, request.BuyCounts, saleField) svc.buildTotalSaleRangeCondition(queryBuilder, request.TotalSaleRange) svc.buildNumericRangeConditions(queryBuilder, request, saleField) svc.buildExactMatchConditions(queryBuilder, request) svc.buildFuzzyMatchConditions(queryBuilder, request) svc.buildDefaultPrefixConditions(queryBuilder, request) // ========== 分页和排序 ========== from := (request.Page - 1) * request.PageSize log.Printf("[DEBUG] page=%d pageSize=%d from=%d", request.Page, request.PageSize, from) var sort []map[string]interface{} if request.PageSize >= 500 { sort = []map[string]interface{}{ {"id": map[string]interface{}{"order": "asc"}}, } } else { sort = []map[string]interface{}{ {"update_time": map[string]interface{}{"order": "desc"}}, } } // ========== 构建 ES 查询 ========== query := queryBuilder.Build(from, request.PageSize, sort) body, _ := json.MarshalIndent(query, "", " ") log.Printf("[DEBUG] ES Query Body:\n%s", string(body)) // ========== 执行 ES 查询 ========== res, err := svc.esClient.Client.Search( svc.esClient.Client.Search.WithIndex(es.ESIndex), svc.esClient.Client.Search.WithBody(bytes.NewReader(body)), svc.esClient.Client.Search.WithTrackTotalHits(true), ) log.Printf("[DEBUG] ES Query Response Status: %s", res.Status) if err != nil { log.Printf("[ERROR] ES.Client.Search error: %v", err) return nil, 0, err } defer res.Body.Close() // 读取响应 var buf bytes.Buffer // 使用CopyBuffer可以重用缓冲区 writer := bufio.NewWriterSize(&buf, 8192) _, err = io.Copy(writer, res.Body) if err != nil { return nil, 0, fmt.Errorf("复制响应数据失败: %v", err) } err = writer.Flush() if err != nil { return nil, 0, fmt.Errorf("刷新缓冲区失败:%v", err) } rawData := buf.Bytes() if err != nil { return nil, 0, fmt.Errorf("读取响应失败: %v", err) } // 检查是否有数据 if len(rawData) == 0 { return nil, 0, fmt.Errorf("ES返回空响应") } // 验证是否是有效的JSON if rawData[0] != '{' { return nil, 0, fmt.Errorf("ES返回非JSON响应: %s", string(rawData[:min(100, len(rawData))])) } fmt.Println("[DEBUG] ES query executed successfully") var _json = jsoniter.ConfigCompatibleWithStandardLibrary var parsed esHitsWrapper if err := _json.Unmarshal(rawData, &parsed); err != nil { return nil, 0, fmt.Errorf("JSON解析失败: %v, 原始数据: %s", err, string(rawData[:min(200, len(rawData))])) } list := make([]es.ESBook, 0, len(parsed.Hits.Hits)) for _, hit := range parsed.Hits.Hits { list = append(list, hit.Source) } return list, parsed.Hits.Total.Value, nil } // AddBookToESHandler 根据 ISBN 查询 ES 中是否存在,不存在则新增,存在则更新 func (svc *BookService) AddBookToESHandler(ctx context.Context, req *es.ESBook) (*AddBookResult, error) { // 先查 ES 是否存在 book, err := svc.SearchBookByISBN(req.ISBN) if err != nil { return nil, fmt.Errorf("查询 ES 失败:%v", err) } // 已存在,处理更新逻辑 if book != nil { updateData := svc.buildUpdateData(book, req) if len(updateData) > 0 { updateData["update_time"] = fmt.Sprintf("%d", time.Now().Unix()) if _, err := svc.UpdateBookFieldsByISBN(&request.BookUpdateRequest{ ISBN: req.ISBN, Data: updateData, }); err != nil { return nil, fmt.Errorf("补全 ES 字段失败:%v", err) } // 重新查询最新数据 book, _ = svc.SearchBookByISBN(req.ISBN) } return &AddBookResult{ Book: book, Source: "es", }, nil } // 不存在,执行新增 newBook, err := svc.addBookToES(ctx, req) if err != nil { return &AddBookResult{ Book: nil, Source: "", }, nil } return &AddBookResult{ Book: newBook, Source: "service", }, nil } // UpdateBookFieldsByISBN 更新图书字段 func (svc *BookService) UpdateBookFieldsByISBN(request *request.BookUpdateRequest) (*UpdateBookResult, error) { // 先确认 ISBN 是否存在 book, err := svc.SearchBookByISBN(request.ISBN) if err != nil { return nil, fmt.Errorf("查询 ES 失败:%v", err) } if book == nil { return nil, fmt.Errorf("未找到该 ISBN 对应的图书") } // 构建更新脚本 var scriptParts []string params := make(map[string]interface{}) allowedFields := map[string]bool{ "book_name": true, "book_pic": true, "book_pic_s": true, "book_pic_b": true, "book_pic_w": true, "author": true, "category": true, "publisher": true, "publication_time": true, "binding_layout": true, "fix_price": true, "content": true, "is_suit": true, "day_sale_7": true, "day_sale_15": true, "day_sale_30": true, "day_sale_60": true, "day_sale_90": true, "day_sale_180": true, "day_sale_365": true, "this_year_sale": true, "last_year_sale": true, "total_sale": true, "buy_counts": true, "sell_counts": true, "is_illegal": true, "is_return": true, "is_filter": true, "update_time": true, "page_count": true, "word_count": true, "book_format": true, "cat_id": true, } // 判断 is_suit 是否已传递,如果没传则自动检测 if _, exists := request.Data["is_suit"]; !exists { // 未传递 is_suit,自动检测并设置 params["is_suit"] = map[bool]int{true: 1, false: 0}[es.CheckBookSuit(book.BookName.Value)] scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.is_suit = params.is_suit;")) } for field, value := range request.Data { if !allowedFields[field] { continue } scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.%s = params.%s;", field, field)) params[field] = value } if len(scriptParts) == 0 { return nil, fmt.Errorf("没有有效的更新字段") } body := map[string]interface{}{ "script": map[string]interface{}{ "source": strings.Join(scriptParts, " "), "lang": "painless", "params": params, }, "query": map[string]interface{}{ "term": map[string]interface{}{ "isbn": request.ISBN, }, }, } payload, _ := json.Marshal(body) res, err := svc.esClient.Client.UpdateByQuery( []string{es.ESIndex}, svc.esClient.Client.UpdateByQuery.WithBody(bytes.NewReader(payload)), svc.esClient.Client.UpdateByQuery.WithRefresh(true), svc.esClient.Client.UpdateByQuery.WithConflicts("proceed"), ) if err != nil { return nil, fmt.Errorf("ES更新失败: %s", err) } defer res.Body.Close() if res.IsError() { return nil, fmt.Errorf("ES返回错误: %s", res.String()) } // 解析响应 var parsed struct { Updated int `json:"updated"` } if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { return nil, fmt.Errorf("解析 ES 响应失败:%v", err) } log.Printf("[INFO] UpdateBookFieldsByISBN | ISBN=%s | updated=%d", request.ISBN, parsed.Updated) // 同步 Redis _ = svc.SyncRedisByISBN(request.ISBN, "update") return &UpdateBookResult{ ISBN: request.ISBN, Updated: parsed.Updated, Fields: request.Data, }, nil } // UpdateBookCatIdByISBNHandler 更新图书字段 func (svc *BookService) UpdateBookCatIdByISBNHandler(request *request.BookUpdateRequest) (*UpdateBookResult, error) { // 先确认 ISBN 是否存在 book, err := svc.SearchBookByISBN(request.ISBN) if err != nil { return nil, fmt.Errorf("查询 ES 失败:%v", err) } if book == nil { return nil, fmt.Errorf("未找到该 ISBN 对应的图书") } catIdValue, exists := request.Data["cat_id"] if !exists { return nil, fmt.Errorf("没有有效的更新字段") } body := map[string]interface{}{ "script": map[string]interface{}{ "source": "ctx._source.cat_id = params.cat_id;", "lang": "painless", "params": map[string]interface{}{ "cat_id": catIdValue, }, }, "query": map[string]interface{}{ "term": map[string]interface{}{ "isbn": request.ISBN, }, }, } payload, _ := json.Marshal(body) res, err := svc.esClient.Client.UpdateByQuery( []string{es.ESIndex}, svc.esClient.Client.UpdateByQuery.WithBody(bytes.NewReader(payload)), svc.esClient.Client.UpdateByQuery.WithRefresh(true), svc.esClient.Client.UpdateByQuery.WithConflicts("proceed"), ) if err != nil { return nil, fmt.Errorf("ES更新失败: %s", err) } defer res.Body.Close() if res.IsError() { return nil, fmt.Errorf("ES返回错误: %s", res.String()) } // 解析响应 var parsed struct { Updated int `json:"updated"` } if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { return nil, fmt.Errorf("解析 ES 响应失败:%v", err) } log.Printf("[INFO] UpdateBookFieldsByISBN | ISBN=%s | updated=%d", request.ISBN, parsed.Updated) // 同步 Redis _ = svc.SyncRedisByISBN(request.ISBN, "update") return &UpdateBookResult{ ISBN: request.ISBN, Updated: parsed.Updated, Fields: map[string]interface{}{"cat_id": catIdValue}, }, nil } // DeleteBookByISBN 删除图书 func (svc *BookService) DeleteBookByISBN(request *request.BookDelByIsbnRequest) error { isbn := strings.TrimSpace(request.ISBN) log.Printf("[DeleteBookByISBN] 开始删除 | ISBN=%s", isbn) if isbn == "" { return fmt.Errorf("ISBN 不能为空") } query := fmt.Sprintf(`{ "query": { "term": { "isbn": "%s" } } }`, isbn) res, err := svc.esClient.Client.DeleteByQuery( []string{es.ESIndex}, strings.NewReader(query), svc.esClient.Client.DeleteByQuery.WithRefresh(true), ) if err != nil { return fmt.Errorf("执行 DeleteByQuery 失败: %v", err) } defer res.Body.Close() if res.IsError() { return fmt.Errorf("ES 返回错误: %s", res.String()) } // 同步 Redis _ = svc.SyncRedisByISBN(request.ISBN, "del") log.Printf("[DeleteBookByISBN] 成功删除 ISBN=%s 对应的文档", isbn) return nil } // DeleteBookByID 通过 ID 删除 ES 文档 func (svc *BookService) DeleteBookByID(request *request.BookDelByIdRequest) error { id := strings.TrimSpace(request.ID) log.Printf("[DeleteBookByID] 开始删除 | ID=%s", id) if id == "" { return fmt.Errorf("ID 不能为空") } isbn, err := svc.SearchBookISBNByID(id) if err != nil { log.Printf("[DeleteBookByID] 获取 ISBN 失败:%v", err) } query := fmt.Sprintf(`{ "query": { "term": { "id": "%s" } } }`, id) res, err := svc.esClient.Client.DeleteByQuery( []string{es.ESIndex}, strings.NewReader(query), svc.esClient.Client.DeleteByQuery.WithRefresh(true), ) if err != nil { return fmt.Errorf("执行 DeleteByQuery 失败: %v", err) } defer res.Body.Close() if res.IsError() { return fmt.Errorf("ES 返回错误: %s", res.String()) } // 同步 Redis if isbn != "" { _ = svc.SyncRedisByISBN(isbn, "del") } log.Printf("[DeleteBookByID] 成功删除 ID=%s 对应的文档", id) return nil } // SearchBookISBNByID 根据 ES ID 查询 ISBN func (svc *BookService) SearchBookISBNByID(id string) (string, error) { log.Printf("[SearchBookISBNByID] 开始查询 | ID=%s", id) query := map[string]interface{}{ "query": map[string]interface{}{ "term": map[string]interface{}{ "id": id, }, }, "_source": []string{"isbn"}, "size": 1, } body, err := json.Marshal(query) if err != nil { log.Printf("[SearchBookISBNByID] 构建查询 JSON 失败:%v", err) return "", fmt.Errorf("构建查询 JSON 失败:%v", err) } res, err := svc.esClient.Client.Search( svc.esClient.Client.Search.WithIndex(es.ESIndex), svc.esClient.Client.Search.WithBody(bytes.NewReader(body)), svc.esClient.Client.Search.WithTrackTotalHits(true), ) if err != nil { log.Printf("[SearchBookISBNByID] ES 查询失败:%v", err) return "", fmt.Errorf("ES 查询失败:%v", err) } defer res.Body.Close() if res.IsError() { log.Printf("[SearchBookISBNByID] ES 返回错误:%s", res.String()) return "", fmt.Errorf("ES 返回错误:%s", res.String()) } var parsed esHitsWrapper if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { log.Printf("[SearchBookISBNByID] 解析 ES 响应失败:%v", err) return "", fmt.Errorf("解析 ES 响应失败:%v", err) } if len(parsed.Hits.Hits) == 0 { log.Printf("[SearchBookISBNByID] 未找到 ID=%s 对应文档", id) return "", nil } isbn := parsed.Hits.Hits[0].Source.ISBN log.Printf("[SearchBookISBNByID] 查询到 ISBN: %s", isbn) return isbn, nil } // buildUpdateData 构建更新数据(只补充空值字段) func (svc *BookService) buildUpdateData(existing, new *es.ESBook) map[string]interface{} { updateData := make(map[string]interface{}) // 定义字段检查规则 fieldChecks := []struct { condition bool key string value interface{} }{ {existing.Publisher == "" && new.Publisher != "", "publisher", new.Publisher}, {existing.PublicationTime == "" && new.PublicationTime != "", "publication_time", new.PublicationTime}, {existing.BookName.Value == "" && new.BookName.Value != "", "book_name", new.BookName.Value}, {existing.Author == "" && new.Author != "", "author", new.Author}, {existing.BookPic.PddPath == "" && new.BookPic.PddPath != "", "book_pic", map[string]interface{}{"localPath": "", "pddPath": new.BookPic.PddPath}}, {existing.BookPicS.PddResponse == "" && new.BookPicS.PddResponse != "", "book_pic_s", map[string]interface{}{"localPath": "", "pddResponse": new.BookPicS.PddResponse}}, } // 遍历并添加需要更新的字段 for _, check := range fieldChecks { if check.condition { updateData[check.key] = check.value } } log.Printf("更新数据:%+v", updateData) return updateData } // addBookToES 新增图书到 ES func (svc *BookService) addBookToES(ctx context.Context, req *es.ESBook) (*es.ESBook, error) { if req.ISBN == "" { return nil, fmt.Errorf("ISBN 不能为空") } // 获取并解析销量数据 salesData, _ := tail.CheckSales([]string{req.ISBN}) salesInfo := svc.parseSalesData(salesData, req.BuyCounts) // 处理出版时间,转换为时间戳 publicationTimeTimestamp := req.PublicationTime if req.PublicationTime != "" && req.PublicationTime != "0" { publicationTimeTimestamp = strconv.FormatInt(util.ParsePublicationTime(publicationTimeTimestamp), 10) } // 构建 ES 文档 doc := map[string]interface{}{ "id": svc.generateNewID(), "book_name": req.BookName.Value, "book_pic": es.BookPicObj{LocalPath: "", PddPath: req.BookPic.PddPath}, "book_pic_s": es.BookPicSObj{LocalPath: "", PddResponse: req.BookPicS.PddResponse}, "book_pic_b": req.BookPicB, "book_pic_w": make(map[string]interface{}), "isbn": req.ISBN, "author": req.Author, "category": req.Category, "publisher": req.Publisher, "publication_time": publicationTimeTimestamp, "binding_layout": req.BindingLayout, "fix_price": req.FixPrice, "content": req.Content, "is_suit": map[bool]int{true: 1, false: 0}[es.CheckBookSuit(req.BookName.Value)], "day_sale_7": salesInfo.DaySale7, "day_sale_15": salesInfo.DaySale15, "day_sale_30": salesInfo.DaySale30, "day_sale_60": salesInfo.DaySale60, "day_sale_90": salesInfo.DaySale90, "day_sale_180": salesInfo.DaySale180, "day_sale_365": salesInfo.DaySale365, "this_year_sale": salesInfo.ThisYearSale, "last_year_sale": salesInfo.LastYearSale, "total_sale": salesInfo.TotalSale, "buy_counts": req.BuyCounts, "sell_counts": req.SellCounts, "book_pic_obj": req.BookPicObj, "book_pic_obj_s": req.BookPicObjS, "is_illegal": 0, "is_return": 0, "is_filter": "000000", "update_time": es.NumberOrString(fmt.Sprintf("%d", time.Now().Unix())), "page_count": req.PageCount, "word_count": req.WordCount, "book_format": req.BookFormat, } // 写入 ES if err := svc.indexDocumentToES(ctx, doc, req.ISBN); err != nil { return nil, err } // 同步 Redis _ = svc.SyncRedisByISBN(req.ISBN, "update") // 构建返回对象 return &es.ESBook{ ID: doc["id"].(int64), BookName: es.FlexibleString{Value: req.BookName.Value}, BookPic: doc["book_pic"].(es.BookPicObj), BookPicS: doc["book_pic_s"].(es.BookPicSObj), BookPicB: req.BookPicB, BookPicW: doc["book_pic_w"].(map[string]interface{}), ISBN: req.ISBN, Author: req.Author, Category: req.Category, Publisher: req.Publisher, PublicationTime: req.PublicationTime, BindingLayout: req.BindingLayout, FixPrice: req.FixPrice, Content: req.Content, IsSuit: doc["is_suit"].(int), DaySale7: doc["day_sale_7"].(int), DaySale15: doc["day_sale_15"].(int), DaySale30: doc["day_sale_30"].(int), DaySale60: doc["day_sale_60"].(int), DaySale90: doc["day_sale_90"].(int), DaySale180: doc["day_sale_180"].(int), DaySale365: doc["day_sale_365"].(int), ThisYearSale: doc["this_year_sale"].(int), LastYearSale: doc["last_year_sale"].(int), TotalSale: doc["total_sale"].(int), BuyCounts: req.BuyCounts, SellCounts: req.SellCounts, BookPicObj: req.BookPicObj, BookPicObjS: req.BookPicObjS, UpdateTime: doc["update_time"].(es.NumberOrString), IsIllegal: 0, IsReturn: 0, IsFilter: "000000", PageCount: req.PageCount, WordCount: req.WordCount, BookFormat: req.BookFormat, }, nil } // SyncRedisByISBN 同步到Redis func (svc *BookService) SyncRedisByISBN(isbn string, act string) error { client, err := redisClient.GetClientByName("db1") if err != nil { log.Printf("[SyncRedisByISBN] 获取 Redis 客户端失败:%v", err) return err } if act == "del" { exists, _ := client.Exists(context.Background(), isbn).Result() fmt.Println("[SyncRedisByISBN] exists:", exists) if exists > 0 { if err := client.Del(context.Background(), isbn).Err(); err != nil { log.Printf("[SyncRedisByISBN] 删除 Redis 失败:%v", err) return err } } } else { book, err := svc.SearchBookByISBN(isbn) if err != nil { log.Printf("[SyncRedisByISBN] 查询 ES 错误:%v", err) return err } redisBookInfo := request.BookInfo{} if book.ISBN != "" { redisBookInfo.Isbn = book.ISBN } if book.BookName.Value != "" { redisBookInfo.BookName = book.BookName.Value } if book.Author != "" { redisBookInfo.Author = book.Author } if book.Publisher != "" { redisBookInfo.Publishing = book.Publisher } if book.PublicationTime != "" && book.PublicationTime != "0" { publicationTimeIn64, err := strconv.ParseInt(book.PublicationTime, 10, 64) if err == nil { redisBookInfo.PublicationDate = time.Unix(publicationTimeIn64, 0).Format("2006-01") } } if book.BindingLayout != "" { redisBookInfo.Binding = book.BindingLayout } if book.PageCount != "" && book.PageCount != "0" { pageCount, err := strconv.ParseInt(string(book.PageCount), 10, 64) if err == nil { redisBookInfo.PagesCount = pageCount } } if book.WordCount != "" && book.WordCount != "0" { wordCount, err := strconv.ParseInt(string(book.WordCount), 10, 64) if err == nil { redisBookInfo.WordsCount = wordCount } } if book.BookFormat != "" && book.BookFormat != "0" { bookFormat, err := strconv.ParseInt(string(book.BookFormat), 10, 64) if err == nil { redisBookInfo.Format = bookFormat } } whiteBackgroundUrl := book.BookPicB carouselUrls := []string{} liveShootingUrl := []string{} if book.BookPic.PddPath != "" { carouselUrls = append(carouselUrls, book.BookPic.PddPath) } if book.BookPicS.PddResponse != "" { liveShootingUrl = append(liveShootingUrl, book.BookPicS.PddResponse) } redisBookInfo.ImageObject = &request.ImageObject{ CarouselUrlArray: carouselUrls, WhiteBackgroundUrl: whiteBackgroundUrl, DefaultImageUrl: book.BookDefPic.PddPath, DetailUrlObject: request.DetailImageObject{ IntroductionUrl: []string{}, CatalogueUrl: []string{}, LiveShootingUrl: liveShootingUrl, OtherUrl: []string{}, }, } redisBookInfo.Price = int64(book.FixPrice) redisBookInfo.CatIdObject = request.CatIdObject{ PinDuoDuoCatId: book.CatId.PinDuoDuoCatId, KongFuZiCatId: book.CatId.KongFuZiCatId, XianYuCatId: book.CatId.XianYuCatId, } jsonData, err := json.Marshal(redisBookInfo) if err != nil { log.Printf("[SyncRedisByISBN] 序列化 BookInfo 失败:%v", err) return err } if err := client.Set(context.Background(), isbn, jsonData, 0).Err(); err != nil { log.Printf("[SyncRedisByISBN] 更新 Redis 失败:%v", err) return err } } log.Printf("[SyncRedisByISBN] 成功同步 ISBN=%s 到 Redis", isbn) return nil } // generateNewID 生成新 ID func (svc *BookService) generateNewID() int64 { lastID, err := svc.GetLastID() if err != nil { log.Printf("[WARN] GetLastID failed: %v, using timestamp", err) return time.Now().UnixNano() / 1e6 } return int64(lastID + 1) } // parseSalesData 解析销量数据 func (svc *BookService) parseSalesData(salesData *tail.SalesResponse, buyCounts int64) *SalesInfo { info := &SalesInfo{} if salesData == nil || salesData.Data == nil { return info } parse := func(s string) int { if v, _ := strconv.Atoi(s); v > 0 { return v } return 0 } for _, s := range salesData.Data { info = &SalesInfo{ DaySale7: parse(s.DaySale7), DaySale15: parse(s.DaySale15), DaySale30: parse(s.DaySale30), DaySale60: parse(s.DaySale60), DaySale90: parse(s.DaySale90), DaySale180: parse(s.DaySale180), DaySale365: parse(s.DaySale365), ThisYearSale: parse(s.ThisYearSale), LastYearSale: parse(s.LastYearSale), TotalSale: parse(s.Sale), } break } if info.TotalSale == 0 && buyCounts > 0 { info.TotalSale = int(buyCounts) } return info } // buildBookMapForSerialization 构建序列化用的 map func (svc *BookService) buildBookMapForSerialization(book *es.ESBook) map[string]interface{} { return map[string]interface{}{ "id": book.ID, "book_name": book.BookName.Value, "book_pic": book.BookPic, "book_pic_s": book.BookPicS, "book_pic_b": book.BookPicB, "book_pic_w": book.BookPicW, "isbn": book.ISBN, "author": book.Author, "category": book.Category, "publisher": book.Publisher, "publication_time": book.PublicationTime, "binding_layout": book.BindingLayout, "fix_price": book.FixPrice, "content": book.Content, "is_suit": book.IsSuit, "day_sale_7": book.DaySale7, "day_sale_15": book.DaySale15, "day_sale_30": book.DaySale30, "day_sale_60": book.DaySale60, "day_sale_90": book.DaySale90, "day_sale_180": book.DaySale180, "day_sale_365": book.DaySale365, "this_year_sale": book.ThisYearSale, "last_year_sale": book.LastYearSale, "total_sale": book.TotalSale, "buy_counts": book.BuyCounts, "sell_counts": book.SellCounts, "book_pic_obj": book.BookPicObj, "book_pic_obj_s": book.BookPicObjS, "is_illegal": book.IsIllegal, "is_return": book.IsReturn, "is_filter": book.IsFilter, "update_time": book.UpdateTime, } } // indexDocumentToES 写入 ES func (svc *BookService) indexDocumentToES(ctx context.Context, doc map[string]interface{}, id string) error { jsonData, _ := json.Marshal(doc) esReq := esapi.IndexRequest{ Index: es.ESIndex, DocumentID: id, Body: bytes.NewReader(jsonData), Refresh: "true", } res, err := esReq.Do(ctx, svc.esClient.Client.Transport) if err != nil { return fmt.Errorf("ES 写入失败:%w", err) } defer res.Body.Close() if res.IsError() { return fmt.Errorf("ES 错误:%s", res.String()) } log.Printf("[AddBookToES] 成功 | ISBN=%s", id) return nil } // GetLastID 获取最后一条 ID func (svc *BookService) GetLastID() (int, error) { query := `{ "size": 1, "sort": [{"id": {"order": "desc"}}] }` res, err := svc.esClient.Client.Search( svc.esClient.Client.Search.WithContext(context.Background()), svc.esClient.Client.Search.WithIndex(es.ESIndex), svc.esClient.Client.Search.WithBody(strings.NewReader(query)), ) if err != nil { log.Printf("[GetLastID] ES 查询失败: %v\n", err) return 0, fmt.Errorf("ES 查询失败: %w", err) } defer res.Body.Close() if res.IsError() { log.Printf("[GetLastID] ES 返回错误: %s\n", res.String()) return 0, fmt.Errorf("ES 返回错误: %s", res.String()) } // 定义结构体,ID 为数组 var result struct { Hits struct { Hits []struct { Source struct { ID int `json:"id"` // 改回单个 int } `json:"_source"` } `json:"hits"` } `json:"hits"` } if err := json.NewDecoder(res.Body).Decode(&result); err != nil { log.Printf("[GetLastID] 解析 ES 返回 JSON 失败: %v\n", err) return 0, fmt.Errorf("解析 ES 返回 JSON 失败: %w", err) } if len(result.Hits.Hits) == 0 { log.Println("[GetLastID] 没有找到任何文档") return 0, nil } lastID := result.Hits.Hits[0].Source.ID log.Printf("[GetLastID] 查询到最新文档 ID: %d\n", lastID) return lastID, nil } // SearchBookByISBN 根据ISBN查询图书(在当前类中封装) func (svc *BookService) SearchBookByISBN(isbn string) (*es.ESBook, error) { log.Printf("[SearchBookByISBN] 开始查询 | ISBN=%s", isbn) query := map[string]interface{}{ "query": map[string]interface{}{ "term": map[string]interface{}{ "isbn": isbn, }, }, "_source": true, "size": 1, } body, err := json.Marshal(query) if err != nil { log.Printf("[SearchBookByISBN] 构建查询 JSON 失败:%v", err) return nil, fmt.Errorf("构建查询 JSON 失败:%v", err) } res, err := svc.esClient.Client.Search( svc.esClient.Client.Search.WithIndex(es.ESIndex), svc.esClient.Client.Search.WithBody(bytes.NewReader(body)), svc.esClient.Client.Search.WithTrackTotalHits(true), ) if err != nil { log.Printf("[SearchBookByISBN] ES 查询失败:%v", err) return nil, fmt.Errorf("ES 查询失败:%v", err) } defer res.Body.Close() if res.IsError() { log.Printf("[SearchBookByISBN] ES 返回错误:%s", res.String()) return nil, fmt.Errorf("ES 返回错误:%s", res.String()) } var parsed esHitsWrapper if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil { log.Printf("[SearchBookByISBN] 解析 ES 响应失败:%v", err) return nil, fmt.Errorf("解析 ES 响应失败:%v", err) } if len(parsed.Hits.Hits) == 0 { log.Printf("[SearchBookByISBN] 未找到 ISBN=%s 对应文档", isbn) return nil, nil } book := parsed.Hits.Hits[0].Source log.Printf("[SearchBookByISBN] 查询到文档: %+v", book) return &book, nil } // buildISuitCondition 构建 is_suit 查询条件 func (svc *BookService) buildISuitCondition(builder *ESQueryBuilder, isSuit string) { if isSuit == "" { return } log.Printf("[DEBUG] is_suit val=%q", isSuit) if num, err := strconv.Atoi(isSuit); err == nil { builder.AddQuery(&QueryCondition{ Field: "is_suit", Value: num, Type: "term", }) } else { log.Printf("[ERROR] is_suit Atoi error: %v", err) } } // buildIsReturnCondition 构建 is_return 查询条件 func (svc *BookService) buildIsReturnCondition(builder *ESQueryBuilder, isReturn string) { if isReturn == "" { return } log.Printf("[DEBUG] is_return val=%q", isReturn) if num, err := strconv.Atoi(isReturn); err == nil { builder.AddQuery(&QueryCondition{ Field: "is_return", Value: num, Type: "term", }) } else { log.Printf("[ERROR] is_return Atoi error: %v", err) } } // buildIsFilterCondition 构建 is_filter 查询条件 func (svc *BookService) buildIsFilterCondition(builder *ESQueryBuilder, isFilter, shopType string) { if isFilter != "1" && isFilter != "2" { return } log.Printf("[DEBUG] is_filter val=%q", isFilter) targetBit := "1" if isFilter == "2" { targetBit = "0" } var pattern string switch shopType { case "0": pattern = targetBit + "*" case "1": pattern = "?" + targetBit + "*" case "2": pattern = "??" + targetBit + "*" case "3": pattern = "???" + targetBit + "*" default: return } builder.AddQuery(&QueryCondition{ Field: "is_filter", Type: "wildcard", Pattern: pattern, }) } // buildCategoryTypeCondition 构建 categoryType 查询条件 func (svc *BookService) buildCategoryTypeCondition(builder *ESQueryBuilder, categoryType string) { if categoryType == "" { return } if categoryType == "1" { builder.AddQuery(&QueryCondition{ Field: "isbn", Value: "9787", Type: "prefix", }) } else { builder.AddBoolQuery("must_not", []map[string]interface{}{ {"prefix": map[string]interface{}{"isbn": "9787"}}, }) } } // buildCategoryCondition 构建 category 查询条件 func (svc *BookService) buildCategoryCondition(builder *ESQueryBuilder, category string) { if category == "" { return } if category == "排除大学教材" { builder.AddBoolQuery("must_not", []map[string]interface{}{ {"match_phrase": map[string]interface{}{ "category": "图书/教材教辅考试/大学教材", }}, }) } else { builder.AddQuery(&QueryCondition{ Field: "category", Value: category, Type: "match_phrase", }) } } // buildBookPicCondition 构建 book_pic 查询条件 func (svc *BookService) buildBookPicCondition(builder *ESQueryBuilder, bookPic, picType string) { if bookPic == "" { return } targetField := "book_pic.pddPath" if picType != "1" && picType != "" { targetField = "book_pic_s.pddResponse" } if bookPic == "1" { // 有图:字段必须存在且非空 builder.AddBoolQuery("must", []map[string]interface{}{ {"exists": map[string]interface{}{"field": targetField}}, {"wildcard": map[string]interface{}{targetField: "*"}}, }) } else { // 无图:字段不存在或为空 keywordField := "book_pic.pddPath.keyword" if picType != "1" && picType != "" { keywordField = "book_pic_s.pddResponse.keyword" } builder.AddBoolQuery("should", []map[string]interface{}{ { "bool": map[string]interface{}{ "must_not": []map[string]interface{}{ {"exists": map[string]interface{}{"field": keywordField}}, }, }, }, { "term": map[string]interface{}{keywordField: ""}, }, }, 1) // minimum_should_match = 1 } } // buildBuyCountsCondition 构建 buy_counts 查询条件 func (svc *BookService) buildBuyCountsCondition(builder *ESQueryBuilder, buyCounts, saleField string) { if buyCounts == "" || saleField == "" { return } log.Printf("[DEBUG] buy_counts uses saleField=%s", saleField) parts := strings.Split(buyCounts, ",") if len(parts) == 2 { minVal, _ := strconv.Atoi(parts[0]) maxVal, _ := strconv.Atoi(parts[1]) builder.AddQuery(&QueryCondition{ Field: saleField, Type: "range", GTE: minVal, LTE: maxVal, }) return } if num, err := strconv.Atoi(buyCounts); err == nil { builder.AddQuery(&QueryCondition{ Field: saleField, Type: "range", GTE: num, }) } } // buildTotalSaleRangeCondition 构建 totalSale_range 查询条件 func (svc *BookService) buildTotalSaleRangeCondition(builder *ESQueryBuilder, totalSaleRange string) { if totalSaleRange == "" { return } parts := strings.Split(totalSaleRange, ",") if len(parts) == 2 { minVal, _ := strconv.Atoi(parts[0]) maxVal, _ := strconv.Atoi(parts[1]) builder.AddQuery(&QueryCondition{ Field: "total_sale", Type: "range", GTE: minVal, LTE: maxVal, }) } } // buildNumericRangeConditions 构建数值范围查询条件 func (svc *BookService) buildNumericRangeConditions(builder *ESQueryBuilder, request *request.BookSearchRequest, saleField string) { fields := map[string]string{ "SellCounts": request.SellCounts, "DaySale7": request.DaySale7, "DaySale15": request.DaySale15, "DaySale30": request.DaySale30, "DaySale60": request.DaySale60, "DaySale90": request.DaySale90, "DaySale180": request.DaySale180, "DaySale365": request.DaySale365, "ThisYearSale": request.ThisYearSale, "LastYearSale": request.LastYearSale, "PublicationTime": request.PublicationTime, } esFields := map[string]string{ "SellCounts": "sell_counts", "DaySale7": "day_sale_7", "DaySale15": "day_sale_15", "DaySale30": "day_sale_30", "DaySale60": "day_sale_60", "DaySale90": "day_sale_90", "DaySale180": "day_sale_180", "DaySale365": "day_sale_365", "ThisYearSale": "this_year_sale", "LastYearSale": "last_year_sale", "PublicationTime": "publication_time", } for fieldName, value := range fields { if value == "" { continue } esField := esFields[fieldName] parts := strings.Split(value, ",") if len(parts) == 2 { minVal, _ := strconv.Atoi(parts[0]) maxVal, _ := strconv.Atoi(parts[1]) builder.AddQuery(&QueryCondition{ Field: esField, Type: "range", GTE: minVal, LTE: maxVal, }) } } } // buildExactMatchConditions 构建精确匹配查询条件 func (svc *BookService) buildExactMatchConditions(builder *ESQueryBuilder, request *request.BookSearchRequest) { exactFields := map[string]string{ "ISBN": request.ISBN, "ID": request.ID, "Publisher": request.Publisher, } esFields := map[string]string{ "ISBN": "isbn", "ID": "id", "Publisher": "publisher", } for fieldName, value := range exactFields { if value == "" { continue } esField := esFields[fieldName] builder.AddQuery(&QueryCondition{ Field: esField, Value: value, Type: "term", }) } } // buildFuzzyMatchConditions 构建模糊匹配查询条件 func (svc *BookService) buildFuzzyMatchConditions(builder *ESQueryBuilder, request *request.BookSearchRequest) { fuzzyFields := map[string]string{ "BookName": request.BookName, "Author": request.Author, } esFields := map[string]string{ "BookName": "book_name", "Author": "author", } for fieldName, value := range fuzzyFields { if value == "" { continue } esField := esFields[fieldName] builder.AddQuery(&QueryCondition{ Field: esField, Value: value, Type: "match", Operator: "and", }) } } // buildDefaultPrefixConditions 构建默认前缀匹配查询条件 func (svc *BookService) buildDefaultPrefixConditions(builder *ESQueryBuilder, request *request.BookSearchRequest) { prefixFields := map[string]string{ "BindingLayout": request.BindingLayout, "FixPrice": request.FixPrice, } esFields := map[string]string{ "BindingLayout": "binding_layout", "FixPrice": "fix_price", } for fieldName, value := range prefixFields { if value == "" { continue } esField := esFields[fieldName] builder.AddQuery(&QueryCondition{ Field: esField, Value: value, Type: "prefix", }) } } // NewESQueryBuilder 创建 ES 查询构建器 func NewESQueryBuilder() *ESQueryBuilder { return &ESQueryBuilder{ mustQueries: make([]map[string]interface{}, 0), boolMust: make([]map[string]interface{}, 0), boolMustNot: make([]map[string]interface{}, 0), boolShould: make([]map[string]interface{}, 0), } } // AddQuery 添加单个查询条件 func (b *ESQueryBuilder) AddQuery(cond *QueryCondition) { var query map[string]interface{} switch cond.Type { case "term": query = map[string]interface{}{ "term": map[string]interface{}{ cond.Field: cond.Value, }, } case "match": matchQuery := map[string]interface{}{ "query": cond.Value, "operator": cond.Operator, "fuzziness": "AUTO", } query = map[string]interface{}{ "match": map[string]interface{}{ cond.Field: matchQuery, }, } case "match_phrase": query = map[string]interface{}{ "match_phrase": map[string]interface{}{ cond.Field: cond.Value, }, } case "prefix": query = map[string]interface{}{ "prefix": map[string]interface{}{ cond.Field: cond.Value, }, } case "range": rangeCond := make(map[string]interface{}) if cond.GTE != nil { rangeCond["gte"] = cond.GTE } if cond.LTE != nil { rangeCond["lte"] = cond.LTE } query = map[string]interface{}{ "range": map[string]interface{}{ cond.Field: rangeCond, }, } case "wildcard": query = map[string]interface{}{ "wildcard": map[string]interface{}{ cond.Field: cond.Pattern, }, } case "exists": query = map[string]interface{}{ "exists": map[string]interface{}{ "field": cond.Field, }, } } if query != nil { b.mustQueries = append(b.mustQueries, query) log.Printf("[DEBUG] Added query: %v", query) } } // AddBoolQuery 添加布尔查询条件 func (b *ESQueryBuilder) AddBoolQuery(boolType string, queries []map[string]interface{}, minShouldMatch ...int) { switch boolType { case "must": b.boolMust = append(b.boolMust, queries...) case "must_not": b.boolMustNot = append(b.boolMustNot, queries...) case "should": b.boolShould = append(b.boolShould, queries...) if len(minShouldMatch) > 0 { b.minShouldMatch = minShouldMatch[0] } } log.Printf("[DEBUG] Added bool %s queries: %v", boolType, queries) } // Build 构建最终的 ES 查询 func (b *ESQueryBuilder) Build(from, size int, sort []map[string]interface{}) map[string]interface{} { allMust := make([]map[string]interface{}, 0) allMust = append(allMust, b.mustQueries...) allMust = append(allMust, b.boolMust...) query := map[string]interface{}{ "from": from, "size": size, "sort": sort, } boolQuery := make(map[string]interface{}) if len(allMust) > 0 { boolQuery["must"] = allMust } if len(b.boolMustNot) > 0 { boolQuery["must_not"] = b.boolMustNot } if len(b.boolShould) > 0 { boolQuery["should"] = b.boolShould if b.minShouldMatch > 0 { boolQuery["minimum_should_match"] = b.minShouldMatch } } query["query"] = map[string]interface{}{ "bool": boolQuery, } return query }