2089 lines
57 KiB
Go
2089 lines
57 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"crypto/md5"
|
||
"crypto/tls"
|
||
"database/sql"
|
||
"encoding/hex"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||
"golang.org/x/image/bmp"
|
||
"golang.org/x/image/draw"
|
||
"golang.org/x/image/tiff"
|
||
"golang.org/x/image/webp"
|
||
"image"
|
||
"image/color"
|
||
"image/jpeg"
|
||
"image/png"
|
||
"io"
|
||
"io/ioutil"
|
||
"log"
|
||
"mime/multipart"
|
||
"net/http"
|
||
"os"
|
||
"path/filepath"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
_ "github.com/go-sql-driver/mysql"
|
||
"github.com/nfnt/resize"
|
||
)
|
||
|
||
// ES 配置
|
||
const (
|
||
esAddress = "http://103.236.91.138:9200"
|
||
esUsername = "elastic"
|
||
esPassword = "5mRDIUg52VC0fp14nw-F"
|
||
esIndex = "books-from-mysql"
|
||
ClientID = "203c5a7ba8bd4b8488d5e26f93052642" // 拼多多开放平台配置
|
||
ClientSecret = "892ffaa86e12b7a3d8d2942b669d9aa520ad8179"
|
||
PDDApiURL = "https://gw-upload.pinduoduo.com/api/upload"
|
||
)
|
||
|
||
// 配置参数
|
||
const (
|
||
maxWorkers = 15 // 最大并发worker数量
|
||
maxRetries = 3 // 最大重试次数
|
||
retryDelay = 2 * time.Second // 重试延迟
|
||
progressInterval = 5 * time.Second // 进度报告间隔
|
||
)
|
||
|
||
// ES 客户端封装
|
||
type ESClient struct {
|
||
client *elasticsearch.Client
|
||
}
|
||
|
||
// 数据库记录结构体
|
||
type CrawlerRecord struct {
|
||
BookISBN sql.NullString
|
||
BookPicture sql.NullString
|
||
}
|
||
|
||
// 处理结果结构体
|
||
type ProcessResult struct {
|
||
Record CrawlerRecord
|
||
Success bool
|
||
LocalPaths []string
|
||
PDDURLs []string
|
||
Error error
|
||
WorkerID int
|
||
ProcessedAt time.Time
|
||
}
|
||
|
||
// 全局统计
|
||
type Statistics struct {
|
||
Total int32
|
||
Success int32
|
||
Failed int32
|
||
Skipped int32
|
||
CurrentIndex int32
|
||
StartTime time.Time
|
||
}
|
||
|
||
// NewESClient 初始化 ES 客户端
|
||
// 说明:与 main(2).go 保持一致的连接方式(禁用证书校验、设置超时和连接池参数)
|
||
func NewESClient(addresses []string, username, password string) (*ESClient, error) {
|
||
cfg := elasticsearch.Config{
|
||
Addresses: addresses,
|
||
Username: username,
|
||
Password: password,
|
||
Transport: &http.Transport{
|
||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||
MaxIdleConnsPerHost: 100,
|
||
ResponseHeaderTimeout: 60 * time.Second,
|
||
},
|
||
}
|
||
cli, err := elasticsearch.NewClient(cfg)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return &ESClient{client: cli}, nil
|
||
}
|
||
|
||
// CheckHealth 检查 ES 集群健康
|
||
// 行为:等待状态至少为 yellow,输出基本信息
|
||
func (es *ESClient) CheckHealth() error {
|
||
res, err := es.client.Cluster.Health(
|
||
es.client.Cluster.Health.WithWaitForStatus("yellow"),
|
||
es.client.Cluster.Health.WithTimeout(30*time.Second),
|
||
)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer res.Body.Close()
|
||
if res.IsError() {
|
||
return fmt.Errorf("Elasticsearch 健康检查失败: %s", res.String())
|
||
}
|
||
var m map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&m); err == nil {
|
||
log.Printf("ES status=%v nodes=%v cluster=%v", m["status"], m["number_of_nodes"], m["cluster_name"])
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// PDDImageProcessor 实现图片处理器
|
||
// pdd上传图片官方接口
|
||
// 上传图片到拼多多
|
||
func uploadToPDD(token, imagePath string) (string, error) {
|
||
// 检查token是否有效
|
||
if len(token) == 0 {
|
||
return "", fmt.Errorf("获取到的token为空")
|
||
}
|
||
result, err := ProcessAndUploadImage(imagePath, token)
|
||
if err != nil {
|
||
return "", fmt.Errorf("拼多多图片上传失败: %v", err)
|
||
}
|
||
|
||
// 解析JSON响应获取URL
|
||
var response struct {
|
||
RequestID string `json:"request_id"`
|
||
URL string `json:"url"`
|
||
}
|
||
|
||
err = json.Unmarshal([]byte(result), &response)
|
||
if err != nil {
|
||
return "", fmt.Errorf("解析上传响应失败: %v", err)
|
||
}
|
||
|
||
if response.URL == "" {
|
||
return "", fmt.Errorf("上传响应中未找到URL")
|
||
}
|
||
|
||
return response.URL, nil
|
||
}
|
||
func ProcessAndUploadImage(imagePath, token string) (string, error) {
|
||
// 打开图片文件
|
||
file, err := os.Open(imagePath)
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to open image file: %v", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// 准备参数 - 不包含文件路径
|
||
params := map[string]string{
|
||
"access_token": token,
|
||
"data_type": "JSON",
|
||
"type": "pdd.goods.img.upload",
|
||
"client_id": ClientID,
|
||
"timestamp": fmt.Sprintf("%d", time.Now().Unix()),
|
||
}
|
||
|
||
// 生成签名(不包含文件路径)
|
||
params["sign"] = generateSign(params)
|
||
|
||
// 创建multipart表单
|
||
body := &strings.Builder{}
|
||
writer := multipart.NewWriter(body)
|
||
|
||
// 写入文本参数
|
||
for key, value := range params {
|
||
if err := writer.WriteField(key, value); err != nil {
|
||
return "", fmt.Errorf("failed to write field %s: %v", key, err)
|
||
}
|
||
}
|
||
|
||
// 写入文件流 - 使用正确的字段名 "file"
|
||
part, err := writer.CreateFormFile("file", filepath.Base(imagePath))
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to create form file: %v", err)
|
||
}
|
||
|
||
if _, err := io.Copy(part, file); err != nil {
|
||
return "", fmt.Errorf("failed to copy file data: %v", err)
|
||
}
|
||
|
||
// 关闭writer
|
||
if err := writer.Close(); err != nil {
|
||
return "", fmt.Errorf("failed to close writer: %v", err)
|
||
}
|
||
|
||
// 创建请求
|
||
req, err := http.NewRequest("POST", PDDApiURL, strings.NewReader(body.String()))
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to create request: %v", err)
|
||
}
|
||
|
||
// 设置请求头
|
||
req.Header.Set("Content-Type", writer.FormDataContentType())
|
||
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
|
||
|
||
// 发送请求
|
||
client := &http.Client{Timeout: 30 * time.Second}
|
||
resp, err := client.Do(req)
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to send request: %v", err)
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// 读取响应
|
||
respBody, err := io.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return "", fmt.Errorf("failed to read response: %v", err)
|
||
}
|
||
|
||
log.Printf("拼多多API响应状态: %d", resp.StatusCode)
|
||
log.Printf("拼多多API响应内容: %s", string(respBody))
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return "", fmt.Errorf("API returned error status: %d, body: %s", resp.StatusCode, string(respBody))
|
||
}
|
||
|
||
// 解析响应
|
||
var result map[string]interface{}
|
||
if err := json.Unmarshal(respBody, &result); err != nil {
|
||
return "", fmt.Errorf("failed to parse response: %v", err)
|
||
}
|
||
|
||
// 检查API返回的错误
|
||
if errorResponse, exists := result["error_response"]; exists {
|
||
errorMsg, _ := json.Marshal(errorResponse)
|
||
return "", fmt.Errorf("API returned error: %s", string(errorMsg))
|
||
}
|
||
|
||
// 查找成功的响应
|
||
for key, value := range result {
|
||
if key != "error_response" {
|
||
successResponse, _ := json.Marshal(value)
|
||
return string(successResponse), nil
|
||
}
|
||
}
|
||
|
||
return string(respBody), nil
|
||
}
|
||
|
||
// generateSign 生成拼多多API签名
|
||
func generateSign(params map[string]string) string {
|
||
// 按参数名排序
|
||
var keys []string
|
||
for k := range params {
|
||
keys = append(keys, k)
|
||
}
|
||
sort.Strings(keys)
|
||
// 拼接参数字符串
|
||
var signStr string
|
||
for _, k := range keys {
|
||
signStr += k + params[k]
|
||
}
|
||
signStr = ClientSecret + signStr + ClientSecret
|
||
// 计算MD5并转为大写
|
||
hasher := md5.New()
|
||
hasher.Write([]byte(signStr))
|
||
result := strings.ToUpper(hex.EncodeToString(hasher.Sum(nil)))
|
||
return result
|
||
}
|
||
|
||
// GetPddToken 获取PDD token(简化版)
|
||
func GetPddToken() (string, error) {
|
||
url := "https://api.buzhiyushu.cn/huidiao/pdd/getPddChildrenBooksToken"
|
||
|
||
resp, err := http.Get(url)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
body, err := ioutil.ReadAll(resp.Body)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
// 使用map解析JSON
|
||
var result map[string]interface{}
|
||
json.Unmarshal(body, &result)
|
||
|
||
// 检查业务状态
|
||
if code, ok := result["code"].(float64); !ok || code != 200 {
|
||
return "", fmt.Errorf("API错误: %v", result["msg"])
|
||
}
|
||
|
||
// 提取token
|
||
token := result["data"].(string)
|
||
return token, nil
|
||
}
|
||
|
||
// 并发处理记录的主要函数
|
||
func processRecordsConcurrently(records []CrawlerRecord, imageDir string, es *ESClient, maxWorkers int, token string) []ProcessResult {
|
||
var stats Statistics
|
||
stats.Total = int32(len(records))
|
||
stats.StartTime = time.Now()
|
||
|
||
// 创建通道
|
||
recordChan := make(chan CrawlerRecord, len(records))
|
||
resultChan := make(chan ProcessResult, len(records))
|
||
|
||
// 启动worker
|
||
var wg sync.WaitGroup
|
||
for i := 0; i < maxWorkers; i++ {
|
||
wg.Add(1)
|
||
go worker(token, i, &wg, recordChan, resultChan, imageDir, es, &stats)
|
||
}
|
||
|
||
// 发送任务到通道
|
||
go func() {
|
||
for _, record := range records {
|
||
recordChan <- record
|
||
}
|
||
close(recordChan)
|
||
}()
|
||
|
||
// 启动进度报告
|
||
go progressReporter(&stats)
|
||
|
||
// 收集结果
|
||
var results []ProcessResult
|
||
go func() {
|
||
for result := range resultChan {
|
||
results = append(results, result)
|
||
}
|
||
}()
|
||
|
||
// 等待所有worker完成
|
||
wg.Wait()
|
||
close(resultChan)
|
||
|
||
return results
|
||
}
|
||
|
||
// worker 处理函数
|
||
func worker(token string, id int, wg *sync.WaitGroup, recordChan <-chan CrawlerRecord, resultChan chan<- ProcessResult, imageDir string, es *ESClient, stats *Statistics) {
|
||
defer wg.Done()
|
||
|
||
for record := range recordChan {
|
||
currentIndex := atomic.AddInt32(&stats.CurrentIndex, 1)
|
||
|
||
result := ProcessResult{
|
||
Record: record,
|
||
WorkerID: id,
|
||
ProcessedAt: time.Now(),
|
||
}
|
||
|
||
// 检查记录有效性
|
||
if !isRecordValid(record) {
|
||
atomic.AddInt32(&stats.Skipped, 1)
|
||
result.Success = false
|
||
result.Error = fmt.Errorf("无效记录: ISBN或图片URL为空")
|
||
resultChan <- result
|
||
continue
|
||
}
|
||
|
||
// 处理记录(带重试机制)
|
||
var localPaths, pddURLs []string
|
||
var err error
|
||
|
||
for attempt := 1; attempt <= maxRetries; attempt++ {
|
||
localPaths, pddURLs, err = processSingleRecord(token, record, imageDir, es)
|
||
if err == nil {
|
||
break
|
||
}
|
||
|
||
// 如果是ES记录未找到的错误,不需要重试
|
||
if strings.Contains(err.Error(), "ES记录未找到") {
|
||
break
|
||
}
|
||
|
||
if attempt < maxRetries {
|
||
log.Printf("Worker %d: 第 %d 次尝试处理 ISBN %s 失败, %d 秒后重试: %v",
|
||
id, attempt, record.BookISBN.String, retryDelay/time.Second, err)
|
||
time.Sleep(retryDelay)
|
||
}
|
||
}
|
||
|
||
if err != nil {
|
||
atomic.AddInt32(&stats.Failed, 1)
|
||
result.Success = false
|
||
result.Error = err
|
||
// 即使失败,也记录已处理的本地路径(如果有)
|
||
result.LocalPaths = localPaths
|
||
result.PDDURLs = pddURLs
|
||
|
||
// 根据错误类型记录不同的日志
|
||
if strings.Contains(err.Error(), "ES记录未找到") {
|
||
log.Printf("Worker %d: ES记录未找到 [%d/%d] ISBN: %s",
|
||
id, currentIndex, stats.Total, record.BookISBN.String)
|
||
} else {
|
||
log.Printf("Worker %d: 处理失败 [%d/%d] ISBN: %s, 错误: %v",
|
||
id, currentIndex, stats.Total, record.BookISBN.String, err)
|
||
}
|
||
} else {
|
||
atomic.AddInt32(&stats.Success, 1)
|
||
result.Success = true
|
||
result.LocalPaths = localPaths
|
||
result.PDDURLs = pddURLs
|
||
|
||
log.Printf("Worker %d: 成功处理 [%d/%d] ISBN: %s, 生成 %d 个文件, 上传 %d 个URL",
|
||
id, currentIndex, stats.Total, record.BookISBN.String, len(localPaths), len(pddURLs))
|
||
}
|
||
|
||
resultChan <- result
|
||
}
|
||
}
|
||
|
||
// 检查记录有效性
|
||
func isRecordValid(record CrawlerRecord) bool {
|
||
if !record.BookISBN.Valid || record.BookISBN.String == "" {
|
||
return false
|
||
}
|
||
if !record.BookPicture.Valid || record.BookPicture.String == "" {
|
||
return false
|
||
}
|
||
return true
|
||
}
|
||
|
||
// 进度报告器
|
||
func progressReporter(stats *Statistics) {
|
||
ticker := time.NewTicker(progressInterval)
|
||
defer ticker.Stop()
|
||
|
||
for range ticker.C {
|
||
processed := atomic.LoadInt32(&stats.CurrentIndex)
|
||
success := atomic.LoadInt32(&stats.Success)
|
||
failed := atomic.LoadInt32(&stats.Failed)
|
||
skipped := atomic.LoadInt32(&stats.Skipped)
|
||
|
||
elapsed := time.Since(stats.StartTime)
|
||
rate := float64(processed) / elapsed.Seconds()
|
||
|
||
// 计算预估剩余时间
|
||
var eta time.Duration
|
||
if processed > 0 && rate > 0 {
|
||
remaining := float64(stats.Total - processed)
|
||
eta = time.Duration(remaining/rate) * time.Second
|
||
}
|
||
|
||
fmt.Printf("[进度] 已处理: %d/%d (成功: %d, 失败: %d, 跳过: %d) | 速率: %.2f 条/秒 | 运行: %v | ETA: %v\n",
|
||
processed, stats.Total, success, failed, skipped, rate, elapsed.Round(time.Second), eta.Round(time.Second))
|
||
|
||
if processed >= stats.Total {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 打印最终统计
|
||
func printFinalStatistics(results []ProcessResult) {
|
||
var success, failed, skipped int
|
||
var totalFilesGenerated int
|
||
var totalURLsUploaded int
|
||
|
||
// 失败原因分类
|
||
failureReasons := make(map[string]int)
|
||
|
||
for _, result := range results {
|
||
if result.Success {
|
||
success++
|
||
totalFilesGenerated += len(result.LocalPaths)
|
||
totalURLsUploaded += len(result.PDDURLs)
|
||
} else if result.Error != nil && strings.Contains(result.Error.Error(), "无效记录") {
|
||
skipped++
|
||
failureReasons["无效记录(ISBN或URL为空)"]++
|
||
} else {
|
||
failed++
|
||
// 即使是失败的情况,也可能生成了部分文件
|
||
totalFilesGenerated += len(result.LocalPaths)
|
||
totalURLsUploaded += len(result.PDDURLs)
|
||
|
||
// 分类失败原因
|
||
errMsg := result.Error.Error()
|
||
switch {
|
||
case strings.Contains(errMsg, "ES记录未找到"):
|
||
failureReasons["ES记录未找到"]++
|
||
case strings.Contains(errMsg, "查询ES中ID失败"):
|
||
failureReasons["ES查询失败"]++
|
||
case strings.Contains(errMsg, "下载图片失败"):
|
||
failureReasons["图片下载失败"]++
|
||
case strings.Contains(errMsg, "处理图片失败"):
|
||
failureReasons["图片处理失败"]++
|
||
case strings.Contains(errMsg, "上传PNG图片失败"):
|
||
failureReasons["PNG上传失败"]++
|
||
case strings.Contains(errMsg, "上传JPG图片失败"):
|
||
failureReasons["JPG上传失败"]++
|
||
case strings.Contains(errMsg, "更新ES数据失败"):
|
||
failureReasons["ES更新失败"]++
|
||
default:
|
||
failureReasons["其他错误"]++
|
||
}
|
||
}
|
||
}
|
||
|
||
fmt.Printf("\n=== 处理完成 ===\n")
|
||
fmt.Printf("总记录数: %d\n", len(results))
|
||
fmt.Printf("成功: %d\n", success)
|
||
fmt.Printf("失败: %d\n", failed)
|
||
fmt.Printf("跳过: %d\n", skipped)
|
||
fmt.Printf("成功率: %.2f%%\n", float64(success)/float64(len(results))*100)
|
||
fmt.Printf("生成文件总数: %d (平均每条记录 %.1f 个文件)\n", totalFilesGenerated, float64(totalFilesGenerated)/float64(len(results)))
|
||
fmt.Printf("上传URL总数: %d (平均每条记录 %.1f 个URL)\n", totalURLsUploaded, float64(totalURLsUploaded)/float64(len(results)))
|
||
|
||
// 显示失败原因统计
|
||
if len(failureReasons) > 0 {
|
||
fmt.Printf("\n=== 失败原因统计 ===\n")
|
||
for reason, count := range failureReasons {
|
||
fmt.Printf(" %s: %d\n", reason, count)
|
||
}
|
||
}
|
||
|
||
// 显示处理详情示例
|
||
fmt.Printf("\n=== 处理详情示例 ===\n")
|
||
successCount := 0
|
||
failedCount := 0
|
||
for _, result := range results {
|
||
if result.Success && successCount < 3 {
|
||
fmt.Printf("✅ 成功: ISBN %s -> 文件: %d 个, URL: %d 个\n",
|
||
result.Record.BookISBN.String,
|
||
len(result.LocalPaths),
|
||
len(result.PDDURLs))
|
||
successCount++
|
||
} else if !result.Success && failedCount < 3 && !strings.Contains(result.Error.Error(), "无效记录") {
|
||
fmt.Printf("❌ 失败: ISBN %s -> 错误: %v\n",
|
||
result.Record.BookISBN.String,
|
||
result.Error)
|
||
failedCount++
|
||
}
|
||
if successCount >= 3 && failedCount >= 3 {
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 处理单条记录
|
||
func processSingleRecord(token string, record CrawlerRecord, imageDir string, es *ESClient) ([]string, []string, error) {
|
||
// 更新ES
|
||
ids, err := es.FindIDsByISBN(esIndex, record.BookISBN.String)
|
||
if err != nil {
|
||
return nil, nil, fmt.Errorf("查询ES中ID失败: %v", err)
|
||
}
|
||
var pngImageUrl string
|
||
var jpgImageUrl string
|
||
var localPaths []string
|
||
var pddURLs []string
|
||
if ids != "" {
|
||
// 下载并处理图片
|
||
pngPath, jpgPath, err := processAndSaveImage(record, imageDir)
|
||
if err != nil {
|
||
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
|
||
if err != nil {
|
||
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
|
||
} else {
|
||
log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String)
|
||
}
|
||
return nil, nil, fmt.Errorf("处理图片失败: %v", err)
|
||
}
|
||
localPaths = []string{pngPath, jpgPath}
|
||
// 上传到PDD
|
||
pngImageUrl, err = uploadToPDD(token, pngPath)
|
||
if err != nil {
|
||
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
|
||
if err != nil {
|
||
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
|
||
} else {
|
||
log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String)
|
||
}
|
||
return nil, nil, fmt.Errorf("上传PNG图片失败: %v", err)
|
||
}
|
||
// 上传到PDD
|
||
jpgImageUrl, err = uploadToPDD(token, jpgPath)
|
||
if err != nil {
|
||
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
|
||
if err != nil {
|
||
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
|
||
} else {
|
||
log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String)
|
||
}
|
||
return nil, nil, fmt.Errorf("上传JPG图片失败: %v", err)
|
||
}
|
||
pddURLs = []string{pngImageUrl, jpgImageUrl}
|
||
err = es.UpdateBookPicsByID(esIndex, ids, "", pngImageUrl, jpgImageUrl)
|
||
if err != nil {
|
||
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
|
||
if err != nil {
|
||
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
|
||
} else {
|
||
log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String)
|
||
}
|
||
return nil, nil, fmt.Errorf("更新ES数据失败: %v", err)
|
||
}
|
||
|
||
for _, path := range localPaths {
|
||
// ES更新成功后删除本地图片
|
||
if removeErr := os.Remove(path); removeErr == nil {
|
||
log.Printf("ES更新成功,已删除本地图片: %s", path)
|
||
} else {
|
||
log.Printf("警告: 无法删除本地图片 %s: %v", path, removeErr)
|
||
}
|
||
}
|
||
} else {
|
||
// ids为空,将ISBN存储到txt文件
|
||
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
|
||
if err != nil {
|
||
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
|
||
} else {
|
||
log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String)
|
||
}
|
||
return nil, nil, fmt.Errorf("未找到ISBN %s 对应的ES记录", record.BookISBN.String)
|
||
}
|
||
return localPaths, pddURLs, nil
|
||
}
|
||
|
||
// 保存未找到的ISBN和图片URL到txt文件(CSV格式,带去重)
|
||
func saveISBNToFile(isbn string, imageUrl string) error {
|
||
filename := "cmd/update_es_gt/xgy_not_found_isbns.txt"
|
||
|
||
// 读取现有内容检查是否已存在
|
||
existingRecords := make(map[string]bool)
|
||
if content, err := os.ReadFile(filename); err == nil {
|
||
lines := strings.Split(string(content), "\n")
|
||
for _, line := range lines {
|
||
if line != "" && !strings.HasPrefix(line, "#") {
|
||
parts := strings.Split(line, ",")
|
||
if len(parts) > 0 {
|
||
existingRecords[parts[0]] = true // 以ISBN作为去重依据
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// 如果已存在,则不重复添加
|
||
if existingRecords[isbn] {
|
||
return nil
|
||
}
|
||
// 以追加模式打开文件
|
||
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||
if err != nil {
|
||
return fmt.Errorf("打开文件失败: %v", err)
|
||
}
|
||
defer file.Close()
|
||
// 如果是空文件,先写入CSV表头
|
||
stat, err := file.Stat()
|
||
if err == nil && stat.Size() == 0 {
|
||
header := "# ISBN,ImageURL\n"
|
||
if _, err := file.WriteString(header); err != nil {
|
||
return fmt.Errorf("写入表头失败: %v", err)
|
||
}
|
||
}
|
||
|
||
// 写入ISBN和图片URL,用逗号分隔,并添加换行符
|
||
line := fmt.Sprintf("%s,%s\n", isbn, imageUrl)
|
||
_, err = file.WriteString(line)
|
||
if err != nil {
|
||
return fmt.Errorf("写入文件失败: %v", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// 下载并处理图片
|
||
func processAndSaveImage(record CrawlerRecord, saveDir string) (string, string, error) {
|
||
// 下载图片
|
||
img, originalFormat, err := downloadImage(record.BookPicture.String)
|
||
if err != nil {
|
||
return "", "", fmt.Errorf("下载图片失败: %v", err)
|
||
}
|
||
|
||
fmt.Printf("下载成功,原始格式: %s\n", originalFormat)
|
||
|
||
// 调整图片高度为600,等比例缩放
|
||
//resizedImg := resizeImageToHeight(img, 600)
|
||
// 使用高质量缩放调整图片高度为600,等比例缩放
|
||
resizedImg := resizeToHeightHighQuality(img, 600)
|
||
fmt.Printf("缩放后尺寸: %dx%d\n", resizedImg.Bounds().Dx(), resizedImg.Bounds().Dy())
|
||
|
||
// 创建800x800的透明背景
|
||
finalImg := createCenteredImage(resizedImg, 800, 800, true)
|
||
|
||
// 创建800x800的白色背景(用于JPG)
|
||
whiteImg := createCenteredImage(resizedImg, 800, 800, false)
|
||
|
||
// 生成文件名
|
||
filename := fmt.Sprintf("%s", record.BookISBN.String)
|
||
// 清理文件名中的非法字符
|
||
filename = sanitizeFilename(filename)
|
||
|
||
// PNG文件路径
|
||
pngPath := filepath.Join(saveDir, filename+".png")
|
||
// JPG文件路径
|
||
jpgPath := filepath.Join(saveDir, filename+".jpg")
|
||
|
||
// 保存为PNG图片
|
||
err = savePNG(finalImg, pngPath)
|
||
if err != nil {
|
||
return "", "", fmt.Errorf("保存图片失败: %v", err)
|
||
}
|
||
|
||
// 保存为JPG图片(白色背景)
|
||
err = saveJPG(whiteImg, jpgPath, 95) // 95%质量
|
||
if err != nil {
|
||
return "", "", fmt.Errorf("保存JPG图片失败: %v", err)
|
||
}
|
||
|
||
fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "PNG", pngPath)
|
||
fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "JPG", jpgPath)
|
||
return pngPath, jpgPath, nil
|
||
}
|
||
|
||
// 下载图片
|
||
func downloadImage(url string) (image.Image, string, error) {
|
||
// 创建HTTP客户端,设置超时等参数
|
||
client := &http.Client{
|
||
Timeout: 30 * time.Second,
|
||
}
|
||
|
||
resp, err := client.Get(url)
|
||
if err != nil {
|
||
return nil, "", err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, "", fmt.Errorf("HTTP请求失败,状态码: %d", resp.StatusCode)
|
||
}
|
||
|
||
// 读取响应体前几个字节来判断图片格式
|
||
peekBytes := make([]byte, 512)
|
||
n, err := resp.Body.Read(peekBytes)
|
||
if err != nil && err != io.EOF {
|
||
return nil, "", err
|
||
}
|
||
|
||
// 创建一个新的Reader,包含已读取的数据和剩余数据
|
||
reader := io.MultiReader(strings.NewReader(string(peekBytes[:n])), resp.Body)
|
||
|
||
// 根据文件头识别图片格式
|
||
contentType := http.DetectContentType(peekBytes[:n])
|
||
fmt.Printf("检测到的Content-Type: %s\n", contentType)
|
||
|
||
var img image.Image
|
||
var format string
|
||
|
||
// 根据Content-Type或文件扩展名选择解码器
|
||
switch {
|
||
case strings.Contains(contentType, "jpeg") || strings.HasSuffix(strings.ToLower(url), ".jpg") || strings.HasSuffix(strings.ToLower(url), ".jpeg"):
|
||
img, err = jpeg.Decode(reader)
|
||
format = "JPEG"
|
||
case strings.Contains(contentType, "png") || strings.HasSuffix(strings.ToLower(url), ".png"):
|
||
img, err = png.Decode(reader)
|
||
format = "PNG"
|
||
case strings.Contains(contentType, "webp") || strings.HasSuffix(strings.ToLower(url), ".webp"):
|
||
img, err = webp.Decode(reader)
|
||
format = "WEBP"
|
||
case strings.Contains(contentType, "bmp") || strings.HasSuffix(strings.ToLower(url), ".bmp"):
|
||
img, err = bmp.Decode(reader)
|
||
format = "BMP"
|
||
case strings.Contains(contentType, "tiff") || strings.HasSuffix(strings.ToLower(url), ".tiff") || strings.HasSuffix(strings.ToLower(url), ".tif"):
|
||
img, err = tiff.Decode(reader)
|
||
format = "TIFF"
|
||
default:
|
||
// 尝试通用解码
|
||
img, format, err = image.Decode(reader)
|
||
if err != nil {
|
||
return nil, "", fmt.Errorf("不支持的图片格式: %s, 错误: %v", contentType, err)
|
||
}
|
||
}
|
||
|
||
if err != nil {
|
||
return nil, "", fmt.Errorf("解码图片失败: %v", err)
|
||
}
|
||
|
||
return img, format, nil
|
||
}
|
||
|
||
// 高质量等比例缩放到指定高度
|
||
func resizeToHeightHighQuality(src image.Image, targetHeight int) image.Image {
|
||
bounds := src.Bounds()
|
||
srcWidth := bounds.Dx()
|
||
srcHeight := bounds.Dy()
|
||
|
||
// 如果原图高度已经小于等于目标高度,且宽度合适,可以直接返回
|
||
//if srcHeight <= targetHeight {
|
||
// return src
|
||
//}
|
||
|
||
// 计算等比例缩放后的宽度
|
||
targetWidth := uint(float64(srcWidth) * float64(targetHeight) / float64(srcHeight))
|
||
|
||
// 使用 Lanczos3 插值算法进行高质量缩放
|
||
return resize.Resize(targetWidth, uint(targetHeight), src, resize.Lanczos3)
|
||
}
|
||
|
||
// 创建居中图片(将原图放在指定大小的透明背景中央)
|
||
func createCenteredImage(src image.Image, width, height int, transparent bool) *image.RGBA {
|
||
// 创建透明背景
|
||
dst := image.NewRGBA(image.Rect(0, 0, width, height))
|
||
|
||
// 设置背景颜色
|
||
var bgColor color.Color
|
||
if transparent {
|
||
bgColor = color.RGBA{0, 0, 0, 0} // 透明
|
||
} else {
|
||
bgColor = color.RGBA{255, 255, 255, 255} // 白色
|
||
}
|
||
|
||
// 填充透明背景
|
||
//transparent := color.RGBA{0, 0, 0, 0}
|
||
draw.Draw(dst, dst.Bounds(), &image.Uniform{bgColor}, image.Point{}, draw.Src)
|
||
|
||
// 计算居中位置
|
||
srcBounds := src.Bounds()
|
||
srcWidth := srcBounds.Dx()
|
||
srcHeight := srcBounds.Dy()
|
||
|
||
x := (width - srcWidth) / 2
|
||
y := (height - srcHeight) / 2
|
||
|
||
// 将原图绘制到中央
|
||
draw.Draw(dst, image.Rect(x, y, x+srcWidth, y+srcHeight), src, image.Point{}, draw.Over)
|
||
|
||
return dst
|
||
}
|
||
|
||
// 保存为PNG图片
|
||
func savePNG(img image.Image, filename string) error {
|
||
file, err := os.Create(filename)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer file.Close()
|
||
|
||
return png.Encode(file, img)
|
||
}
|
||
|
||
// 保存为JPG图片
|
||
func saveJPG(img image.Image, filename string, quality int) error {
|
||
file, err := os.Create(filename)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer file.Close()
|
||
|
||
// 设置JPEG编码选项
|
||
options := &jpeg.Options{
|
||
Quality: quality, // 1-100,越高质量越好
|
||
}
|
||
|
||
return jpeg.Encode(file, img, options)
|
||
}
|
||
|
||
// 清理文件名中的非法字符
|
||
func sanitizeFilename(filename string) string {
|
||
// 替换Windows文件名中不允许的字符
|
||
invalidChars := []string{"\\", "/", ":", "*", "?", "\"", "<", ">", "|"}
|
||
for _, char := range invalidChars {
|
||
filename = strings.ReplaceAll(filename, char, "_")
|
||
}
|
||
// 移除或替换其他可能的问题字符
|
||
filename = strings.TrimSpace(filename)
|
||
if filename == "" {
|
||
filename = "unknown"
|
||
}
|
||
return filename
|
||
}
|
||
|
||
// 从数据库获取记录
|
||
func getRecords(db *sql.DB) ([]CrawlerRecord, error) {
|
||
// 查询所有记录,包括 NULL 值
|
||
query := "SELECT book_isbn, book_picture FROM dk_crawler_record_info"
|
||
rows, err := db.Query(query)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer rows.Close()
|
||
|
||
var records []CrawlerRecord
|
||
for rows.Next() {
|
||
var record CrawlerRecord
|
||
// 使用 sql.NullString 来接收可能为 NULL 的字段
|
||
err := rows.Scan(&record.BookISBN, &record.BookPicture)
|
||
if err != nil {
|
||
fmt.Printf("扫描记录失败: %v\n", err)
|
||
continue
|
||
}
|
||
records = append(records, record)
|
||
}
|
||
|
||
// 检查遍历过程中是否有错误
|
||
if err = rows.Err(); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
return records, nil
|
||
}
|
||
|
||
// FindIDsByISBN 根据 ISBN 查询文档 ID 列表
|
||
func (es *ESClient) FindIDsByISBN(index, isbn string) (string, error) {
|
||
q := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"term": map[string]interface{}{"isbn": isbn},
|
||
},
|
||
"_source": false,
|
||
"size": 1000,
|
||
}
|
||
b, _ := json.Marshal(q)
|
||
res, err := es.client.Search(
|
||
es.client.Search.WithIndex(index),
|
||
es.client.Search.WithBody(strings.NewReader(string(b))),
|
||
es.client.Search.WithContext(context.Background()),
|
||
)
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
defer res.Body.Close()
|
||
if res.IsError() {
|
||
return "", fmt.Errorf("搜索失败: %s", res.String())
|
||
}
|
||
var r map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
|
||
return "", err
|
||
}
|
||
hits, _ := r["hits"].(map[string]interface{})
|
||
arr, _ := hits["hits"].([]interface{})
|
||
var ids string
|
||
for _, h := range arr {
|
||
m, _ := h.(map[string]interface{})
|
||
id, _ := m["_id"].(string)
|
||
if id != "" {
|
||
//ids = append(ids, id)
|
||
ids = id
|
||
}
|
||
}
|
||
return ids, nil
|
||
}
|
||
|
||
func (es *ESClient) UpdateBookPicsByID(index, id, localImageS, pngImageUrl, jpgImageUrl string) error {
|
||
bookPicJSON, err := json.Marshal(map[string]string{
|
||
"localPath": localImageS,
|
||
"pddPath": jpgImageUrl,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("序列化 book_pic_w 失败: %w", err)
|
||
}
|
||
|
||
bookPicBJSON, err := json.Marshal(map[string]string{
|
||
"localPath": localImageS,
|
||
"pddResponse": pngImageUrl,
|
||
})
|
||
if err != nil {
|
||
return fmt.Errorf("序列化 book_pic_b 失败: %w", err)
|
||
}
|
||
// 构建更新文档
|
||
payload := map[string]interface{}{
|
||
"doc": map[string]string{
|
||
"book_pic": string(bookPicJSON),
|
||
"book_pic_b": string(bookPicBJSON),
|
||
},
|
||
}
|
||
// JSON 序列化整个更新请求
|
||
body, err := json.Marshal(payload)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化更新请求失败: %w", err)
|
||
}
|
||
req := esapi.UpdateRequest{
|
||
Index: index,
|
||
DocumentID: id,
|
||
Body: strings.NewReader(string(body)),
|
||
}
|
||
res, err := req.Do(context.Background(), es.client)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
defer res.Body.Close()
|
||
if res.IsError() {
|
||
data, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES 更新失败: %s", data)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// 从 sql.NullString 获取字符串值
|
||
func getStringValue(nullString sql.NullString) string {
|
||
if nullString.Valid {
|
||
return nullString.String
|
||
}
|
||
return "NULL"
|
||
}
|
||
|
||
func main() {
|
||
//// 获取token
|
||
//token, err := GetPddToken()
|
||
//if err != nil {
|
||
// fmt.Errorf("获取拼多多token失败: %v", err)
|
||
//}
|
||
//fmt.Println("token=", token)
|
||
//// 数据源名称格式
|
||
//dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
|
||
// "root",
|
||
// "123456",
|
||
// "localhost",
|
||
// 3306,
|
||
// "book_image")
|
||
//db, err := sql.Open("mysql", dsn)
|
||
//if err != nil {
|
||
// fmt.Printf("打开数据库连接失败: %v", err)
|
||
//}
|
||
//// 设置连接池参数
|
||
//db.SetMaxOpenConns(20) // 最大打开连接数
|
||
//db.SetMaxIdleConns(10)
|
||
//err = db.Ping()
|
||
//if err != nil {
|
||
// fmt.Printf("数据库连接测试失败: %v", err)
|
||
//}
|
||
//
|
||
//// 查询数据
|
||
//records, err := getRecords(db)
|
||
//if err != nil {
|
||
// fmt.Printf("查询失败: %v", err)
|
||
//}
|
||
//imageDir := "D:\\image"
|
||
//err = os.MkdirAll(imageDir, 0755)
|
||
//if err != nil {
|
||
// fmt.Sprintf("创建目录失败: %v", err)
|
||
//}
|
||
//fmt.Printf("找到 %d 条记录需要处理\n", len(records))
|
||
//
|
||
//es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
|
||
//if err != nil {
|
||
// log.Fatalf("ES 连接失败: %v", err)
|
||
//}
|
||
//if err := es.CheckHealth(); err != nil {
|
||
// log.Fatalf("ES 健康检查失败: %v", err)
|
||
//}
|
||
//// 启动并发处理
|
||
//results := processRecordsConcurrently(records, imageDir, es, maxWorkers, token)
|
||
//
|
||
//// 输出最终统计
|
||
//printFinalStatistics(results)
|
||
|
||
//mainQuerySaleISBNs()
|
||
//mainFindESOnlyISBNs()
|
||
mainQuerySaleISBNsWithEmptyPic()
|
||
}
|
||
|
||
// 查询并导出有销售记录且book_pic字符串中pddPath为空的ISBN
|
||
func queryAndExportSaleISBNs(es *ESClient, outputFile string) error {
|
||
log.Printf("开始查询有销售记录且book_pic字符串中pddPath为空的ISBN...")
|
||
|
||
// 使用其他字段排序,比如 isbn 字段或者时间字段
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": []map[string]interface{}{
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"should": []map[string]interface{}{
|
||
{"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}},
|
||
{"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}},
|
||
{"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}},
|
||
{"range": map[string]interface{}{"day_sale_60": map[string]interface{}{"gt": 0}}},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"should": []map[string]interface{}{
|
||
// 匹配 pddPath:"" 的JSON字符串
|
||
{"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\"\".*"}},
|
||
// 匹配 pddPath: "" (带空格的)
|
||
{"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\\s*\"\".*"}},
|
||
// 匹配整个book_pic字段为空
|
||
{"term": map[string]interface{}{"book_pic": ""}},
|
||
// 匹配book_pic字段不存在
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"must_not": map[string]interface{}{
|
||
"exists": map[string]interface{}{"field": "book_pic"},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"_source": []string{"isbn"},
|
||
"sort": []map[string]interface{}{
|
||
{"isbn": "asc"}, // 使用 isbn 字段排序,或者使用其他可排序字段
|
||
},
|
||
"size": 10000,
|
||
}
|
||
|
||
// 打印查询条件用于验证
|
||
queryJSON, _ := json.MarshalIndent(query, "", " ")
|
||
log.Printf("查询条件:\n%s", string(queryJSON))
|
||
|
||
var allISBNs []string
|
||
var searchAfter interface{}
|
||
totalCount := 0
|
||
page := 1
|
||
|
||
for {
|
||
// 复制基础查询
|
||
currentQuery := make(map[string]interface{})
|
||
for k, v := range query {
|
||
currentQuery[k] = v
|
||
}
|
||
|
||
// 添加游标
|
||
if searchAfter != nil {
|
||
currentQuery["search_after"] = searchAfter
|
||
}
|
||
|
||
body, err := json.Marshal(currentQuery)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化查询失败: %w", err)
|
||
}
|
||
|
||
log.Printf("执行第 %d 页查询...", page)
|
||
|
||
// 执行搜索
|
||
res, err := es.client.Search(
|
||
es.client.Search.WithIndex(esIndex),
|
||
es.client.Search.WithBody(strings.NewReader(string(body))),
|
||
es.client.Search.WithContext(context.Background()),
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("ES搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
|
||
}
|
||
|
||
// 读取并解析响应体
|
||
bodyBytes, err := io.ReadAll(res.Body)
|
||
if err != nil {
|
||
return fmt.Errorf("读取响应体失败: %w", err)
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.Unmarshal(bodyBytes, &result); err != nil {
|
||
return fmt.Errorf("解析ES响应失败: %w", err)
|
||
}
|
||
|
||
// 检查是否有错误
|
||
if errMsg, exists := result["error"]; exists {
|
||
return fmt.Errorf("ES返回错误: %v", errMsg)
|
||
}
|
||
|
||
hits, ok := result["hits"].(map[string]interface{})
|
||
if !ok {
|
||
return fmt.Errorf("无法解析hits字段")
|
||
}
|
||
|
||
// 获取总命中数
|
||
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
|
||
if totalValue, exists := totalHits["value"]; exists {
|
||
log.Printf("ES返回总命中数: %.0f", totalValue)
|
||
}
|
||
}
|
||
|
||
hitList, ok := hits["hits"].([]interface{})
|
||
if !ok || len(hitList) == 0 {
|
||
log.Printf("第 %d 页没有数据,查询完成", page)
|
||
break // 没有更多数据
|
||
}
|
||
|
||
// 处理当前批次的数据
|
||
batchCount := 0
|
||
for _, hit := range hitList {
|
||
hitMap, ok := hit.(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析hit数据")
|
||
continue
|
||
}
|
||
|
||
source, ok := hitMap["_source"].(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析_source字段")
|
||
continue
|
||
}
|
||
|
||
isbn, ok := source["isbn"].(string)
|
||
if ok && isbn != "" {
|
||
allISBNs = append(allISBNs, isbn)
|
||
batchCount++
|
||
} else {
|
||
log.Printf("警告: 跳过空的ISBN字段")
|
||
}
|
||
|
||
// 更新游标(使用最后一个文档的排序值)
|
||
sortValues, ok := hitMap["sort"].([]interface{})
|
||
if ok && len(sortValues) > 0 {
|
||
searchAfter = sortValues
|
||
}
|
||
}
|
||
|
||
totalCount += batchCount
|
||
log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount)
|
||
page++
|
||
|
||
// 如果返回的数量小于请求的数量,说明已经是最后一页
|
||
if len(hitList) < 10000 {
|
||
log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList))
|
||
break
|
||
}
|
||
|
||
// 添加短暂延迟,避免对ES造成过大压力
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
if len(allISBNs) == 0 {
|
||
return fmt.Errorf("没有找到符合条件的ISBN记录")
|
||
}
|
||
|
||
// 去重
|
||
isbnSet := make(map[string]bool)
|
||
uniqueISBNs := make([]string, 0)
|
||
for _, isbn := range allISBNs {
|
||
if !isbnSet[isbn] {
|
||
isbnSet[isbn] = true
|
||
uniqueISBNs = append(uniqueISBNs, isbn)
|
||
}
|
||
}
|
||
|
||
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
|
||
|
||
// 确保输出目录存在
|
||
outputDir := filepath.Dir(outputFile)
|
||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||
return fmt.Errorf("创建输出目录失败: %w", err)
|
||
}
|
||
|
||
// 写入文件
|
||
file, err := os.Create(outputFile)
|
||
if err != nil {
|
||
return fmt.Errorf("创建文件失败: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// 写入文件头信息
|
||
header := fmt.Sprintf(`# 有销售记录且book_pic字符串中pddPath为空的ISBN列表
|
||
# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0 OR day_sale_60 > 0) AND (book_pic包含"pddPath":"" 或 book_pic为空 或 book_pic字段不存在)
|
||
# 索引: %s
|
||
# 统计时间: %s
|
||
# 总记录数: %d
|
||
|
||
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
|
||
|
||
if _, err := file.WriteString(header); err != nil {
|
||
return fmt.Errorf("写入文件头失败: %w", err)
|
||
}
|
||
|
||
// 按字母顺序排序后写入
|
||
sort.Strings(uniqueISBNs)
|
||
successCount := 0
|
||
for _, isbn := range uniqueISBNs {
|
||
if _, err := file.WriteString(isbn + "\n"); err != nil {
|
||
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
|
||
continue
|
||
}
|
||
successCount++
|
||
}
|
||
|
||
log.Printf("成功导出 %d/%d 个有销售记录且book_pic字符串中pddPath为空的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
|
||
return nil
|
||
}
|
||
|
||
// 查询并导出销售ISBN的主函数
|
||
func mainQuerySaleISBNs() {
|
||
// 初始化ES客户端
|
||
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
|
||
if err != nil {
|
||
log.Fatalf("ES连接失败: %v", err)
|
||
}
|
||
|
||
// 检查ES健康状态
|
||
if err := es.CheckHealth(); err != nil {
|
||
log.Fatalf("ES健康检查失败: %v", err)
|
||
}
|
||
|
||
// 输出文件路径
|
||
outputFile := "cmd/update_es_gt/all_isbns.txt"
|
||
|
||
// 查询并导出ISBN
|
||
startTime := time.Now()
|
||
if err := exportAllISBNs(es, outputFile); err != nil {
|
||
log.Fatalf("导出销售ISBN失败: %v", err)
|
||
}
|
||
|
||
elapsed := time.Since(startTime)
|
||
log.Printf("任务完成!耗时: %v,ISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile)
|
||
}
|
||
|
||
// 导出所有ISBN到txt文件
|
||
func exportAllISBNs(es *ESClient, outputFile string) error {
|
||
log.Printf("开始导出所有ISBN...")
|
||
|
||
// 查询所有包含isbn字段的文档
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"exists": map[string]interface{}{
|
||
"field": "isbn",
|
||
},
|
||
},
|
||
"_source": []string{"isbn"},
|
||
"sort": []map[string]interface{}{
|
||
{"isbn": "asc"}, // 按ISBN排序
|
||
},
|
||
"size": 10000,
|
||
}
|
||
|
||
var allISBNs []string
|
||
var searchAfter interface{}
|
||
totalCount := 0
|
||
page := 1
|
||
|
||
for {
|
||
// 复制基础查询
|
||
currentQuery := make(map[string]interface{})
|
||
for k, v := range query {
|
||
currentQuery[k] = v
|
||
}
|
||
|
||
// 添加游标
|
||
if searchAfter != nil {
|
||
currentQuery["search_after"] = searchAfter
|
||
}
|
||
|
||
body, err := json.Marshal(currentQuery)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化查询失败: %w", err)
|
||
}
|
||
|
||
log.Printf("执行第 %d 页查询...", page)
|
||
|
||
// 执行搜索
|
||
res, err := es.client.Search(
|
||
es.client.Search.WithIndex(esIndex),
|
||
es.client.Search.WithBody(strings.NewReader(string(body))),
|
||
es.client.Search.WithContext(context.Background()),
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("ES搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
|
||
}
|
||
|
||
// 读取并解析响应体
|
||
bodyBytes, err := io.ReadAll(res.Body)
|
||
if err != nil {
|
||
return fmt.Errorf("读取响应体失败: %w", err)
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.Unmarshal(bodyBytes, &result); err != nil {
|
||
return fmt.Errorf("解析ES响应失败: %w", err)
|
||
}
|
||
|
||
// 检查是否有错误
|
||
if errMsg, exists := result["error"]; exists {
|
||
return fmt.Errorf("ES返回错误: %v", errMsg)
|
||
}
|
||
|
||
hits, ok := result["hits"].(map[string]interface{})
|
||
if !ok {
|
||
return fmt.Errorf("无法解析hits字段")
|
||
}
|
||
|
||
// 获取总命中数
|
||
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
|
||
if totalValue, exists := totalHits["value"]; exists {
|
||
if page == 1 {
|
||
log.Printf("ES索引中共有 %.0f 条包含ISBN的记录", totalValue)
|
||
}
|
||
}
|
||
}
|
||
|
||
hitList, ok := hits["hits"].([]interface{})
|
||
if !ok || len(hitList) == 0 {
|
||
log.Printf("第 %d 页没有数据,查询完成", page)
|
||
break // 没有更多数据
|
||
}
|
||
|
||
// 处理当前批次的数据
|
||
batchCount := 0
|
||
for _, hit := range hitList {
|
||
hitMap, ok := hit.(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析hit数据")
|
||
continue
|
||
}
|
||
|
||
source, ok := hitMap["_source"].(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析_source字段")
|
||
continue
|
||
}
|
||
|
||
isbn, ok := source["isbn"].(string)
|
||
if ok && isbn != "" {
|
||
allISBNs = append(allISBNs, isbn)
|
||
batchCount++
|
||
} else {
|
||
log.Printf("警告: 跳过空的ISBN字段")
|
||
}
|
||
|
||
// 更新游标(使用最后一个文档的排序值)
|
||
sortValues, ok := hitMap["sort"].([]interface{})
|
||
if ok && len(sortValues) > 0 {
|
||
searchAfter = sortValues
|
||
}
|
||
}
|
||
|
||
totalCount += batchCount
|
||
log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount)
|
||
page++
|
||
|
||
// 如果返回的数量小于请求的数量,说明已经是最后一页
|
||
if len(hitList) < 10000 {
|
||
log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList))
|
||
break
|
||
}
|
||
|
||
// 添加短暂延迟,避免对ES造成过大压力
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
if len(allISBNs) == 0 {
|
||
return fmt.Errorf("没有找到包含ISBN字段的记录")
|
||
}
|
||
|
||
// 去重
|
||
isbnSet := make(map[string]bool)
|
||
uniqueISBNs := make([]string, 0)
|
||
for _, isbn := range allISBNs {
|
||
if !isbnSet[isbn] {
|
||
isbnSet[isbn] = true
|
||
uniqueISBNs = append(uniqueISBNs, isbn)
|
||
}
|
||
}
|
||
|
||
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
|
||
|
||
// 确保输出目录存在
|
||
outputDir := filepath.Dir(outputFile)
|
||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||
return fmt.Errorf("创建输出目录失败: %w", err)
|
||
}
|
||
|
||
// 写入文件
|
||
file, err := os.Create(outputFile)
|
||
if err != nil {
|
||
return fmt.Errorf("创建文件失败: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// 写入文件头信息
|
||
header := fmt.Sprintf(`# 所有ISBN列表
|
||
# 索引: %s
|
||
# 导出时间: %s
|
||
# 总记录数: %d
|
||
|
||
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
|
||
|
||
if _, err := file.WriteString(header); err != nil {
|
||
return fmt.Errorf("写入文件头失败: %w", err)
|
||
}
|
||
|
||
// 按字母顺序排序后写入
|
||
sort.Strings(uniqueISBNs)
|
||
successCount := 0
|
||
for _, isbn := range uniqueISBNs {
|
||
if _, err := file.WriteString(isbn + "\n"); err != nil {
|
||
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
|
||
continue
|
||
}
|
||
successCount++
|
||
}
|
||
|
||
log.Printf("成功导出 %d/%d 个ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
|
||
return nil
|
||
}
|
||
|
||
// 从ES获取所有ISBN
|
||
func getAllISBNsFromES(es *ESClient) ([]string, error) {
|
||
log.Printf("开始从ES索引 %s 获取所有ISBN...", esIndex)
|
||
|
||
var allISBNs []string
|
||
var searchAfter interface{}
|
||
totalCount := 0
|
||
page := 1
|
||
|
||
for {
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"exists": map[string]interface{}{
|
||
"field": "isbn",
|
||
},
|
||
},
|
||
"_source": []string{"isbn"},
|
||
"sort": []map[string]interface{}{
|
||
{"isbn": "asc"},
|
||
},
|
||
"size": 10000,
|
||
}
|
||
|
||
// 添加游标
|
||
if searchAfter != nil {
|
||
query["search_after"] = searchAfter
|
||
}
|
||
|
||
body, err := json.Marshal(query)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("序列化查询失败: %w", err)
|
||
}
|
||
|
||
log.Printf("执行第 %d 页ES查询...", page)
|
||
|
||
// 执行搜索
|
||
res, err := es.client.Search(
|
||
es.client.Search.WithIndex(esIndex),
|
||
es.client.Search.WithBody(strings.NewReader(string(body))),
|
||
es.client.Search.WithContext(context.Background()),
|
||
)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("ES搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return nil, fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
|
||
}
|
||
|
||
// 解析响应
|
||
var result map[string]interface{}
|
||
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
|
||
return nil, fmt.Errorf("解析ES响应失败: %w", err)
|
||
}
|
||
|
||
hits, ok := result["hits"].(map[string]interface{})
|
||
if !ok {
|
||
return nil, fmt.Errorf("无法解析hits字段")
|
||
}
|
||
|
||
hitList, ok := hits["hits"].([]interface{})
|
||
if !ok || len(hitList) == 0 {
|
||
log.Printf("第 %d 页没有数据,查询完成", page)
|
||
break
|
||
}
|
||
|
||
// 处理当前批次的数据
|
||
batchCount := 0
|
||
for _, hit := range hitList {
|
||
hitMap, ok := hit.(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析hit数据")
|
||
continue
|
||
}
|
||
|
||
source, ok := hitMap["_source"].(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析_source字段")
|
||
continue
|
||
}
|
||
|
||
isbn, ok := source["isbn"].(string)
|
||
if ok && isbn != "" {
|
||
allISBNs = append(allISBNs, isbn)
|
||
batchCount++
|
||
}
|
||
|
||
// 更新游标
|
||
sortValues, ok := hitMap["sort"].([]interface{})
|
||
if ok && len(sortValues) > 0 {
|
||
searchAfter = sortValues
|
||
}
|
||
}
|
||
|
||
totalCount += batchCount
|
||
log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount)
|
||
page++
|
||
|
||
// 如果返回的数量小于请求的数量,说明已经是最后一页
|
||
if len(hitList) < 10000 {
|
||
log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList))
|
||
break
|
||
}
|
||
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
if len(allISBNs) == 0 {
|
||
return nil, fmt.Errorf("ES中没有找到包含ISBN字段的记录")
|
||
}
|
||
|
||
log.Printf("从ES中获取到 %d 个ISBN", len(allISBNs))
|
||
return allISBNs, nil
|
||
}
|
||
|
||
// 批量检查ISBN在数据库中是否存在
|
||
func checkISBNsInDB(db *sql.DB, isbns []string) (map[string]bool, error) {
|
||
log.Printf("开始检查 %d 个ISBN在数据库中的存在情况...", len(isbns))
|
||
|
||
existsMap := make(map[string]bool)
|
||
|
||
// 分批处理,避免SQL语句过长
|
||
batchSize := 1000
|
||
totalBatches := (len(isbns) + batchSize - 1) / batchSize
|
||
|
||
for batch := 0; batch < totalBatches; batch++ {
|
||
start := batch * batchSize
|
||
end := start + batchSize
|
||
if end > len(isbns) {
|
||
end = len(isbns)
|
||
}
|
||
|
||
batchISBNs := isbns[start:end]
|
||
log.Printf("处理数据库批次 %d/%d: ISBN范围 %d-%d", batch+1, totalBatches, start+1, end)
|
||
|
||
// 构建IN查询的占位符
|
||
placeholders := make([]string, len(batchISBNs))
|
||
args := make([]interface{}, len(batchISBNs))
|
||
for i, isbn := range batchISBNs {
|
||
placeholders[i] = "?"
|
||
args[i] = isbn
|
||
}
|
||
|
||
query := fmt.Sprintf(
|
||
"SELECT isbn FROM xgy_base_item WHERE isbn IN (%s)",
|
||
strings.Join(placeholders, ","),
|
||
)
|
||
|
||
rows, err := db.Query(query, args...)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("数据库查询失败: %w", err)
|
||
}
|
||
|
||
// 读取存在的ISBN
|
||
for rows.Next() {
|
||
var isbn string
|
||
if err := rows.Scan(&isbn); err != nil {
|
||
rows.Close()
|
||
return nil, fmt.Errorf("扫描ISBN失败: %w", err)
|
||
}
|
||
existsMap[isbn] = true
|
||
}
|
||
rows.Close()
|
||
|
||
if err = rows.Err(); err != nil {
|
||
return nil, fmt.Errorf("遍历数据库行时出错: %w", err)
|
||
}
|
||
|
||
// 添加延迟避免对数据库造成压力
|
||
if batch < totalBatches-1 {
|
||
time.Sleep(50 * time.Millisecond)
|
||
}
|
||
}
|
||
|
||
log.Printf("数据库中存在 %d 个匹配的ISBN", len(existsMap))
|
||
return existsMap, nil
|
||
}
|
||
|
||
// 找出数据库中不存在的ISBN(ES中有但数据库中没有)
|
||
func findESOnlyISBNs(esISBNs []string, dbExistsMap map[string]bool) []string {
|
||
var esOnlyISBNs []string
|
||
|
||
for _, isbn := range esISBNs {
|
||
if !dbExistsMap[isbn] {
|
||
esOnlyISBNs = append(esOnlyISBNs, isbn)
|
||
}
|
||
}
|
||
|
||
log.Printf("ES中有 %d 个ISBN在数据库中不存在", len(esOnlyISBNs))
|
||
return esOnlyISBNs
|
||
}
|
||
|
||
// 导出ES独有ISBN到txt文件
|
||
func exportESOnlyISBNs(esOnlyISBNs []string, outputFile string) error {
|
||
if len(esOnlyISBNs) == 0 {
|
||
log.Printf("没有ES独有的ISBN需要导出")
|
||
return nil
|
||
}
|
||
|
||
// 确保输出目录存在
|
||
outputDir := filepath.Dir(outputFile)
|
||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||
return fmt.Errorf("创建输出目录失败: %w", err)
|
||
}
|
||
|
||
// 写入文件
|
||
file, err := os.Create(outputFile)
|
||
if err != nil {
|
||
return fmt.Errorf("创建文件失败: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// 写入文件头信息
|
||
header := fmt.Sprintf(`# ES中有但数据库中没有的ISBN列表
|
||
# 数据库表: xgy_base_item
|
||
# ES索引: %s
|
||
# 导出时间: %s
|
||
# 记录数: %d
|
||
# 说明: 这些ISBN在ES索引中存在但在数据库表中不存在
|
||
|
||
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(esOnlyISBNs))
|
||
|
||
if _, err := file.WriteString(header); err != nil {
|
||
return fmt.Errorf("写入文件头失败: %w", err)
|
||
}
|
||
|
||
// 按字母顺序排序后写入
|
||
sort.Strings(esOnlyISBNs)
|
||
successCount := 0
|
||
for _, isbn := range esOnlyISBNs {
|
||
if _, err := file.WriteString(isbn + "\n"); err != nil {
|
||
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
|
||
continue
|
||
}
|
||
successCount++
|
||
}
|
||
|
||
log.Printf("成功导出 %d/%d 个ES独有ISBN到文件: %s", successCount, len(esOnlyISBNs), outputFile)
|
||
return nil
|
||
}
|
||
|
||
// 主函数:查询ES中有但数据库中没有的ISBN
|
||
func mainFindESOnlyISBNs() {
|
||
// 初始化数据库连接
|
||
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
|
||
"root",
|
||
"123456",
|
||
"localhost",
|
||
3306,
|
||
"book_image") // 请根据实际情况修改数据库名
|
||
|
||
db, err := sql.Open("mysql", dsn)
|
||
if err != nil {
|
||
log.Fatalf("打开数据库连接失败: %v", err)
|
||
}
|
||
defer db.Close()
|
||
|
||
// 设置连接池参数
|
||
db.SetMaxOpenConns(20)
|
||
db.SetMaxIdleConns(10)
|
||
|
||
// 测试数据库连接
|
||
if err := db.Ping(); err != nil {
|
||
log.Fatalf("数据库连接测试失败: %v", err)
|
||
}
|
||
|
||
// 初始化ES客户端
|
||
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
|
||
if err != nil {
|
||
log.Fatalf("ES连接失败: %v", err)
|
||
}
|
||
|
||
// 检查ES健康状态
|
||
if err := es.CheckHealth(); err != nil {
|
||
log.Fatalf("ES健康检查失败: %v", err)
|
||
}
|
||
|
||
startTime := time.Now()
|
||
log.Printf("开始处理ES与数据库的ISBN匹配...")
|
||
|
||
// 步骤1: 从ES获取所有ISBN
|
||
esISBNs, err := getAllISBNsFromES(es)
|
||
if err != nil {
|
||
log.Fatalf("获取ES ISBN失败: %v", err)
|
||
}
|
||
|
||
// 步骤2: 检查ISBN在数据库中的存在情况
|
||
dbExistsMap, err := checkISBNsInDB(db, esISBNs)
|
||
if err != nil {
|
||
log.Fatalf("检查数据库中ISBN存在情况失败: %v", err)
|
||
}
|
||
|
||
// 步骤3: 找出ES独有ISBN(ES中有但数据库中没有)
|
||
esOnlyISBNs := findESOnlyISBNs(esISBNs, dbExistsMap)
|
||
|
||
// 步骤4: 导出ES独有ISBN
|
||
outputFile := "cmd/update_es_gt/missing_isbns.txt"
|
||
if err := exportESOnlyISBNs(esOnlyISBNs, outputFile); err != nil {
|
||
log.Fatalf("导出ES独有ISBN失败: %v", err)
|
||
}
|
||
|
||
elapsed := time.Since(startTime)
|
||
|
||
// 输出统计信息
|
||
fmt.Printf("\n=== 处理完成 ===\n")
|
||
fmt.Printf("ES中ISBN总数: %d\n", len(esISBNs))
|
||
fmt.Printf("数据库中匹配的ISBN数: %d\n", len(dbExistsMap))
|
||
fmt.Printf("ES独有ISBN数(数据库中没有的): %d\n", len(esOnlyISBNs))
|
||
fmt.Printf("独有比例: %.2f%%\n", float64(len(esOnlyISBNs))/float64(len(esISBNs))*100)
|
||
fmt.Printf("耗时: %v\n", elapsed.Round(time.Millisecond))
|
||
fmt.Printf("输出文件: %s\n", outputFile)
|
||
|
||
// 显示部分ES独有ISBN示例
|
||
if len(esOnlyISBNs) > 0 {
|
||
fmt.Printf("\nES独有ISBN示例 (前10个):\n")
|
||
for i := 0; i < 10 && i < len(esOnlyISBNs); i++ {
|
||
fmt.Printf(" %s\n", esOnlyISBNs[i])
|
||
}
|
||
if len(esOnlyISBNs) > 10 {
|
||
fmt.Printf(" ... 还有 %d 个\n", len(esOnlyISBNs)-10)
|
||
}
|
||
}
|
||
}
|
||
|
||
// 查询并导出有销售记录且book_pic为空的ISBN
|
||
func queryAndExportSaleISBNsWithEmptyPic(es *ESClient, outputFile string) error {
|
||
log.Printf("开始查询有销售记录且book_pic为空的ISBN...")
|
||
|
||
// 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND book_pic为空
|
||
query := map[string]interface{}{
|
||
"query": map[string]interface{}{
|
||
"bool": map[string]interface{}{
|
||
"must": []map[string]interface{}{
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"should": []map[string]interface{}{
|
||
{"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}},
|
||
{"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}},
|
||
{"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}},
|
||
},
|
||
"minimum_should_match": 1,
|
||
},
|
||
},
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"should": []map[string]interface{}{
|
||
// 匹配book_pic字段为空
|
||
{"term": map[string]interface{}{"book_pic": ""}},
|
||
// 匹配book_pic字段不存在
|
||
{
|
||
"bool": map[string]interface{}{
|
||
"must_not": map[string]interface{}{
|
||
"exists": map[string]interface{}{"field": "book_pic"},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
},
|
||
"_source": []string{"isbn"},
|
||
"sort": []map[string]interface{}{
|
||
{"isbn": "asc"}, // 按ISBN排序
|
||
},
|
||
"size": 10000,
|
||
}
|
||
|
||
// 打印查询条件用于验证
|
||
queryJSON, _ := json.MarshalIndent(query, "", " ")
|
||
log.Printf("查询条件:\n%s", string(queryJSON))
|
||
|
||
var allISBNs []string
|
||
var searchAfter interface{}
|
||
totalCount := 0
|
||
page := 1
|
||
|
||
for {
|
||
// 复制基础查询
|
||
currentQuery := make(map[string]interface{})
|
||
for k, v := range query {
|
||
currentQuery[k] = v
|
||
}
|
||
|
||
// 添加游标
|
||
if searchAfter != nil {
|
||
currentQuery["search_after"] = searchAfter
|
||
}
|
||
|
||
body, err := json.Marshal(currentQuery)
|
||
if err != nil {
|
||
return fmt.Errorf("序列化查询失败: %w", err)
|
||
}
|
||
|
||
log.Printf("执行第 %d 页查询...", page)
|
||
|
||
// 执行搜索
|
||
res, err := es.client.Search(
|
||
es.client.Search.WithIndex(esIndex),
|
||
es.client.Search.WithBody(strings.NewReader(string(body))),
|
||
es.client.Search.WithContext(context.Background()),
|
||
)
|
||
if err != nil {
|
||
return fmt.Errorf("ES搜索失败: %w", err)
|
||
}
|
||
defer res.Body.Close()
|
||
|
||
if res.IsError() {
|
||
bodyBytes, _ := io.ReadAll(res.Body)
|
||
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
|
||
}
|
||
|
||
// 读取并解析响应体
|
||
bodyBytes, err := io.ReadAll(res.Body)
|
||
if err != nil {
|
||
return fmt.Errorf("读取响应体失败: %w", err)
|
||
}
|
||
|
||
var result map[string]interface{}
|
||
if err := json.Unmarshal(bodyBytes, &result); err != nil {
|
||
return fmt.Errorf("解析ES响应失败: %w", err)
|
||
}
|
||
|
||
// 检查是否有错误
|
||
if errMsg, exists := result["error"]; exists {
|
||
return fmt.Errorf("ES返回错误: %v", errMsg)
|
||
}
|
||
|
||
hits, ok := result["hits"].(map[string]interface{})
|
||
if !ok {
|
||
return fmt.Errorf("无法解析hits字段")
|
||
}
|
||
|
||
// 获取总命中数
|
||
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
|
||
if totalValue, exists := totalHits["value"]; exists {
|
||
log.Printf("ES返回总命中数: %.0f", totalValue)
|
||
}
|
||
}
|
||
|
||
hitList, ok := hits["hits"].([]interface{})
|
||
if !ok || len(hitList) == 0 {
|
||
log.Printf("第 %d 页没有数据,查询完成", page)
|
||
break // 没有更多数据
|
||
}
|
||
|
||
// 处理当前批次的数据
|
||
batchCount := 0
|
||
for _, hit := range hitList {
|
||
hitMap, ok := hit.(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析hit数据")
|
||
continue
|
||
}
|
||
|
||
source, ok := hitMap["_source"].(map[string]interface{})
|
||
if !ok {
|
||
log.Printf("警告: 无法解析_source字段")
|
||
continue
|
||
}
|
||
|
||
isbn, ok := source["isbn"].(string)
|
||
if ok && isbn != "" {
|
||
allISBNs = append(allISBNs, isbn)
|
||
batchCount++
|
||
} else {
|
||
log.Printf("警告: 跳过空的ISBN字段")
|
||
}
|
||
|
||
// 更新游标(使用最后一个文档的排序值)
|
||
sortValues, ok := hitMap["sort"].([]interface{})
|
||
if ok && len(sortValues) > 0 {
|
||
searchAfter = sortValues
|
||
}
|
||
}
|
||
|
||
totalCount += batchCount
|
||
log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount)
|
||
page++
|
||
|
||
// 如果返回的数量小于请求的数量,说明已经是最后一页
|
||
if len(hitList) < 10000 {
|
||
log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList))
|
||
break
|
||
}
|
||
|
||
// 添加短暂延迟,避免对ES造成过大压力
|
||
time.Sleep(100 * time.Millisecond)
|
||
}
|
||
|
||
if len(allISBNs) == 0 {
|
||
return fmt.Errorf("没有找到符合条件的ISBN记录")
|
||
}
|
||
|
||
// 去重
|
||
isbnSet := make(map[string]bool)
|
||
uniqueISBNs := make([]string, 0)
|
||
for _, isbn := range allISBNs {
|
||
if !isbnSet[isbn] {
|
||
isbnSet[isbn] = true
|
||
uniqueISBNs = append(uniqueISBNs, isbn)
|
||
}
|
||
}
|
||
|
||
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
|
||
|
||
// 确保输出目录存在
|
||
outputDir := filepath.Dir(outputFile)
|
||
if err := os.MkdirAll(outputDir, 0755); err != nil {
|
||
return fmt.Errorf("创建输出目录失败: %w", err)
|
||
}
|
||
|
||
// 写入文件
|
||
file, err := os.Create(outputFile)
|
||
if err != nil {
|
||
return fmt.Errorf("创建文件失败: %w", err)
|
||
}
|
||
defer file.Close()
|
||
|
||
// 写入文件头信息
|
||
header := fmt.Sprintf(`# 有销售记录且book_pic为空的ISBN列表
|
||
# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND (book_pic为空 或 book_pic字段不存在)
|
||
# 索引: %s
|
||
# 查询时间: %s
|
||
# 总记录数: %d
|
||
|
||
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
|
||
|
||
if _, err := file.WriteString(header); err != nil {
|
||
return fmt.Errorf("写入文件头失败: %w", err)
|
||
}
|
||
|
||
// 按字母顺序排序后写入
|
||
sort.Strings(uniqueISBNs)
|
||
successCount := 0
|
||
for _, isbn := range uniqueISBNs {
|
||
if _, err := file.WriteString(isbn + "\n"); err != nil {
|
||
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
|
||
continue
|
||
}
|
||
successCount++
|
||
}
|
||
|
||
log.Printf("成功导出 %d/%d 个符合条件的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
|
||
return nil
|
||
}
|
||
|
||
// 查询并导出有销售记录且book_pic为空的ISBN主函数
|
||
func mainQuerySaleISBNsWithEmptyPic() {
|
||
// 初始化ES客户端
|
||
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
|
||
if err != nil {
|
||
log.Fatalf("ES连接失败: %v", err)
|
||
}
|
||
|
||
// 检查ES健康状态
|
||
if err := es.CheckHealth(); err != nil {
|
||
log.Fatalf("ES健康检查失败: %v", err)
|
||
}
|
||
|
||
// 输出文件路径
|
||
outputFile := "es/sale_isbns_empty_pic.txt"
|
||
|
||
// 查询并导出ISBN
|
||
startTime := time.Now()
|
||
if err := queryAndExportSaleISBNsWithEmptyPic(es, outputFile); err != nil {
|
||
log.Fatalf("导出有销售记录且book_pic为空的ISBN失败: %v", err)
|
||
}
|
||
|
||
elapsed := time.Since(startTime)
|
||
log.Printf("任务完成!耗时: %v,ISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile)
|
||
}
|