daShangDao_psiServer/es/es.go
2026-06-15 13:47:39 +08:00

163 lines
3.4 KiB
Go

package es
import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"net/http"
"strconv"
"strings"
"time"
)
// BookPicObj 用于 book_pic
type BookPicObj struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
}
// BookPicSObj 用于 book_pic_s
type BookPicSObj struct {
LocalPath string `json:"localPath"`
PddResponse string `json:"pddResponse"`
}
// BookDefPicObj 用于 book_def_pic
type BookDefPicObj struct {
LocalPath string `json:"localPath"`
PddPath string `json:"pddPath"`
}
type CatIdObject struct {
PinDuoDuoCatId string `json:"pin_duo_duo_cat_id"` // 拼多多分类 ID
KongFuZiCatId string `json:"kong_fu_zi_cat_id"` // 孔夫子分类 ID
XianYuCatId string `json:"xian_yu_cat_id"` // 闲鱼分类 ID
}
// FlexibleString 处理可能是字符串或数组的字段
type FlexibleString struct {
Value string
}
// MarshalJSON 实现自定义 JSON 序列化
func (f FlexibleString) MarshalJSON() ([]byte, error) {
return json.Marshal(f.Value)
}
// UnmarshalJSON 实现自定义 JSON 反序列化
func (f *FlexibleString) UnmarshalJSON(data []byte) error {
var value string
if err := json.Unmarshal(data, &value); err != nil {
return err
}
f.Value = value
return nil
}
type NumberOrString string
func (n *NumberOrString) UnmarshalJSON(data []byte) error {
s := strings.TrimSpace(string(data))
// 如果是数字(不以引号开头)
if len(s) > 0 && s[0] != '"' {
*n = NumberOrString(s)
return nil
}
// 普通字符串
var str string
if err := json.Unmarshal(data, &str); err != nil {
return err
}
*n = NumberOrString(str)
return nil
}
type Float64OrString float64
func (f *Float64OrString) UnmarshalJSON(b []byte) error {
// 去掉空值
if string(b) == "null" || len(b) == 0 {
*f = 0
return nil
}
// 尝试解析为 float
var num float64
if err := json.Unmarshal(b, &num); err == nil {
*f = Float64OrString(num)
return nil
}
// 尝试解析为 string
var str string
if err := json.Unmarshal(b, &str); err == nil {
if str == "" {
*f = 0
return nil
}
parsed, err := strconv.ParseFloat(str, 64)
if err != nil {
return err
}
*f = Float64OrString(parsed)
return nil
}
return fmt.Errorf("无法解析 fix_price: %s", string(b))
}
// ESClient 封装 Elasticsearch 客户端
type ESClient struct {
Client *elasticsearch.Client
}
var DefaultClient *ESClient
// Init 初始化默认 ES 客户端
func Init(addresses []string, username, password string) error {
client, err := NewESClient(addresses, username, password)
if err != nil {
return err
}
DefaultClient = client
return nil
}
// NewESClient 初始化客户端
func NewESClient(addresses []string, username, password string) (*ESClient, error) {
cfg := elasticsearch.Config{
Addresses: addresses,
Username: username,
Password: password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
MaxIdleConnsPerHost: 100,
ResponseHeaderTimeout: 60 * time.Second,
},
CompressRequestBody: true,
}
client, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, fmt.Errorf("创建 ES 客户端失败: %v", err)
}
res, err := client.Info()
if err != nil {
return nil, fmt.Errorf("ES 连接失败: %v", err)
}
defer res.Body.Close()
if res.IsError() {
return nil, fmt.Errorf("ES 返回错误: %s", res.String())
}
fmt.Println("✅ Elasticsearch 连接成功")
return &ESClient{Client: client}, nil
}