daShangDao_kfzgw-info/es/main.go
2025-12-24 09:00:38 +08:00

2089 lines
57 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"context"
"crypto/md5"
"crypto/tls"
"database/sql"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"golang.org/x/image/bmp"
"golang.org/x/image/draw"
"golang.org/x/image/tiff"
"golang.org/x/image/webp"
"image"
"image/color"
"image/jpeg"
"image/png"
"io"
"io/ioutil"
"log"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
_ "github.com/go-sql-driver/mysql"
"github.com/nfnt/resize"
)
// ES 配置
const (
esAddress = "http://103.236.91.138:9200"
esUsername = "elastic"
esPassword = "5mRDIUg52VC0fp14nw-F"
esIndex = "books-from-mysql"
ClientID = "203c5a7ba8bd4b8488d5e26f93052642" // 拼多多开放平台配置
ClientSecret = "892ffaa86e12b7a3d8d2942b669d9aa520ad8179"
PDDApiURL = "https://gw-upload.pinduoduo.com/api/upload"
)
// 配置参数
const (
maxWorkers = 15 // 最大并发worker数量
maxRetries = 3 // 最大重试次数
retryDelay = 2 * time.Second // 重试延迟
progressInterval = 5 * time.Second // 进度报告间隔
)
// ES 客户端封装
type ESClient struct {
client *elasticsearch.Client
}
// 数据库记录结构体
type CrawlerRecord struct {
BookISBN sql.NullString
BookPicture sql.NullString
}
// 处理结果结构体
type ProcessResult struct {
Record CrawlerRecord
Success bool
LocalPaths []string
PDDURLs []string
Error error
WorkerID int
ProcessedAt time.Time
}
// 全局统计
type Statistics struct {
Total int32
Success int32
Failed int32
Skipped int32
CurrentIndex int32
StartTime time.Time
}
// NewESClient 初始化 ES 客户端
// 说明:保持一致的连接方式(禁用证书校验、设置超时和连接池参数)
func NewESClient(addresses []string, username, password string) (*ESClient, error) {
cfg := elasticsearch.Config{
Addresses: addresses,
Username: username,
Password: password,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConnsPerHost: 100,
ResponseHeaderTimeout: 60 * time.Second,
},
}
cli, err := elasticsearch.NewClient(cfg)
if err != nil {
return nil, err
}
return &ESClient{client: cli}, nil
}
// CheckHealth 检查 ES 集群健康
// 行为:等待状态至少为 yellow输出基本信息
func (es *ESClient) CheckHealth() error {
res, err := es.client.Cluster.Health(
es.client.Cluster.Health.WithWaitForStatus("yellow"),
es.client.Cluster.Health.WithTimeout(30*time.Second),
)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
return fmt.Errorf("Elasticsearch 健康检查失败: %s", res.String())
}
var m map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&m); err == nil {
log.Printf("ES status=%v nodes=%v cluster=%v", m["status"], m["number_of_nodes"], m["cluster_name"])
}
return nil
}
// PDDImageProcessor 实现图片处理器
// pdd上传图片官方接口
// 上传图片到拼多多
func uploadToPDD(token, imagePath string) (string, error) {
// 检查token是否有效
if len(token) == 0 {
return "", fmt.Errorf("获取到的token为空")
}
result, err := ProcessAndUploadImage(imagePath, token)
if err != nil {
return "", fmt.Errorf("拼多多图片上传失败: %v", err)
}
// 解析JSON响应获取URL
var response struct {
RequestID string `json:"request_id"`
URL string `json:"url"`
}
err = json.Unmarshal([]byte(result), &response)
if err != nil {
return "", fmt.Errorf("解析上传响应失败: %v", err)
}
if response.URL == "" {
return "", fmt.Errorf("上传响应中未找到URL")
}
return response.URL, nil
}
func ProcessAndUploadImage(imagePath, token string) (string, error) {
// 打开图片文件
file, err := os.Open(imagePath)
if err != nil {
return "", fmt.Errorf("failed to open image file: %v", err)
}
defer file.Close()
// 准备参数 - 不包含文件路径
params := map[string]string{
"access_token": token,
"data_type": "JSON",
"type": "pdd.goods.img.upload",
"client_id": ClientID,
"timestamp": fmt.Sprintf("%d", time.Now().Unix()),
}
// 生成签名(不包含文件路径)
params["sign"] = generateSign(params)
// 创建multipart表单
body := &strings.Builder{}
writer := multipart.NewWriter(body)
// 写入文本参数
for key, value := range params {
if err := writer.WriteField(key, value); err != nil {
return "", fmt.Errorf("failed to write field %s: %v", key, err)
}
}
// 写入文件流 - 使用正确的字段名 "file"
part, err := writer.CreateFormFile("file", filepath.Base(imagePath))
if err != nil {
return "", fmt.Errorf("failed to create form file: %v", err)
}
if _, err := io.Copy(part, file); err != nil {
return "", fmt.Errorf("failed to copy file data: %v", err)
}
// 关闭writer
if err := writer.Close(); err != nil {
return "", fmt.Errorf("failed to close writer: %v", err)
}
// 创建请求
req, err := http.NewRequest("POST", PDDApiURL, strings.NewReader(body.String()))
if err != nil {
return "", fmt.Errorf("failed to create request: %v", err)
}
// 设置请求头
req.Header.Set("Content-Type", writer.FormDataContentType())
req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36")
// 发送请求
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("failed to send request: %v", err)
}
defer resp.Body.Close()
// 读取响应
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed to read response: %v", err)
}
log.Printf("拼多多API响应状态: %d", resp.StatusCode)
log.Printf("拼多多API响应内容: %s", string(respBody))
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("API returned error status: %d, body: %s", resp.StatusCode, string(respBody))
}
// 解析响应
var result map[string]interface{}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", fmt.Errorf("failed to parse response: %v", err)
}
// 检查API返回的错误
if errorResponse, exists := result["error_response"]; exists {
errorMsg, _ := json.Marshal(errorResponse)
return "", fmt.Errorf("API returned error: %s", string(errorMsg))
}
// 查找成功的响应
for key, value := range result {
if key != "error_response" {
successResponse, _ := json.Marshal(value)
return string(successResponse), nil
}
}
return string(respBody), nil
}
// generateSign 生成拼多多API签名
func generateSign(params map[string]string) string {
// 按参数名排序
var keys []string
for k := range params {
keys = append(keys, k)
}
sort.Strings(keys)
// 拼接参数字符串
var signStr string
for _, k := range keys {
signStr += k + params[k]
}
signStr = ClientSecret + signStr + ClientSecret
// 计算MD5并转为大写
hasher := md5.New()
hasher.Write([]byte(signStr))
result := strings.ToUpper(hex.EncodeToString(hasher.Sum(nil)))
return result
}
// GetPddToken 获取PDD token简化版
func GetPddToken() (string, error) {
url := "https://api.buzhiyushu.cn/huidiao/pdd/getPddChildrenBooksToken"
resp, err := http.Get(url)
if err != nil {
return "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return "", err
}
// 使用map解析JSON
var result map[string]interface{}
json.Unmarshal(body, &result)
// 检查业务状态
if code, ok := result["code"].(float64); !ok || code != 200 {
return "", fmt.Errorf("API错误: %v", result["msg"])
}
// 提取token
token := result["data"].(string)
return token, nil
}
// 并发处理记录的主要函数
func processRecordsConcurrently(records []CrawlerRecord, imageDir string, es *ESClient, maxWorkers int, token string) []ProcessResult {
var stats Statistics
stats.Total = int32(len(records))
stats.StartTime = time.Now()
// 创建通道
recordChan := make(chan CrawlerRecord, len(records))
resultChan := make(chan ProcessResult, len(records))
// 启动worker
var wg sync.WaitGroup
for i := 0; i < maxWorkers; i++ {
wg.Add(1)
go worker(token, i, &wg, recordChan, resultChan, imageDir, es, &stats)
}
// 发送任务到通道
go func() {
for _, record := range records {
recordChan <- record
}
close(recordChan)
}()
// 启动进度报告
go progressReporter(&stats)
// 收集结果
var results []ProcessResult
go func() {
for result := range resultChan {
results = append(results, result)
}
}()
// 等待所有worker完成
wg.Wait()
close(resultChan)
return results
}
// worker 处理函数
func worker(token string, id int, wg *sync.WaitGroup, recordChan <-chan CrawlerRecord, resultChan chan<- ProcessResult, imageDir string, es *ESClient, stats *Statistics) {
defer wg.Done()
for record := range recordChan {
currentIndex := atomic.AddInt32(&stats.CurrentIndex, 1)
result := ProcessResult{
Record: record,
WorkerID: id,
ProcessedAt: time.Now(),
}
// 检查记录有效性
if !isRecordValid(record) {
atomic.AddInt32(&stats.Skipped, 1)
result.Success = false
result.Error = fmt.Errorf("无效记录: ISBN或图片URL为空")
resultChan <- result
continue
}
// 处理记录(带重试机制)
var localPaths, pddURLs []string
var err error
for attempt := 1; attempt <= maxRetries; attempt++ {
localPaths, pddURLs, err = processSingleRecord(token, record, imageDir, es)
if err == nil {
break
}
// 如果是ES记录未找到的错误不需要重试
if strings.Contains(err.Error(), "ES记录未找到") {
break
}
if attempt < maxRetries {
log.Printf("Worker %d: 第 %d 次尝试处理 ISBN %s 失败, %d 秒后重试: %v",
id, attempt, record.BookISBN.String, retryDelay/time.Second, err)
time.Sleep(retryDelay)
}
}
if err != nil {
atomic.AddInt32(&stats.Failed, 1)
result.Success = false
result.Error = err
// 即使失败,也记录已处理的本地路径(如果有)
result.LocalPaths = localPaths
result.PDDURLs = pddURLs
// 根据错误类型记录不同的日志
if strings.Contains(err.Error(), "ES记录未找到") {
log.Printf("Worker %d: ES记录未找到 [%d/%d] ISBN: %s",
id, currentIndex, stats.Total, record.BookISBN.String)
} else {
log.Printf("Worker %d: 处理失败 [%d/%d] ISBN: %s, 错误: %v",
id, currentIndex, stats.Total, record.BookISBN.String, err)
}
} else {
atomic.AddInt32(&stats.Success, 1)
result.Success = true
result.LocalPaths = localPaths
result.PDDURLs = pddURLs
log.Printf("Worker %d: 成功处理 [%d/%d] ISBN: %s, 生成 %d 个文件, 上传 %d 个URL",
id, currentIndex, stats.Total, record.BookISBN.String, len(localPaths), len(pddURLs))
}
resultChan <- result
}
}
// 检查记录有效性
func isRecordValid(record CrawlerRecord) bool {
if !record.BookISBN.Valid || record.BookISBN.String == "" {
return false
}
if !record.BookPicture.Valid || record.BookPicture.String == "" {
return false
}
return true
}
// 进度报告器
func progressReporter(stats *Statistics) {
ticker := time.NewTicker(progressInterval)
defer ticker.Stop()
for range ticker.C {
processed := atomic.LoadInt32(&stats.CurrentIndex)
success := atomic.LoadInt32(&stats.Success)
failed := atomic.LoadInt32(&stats.Failed)
skipped := atomic.LoadInt32(&stats.Skipped)
elapsed := time.Since(stats.StartTime)
rate := float64(processed) / elapsed.Seconds()
// 计算预估剩余时间
var eta time.Duration
if processed > 0 && rate > 0 {
remaining := float64(stats.Total - processed)
eta = time.Duration(remaining/rate) * time.Second
}
fmt.Printf("[进度] 已处理: %d/%d (成功: %d, 失败: %d, 跳过: %d) | 速率: %.2f 条/秒 | 运行: %v | ETA: %v\n",
processed, stats.Total, success, failed, skipped, rate, elapsed.Round(time.Second), eta.Round(time.Second))
if processed >= stats.Total {
break
}
}
}
// 打印最终统计
func printFinalStatistics(results []ProcessResult) {
var success, failed, skipped int
var totalFilesGenerated int
var totalURLsUploaded int
// 失败原因分类
failureReasons := make(map[string]int)
for _, result := range results {
if result.Success {
success++
totalFilesGenerated += len(result.LocalPaths)
totalURLsUploaded += len(result.PDDURLs)
} else if result.Error != nil && strings.Contains(result.Error.Error(), "无效记录") {
skipped++
failureReasons["无效记录(ISBN或URL为空)"]++
} else {
failed++
// 即使是失败的情况,也可能生成了部分文件
totalFilesGenerated += len(result.LocalPaths)
totalURLsUploaded += len(result.PDDURLs)
// 分类失败原因
errMsg := result.Error.Error()
switch {
case strings.Contains(errMsg, "ES记录未找到"):
failureReasons["ES记录未找到"]++
case strings.Contains(errMsg, "查询ES中ID失败"):
failureReasons["ES查询失败"]++
case strings.Contains(errMsg, "下载图片失败"):
failureReasons["图片下载失败"]++
case strings.Contains(errMsg, "处理图片失败"):
failureReasons["图片处理失败"]++
case strings.Contains(errMsg, "上传PNG图片失败"):
failureReasons["PNG上传失败"]++
case strings.Contains(errMsg, "上传JPG图片失败"):
failureReasons["JPG上传失败"]++
case strings.Contains(errMsg, "更新ES数据失败"):
failureReasons["ES更新失败"]++
default:
failureReasons["其他错误"]++
}
}
}
fmt.Printf("\n=== 处理完成 ===\n")
fmt.Printf("总记录数: %d\n", len(results))
fmt.Printf("成功: %d\n", success)
fmt.Printf("失败: %d\n", failed)
fmt.Printf("跳过: %d\n", skipped)
fmt.Printf("成功率: %.2f%%\n", float64(success)/float64(len(results))*100)
fmt.Printf("生成文件总数: %d (平均每条记录 %.1f 个文件)\n", totalFilesGenerated, float64(totalFilesGenerated)/float64(len(results)))
fmt.Printf("上传URL总数: %d (平均每条记录 %.1f 个URL)\n", totalURLsUploaded, float64(totalURLsUploaded)/float64(len(results)))
// 显示失败原因统计
if len(failureReasons) > 0 {
fmt.Printf("\n=== 失败原因统计 ===\n")
for reason, count := range failureReasons {
fmt.Printf(" %s: %d\n", reason, count)
}
}
// 显示处理详情示例
fmt.Printf("\n=== 处理详情示例 ===\n")
successCount := 0
failedCount := 0
for _, result := range results {
if result.Success && successCount < 3 {
fmt.Printf("✅ 成功: ISBN %s -> 文件: %d 个, URL: %d 个\n",
result.Record.BookISBN.String,
len(result.LocalPaths),
len(result.PDDURLs))
successCount++
} else if !result.Success && failedCount < 3 && !strings.Contains(result.Error.Error(), "无效记录") {
fmt.Printf("❌ 失败: ISBN %s -> 错误: %v\n",
result.Record.BookISBN.String,
result.Error)
failedCount++
}
if successCount >= 3 && failedCount >= 3 {
break
}
}
}
// 处理单条记录
func processSingleRecord(token string, record CrawlerRecord, imageDir string, es *ESClient) ([]string, []string, error) {
// 更新ES
ids, err := es.FindIDsByISBN(esIndex, record.BookISBN.String)
if err != nil {
return nil, nil, fmt.Errorf("查询ES中ID失败: %v", err)
}
var pngImageUrl string
var jpgImageUrl string
var localPaths []string
var pddURLs []string
if ids != "" {
// 下载并处理图片
pngPath, jpgPath, err := processAndSaveImage(record, imageDir)
if err != nil {
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
if err != nil {
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
} else {
log.Printf("未找到ISBN %s 对应的ES记录已保存到文件", record.BookISBN.String)
}
return nil, nil, fmt.Errorf("处理图片失败: %v", err)
}
localPaths = []string{pngPath, jpgPath}
// 上传到PDD
pngImageUrl, err = uploadToPDD(token, pngPath)
if err != nil {
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
if err != nil {
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
} else {
log.Printf("未找到ISBN %s 对应的ES记录已保存到文件", record.BookISBN.String)
}
return nil, nil, fmt.Errorf("上传PNG图片失败: %v", err)
}
// 上传到PDD
jpgImageUrl, err = uploadToPDD(token, jpgPath)
if err != nil {
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
if err != nil {
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
} else {
log.Printf("未找到ISBN %s 对应的ES记录已保存到文件", record.BookISBN.String)
}
return nil, nil, fmt.Errorf("上传JPG图片失败: %v", err)
}
pddURLs = []string{pngImageUrl, jpgImageUrl}
err = es.UpdateBookPicsByID(esIndex, ids, "", pngImageUrl, jpgImageUrl)
if err != nil {
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
if err != nil {
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
} else {
log.Printf("未找到ISBN %s 对应的ES记录已保存到文件", record.BookISBN.String)
}
return nil, nil, fmt.Errorf("更新ES数据失败: %v", err)
}
for _, path := range localPaths {
// ES更新成功后删除本地图片
if removeErr := os.Remove(path); removeErr == nil {
log.Printf("ES更新成功已删除本地图片: %s", path)
} else {
log.Printf("警告: 无法删除本地图片 %s: %v", path, removeErr)
}
}
} else {
// ids为空将ISBN存储到txt文件
err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String)
if err != nil {
log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err)
} else {
log.Printf("未找到ISBN %s 对应的ES记录已保存到文件", record.BookISBN.String)
}
return nil, nil, fmt.Errorf("未找到ISBN %s 对应的ES记录", record.BookISBN.String)
}
return localPaths, pddURLs, nil
}
// 保存未找到的ISBN和图片URL到txt文件CSV格式带去重
func saveISBNToFile(isbn string, imageUrl string) error {
filename := "cmd/update_es_gt/xgy_not_found_isbns.txt"
// 读取现有内容检查是否已存在
existingRecords := make(map[string]bool)
if content, err := os.ReadFile(filename); err == nil {
lines := strings.Split(string(content), "\n")
for _, line := range lines {
if line != "" && !strings.HasPrefix(line, "#") {
parts := strings.Split(line, ",")
if len(parts) > 0 {
existingRecords[parts[0]] = true // 以ISBN作为去重依据
}
}
}
}
// 如果已存在,则不重复添加
if existingRecords[isbn] {
return nil
}
// 以追加模式打开文件
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return fmt.Errorf("打开文件失败: %v", err)
}
defer file.Close()
// 如果是空文件先写入CSV表头
stat, err := file.Stat()
if err == nil && stat.Size() == 0 {
header := "# ISBN,ImageURL\n"
if _, err := file.WriteString(header); err != nil {
return fmt.Errorf("写入表头失败: %v", err)
}
}
// 写入ISBN和图片URL用逗号分隔并添加换行符
line := fmt.Sprintf("%s,%s\n", isbn, imageUrl)
_, err = file.WriteString(line)
if err != nil {
return fmt.Errorf("写入文件失败: %v", err)
}
return nil
}
// 下载并处理图片
func processAndSaveImage(record CrawlerRecord, saveDir string) (string, string, error) {
// 下载图片
img, originalFormat, err := downloadImage(record.BookPicture.String)
if err != nil {
return "", "", fmt.Errorf("下载图片失败: %v", err)
}
fmt.Printf("下载成功,原始格式: %s\n", originalFormat)
// 调整图片高度为600等比例缩放
//resizedImg := resizeImageToHeight(img, 600)
// 使用高质量缩放调整图片高度为600等比例缩放
resizedImg := resizeToHeightHighQuality(img, 600)
fmt.Printf("缩放后尺寸: %dx%d\n", resizedImg.Bounds().Dx(), resizedImg.Bounds().Dy())
// 创建800x800的透明背景
finalImg := createCenteredImage(resizedImg, 800, 800, true)
// 创建800x800的白色背景用于JPG
whiteImg := createCenteredImage(resizedImg, 800, 800, false)
// 生成文件名
filename := fmt.Sprintf("%s", record.BookISBN.String)
// 清理文件名中的非法字符
filename = sanitizeFilename(filename)
// PNG文件路径
pngPath := filepath.Join(saveDir, filename+".png")
// JPG文件路径
jpgPath := filepath.Join(saveDir, filename+".jpg")
// 保存为PNG图片
err = savePNG(finalImg, pngPath)
if err != nil {
return "", "", fmt.Errorf("保存图片失败: %v", err)
}
// 保存为JPG图片白色背景
err = saveJPG(whiteImg, jpgPath, 95) // 95%质量
if err != nil {
return "", "", fmt.Errorf("保存JPG图片失败: %v", err)
}
fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "PNG", pngPath)
fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "JPG", jpgPath)
return pngPath, jpgPath, nil
}
// 下载图片
func downloadImage(url string) (image.Image, string, error) {
// 创建HTTP客户端设置超时等参数
client := &http.Client{
Timeout: 30 * time.Second,
}
resp, err := client.Get(url)
if err != nil {
return nil, "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, "", fmt.Errorf("HTTP请求失败状态码: %d", resp.StatusCode)
}
// 读取响应体前几个字节来判断图片格式
peekBytes := make([]byte, 512)
n, err := resp.Body.Read(peekBytes)
if err != nil && err != io.EOF {
return nil, "", err
}
// 创建一个新的Reader包含已读取的数据和剩余数据
reader := io.MultiReader(strings.NewReader(string(peekBytes[:n])), resp.Body)
// 根据文件头识别图片格式
contentType := http.DetectContentType(peekBytes[:n])
fmt.Printf("检测到的Content-Type: %s\n", contentType)
var img image.Image
var format string
// 根据Content-Type或文件扩展名选择解码器
switch {
case strings.Contains(contentType, "jpeg") || strings.HasSuffix(strings.ToLower(url), ".jpg") || strings.HasSuffix(strings.ToLower(url), ".jpeg"):
img, err = jpeg.Decode(reader)
format = "JPEG"
case strings.Contains(contentType, "png") || strings.HasSuffix(strings.ToLower(url), ".png"):
img, err = png.Decode(reader)
format = "PNG"
case strings.Contains(contentType, "webp") || strings.HasSuffix(strings.ToLower(url), ".webp"):
img, err = webp.Decode(reader)
format = "WEBP"
case strings.Contains(contentType, "bmp") || strings.HasSuffix(strings.ToLower(url), ".bmp"):
img, err = bmp.Decode(reader)
format = "BMP"
case strings.Contains(contentType, "tiff") || strings.HasSuffix(strings.ToLower(url), ".tiff") || strings.HasSuffix(strings.ToLower(url), ".tif"):
img, err = tiff.Decode(reader)
format = "TIFF"
default:
// 尝试通用解码
img, format, err = image.Decode(reader)
if err != nil {
return nil, "", fmt.Errorf("不支持的图片格式: %s, 错误: %v", contentType, err)
}
}
if err != nil {
return nil, "", fmt.Errorf("解码图片失败: %v", err)
}
return img, format, nil
}
// 高质量等比例缩放到指定高度
func resizeToHeightHighQuality(src image.Image, targetHeight int) image.Image {
bounds := src.Bounds()
srcWidth := bounds.Dx()
srcHeight := bounds.Dy()
// 如果原图高度已经小于等于目标高度,且宽度合适,可以直接返回
//if srcHeight <= targetHeight {
// return src
//}
// 计算等比例缩放后的宽度
targetWidth := uint(float64(srcWidth) * float64(targetHeight) / float64(srcHeight))
// 使用 Lanczos3 插值算法进行高质量缩放
return resize.Resize(targetWidth, uint(targetHeight), src, resize.Lanczos3)
}
// 创建居中图片(将原图放在指定大小的透明背景中央)
func createCenteredImage(src image.Image, width, height int, transparent bool) *image.RGBA {
// 创建透明背景
dst := image.NewRGBA(image.Rect(0, 0, width, height))
// 设置背景颜色
var bgColor color.Color
if transparent {
bgColor = color.RGBA{0, 0, 0, 0} // 透明
} else {
bgColor = color.RGBA{255, 255, 255, 255} // 白色
}
// 填充透明背景
//transparent := color.RGBA{0, 0, 0, 0}
draw.Draw(dst, dst.Bounds(), &image.Uniform{bgColor}, image.Point{}, draw.Src)
// 计算居中位置
srcBounds := src.Bounds()
srcWidth := srcBounds.Dx()
srcHeight := srcBounds.Dy()
x := (width - srcWidth) / 2
y := (height - srcHeight) / 2
// 将原图绘制到中央
draw.Draw(dst, image.Rect(x, y, x+srcWidth, y+srcHeight), src, image.Point{}, draw.Over)
return dst
}
// 保存为PNG图片
func savePNG(img image.Image, filename string) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
return png.Encode(file, img)
}
// 保存为JPG图片
func saveJPG(img image.Image, filename string, quality int) error {
file, err := os.Create(filename)
if err != nil {
return err
}
defer file.Close()
// 设置JPEG编码选项
options := &jpeg.Options{
Quality: quality, // 1-100越高质量越好
}
return jpeg.Encode(file, img, options)
}
// 清理文件名中的非法字符
func sanitizeFilename(filename string) string {
// 替换Windows文件名中不允许的字符
invalidChars := []string{"\\", "/", ":", "*", "?", "\"", "<", ">", "|"}
for _, char := range invalidChars {
filename = strings.ReplaceAll(filename, char, "_")
}
// 移除或替换其他可能的问题字符
filename = strings.TrimSpace(filename)
if filename == "" {
filename = "unknown"
}
return filename
}
// 从数据库获取记录
func getRecords(db *sql.DB) ([]CrawlerRecord, error) {
// 查询所有记录,包括 NULL 值
query := "SELECT book_isbn, book_picture FROM dk_crawler_record_info"
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var records []CrawlerRecord
for rows.Next() {
var record CrawlerRecord
// 使用 sql.NullString 来接收可能为 NULL 的字段
err := rows.Scan(&record.BookISBN, &record.BookPicture)
if err != nil {
fmt.Printf("扫描记录失败: %v\n", err)
continue
}
records = append(records, record)
}
// 检查遍历过程中是否有错误
if err = rows.Err(); err != nil {
return nil, err
}
return records, nil
}
// FindIDsByISBN 根据 ISBN 查询文档 ID 列表
func (es *ESClient) FindIDsByISBN(index, isbn string) (string, error) {
q := map[string]interface{}{
"query": map[string]interface{}{
"term": map[string]interface{}{"isbn": isbn},
},
"_source": false,
"size": 1000,
}
b, _ := json.Marshal(q)
res, err := es.client.Search(
es.client.Search.WithIndex(index),
es.client.Search.WithBody(strings.NewReader(string(b))),
es.client.Search.WithContext(context.Background()),
)
if err != nil {
return "", err
}
defer res.Body.Close()
if res.IsError() {
return "", fmt.Errorf("搜索失败: %s", res.String())
}
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
return "", err
}
hits, _ := r["hits"].(map[string]interface{})
arr, _ := hits["hits"].([]interface{})
var ids string
for _, h := range arr {
m, _ := h.(map[string]interface{})
id, _ := m["_id"].(string)
if id != "" {
//ids = append(ids, id)
ids = id
}
}
return ids, nil
}
func (es *ESClient) UpdateBookPicsByID(index, id, localImageS, pngImageUrl, jpgImageUrl string) error {
bookPicJSON, err := json.Marshal(map[string]string{
"localPath": localImageS,
"pddPath": jpgImageUrl,
})
if err != nil {
return fmt.Errorf("序列化 book_pic_w 失败: %w", err)
}
bookPicBJSON, err := json.Marshal(map[string]string{
"localPath": localImageS,
"pddResponse": pngImageUrl,
})
if err != nil {
return fmt.Errorf("序列化 book_pic_b 失败: %w", err)
}
// 构建更新文档
payload := map[string]interface{}{
"doc": map[string]string{
"book_pic": string(bookPicJSON),
"book_pic_b": string(bookPicBJSON),
},
}
// JSON 序列化整个更新请求
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("序列化更新请求失败: %w", err)
}
req := esapi.UpdateRequest{
Index: index,
DocumentID: id,
Body: strings.NewReader(string(body)),
}
res, err := req.Do(context.Background(), es.client)
if err != nil {
return err
}
defer res.Body.Close()
if res.IsError() {
data, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES 更新失败: %s", data)
}
return nil
}
// 从 sql.NullString 获取字符串值
func getStringValue(nullString sql.NullString) string {
if nullString.Valid {
return nullString.String
}
return "NULL"
}
func main() {
//// 获取token
//token, err := GetPddToken()
//if err != nil {
// fmt.Errorf("获取拼多多token失败: %v", err)
//}
//fmt.Println("token=", token)
//// 数据源名称格式
//dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
// "root",
// "123456",
// "localhost",
// 3306,
// "book_image")
//db, err := sql.Open("mysql", dsn)
//if err != nil {
// fmt.Printf("打开数据库连接失败: %v", err)
//}
//// 设置连接池参数
//db.SetMaxOpenConns(20) // 最大打开连接数
//db.SetMaxIdleConns(10)
//err = db.Ping()
//if err != nil {
// fmt.Printf("数据库连接测试失败: %v", err)
//}
//
//// 查询数据
//records, err := getRecords(db)
//if err != nil {
// fmt.Printf("查询失败: %v", err)
//}
//imageDir := "D:\\image"
//err = os.MkdirAll(imageDir, 0755)
//if err != nil {
// fmt.Sprintf("创建目录失败: %v", err)
//}
//fmt.Printf("找到 %d 条记录需要处理\n", len(records))
//
//es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
//if err != nil {
// log.Fatalf("ES 连接失败: %v", err)
//}
//if err := es.CheckHealth(); err != nil {
// log.Fatalf("ES 健康检查失败: %v", err)
//}
//// 启动并发处理
//results := processRecordsConcurrently(records, imageDir, es, maxWorkers, token)
//
//// 输出最终统计
//printFinalStatistics(results)
//mainQuerySaleISBNs()
//mainFindESOnlyISBNs()
mainQuerySaleISBNsWithEmptyPic()
}
// 查询并导出有销售记录且book_pic字符串中pddPath为空的ISBN
func queryAndExportSaleISBNs(es *ESClient, outputFile string) error {
log.Printf("开始查询有销售记录且book_pic字符串中pddPath为空的ISBN...")
// 使用其他字段排序,比如 isbn 字段或者时间字段
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"bool": map[string]interface{}{
"should": []map[string]interface{}{
{"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}},
{"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}},
{"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}},
{"range": map[string]interface{}{"day_sale_60": map[string]interface{}{"gt": 0}}},
},
"minimum_should_match": 1,
},
},
{
"bool": map[string]interface{}{
"should": []map[string]interface{}{
// 匹配 pddPath:"" 的JSON字符串
{"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\"\".*"}},
// 匹配 pddPath: "" (带空格的)
{"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\\s*\"\".*"}},
// 匹配整个book_pic字段为空
{"term": map[string]interface{}{"book_pic": ""}},
// 匹配book_pic字段不存在
{
"bool": map[string]interface{}{
"must_not": map[string]interface{}{
"exists": map[string]interface{}{"field": "book_pic"},
},
},
},
},
},
},
},
},
},
"_source": []string{"isbn"},
"sort": []map[string]interface{}{
{"isbn": "asc"}, // 使用 isbn 字段排序,或者使用其他可排序字段
},
"size": 10000,
}
// 打印查询条件用于验证
queryJSON, _ := json.MarshalIndent(query, "", " ")
log.Printf("查询条件:\n%s", string(queryJSON))
var allISBNs []string
var searchAfter interface{}
totalCount := 0
page := 1
for {
// 复制基础查询
currentQuery := make(map[string]interface{})
for k, v := range query {
currentQuery[k] = v
}
// 添加游标
if searchAfter != nil {
currentQuery["search_after"] = searchAfter
}
body, err := json.Marshal(currentQuery)
if err != nil {
return fmt.Errorf("序列化查询失败: %w", err)
}
log.Printf("执行第 %d 页查询...", page)
// 执行搜索
res, err := es.client.Search(
es.client.Search.WithIndex(esIndex),
es.client.Search.WithBody(strings.NewReader(string(body))),
es.client.Search.WithContext(context.Background()),
)
if err != nil {
return fmt.Errorf("ES搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
}
// 读取并解析响应体
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("读取响应体失败: %w", err)
}
var result map[string]interface{}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
return fmt.Errorf("解析ES响应失败: %w", err)
}
// 检查是否有错误
if errMsg, exists := result["error"]; exists {
return fmt.Errorf("ES返回错误: %v", errMsg)
}
hits, ok := result["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("无法解析hits字段")
}
// 获取总命中数
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
if totalValue, exists := totalHits["value"]; exists {
log.Printf("ES返回总命中数: %.0f", totalValue)
}
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
log.Printf("第 %d 页没有数据,查询完成", page)
break // 没有更多数据
}
// 处理当前批次的数据
batchCount := 0
for _, hit := range hitList {
hitMap, ok := hit.(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析hit数据")
continue
}
source, ok := hitMap["_source"].(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析_source字段")
continue
}
isbn, ok := source["isbn"].(string)
if ok && isbn != "" {
allISBNs = append(allISBNs, isbn)
batchCount++
} else {
log.Printf("警告: 跳过空的ISBN字段")
}
// 更新游标(使用最后一个文档的排序值)
sortValues, ok := hitMap["sort"].([]interface{})
if ok && len(sortValues) > 0 {
searchAfter = sortValues
}
}
totalCount += batchCount
log.Printf("第 %d 页: 获取 %d 条ISBN记录总计: %d", page, batchCount, totalCount)
page++
// 如果返回的数量小于请求的数量,说明已经是最后一页
if len(hitList) < 10000 {
log.Printf("最后一页数据量 %d < 10000查询完成", len(hitList))
break
}
// 添加短暂延迟避免对ES造成过大压力
time.Sleep(100 * time.Millisecond)
}
if len(allISBNs) == 0 {
return fmt.Errorf("没有找到符合条件的ISBN记录")
}
// 去重
isbnSet := make(map[string]bool)
uniqueISBNs := make([]string, 0)
for _, isbn := range allISBNs {
if !isbnSet[isbn] {
isbnSet[isbn] = true
uniqueISBNs = append(uniqueISBNs, isbn)
}
}
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
// 确保输出目录存在
outputDir := filepath.Dir(outputFile)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("创建输出目录失败: %w", err)
}
// 写入文件
file, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("创建文件失败: %w", err)
}
defer file.Close()
// 写入文件头信息
header := fmt.Sprintf(`# 有销售记录且book_pic字符串中pddPath为空的ISBN列表
# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0 OR day_sale_60 > 0) AND (book_pic包含"pddPath":"" 或 book_pic为空 或 book_pic字段不存在)
# 索引: %s
# 统计时间: %s
# 总记录数: %d
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
if _, err := file.WriteString(header); err != nil {
return fmt.Errorf("写入文件头失败: %w", err)
}
// 按字母顺序排序后写入
sort.Strings(uniqueISBNs)
successCount := 0
for _, isbn := range uniqueISBNs {
if _, err := file.WriteString(isbn + "\n"); err != nil {
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
continue
}
successCount++
}
log.Printf("成功导出 %d/%d 个有销售记录且book_pic字符串中pddPath为空的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
return nil
}
// 查询并导出销售ISBN的主函数
func mainQuerySaleISBNs() {
// 初始化ES客户端
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
if err != nil {
log.Fatalf("ES连接失败: %v", err)
}
// 检查ES健康状态
if err := es.CheckHealth(); err != nil {
log.Fatalf("ES健康检查失败: %v", err)
}
// 输出文件路径
outputFile := "cmd/update_es_gt/all_isbns.txt"
// 查询并导出ISBN
startTime := time.Now()
if err := exportAllISBNs(es, outputFile); err != nil {
log.Fatalf("导出销售ISBN失败: %v", err)
}
elapsed := time.Since(startTime)
log.Printf("任务完成!耗时: %vISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile)
}
// 导出所有ISBN到txt文件
func exportAllISBNs(es *ESClient, outputFile string) error {
log.Printf("开始导出所有ISBN...")
// 查询所有包含isbn字段的文档
query := map[string]interface{}{
"query": map[string]interface{}{
"exists": map[string]interface{}{
"field": "isbn",
},
},
"_source": []string{"isbn"},
"sort": []map[string]interface{}{
{"isbn": "asc"}, // 按ISBN排序
},
"size": 10000,
}
var allISBNs []string
var searchAfter interface{}
totalCount := 0
page := 1
for {
// 复制基础查询
currentQuery := make(map[string]interface{})
for k, v := range query {
currentQuery[k] = v
}
// 添加游标
if searchAfter != nil {
currentQuery["search_after"] = searchAfter
}
body, err := json.Marshal(currentQuery)
if err != nil {
return fmt.Errorf("序列化查询失败: %w", err)
}
log.Printf("执行第 %d 页查询...", page)
// 执行搜索
res, err := es.client.Search(
es.client.Search.WithIndex(esIndex),
es.client.Search.WithBody(strings.NewReader(string(body))),
es.client.Search.WithContext(context.Background()),
)
if err != nil {
return fmt.Errorf("ES搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
}
// 读取并解析响应体
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("读取响应体失败: %w", err)
}
var result map[string]interface{}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
return fmt.Errorf("解析ES响应失败: %w", err)
}
// 检查是否有错误
if errMsg, exists := result["error"]; exists {
return fmt.Errorf("ES返回错误: %v", errMsg)
}
hits, ok := result["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("无法解析hits字段")
}
// 获取总命中数
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
if totalValue, exists := totalHits["value"]; exists {
if page == 1 {
log.Printf("ES索引中共有 %.0f 条包含ISBN的记录", totalValue)
}
}
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
log.Printf("第 %d 页没有数据,查询完成", page)
break // 没有更多数据
}
// 处理当前批次的数据
batchCount := 0
for _, hit := range hitList {
hitMap, ok := hit.(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析hit数据")
continue
}
source, ok := hitMap["_source"].(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析_source字段")
continue
}
isbn, ok := source["isbn"].(string)
if ok && isbn != "" {
allISBNs = append(allISBNs, isbn)
batchCount++
} else {
log.Printf("警告: 跳过空的ISBN字段")
}
// 更新游标(使用最后一个文档的排序值)
sortValues, ok := hitMap["sort"].([]interface{})
if ok && len(sortValues) > 0 {
searchAfter = sortValues
}
}
totalCount += batchCount
log.Printf("第 %d 页: 获取 %d 条ISBN记录总计: %d", page, batchCount, totalCount)
page++
// 如果返回的数量小于请求的数量,说明已经是最后一页
if len(hitList) < 10000 {
log.Printf("最后一页数据量 %d < 10000查询完成", len(hitList))
break
}
// 添加短暂延迟避免对ES造成过大压力
time.Sleep(100 * time.Millisecond)
}
if len(allISBNs) == 0 {
return fmt.Errorf("没有找到包含ISBN字段的记录")
}
// 去重
isbnSet := make(map[string]bool)
uniqueISBNs := make([]string, 0)
for _, isbn := range allISBNs {
if !isbnSet[isbn] {
isbnSet[isbn] = true
uniqueISBNs = append(uniqueISBNs, isbn)
}
}
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
// 确保输出目录存在
outputDir := filepath.Dir(outputFile)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("创建输出目录失败: %w", err)
}
// 写入文件
file, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("创建文件失败: %w", err)
}
defer file.Close()
// 写入文件头信息
header := fmt.Sprintf(`# 所有ISBN列表
# 索引: %s
# 导出时间: %s
# 总记录数: %d
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
if _, err := file.WriteString(header); err != nil {
return fmt.Errorf("写入文件头失败: %w", err)
}
// 按字母顺序排序后写入
sort.Strings(uniqueISBNs)
successCount := 0
for _, isbn := range uniqueISBNs {
if _, err := file.WriteString(isbn + "\n"); err != nil {
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
continue
}
successCount++
}
log.Printf("成功导出 %d/%d 个ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
return nil
}
// 从ES获取所有ISBN
func getAllISBNsFromES(es *ESClient) ([]string, error) {
log.Printf("开始从ES索引 %s 获取所有ISBN...", esIndex)
var allISBNs []string
var searchAfter interface{}
totalCount := 0
page := 1
for {
query := map[string]interface{}{
"query": map[string]interface{}{
"exists": map[string]interface{}{
"field": "isbn",
},
},
"_source": []string{"isbn"},
"sort": []map[string]interface{}{
{"isbn": "asc"},
},
"size": 10000,
}
// 添加游标
if searchAfter != nil {
query["search_after"] = searchAfter
}
body, err := json.Marshal(query)
if err != nil {
return nil, fmt.Errorf("序列化查询失败: %w", err)
}
log.Printf("执行第 %d 页ES查询...", page)
// 执行搜索
res, err := es.client.Search(
es.client.Search.WithIndex(esIndex),
es.client.Search.WithBody(strings.NewReader(string(body))),
es.client.Search.WithContext(context.Background()),
)
if err != nil {
return nil, fmt.Errorf("ES搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return nil, fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
}
// 解析响应
var result map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("解析ES响应失败: %w", err)
}
hits, ok := result["hits"].(map[string]interface{})
if !ok {
return nil, fmt.Errorf("无法解析hits字段")
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
log.Printf("第 %d 页没有数据,查询完成", page)
break
}
// 处理当前批次的数据
batchCount := 0
for _, hit := range hitList {
hitMap, ok := hit.(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析hit数据")
continue
}
source, ok := hitMap["_source"].(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析_source字段")
continue
}
isbn, ok := source["isbn"].(string)
if ok && isbn != "" {
allISBNs = append(allISBNs, isbn)
batchCount++
}
// 更新游标
sortValues, ok := hitMap["sort"].([]interface{})
if ok && len(sortValues) > 0 {
searchAfter = sortValues
}
}
totalCount += batchCount
log.Printf("第 %d 页: 获取 %d 条ISBN记录总计: %d", page, batchCount, totalCount)
page++
// 如果返回的数量小于请求的数量,说明已经是最后一页
if len(hitList) < 10000 {
log.Printf("最后一页数据量 %d < 10000查询完成", len(hitList))
break
}
time.Sleep(100 * time.Millisecond)
}
if len(allISBNs) == 0 {
return nil, fmt.Errorf("ES中没有找到包含ISBN字段的记录")
}
log.Printf("从ES中获取到 %d 个ISBN", len(allISBNs))
return allISBNs, nil
}
// 批量检查ISBN在数据库中是否存在
func checkISBNsInDB(db *sql.DB, isbns []string) (map[string]bool, error) {
log.Printf("开始检查 %d 个ISBN在数据库中的存在情况...", len(isbns))
existsMap := make(map[string]bool)
// 分批处理避免SQL语句过长
batchSize := 1000
totalBatches := (len(isbns) + batchSize - 1) / batchSize
for batch := 0; batch < totalBatches; batch++ {
start := batch * batchSize
end := start + batchSize
if end > len(isbns) {
end = len(isbns)
}
batchISBNs := isbns[start:end]
log.Printf("处理数据库批次 %d/%d: ISBN范围 %d-%d", batch+1, totalBatches, start+1, end)
// 构建IN查询的占位符
placeholders := make([]string, len(batchISBNs))
args := make([]interface{}, len(batchISBNs))
for i, isbn := range batchISBNs {
placeholders[i] = "?"
args[i] = isbn
}
query := fmt.Sprintf(
"SELECT isbn FROM xgy_base_item WHERE isbn IN (%s)",
strings.Join(placeholders, ","),
)
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("数据库查询失败: %w", err)
}
// 读取存在的ISBN
for rows.Next() {
var isbn string
if err := rows.Scan(&isbn); err != nil {
rows.Close()
return nil, fmt.Errorf("扫描ISBN失败: %w", err)
}
existsMap[isbn] = true
}
rows.Close()
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("遍历数据库行时出错: %w", err)
}
// 添加延迟避免对数据库造成压力
if batch < totalBatches-1 {
time.Sleep(50 * time.Millisecond)
}
}
log.Printf("数据库中存在 %d 个匹配的ISBN", len(existsMap))
return existsMap, nil
}
// 找出数据库中不存在的ISBNES中有但数据库中没有
func findESOnlyISBNs(esISBNs []string, dbExistsMap map[string]bool) []string {
var esOnlyISBNs []string
for _, isbn := range esISBNs {
if !dbExistsMap[isbn] {
esOnlyISBNs = append(esOnlyISBNs, isbn)
}
}
log.Printf("ES中有 %d 个ISBN在数据库中不存在", len(esOnlyISBNs))
return esOnlyISBNs
}
// 导出ES独有ISBN到txt文件
func exportESOnlyISBNs(esOnlyISBNs []string, outputFile string) error {
if len(esOnlyISBNs) == 0 {
log.Printf("没有ES独有的ISBN需要导出")
return nil
}
// 确保输出目录存在
outputDir := filepath.Dir(outputFile)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("创建输出目录失败: %w", err)
}
// 写入文件
file, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("创建文件失败: %w", err)
}
defer file.Close()
// 写入文件头信息
header := fmt.Sprintf(`# ES中有但数据库中没有的ISBN列表
# 数据库表: xgy_base_item
# ES索引: %s
# 导出时间: %s
# 记录数: %d
# 说明: 这些ISBN在ES索引中存在但在数据库表中不存在
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(esOnlyISBNs))
if _, err := file.WriteString(header); err != nil {
return fmt.Errorf("写入文件头失败: %w", err)
}
// 按字母顺序排序后写入
sort.Strings(esOnlyISBNs)
successCount := 0
for _, isbn := range esOnlyISBNs {
if _, err := file.WriteString(isbn + "\n"); err != nil {
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
continue
}
successCount++
}
log.Printf("成功导出 %d/%d 个ES独有ISBN到文件: %s", successCount, len(esOnlyISBNs), outputFile)
return nil
}
// 主函数查询ES中有但数据库中没有的ISBN
func mainFindESOnlyISBNs() {
// 初始化数据库连接
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
"root",
"123456",
"localhost",
3306,
"book_image") // 请根据实际情况修改数据库名
db, err := sql.Open("mysql", dsn)
if err != nil {
log.Fatalf("打开数据库连接失败: %v", err)
}
defer db.Close()
// 设置连接池参数
db.SetMaxOpenConns(20)
db.SetMaxIdleConns(10)
// 测试数据库连接
if err := db.Ping(); err != nil {
log.Fatalf("数据库连接测试失败: %v", err)
}
// 初始化ES客户端
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
if err != nil {
log.Fatalf("ES连接失败: %v", err)
}
// 检查ES健康状态
if err := es.CheckHealth(); err != nil {
log.Fatalf("ES健康检查失败: %v", err)
}
startTime := time.Now()
log.Printf("开始处理ES与数据库的ISBN匹配...")
// 步骤1: 从ES获取所有ISBN
esISBNs, err := getAllISBNsFromES(es)
if err != nil {
log.Fatalf("获取ES ISBN失败: %v", err)
}
// 步骤2: 检查ISBN在数据库中的存在情况
dbExistsMap, err := checkISBNsInDB(db, esISBNs)
if err != nil {
log.Fatalf("检查数据库中ISBN存在情况失败: %v", err)
}
// 步骤3: 找出ES独有ISBNES中有但数据库中没有
esOnlyISBNs := findESOnlyISBNs(esISBNs, dbExistsMap)
// 步骤4: 导出ES独有ISBN
outputFile := "cmd/update_es_gt/missing_isbns.txt"
if err := exportESOnlyISBNs(esOnlyISBNs, outputFile); err != nil {
log.Fatalf("导出ES独有ISBN失败: %v", err)
}
elapsed := time.Since(startTime)
// 输出统计信息
fmt.Printf("\n=== 处理完成 ===\n")
fmt.Printf("ES中ISBN总数: %d\n", len(esISBNs))
fmt.Printf("数据库中匹配的ISBN数: %d\n", len(dbExistsMap))
fmt.Printf("ES独有ISBN数数据库中没有的: %d\n", len(esOnlyISBNs))
fmt.Printf("独有比例: %.2f%%\n", float64(len(esOnlyISBNs))/float64(len(esISBNs))*100)
fmt.Printf("耗时: %v\n", elapsed.Round(time.Millisecond))
fmt.Printf("输出文件: %s\n", outputFile)
// 显示部分ES独有ISBN示例
if len(esOnlyISBNs) > 0 {
fmt.Printf("\nES独有ISBN示例 (前10个):\n")
for i := 0; i < 10 && i < len(esOnlyISBNs); i++ {
fmt.Printf(" %s\n", esOnlyISBNs[i])
}
if len(esOnlyISBNs) > 10 {
fmt.Printf(" ... 还有 %d 个\n", len(esOnlyISBNs)-10)
}
}
}
// 查询并导出有销售记录且book_pic为空的ISBN
func queryAndExportSaleISBNsWithEmptyPic(es *ESClient, outputFile string) error {
log.Printf("开始查询有销售记录且book_pic为空的ISBN...")
// 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND book_pic为空
query := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"bool": map[string]interface{}{
"should": []map[string]interface{}{
{"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}},
{"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}},
{"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}},
},
"minimum_should_match": 1,
},
},
{
"bool": map[string]interface{}{
"should": []map[string]interface{}{
// 匹配book_pic字段为空
{"term": map[string]interface{}{"book_pic": ""}},
// 匹配book_pic字段不存在
{
"bool": map[string]interface{}{
"must_not": map[string]interface{}{
"exists": map[string]interface{}{"field": "book_pic"},
},
},
},
},
},
},
},
},
},
"_source": []string{"isbn"},
"sort": []map[string]interface{}{
{"isbn": "asc"}, // 按ISBN排序
},
"size": 10000,
}
// 打印查询条件用于验证
queryJSON, _ := json.MarshalIndent(query, "", " ")
log.Printf("查询条件:\n%s", string(queryJSON))
var allISBNs []string
var searchAfter interface{}
totalCount := 0
page := 1
for {
// 复制基础查询
currentQuery := make(map[string]interface{})
for k, v := range query {
currentQuery[k] = v
}
// 添加游标
if searchAfter != nil {
currentQuery["search_after"] = searchAfter
}
body, err := json.Marshal(currentQuery)
if err != nil {
return fmt.Errorf("序列化查询失败: %w", err)
}
log.Printf("执行第 %d 页查询...", page)
// 执行搜索
res, err := es.client.Search(
es.client.Search.WithIndex(esIndex),
es.client.Search.WithBody(strings.NewReader(string(body))),
es.client.Search.WithContext(context.Background()),
)
if err != nil {
return fmt.Errorf("ES搜索失败: %w", err)
}
defer res.Body.Close()
if res.IsError() {
bodyBytes, _ := io.ReadAll(res.Body)
return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes))
}
// 读取并解析响应体
bodyBytes, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("读取响应体失败: %w", err)
}
var result map[string]interface{}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
return fmt.Errorf("解析ES响应失败: %w", err)
}
// 检查是否有错误
if errMsg, exists := result["error"]; exists {
return fmt.Errorf("ES返回错误: %v", errMsg)
}
hits, ok := result["hits"].(map[string]interface{})
if !ok {
return fmt.Errorf("无法解析hits字段")
}
// 获取总命中数
if totalHits, exists := hits["total"].(map[string]interface{}); exists {
if totalValue, exists := totalHits["value"]; exists {
log.Printf("ES返回总命中数: %.0f", totalValue)
}
}
hitList, ok := hits["hits"].([]interface{})
if !ok || len(hitList) == 0 {
log.Printf("第 %d 页没有数据,查询完成", page)
break // 没有更多数据
}
// 处理当前批次的数据
batchCount := 0
for _, hit := range hitList {
hitMap, ok := hit.(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析hit数据")
continue
}
source, ok := hitMap["_source"].(map[string]interface{})
if !ok {
log.Printf("警告: 无法解析_source字段")
continue
}
isbn, ok := source["isbn"].(string)
if ok && isbn != "" {
allISBNs = append(allISBNs, isbn)
batchCount++
} else {
log.Printf("警告: 跳过空的ISBN字段")
}
// 更新游标(使用最后一个文档的排序值)
sortValues, ok := hitMap["sort"].([]interface{})
if ok && len(sortValues) > 0 {
searchAfter = sortValues
}
}
totalCount += batchCount
log.Printf("第 %d 页: 获取 %d 条ISBN记录总计: %d", page, batchCount, totalCount)
page++
// 如果返回的数量小于请求的数量,说明已经是最后一页
if len(hitList) < 10000 {
log.Printf("最后一页数据量 %d < 10000查询完成", len(hitList))
break
}
// 添加短暂延迟避免对ES造成过大压力
time.Sleep(100 * time.Millisecond)
}
if len(allISBNs) == 0 {
return fmt.Errorf("没有找到符合条件的ISBN记录")
}
// 去重
isbnSet := make(map[string]bool)
uniqueISBNs := make([]string, 0)
for _, isbn := range allISBNs {
if !isbnSet[isbn] {
isbnSet[isbn] = true
uniqueISBNs = append(uniqueISBNs, isbn)
}
}
log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs))
// 确保输出目录存在
outputDir := filepath.Dir(outputFile)
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("创建输出目录失败: %w", err)
}
// 写入文件
file, err := os.Create(outputFile)
if err != nil {
return fmt.Errorf("创建文件失败: %w", err)
}
defer file.Close()
// 写入文件头信息
header := fmt.Sprintf(`# 有销售记录且book_pic为空的ISBN列表
# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND (book_pic为空 或 book_pic字段不存在)
# 索引: %s
# 查询时间: %s
# 总记录数: %d
`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs))
if _, err := file.WriteString(header); err != nil {
return fmt.Errorf("写入文件头失败: %w", err)
}
// 按字母顺序排序后写入
sort.Strings(uniqueISBNs)
successCount := 0
for _, isbn := range uniqueISBNs {
if _, err := file.WriteString(isbn + "\n"); err != nil {
log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err)
continue
}
successCount++
}
log.Printf("成功导出 %d/%d 个符合条件的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile)
return nil
}
// 查询并导出有销售记录且book_pic为空的ISBN主函数
func mainQuerySaleISBNsWithEmptyPic() {
// 初始化ES客户端
es, err := NewESClient([]string{esAddress}, esUsername, esPassword)
if err != nil {
log.Fatalf("ES连接失败: %v", err)
}
// 检查ES健康状态
if err := es.CheckHealth(); err != nil {
log.Fatalf("ES健康检查失败: %v", err)
}
// 输出文件路径
outputFile := "es/sale_isbns_empty_pic.txt"
// 查询并导出ISBN
startTime := time.Now()
if err := queryAndExportSaleISBNsWithEmptyPic(es, outputFile); err != nil {
log.Fatalf("导出有销售记录且book_pic为空的ISBN失败: %v", err)
}
elapsed := time.Since(startTime)
log.Printf("任务完成!耗时: %vISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile)
}