套装书修改逻辑
This commit is contained in:
parent
701e7c75b1
commit
bd6635f5be
@ -26,6 +26,7 @@ import (
|
||||
)
|
||||
|
||||
const ESIndex = "books-from-mysql-v2"
|
||||
const ESIndexV3 = "books-from-mysql-v3"
|
||||
|
||||
// ESBookResponse 用于返回给Java客户端的格式,ID为简单的int64
|
||||
type ESBookResponse struct {
|
||||
@ -2585,7 +2586,7 @@ func (svc *ESSearchService) SearchBookByISBNHandlerToPsi(c *gin.Context) {
|
||||
endpoint := c.FullPath()
|
||||
|
||||
// Redis 查询(使用监控)
|
||||
db1Client, err := redisClient.GetClientByName("db1")
|
||||
db1Client, err := redisClient.GetClientByName("db10")
|
||||
if err == nil {
|
||||
monitoredRedis := monitor.NewMonitoredRedisClient(db1Client, endpoint)
|
||||
val, _, err := monitoredRedis.Get(ctx, isbn)
|
||||
|
||||
2
main.go
2
main.go
@ -315,7 +315,7 @@ func main() {
|
||||
r.POST("/api/es/addBookToES", bookController.AddBookToESHandler)
|
||||
// 更新:根据ISBN通用更新图书字段
|
||||
r.POST("/api/es/updateBookFieldsByISBN", bookController.UpdateBookFieldsByISBNHandler)
|
||||
// 更新:根据ISBN通用更新图书字段
|
||||
// 更新:根据ISBN更新商品分类字段
|
||||
r.POST("/api/es/updateBookCatIdByISBN", bookController.UpdateBookCatIdByISBNHandler)
|
||||
// 删除:根据ISBN删除ES数据
|
||||
r.GET("/api/es/DeleteBookByISBN", bookController.DeleteBookHandler)
|
||||
|
||||
260
service/book.go
260
service/book.go
@ -270,56 +270,14 @@ func (svc *BookService) UpdateBookFieldsByISBN(request *request.BookUpdateReques
|
||||
params := make(map[string]interface{})
|
||||
|
||||
//svc.AddFilterSet(request.ISBN)
|
||||
// 判断 is_suit 是否已传递,如果没传则自动检测
|
||||
if isSuitValue, exists := request.Data["is_suit"]; exists {
|
||||
// 定义一个辅助函数来检查值是否为 1
|
||||
isOne := func(v interface{}) bool {
|
||||
switch val := v.(type) {
|
||||
case int:
|
||||
return val == 1
|
||||
case int8:
|
||||
return val == 1
|
||||
case int16:
|
||||
return val == 1
|
||||
case int32:
|
||||
return val == 1
|
||||
case int64:
|
||||
return val == 1
|
||||
case float32:
|
||||
return val == 1.0
|
||||
case float64:
|
||||
return val == 1.0
|
||||
case string:
|
||||
return val == "1"
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
// 如果手动传递了 is_suit,检查是否为 1
|
||||
if isOne(isSuitValue) {
|
||||
params["is_filter"] = "100100"
|
||||
scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.is_filter = params.is_filter;"))
|
||||
} else {
|
||||
// is_suit 不为 1 时,清空 is_filter
|
||||
params["is_filter"] = "000000"
|
||||
scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.is_filter = params.is_filter;"))
|
||||
}
|
||||
} else {
|
||||
// 未传递 is_suit,自动检测并设置
|
||||
isSuitValue := map[bool]int{true: 1, false: 0}[es.CheckBookSuit(book.BookName.Value)]
|
||||
params["is_suit"] = isSuitValue
|
||||
scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.is_suit = params.is_suit;"))
|
||||
|
||||
// 如果 is_suit 为 1,同时更新 is_filter 为 100100
|
||||
if isSuitValue == 1 {
|
||||
params["is_filter"] = "100100"
|
||||
scriptParts = append(scriptParts, fmt.Sprintf("ctx._source.is_filter = params.is_filter;"))
|
||||
}
|
||||
// 先捕获 is_suit 的值,v3 同步/删除在 v2 更新完成后执行
|
||||
var hasIsSuitInData bool
|
||||
var isSuitInData interface{}
|
||||
if val, exists := request.Data["is_suit"]; exists {
|
||||
hasIsSuitInData = true
|
||||
isSuitInData = val
|
||||
}
|
||||
for field, value := range request.Data {
|
||||
if field == "is_suit" {
|
||||
continue
|
||||
}
|
||||
// 使用配置检查字段是否允许更新
|
||||
if !fieldConfig.IsAllowUpdate(field) {
|
||||
fmt.Printf("[UpdateBookFieldsByISBN] 字段 %s 不允许更新,已跳过", field)
|
||||
@ -396,7 +354,18 @@ func (svc *BookService) UpdateBookFieldsByISBN(request *request.BookUpdateReques
|
||||
|
||||
// 同步 Redis
|
||||
_ = svc.SyncRedisByISBN(request.ISBN, "update")
|
||||
//svc.AddFilterSet(request.ISBN)
|
||||
// v2 已更新完成,此时操作 v3 索引
|
||||
if hasIsSuitInData {
|
||||
if isOneValue(isSuitInData) {
|
||||
if err := svc.syncBookToV3Index(request.ISBN); err != nil {
|
||||
log.Printf("[WARN] 同步到v3索引失败: %v", err)
|
||||
}
|
||||
} else if isZeroValue(isSuitInData) {
|
||||
if err := svc.deleteBookFromV3Index(request.ISBN); err != nil {
|
||||
log.Printf("[WARN] 从v3索引删除失败: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return &UpdateBookResult{
|
||||
ISBN: request.ISBN,
|
||||
Updated: parsed.Updated,
|
||||
@ -1093,6 +1062,199 @@ func (svc *BookService) indexDocumentToES(ctx context.Context, doc map[string]in
|
||||
return nil
|
||||
}
|
||||
|
||||
// isOneValue 判断 interface{} 类型的值是否为 1
|
||||
func isOneValue(v interface{}) bool {
|
||||
switch val := v.(type) {
|
||||
case int:
|
||||
return val == 1
|
||||
case int8:
|
||||
return val == 1
|
||||
case int16:
|
||||
return val == 1
|
||||
case int32:
|
||||
return val == 1
|
||||
case int64:
|
||||
return val == 1
|
||||
case float32:
|
||||
return val == 1.0
|
||||
case float64:
|
||||
return val == 1.0
|
||||
case string:
|
||||
return val == "1"
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// isZeroValue 判断 interface{} 类型的值是否为 0
|
||||
func isZeroValue(v interface{}) bool {
|
||||
switch val := v.(type) {
|
||||
case int:
|
||||
return val == 0
|
||||
case int8:
|
||||
return val == 0
|
||||
case int16:
|
||||
return val == 0
|
||||
case int32:
|
||||
return val == 0
|
||||
case int64:
|
||||
return val == 0
|
||||
case float32:
|
||||
return val == 0.0
|
||||
case float64:
|
||||
return val == 0.0
|
||||
case string:
|
||||
return val == "0"
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// searchBookByISBNInIndex 在指定索引中按 ISBN 查询文档
|
||||
func (svc *BookService) searchBookByISBNInIndex(isbn string, index string) (*es.ESBook, error) {
|
||||
log.Printf("[SearchBookByISBNInIndex] 开始查询 | ISBN=%s | index=%s", isbn, index)
|
||||
|
||||
query := map[string]interface{}{
|
||||
"query": map[string]interface{}{
|
||||
"term": map[string]interface{}{
|
||||
"isbn": isbn,
|
||||
},
|
||||
},
|
||||
"_source": true,
|
||||
"size": 1,
|
||||
}
|
||||
|
||||
body, err := json.Marshal(query)
|
||||
if err != nil {
|
||||
log.Printf("[SearchBookByISBNInIndex] 构建查询 JSON 失败:%v", err)
|
||||
return nil, fmt.Errorf("构建查询 JSON 失败:%v", err)
|
||||
}
|
||||
|
||||
res, err := svc.esClient.Client.Search(
|
||||
svc.esClient.Client.Search.WithIndex(index),
|
||||
svc.esClient.Client.Search.WithBody(bytes.NewReader(body)),
|
||||
svc.esClient.Client.Search.WithTrackTotalHits(true),
|
||||
)
|
||||
if err != nil {
|
||||
log.Printf("[SearchBookByISBNInIndex] ES 查询失败:%v", err)
|
||||
return nil, fmt.Errorf("ES 查询失败:%v", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
log.Printf("[SearchBookByISBNInIndex] ES 返回错误:%s", res.String())
|
||||
return nil, fmt.Errorf("ES 返回错误:%s", res.String())
|
||||
}
|
||||
|
||||
var parsed esHitsWrapper
|
||||
|
||||
if err := json.NewDecoder(res.Body).Decode(&parsed); err != nil {
|
||||
log.Printf("[SearchBookByISBNInIndex] 解析 ES 响应失败:%v", err)
|
||||
return nil, fmt.Errorf("解析 ES 响应失败:%v", err)
|
||||
}
|
||||
|
||||
if len(parsed.Hits.Hits) == 0 {
|
||||
log.Printf("[SearchBookByISBNInIndex] 未找到 ISBN=%s 对应文档", isbn)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
book := parsed.Hits.Hits[0].Source
|
||||
log.Printf("[SearchBookByISBNInIndex] 查询到文档: %+v", book)
|
||||
|
||||
return &book, nil
|
||||
}
|
||||
|
||||
// indexDocumentToIndex 将文档写入指定索引(幂等:存在则覆盖,不存在则创建)
|
||||
func (svc *BookService) indexDocumentToIndex(ctx context.Context, doc map[string]interface{}, id string, index string) error {
|
||||
jsonData, _ := json.Marshal(doc)
|
||||
|
||||
esReq := esapi.IndexRequest{
|
||||
Index: index,
|
||||
DocumentID: id,
|
||||
Body: bytes.NewReader(jsonData),
|
||||
Refresh: "true",
|
||||
}
|
||||
|
||||
res, err := esReq.Do(ctx, svc.esClient.Client.Transport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("写入索引 %s 失败:%w", index, err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("写入索引 %s 错误:%s", index, res.String())
|
||||
}
|
||||
|
||||
log.Printf("[IndexDocumentToIndex] 成功 | index=%s | id=%s", index, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
// syncBookToV3Index 将 v2 索引中的文档同步到 v3 索引
|
||||
// 先检查 v3 中是否已存在同 ISBN 文档,存在则覆盖更新,不存在则新增
|
||||
func (svc *BookService) syncBookToV3Index(isbn string) error {
|
||||
// 1. 从 v2 索引查询完整文档
|
||||
book, err := svc.SearchBookByISBN(isbn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询v2索引失败: %w", err)
|
||||
}
|
||||
if book == nil {
|
||||
return fmt.Errorf("v2索引中未找到ISBN=%s的文档", isbn)
|
||||
}
|
||||
|
||||
// 2. 构建文档,追加 v3 专有字段
|
||||
doc := svc.buildBookMapForSerialization(book)
|
||||
doc["fid"] = 0
|
||||
|
||||
// 3. 检查 v3 中是否已存在
|
||||
existing, err := svc.searchBookByISBNInIndex(isbn, es.ESIndexV3)
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询v3索引失败: %w", err)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
if existing != nil {
|
||||
log.Printf("[SyncToV3] ISBN=%s 在v3中已存在,执行覆盖更新", isbn)
|
||||
} else {
|
||||
log.Printf("[SyncToV3] ISBN=%s 在v3中不存在,执行新增", isbn)
|
||||
}
|
||||
|
||||
// 4. 写入 v3 索引(IndexRequest 是幂等的:存在则覆盖,不存在则创建)
|
||||
return svc.indexDocumentToIndex(ctx, doc, isbn, es.ESIndexV3)
|
||||
}
|
||||
|
||||
// deleteBookFromV3Index 从 v3 索引中删除指定 ISBN 的文档
|
||||
func (svc *BookService) deleteBookFromV3Index(isbn string) error {
|
||||
// 先检查 v3 中是否存在
|
||||
existing, err := svc.searchBookByISBNInIndex(isbn, es.ESIndexV3)
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询v3索引失败: %w", err)
|
||||
}
|
||||
if existing == nil {
|
||||
log.Printf("[DeleteFromV3] ISBN=%s 在v3中不存在,无需删除", isbn)
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
req := esapi.DeleteRequest{
|
||||
Index: es.ESIndexV3,
|
||||
DocumentID: isbn,
|
||||
Refresh: "true",
|
||||
}
|
||||
|
||||
res, err := req.Do(ctx, svc.esClient.Client.Transport)
|
||||
if err != nil {
|
||||
return fmt.Errorf("从v3索引删除失败: %w", err)
|
||||
}
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.IsError() {
|
||||
return fmt.Errorf("从v3索引删除错误: %s", res.String())
|
||||
}
|
||||
|
||||
log.Printf("[DeleteFromV3] 成功删除 ISBN=%s 从v3索引", isbn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetLastID 获取最后一条 ID
|
||||
func (svc *BookService) GetLastID() (int, error) {
|
||||
query := `{
|
||||
|
||||
Loading…
Reference in New Issue
Block a user