package main //import ( // "context" // "crypto/md5" // "crypto/tls" // "database/sql" // "encoding/hex" // "encoding/json" // "fmt" // "github.com/elastic/go-elasticsearch/v8" // "github.com/elastic/go-elasticsearch/v8/esapi" // "golang.org/x/image/bmp" // "golang.org/x/image/draw" // "golang.org/x/image/tiff" // "golang.org/x/image/webp" // "image" // "image/color" // "image/jpeg" // "image/png" // "io" // "io/ioutil" // "log" // "mime/multipart" // "net/http" // "os" // "path/filepath" // "sort" // "strings" // "sync" // "sync/atomic" // "time" // // _ "github.com/go-sql-driver/mysql" // "github.com/nfnt/resize" //) // //// ES 配置 //const ( // esAddress = "http://103.236.91.138:9200" // esUsername = "elastic" // esPassword = "5mRDIUg52VC0fp14nw-F" // esIndex = "books-from-mysql" // ClientID = "203c5a7ba8bd4b8488d5e26f93052642" // 拼多多开放平台配置 // ClientSecret = "892ffaa86e12b7a3d8d2942b669d9aa520ad8179" // PDDApiURL = "https://gw-upload.pinduoduo.com/api/upload" //) // //// 配置参数 //const ( // maxWorkers = 15 // 最大并发worker数量 // maxRetries = 3 // 最大重试次数 // retryDelay = 2 * time.Second // 重试延迟 // progressInterval = 5 * time.Second // 进度报告间隔 //) // //// ES 客户端封装 //type ESClient struct { // client *elasticsearch.Client //} // //// 数据库记录结构体 //type CrawlerRecord struct { // BookISBN sql.NullString // BookPicture sql.NullString //} // //// 处理结果结构体 //type ProcessResult struct { // Record CrawlerRecord // Success bool // LocalPaths []string // PDDURLs []string // Error error // WorkerID int // ProcessedAt time.Time //} // //// 全局统计 //type Statistics struct { // Total int32 // Success int32 // Failed int32 // Skipped int32 // CurrentIndex int32 // StartTime time.Time //} // //// NewESClient 初始化 ES 客户端 //// 说明:保持一致的连接方式(禁用证书校验、设置超时和连接池参数) //func NewESClient(addresses []string, username, password string) (*ESClient, error) { // cfg := elasticsearch.Config{ // Addresses: addresses, // Username: username, // Password: password, // Transport: &http.Transport{ // TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, // MaxIdleConnsPerHost: 100, // ResponseHeaderTimeout: 60 * time.Second, // }, // } // cli, err := elasticsearch.NewClient(cfg) // if err != nil { // return nil, err // } // return &ESClient{client: cli}, nil //} // //// CheckHealth 检查 ES 集群健康 //// 行为:等待状态至少为 yellow,输出基本信息 //func (es *ESClient) CheckHealth() error { // res, err := es.client.Cluster.Health( // es.client.Cluster.Health.WithWaitForStatus("yellow"), // es.client.Cluster.Health.WithTimeout(30*time.Second), // ) // if err != nil { // return err // } // defer res.Body.Close() // if res.IsError() { // return fmt.Errorf("Elasticsearch 健康检查失败: %s", res.String()) // } // var m map[string]interface{} // if err := json.NewDecoder(res.Body).Decode(&m); err == nil { // log.Printf("ES status=%v nodes=%v cluster=%v", m["status"], m["number_of_nodes"], m["cluster_name"]) // } // return nil //} // //// PDDImageProcessor 实现图片处理器 //// pdd上传图片官方接口 //// 上传图片到拼多多 //func uploadToPDD(token, imagePath string) (string, error) { // // 检查token是否有效 // if len(token) == 0 { // return "", fmt.Errorf("获取到的token为空") // } // result, err := ProcessAndUploadImage(imagePath, token) // if err != nil { // return "", fmt.Errorf("拼多多图片上传失败: %v", err) // } // // // 解析JSON响应获取URL // var response struct { // RequestID string `json:"request_id"` // URL string `json:"url"` // } // // err = json.Unmarshal([]byte(result), &response) // if err != nil { // return "", fmt.Errorf("解析上传响应失败: %v", err) // } // // if response.URL == "" { // return "", fmt.Errorf("上传响应中未找到URL") // } // // return response.URL, nil //} //func ProcessAndUploadImage(imagePath, token string) (string, error) { // // 打开图片文件 // file, err := os.Open(imagePath) // if err != nil { // return "", fmt.Errorf("failed to open image file: %v", err) // } // defer file.Close() // // // 准备参数 - 不包含文件路径 // params := map[string]string{ // "access_token": token, // "data_type": "JSON", // "type": "pdd.goods.img.upload", // "client_id": ClientID, // "timestamp": fmt.Sprintf("%d", time.Now().Unix()), // } // // // 生成签名(不包含文件路径) // params["sign"] = generateSign(params) // // // 创建multipart表单 // body := &strings.Builder{} // writer := multipart.NewWriter(body) // // // 写入文本参数 // for key, value := range params { // if err := writer.WriteField(key, value); err != nil { // return "", fmt.Errorf("failed to write field %s: %v", key, err) // } // } // // // 写入文件流 - 使用正确的字段名 "file" // part, err := writer.CreateFormFile("file", filepath.Base(imagePath)) // if err != nil { // return "", fmt.Errorf("failed to create form file: %v", err) // } // // if _, err := io.Copy(part, file); err != nil { // return "", fmt.Errorf("failed to copy file data: %v", err) // } // // // 关闭writer // if err := writer.Close(); err != nil { // return "", fmt.Errorf("failed to close writer: %v", err) // } // // // 创建请求 // req, err := http.NewRequest("POST", PDDApiURL, strings.NewReader(body.String())) // if err != nil { // return "", fmt.Errorf("failed to create request: %v", err) // } // // // 设置请求头 // req.Header.Set("Content-Type", writer.FormDataContentType()) // req.Header.Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36") // // // 发送请求 // client := &http.Client{Timeout: 30 * time.Second} // resp, err := client.Do(req) // if err != nil { // return "", fmt.Errorf("failed to send request: %v", err) // } // defer resp.Body.Close() // // // 读取响应 // respBody, err := io.ReadAll(resp.Body) // if err != nil { // return "", fmt.Errorf("failed to read response: %v", err) // } // // log.Printf("拼多多API响应状态: %d", resp.StatusCode) // log.Printf("拼多多API响应内容: %s", string(respBody)) // // if resp.StatusCode != http.StatusOK { // return "", fmt.Errorf("API returned error status: %d, body: %s", resp.StatusCode, string(respBody)) // } // // // 解析响应 // var result map[string]interface{} // if err := json.Unmarshal(respBody, &result); err != nil { // return "", fmt.Errorf("failed to parse response: %v", err) // } // // // 检查API返回的错误 // if errorResponse, exists := result["error_response"]; exists { // errorMsg, _ := json.Marshal(errorResponse) // return "", fmt.Errorf("API returned error: %s", string(errorMsg)) // } // // // 查找成功的响应 // for key, value := range result { // if key != "error_response" { // successResponse, _ := json.Marshal(value) // return string(successResponse), nil // } // } // // return string(respBody), nil //} // //// generateSign 生成拼多多API签名 //func generateSign(params map[string]string) string { // // 按参数名排序 // var keys []string // for k := range params { // keys = append(keys, k) // } // sort.Strings(keys) // // 拼接参数字符串 // var signStr string // for _, k := range keys { // signStr += k + params[k] // } // signStr = ClientSecret + signStr + ClientSecret // // 计算MD5并转为大写 // hasher := md5.New() // hasher.Write([]byte(signStr)) // result := strings.ToUpper(hex.EncodeToString(hasher.Sum(nil))) // return result //} // //// GetPddToken 获取PDD token(简化版) //func GetPddToken() (string, error) { // url := "https://api.buzhiyushu.cn/huidiao/pdd/getPddChildrenBooksToken" // // resp, err := http.Get(url) // if err != nil { // return "", err // } // defer resp.Body.Close() // // body, err := ioutil.ReadAll(resp.Body) // if err != nil { // return "", err // } // // // 使用map解析JSON // var result map[string]interface{} // json.Unmarshal(body, &result) // // // 检查业务状态 // if code, ok := result["code"].(float64); !ok || code != 200 { // return "", fmt.Errorf("API错误: %v", result["msg"]) // } // // // 提取token // token := result["data"].(string) // return token, nil //} // //// 并发处理记录的主要函数 //func processRecordsConcurrently(records []CrawlerRecord, imageDir string, es *ESClient, maxWorkers int, token string) []ProcessResult { // var stats Statistics // stats.Total = int32(len(records)) // stats.StartTime = time.Now() // // // 创建通道 // recordChan := make(chan CrawlerRecord, len(records)) // resultChan := make(chan ProcessResult, len(records)) // // // 启动worker // var wg sync.WaitGroup // for i := 0; i < maxWorkers; i++ { // wg.Add(1) // go worker(token, i, &wg, recordChan, resultChan, imageDir, es, &stats) // } // // // 发送任务到通道 // go func() { // for _, record := range records { // recordChan <- record // } // close(recordChan) // }() // // // 启动进度报告 // go progressReporter(&stats) // // // 收集结果 // var results []ProcessResult // go func() { // for result := range resultChan { // results = append(results, result) // } // }() // // // 等待所有worker完成 // wg.Wait() // close(resultChan) // // return results //} // //// worker 处理函数 //func worker(token string, id int, wg *sync.WaitGroup, recordChan <-chan CrawlerRecord, resultChan chan<- ProcessResult, imageDir string, es *ESClient, stats *Statistics) { // defer wg.Done() // // for record := range recordChan { // currentIndex := atomic.AddInt32(&stats.CurrentIndex, 1) // // result := ProcessResult{ // Record: record, // WorkerID: id, // ProcessedAt: time.Now(), // } // // // 检查记录有效性 // if !isRecordValid(record) { // atomic.AddInt32(&stats.Skipped, 1) // result.Success = false // result.Error = fmt.Errorf("无效记录: ISBN或图片URL为空") // resultChan <- result // continue // } // // // 处理记录(带重试机制) // var localPaths, pddURLs []string // var err error // // for attempt := 1; attempt <= maxRetries; attempt++ { // localPaths, pddURLs, err = processSingleRecord(token, record, imageDir, es) // if err == nil { // break // } // // // 如果是ES记录未找到的错误,不需要重试 // if strings.Contains(err.Error(), "ES记录未找到") { // break // } // // if attempt < maxRetries { // log.Printf("Worker %d: 第 %d 次尝试处理 ISBN %s 失败, %d 秒后重试: %v", // id, attempt, record.BookISBN.String, retryDelay/time.Second, err) // time.Sleep(retryDelay) // } // } // // if err != nil { // atomic.AddInt32(&stats.Failed, 1) // result.Success = false // result.Error = err // // 即使失败,也记录已处理的本地路径(如果有) // result.LocalPaths = localPaths // result.PDDURLs = pddURLs // // // 根据错误类型记录不同的日志 // if strings.Contains(err.Error(), "ES记录未找到") { // log.Printf("Worker %d: ES记录未找到 [%d/%d] ISBN: %s", // id, currentIndex, stats.Total, record.BookISBN.String) // } else { // log.Printf("Worker %d: 处理失败 [%d/%d] ISBN: %s, 错误: %v", // id, currentIndex, stats.Total, record.BookISBN.String, err) // } // } else { // atomic.AddInt32(&stats.Success, 1) // result.Success = true // result.LocalPaths = localPaths // result.PDDURLs = pddURLs // // log.Printf("Worker %d: 成功处理 [%d/%d] ISBN: %s, 生成 %d 个文件, 上传 %d 个URL", // id, currentIndex, stats.Total, record.BookISBN.String, len(localPaths), len(pddURLs)) // } // // resultChan <- result // } //} // //// 检查记录有效性 //func isRecordValid(record CrawlerRecord) bool { // if !record.BookISBN.Valid || record.BookISBN.String == "" { // return false // } // if !record.BookPicture.Valid || record.BookPicture.String == "" { // return false // } // return true //} // //// 进度报告器 //func progressReporter(stats *Statistics) { // ticker := time.NewTicker(progressInterval) // defer ticker.Stop() // // for range ticker.C { // processed := atomic.LoadInt32(&stats.CurrentIndex) // success := atomic.LoadInt32(&stats.Success) // failed := atomic.LoadInt32(&stats.Failed) // skipped := atomic.LoadInt32(&stats.Skipped) // // elapsed := time.Since(stats.StartTime) // rate := float64(processed) / elapsed.Seconds() // // // 计算预估剩余时间 // var eta time.Duration // if processed > 0 && rate > 0 { // remaining := float64(stats.Total - processed) // eta = time.Duration(remaining/rate) * time.Second // } // // fmt.Printf("[进度] 已处理: %d/%d (成功: %d, 失败: %d, 跳过: %d) | 速率: %.2f 条/秒 | 运行: %v | ETA: %v\n", // processed, stats.Total, success, failed, skipped, rate, elapsed.Round(time.Second), eta.Round(time.Second)) // // if processed >= stats.Total { // break // } // } //} // //// 打印最终统计 //func printFinalStatistics(results []ProcessResult) { // var success, failed, skipped int // var totalFilesGenerated int // var totalURLsUploaded int // // // 失败原因分类 // failureReasons := make(map[string]int) // // for _, result := range results { // if result.Success { // success++ // totalFilesGenerated += len(result.LocalPaths) // totalURLsUploaded += len(result.PDDURLs) // } else if result.Error != nil && strings.Contains(result.Error.Error(), "无效记录") { // skipped++ // failureReasons["无效记录(ISBN或URL为空)"]++ // } else { // failed++ // // 即使是失败的情况,也可能生成了部分文件 // totalFilesGenerated += len(result.LocalPaths) // totalURLsUploaded += len(result.PDDURLs) // // // 分类失败原因 // errMsg := result.Error.Error() // switch { // case strings.Contains(errMsg, "ES记录未找到"): // failureReasons["ES记录未找到"]++ // case strings.Contains(errMsg, "查询ES中ID失败"): // failureReasons["ES查询失败"]++ // case strings.Contains(errMsg, "下载图片失败"): // failureReasons["图片下载失败"]++ // case strings.Contains(errMsg, "处理图片失败"): // failureReasons["图片处理失败"]++ // case strings.Contains(errMsg, "上传PNG图片失败"): // failureReasons["PNG上传失败"]++ // case strings.Contains(errMsg, "上传JPG图片失败"): // failureReasons["JPG上传失败"]++ // case strings.Contains(errMsg, "更新ES数据失败"): // failureReasons["ES更新失败"]++ // default: // failureReasons["其他错误"]++ // } // } // } // // fmt.Printf("\n=== 处理完成 ===\n") // fmt.Printf("总记录数: %d\n", len(results)) // fmt.Printf("成功: %d\n", success) // fmt.Printf("失败: %d\n", failed) // fmt.Printf("跳过: %d\n", skipped) // fmt.Printf("成功率: %.2f%%\n", float64(success)/float64(len(results))*100) // fmt.Printf("生成文件总数: %d (平均每条记录 %.1f 个文件)\n", totalFilesGenerated, float64(totalFilesGenerated)/float64(len(results))) // fmt.Printf("上传URL总数: %d (平均每条记录 %.1f 个URL)\n", totalURLsUploaded, float64(totalURLsUploaded)/float64(len(results))) // // // 显示失败原因统计 // if len(failureReasons) > 0 { // fmt.Printf("\n=== 失败原因统计 ===\n") // for reason, count := range failureReasons { // fmt.Printf(" %s: %d\n", reason, count) // } // } // // // 显示处理详情示例 // fmt.Printf("\n=== 处理详情示例 ===\n") // successCount := 0 // failedCount := 0 // for _, result := range results { // if result.Success && successCount < 3 { // fmt.Printf("✅ 成功: ISBN %s -> 文件: %d 个, URL: %d 个\n", // result.Record.BookISBN.String, // len(result.LocalPaths), // len(result.PDDURLs)) // successCount++ // } else if !result.Success && failedCount < 3 && !strings.Contains(result.Error.Error(), "无效记录") { // fmt.Printf("❌ 失败: ISBN %s -> 错误: %v\n", // result.Record.BookISBN.String, // result.Error) // failedCount++ // } // if successCount >= 3 && failedCount >= 3 { // break // } // } //} // //// 处理单条记录 //func processSingleRecord(token string, record CrawlerRecord, imageDir string, es *ESClient) ([]string, []string, error) { // // 更新ES // ids, err := es.FindIDsByISBN(esIndex, record.BookISBN.String) // if err != nil { // return nil, nil, fmt.Errorf("查询ES中ID失败: %v", err) // } // var pngImageUrl string // var jpgImageUrl string // var localPaths []string // var pddURLs []string // if ids != "" { // // 下载并处理图片 // pngPath, jpgPath, err := processAndSaveImage(record, imageDir) // if err != nil { // err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String) // if err != nil { // log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err) // } else { // log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String) // } // return nil, nil, fmt.Errorf("处理图片失败: %v", err) // } // localPaths = []string{pngPath, jpgPath} // // 上传到PDD // pngImageUrl, err = uploadToPDD(token, pngPath) // if err != nil { // err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String) // if err != nil { // log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err) // } else { // log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String) // } // return nil, nil, fmt.Errorf("上传PNG图片失败: %v", err) // } // // 上传到PDD // jpgImageUrl, err = uploadToPDD(token, jpgPath) // if err != nil { // err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String) // if err != nil { // log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err) // } else { // log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String) // } // return nil, nil, fmt.Errorf("上传JPG图片失败: %v", err) // } // pddURLs = []string{pngImageUrl, jpgImageUrl} // err = es.UpdateBookPicsByID(esIndex, ids, "", pngImageUrl, jpgImageUrl) // if err != nil { // err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String) // if err != nil { // log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err) // } else { // log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String) // } // return nil, nil, fmt.Errorf("更新ES数据失败: %v", err) // } // // for _, path := range localPaths { // // ES更新成功后删除本地图片 // if removeErr := os.Remove(path); removeErr == nil { // log.Printf("ES更新成功,已删除本地图片: %s", path) // } else { // log.Printf("警告: 无法删除本地图片 %s: %v", path, removeErr) // } // } // } else { // // ids为空,将ISBN存储到txt文件 // err := saveISBNToFile(record.BookISBN.String, record.BookPicture.String) // if err != nil { // log.Printf("警告: 无法保存未找到的ISBN到文件: %v", err) // } else { // log.Printf("未找到ISBN %s 对应的ES记录,已保存到文件", record.BookISBN.String) // } // return nil, nil, fmt.Errorf("未找到ISBN %s 对应的ES记录", record.BookISBN.String) // } // return localPaths, pddURLs, nil //} // //// 保存未找到的ISBN和图片URL到txt文件(CSV格式,带去重) //func saveISBNToFile(isbn string, imageUrl string) error { // filename := "cmd/update_es_gt/xgy_not_found_isbns.txt" // // // 读取现有内容检查是否已存在 // existingRecords := make(map[string]bool) // if content, err := os.ReadFile(filename); err == nil { // lines := strings.Split(string(content), "\n") // for _, line := range lines { // if line != "" && !strings.HasPrefix(line, "#") { // parts := strings.Split(line, ",") // if len(parts) > 0 { // existingRecords[parts[0]] = true // 以ISBN作为去重依据 // } // } // } // } // // // 如果已存在,则不重复添加 // if existingRecords[isbn] { // return nil // } // // 以追加模式打开文件 // file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) // if err != nil { // return fmt.Errorf("打开文件失败: %v", err) // } // defer file.Close() // // 如果是空文件,先写入CSV表头 // stat, err := file.Stat() // if err == nil && stat.Size() == 0 { // header := "# ISBN,ImageURL\n" // if _, err := file.WriteString(header); err != nil { // return fmt.Errorf("写入表头失败: %v", err) // } // } // // // 写入ISBN和图片URL,用逗号分隔,并添加换行符 // line := fmt.Sprintf("%s,%s\n", isbn, imageUrl) // _, err = file.WriteString(line) // if err != nil { // return fmt.Errorf("写入文件失败: %v", err) // } // // return nil //} // //// 下载并处理图片 //func processAndSaveImage(record CrawlerRecord, saveDir string) (string, string, error) { // // 下载图片 // img, originalFormat, err := downloadImage(record.BookPicture.String) // if err != nil { // return "", "", fmt.Errorf("下载图片失败: %v", err) // } // // fmt.Printf("下载成功,原始格式: %s\n", originalFormat) // // // 调整图片高度为600,等比例缩放 // //resizedImg := resizeImageToHeight(img, 600) // // 使用高质量缩放调整图片高度为600,等比例缩放 // resizedImg := resizeToHeightHighQuality(img, 600) // fmt.Printf("缩放后尺寸: %dx%d\n", resizedImg.Bounds().Dx(), resizedImg.Bounds().Dy()) // // // 创建800x800的透明背景 // finalImg := createCenteredImage(resizedImg, 800, 800, true) // // // 创建800x800的白色背景(用于JPG) // whiteImg := createCenteredImage(resizedImg, 800, 800, false) // // // 生成文件名 // filename := fmt.Sprintf("%s", record.BookISBN.String) // // 清理文件名中的非法字符 // filename = sanitizeFilename(filename) // // // PNG文件路径 // pngPath := filepath.Join(saveDir, filename+".png") // // JPG文件路径 // jpgPath := filepath.Join(saveDir, filename+".jpg") // // // 保存为PNG图片 // err = savePNG(finalImg, pngPath) // if err != nil { // return "", "", fmt.Errorf("保存图片失败: %v", err) // } // // // 保存为JPG图片(白色背景) // err = saveJPG(whiteImg, jpgPath, 95) // 95%质量 // if err != nil { // return "", "", fmt.Errorf("保存JPG图片失败: %v", err) // } // // fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "PNG", pngPath) // fmt.Printf("转换成功: %s -> %s, 保存路径: %s\n", originalFormat, "JPG", jpgPath) // return pngPath, jpgPath, nil //} // //// 下载图片 //func downloadImage(url string) (image.Image, string, error) { // // 创建HTTP客户端,设置超时等参数 // client := &http.Client{ // Timeout: 30 * time.Second, // } // // resp, err := client.Get(url) // if err != nil { // return nil, "", err // } // defer resp.Body.Close() // // if resp.StatusCode != http.StatusOK { // return nil, "", fmt.Errorf("HTTP请求失败,状态码: %d", resp.StatusCode) // } // // // 读取响应体前几个字节来判断图片格式 // peekBytes := make([]byte, 512) // n, err := resp.Body.Read(peekBytes) // if err != nil && err != io.EOF { // return nil, "", err // } // // // 创建一个新的Reader,包含已读取的数据和剩余数据 // reader := io.MultiReader(strings.NewReader(string(peekBytes[:n])), resp.Body) // // // 根据文件头识别图片格式 // contentType := http.DetectContentType(peekBytes[:n]) // fmt.Printf("检测到的Content-Type: %s\n", contentType) // // var img image.Image // var format string // // // 根据Content-Type或文件扩展名选择解码器 // switch { // case strings.Contains(contentType, "jpeg") || strings.HasSuffix(strings.ToLower(url), ".jpg") || strings.HasSuffix(strings.ToLower(url), ".jpeg"): // img, err = jpeg.Decode(reader) // format = "JPEG" // case strings.Contains(contentType, "png") || strings.HasSuffix(strings.ToLower(url), ".png"): // img, err = png.Decode(reader) // format = "PNG" // case strings.Contains(contentType, "webp") || strings.HasSuffix(strings.ToLower(url), ".webp"): // img, err = webp.Decode(reader) // format = "WEBP" // case strings.Contains(contentType, "bmp") || strings.HasSuffix(strings.ToLower(url), ".bmp"): // img, err = bmp.Decode(reader) // format = "BMP" // case strings.Contains(contentType, "tiff") || strings.HasSuffix(strings.ToLower(url), ".tiff") || strings.HasSuffix(strings.ToLower(url), ".tif"): // img, err = tiff.Decode(reader) // format = "TIFF" // default: // // 尝试通用解码 // img, format, err = image.Decode(reader) // if err != nil { // return nil, "", fmt.Errorf("不支持的图片格式: %s, 错误: %v", contentType, err) // } // } // // if err != nil { // return nil, "", fmt.Errorf("解码图片失败: %v", err) // } // // return img, format, nil //} // //// 高质量等比例缩放到指定高度 //func resizeToHeightHighQuality(src image.Image, targetHeight int) image.Image { // bounds := src.Bounds() // srcWidth := bounds.Dx() // srcHeight := bounds.Dy() // // // 如果原图高度已经小于等于目标高度,且宽度合适,可以直接返回 // //if srcHeight <= targetHeight { // // return src // //} // // // 计算等比例缩放后的宽度 // targetWidth := uint(float64(srcWidth) * float64(targetHeight) / float64(srcHeight)) // // // 使用 Lanczos3 插值算法进行高质量缩放 // return resize.Resize(targetWidth, uint(targetHeight), src, resize.Lanczos3) //} // //// 创建居中图片(将原图放在指定大小的透明背景中央) //func createCenteredImage(src image.Image, width, height int, transparent bool) *image.RGBA { // // 创建透明背景 // dst := image.NewRGBA(image.Rect(0, 0, width, height)) // // // 设置背景颜色 // var bgColor color.Color // if transparent { // bgColor = color.RGBA{0, 0, 0, 0} // 透明 // } else { // bgColor = color.RGBA{255, 255, 255, 255} // 白色 // } // // // 填充透明背景 // //transparent := color.RGBA{0, 0, 0, 0} // draw.Draw(dst, dst.Bounds(), &image.Uniform{bgColor}, image.Point{}, draw.Src) // // // 计算居中位置 // srcBounds := src.Bounds() // srcWidth := srcBounds.Dx() // srcHeight := srcBounds.Dy() // // x := (width - srcWidth) / 2 // y := (height - srcHeight) / 2 // // // 将原图绘制到中央 // draw.Draw(dst, image.Rect(x, y, x+srcWidth, y+srcHeight), src, image.Point{}, draw.Over) // // return dst //} // //// 保存为PNG图片 //func savePNG(img image.Image, filename string) error { // file, err := os.Create(filename) // if err != nil { // return err // } // defer file.Close() // // return png.Encode(file, img) //} // //// 保存为JPG图片 //func saveJPG(img image.Image, filename string, quality int) error { // file, err := os.Create(filename) // if err != nil { // return err // } // defer file.Close() // // // 设置JPEG编码选项 // options := &jpeg.Options{ // Quality: quality, // 1-100,越高质量越好 // } // // return jpeg.Encode(file, img, options) //} // //// 清理文件名中的非法字符 //func sanitizeFilename(filename string) string { // // 替换Windows文件名中不允许的字符 // invalidChars := []string{"\\", "/", ":", "*", "?", "\"", "<", ">", "|"} // for _, char := range invalidChars { // filename = strings.ReplaceAll(filename, char, "_") // } // // 移除或替换其他可能的问题字符 // filename = strings.TrimSpace(filename) // if filename == "" { // filename = "unknown" // } // return filename //} // //// 从数据库获取记录 //func getRecords(db *sql.DB) ([]CrawlerRecord, error) { // // 查询所有记录,包括 NULL 值 // query := "SELECT book_isbn, book_picture FROM dk_crawler_record_info" // rows, err := db.Query(query) // if err != nil { // return nil, err // } // defer rows.Close() // // var records []CrawlerRecord // for rows.Next() { // var record CrawlerRecord // // 使用 sql.NullString 来接收可能为 NULL 的字段 // err := rows.Scan(&record.BookISBN, &record.BookPicture) // if err != nil { // fmt.Printf("扫描记录失败: %v\n", err) // continue // } // records = append(records, record) // } // // // 检查遍历过程中是否有错误 // if err = rows.Err(); err != nil { // return nil, err // } // // return records, nil //} // //// FindIDsByISBN 根据 ISBN 查询文档 ID 列表 //func (es *ESClient) FindIDsByISBN(index, isbn string) (string, error) { // q := map[string]interface{}{ // "query": map[string]interface{}{ // "term": map[string]interface{}{"isbn": isbn}, // }, // "_source": false, // "size": 1000, // } // b, _ := json.Marshal(q) // res, err := es.client.Search( // es.client.Search.WithIndex(index), // es.client.Search.WithBody(strings.NewReader(string(b))), // es.client.Search.WithContext(context.Background()), // ) // if err != nil { // return "", err // } // defer res.Body.Close() // if res.IsError() { // return "", fmt.Errorf("搜索失败: %s", res.String()) // } // var r map[string]interface{} // if err := json.NewDecoder(res.Body).Decode(&r); err != nil { // return "", err // } // hits, _ := r["hits"].(map[string]interface{}) // arr, _ := hits["hits"].([]interface{}) // var ids string // for _, h := range arr { // m, _ := h.(map[string]interface{}) // id, _ := m["_id"].(string) // if id != "" { // //ids = append(ids, id) // ids = id // } // } // return ids, nil //} // //func (es *ESClient) UpdateBookPicsByID(index, id, localImageS, pngImageUrl, jpgImageUrl string) error { // bookPicJSON, err := json.Marshal(map[string]string{ // "localPath": localImageS, // "pddPath": jpgImageUrl, // }) // if err != nil { // return fmt.Errorf("序列化 book_pic_w 失败: %w", err) // } // // bookPicBJSON, err := json.Marshal(map[string]string{ // "localPath": localImageS, // "pddResponse": pngImageUrl, // }) // if err != nil { // return fmt.Errorf("序列化 book_pic_b 失败: %w", err) // } // // 构建更新文档 // payload := map[string]interface{}{ // "doc": map[string]string{ // "book_pic": string(bookPicJSON), // "book_pic_b": string(bookPicBJSON), // }, // } // // JSON 序列化整个更新请求 // body, err := json.Marshal(payload) // if err != nil { // return fmt.Errorf("序列化更新请求失败: %w", err) // } // req := esapi.UpdateRequest{ // Index: index, // DocumentID: id, // Body: strings.NewReader(string(body)), // } // res, err := req.Do(context.Background(), es.client) // if err != nil { // return err // } // defer res.Body.Close() // if res.IsError() { // data, _ := io.ReadAll(res.Body) // return fmt.Errorf("ES 更新失败: %s", data) // } // return nil //} // //// 从 sql.NullString 获取字符串值 //func getStringValue(nullString sql.NullString) string { // if nullString.Valid { // return nullString.String // } // return "NULL" //} // //func main() { // //// 获取token // //token, err := GetPddToken() // //if err != nil { // // fmt.Errorf("获取拼多多token失败: %v", err) // //} // //fmt.Println("token=", token) // //// 数据源名称格式 // //dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", // // "root", // // "123456", // // "localhost", // // 3306, // // "book_image") // //db, err := sql.Open("mysql", dsn) // //if err != nil { // // fmt.Printf("打开数据库连接失败: %v", err) // //} // //// 设置连接池参数 // //db.SetMaxOpenConns(20) // 最大打开连接数 // //db.SetMaxIdleConns(10) // //err = db.Ping() // //if err != nil { // // fmt.Printf("数据库连接测试失败: %v", err) // //} // // // //// 查询数据 // //records, err := getRecords(db) // //if err != nil { // // fmt.Printf("查询失败: %v", err) // //} // //imageDir := "D:\\image" // //err = os.MkdirAll(imageDir, 0755) // //if err != nil { // // fmt.Sprintf("创建目录失败: %v", err) // //} // //fmt.Printf("找到 %d 条记录需要处理\n", len(records)) // // // //es, err := NewESClient([]string{esAddress}, esUsername, esPassword) // //if err != nil { // // log.Fatalf("ES 连接失败: %v", err) // //} // //if err := es.CheckHealth(); err != nil { // // log.Fatalf("ES 健康检查失败: %v", err) // //} // //// 启动并发处理 // //results := processRecordsConcurrently(records, imageDir, es, maxWorkers, token) // // // //// 输出最终统计 // //printFinalStatistics(results) // // //mainQuerySaleISBNs() // //mainFindESOnlyISBNs() // mainQuerySaleISBNsWithEmptyPic() //} // //// 查询并导出有销售记录且book_pic字符串中pddPath为空的ISBN //func queryAndExportSaleISBNs(es *ESClient, outputFile string) error { // log.Printf("开始查询有销售记录且book_pic字符串中pddPath为空的ISBN...") // // // 使用其他字段排序,比如 isbn 字段或者时间字段 // query := map[string]interface{}{ // "query": map[string]interface{}{ // "bool": map[string]interface{}{ // "must": []map[string]interface{}{ // { // "bool": map[string]interface{}{ // "should": []map[string]interface{}{ // {"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}}, // {"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}}, // {"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}}, // {"range": map[string]interface{}{"day_sale_60": map[string]interface{}{"gt": 0}}}, // }, // "minimum_should_match": 1, // }, // }, // { // "bool": map[string]interface{}{ // "should": []map[string]interface{}{ // // 匹配 pddPath:"" 的JSON字符串 // {"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\"\".*"}}, // // 匹配 pddPath: "" (带空格的) // {"regexp": map[string]interface{}{"book_pic": ".*\"pddPath\":\\s*\"\".*"}}, // // 匹配整个book_pic字段为空 // {"term": map[string]interface{}{"book_pic": ""}}, // // 匹配book_pic字段不存在 // { // "bool": map[string]interface{}{ // "must_not": map[string]interface{}{ // "exists": map[string]interface{}{"field": "book_pic"}, // }, // }, // }, // }, // }, // }, // }, // }, // }, // "_source": []string{"isbn"}, // "sort": []map[string]interface{}{ // {"isbn": "asc"}, // 使用 isbn 字段排序,或者使用其他可排序字段 // }, // "size": 10000, // } // // // 打印查询条件用于验证 // queryJSON, _ := json.MarshalIndent(query, "", " ") // log.Printf("查询条件:\n%s", string(queryJSON)) // // var allISBNs []string // var searchAfter interface{} // totalCount := 0 // page := 1 // // for { // // 复制基础查询 // currentQuery := make(map[string]interface{}) // for k, v := range query { // currentQuery[k] = v // } // // // 添加游标 // if searchAfter != nil { // currentQuery["search_after"] = searchAfter // } // // body, err := json.Marshal(currentQuery) // if err != nil { // return fmt.Errorf("序列化查询失败: %w", err) // } // // log.Printf("执行第 %d 页查询...", page) // // // 执行搜索 // res, err := es.client.Search( // es.client.Search.WithIndex(esIndex), // es.client.Search.WithBody(strings.NewReader(string(body))), // es.client.Search.WithContext(context.Background()), // ) // if err != nil { // return fmt.Errorf("ES搜索失败: %w", err) // } // defer res.Body.Close() // // if res.IsError() { // bodyBytes, _ := io.ReadAll(res.Body) // return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes)) // } // // // 读取并解析响应体 // bodyBytes, err := io.ReadAll(res.Body) // if err != nil { // return fmt.Errorf("读取响应体失败: %w", err) // } // // var result map[string]interface{} // if err := json.Unmarshal(bodyBytes, &result); err != nil { // return fmt.Errorf("解析ES响应失败: %w", err) // } // // // 检查是否有错误 // if errMsg, exists := result["error"]; exists { // return fmt.Errorf("ES返回错误: %v", errMsg) // } // // hits, ok := result["hits"].(map[string]interface{}) // if !ok { // return fmt.Errorf("无法解析hits字段") // } // // // 获取总命中数 // if totalHits, exists := hits["total"].(map[string]interface{}); exists { // if totalValue, exists := totalHits["value"]; exists { // log.Printf("ES返回总命中数: %.0f", totalValue) // } // } // // hitList, ok := hits["hits"].([]interface{}) // if !ok || len(hitList) == 0 { // log.Printf("第 %d 页没有数据,查询完成", page) // break // 没有更多数据 // } // // // 处理当前批次的数据 // batchCount := 0 // for _, hit := range hitList { // hitMap, ok := hit.(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析hit数据") // continue // } // // source, ok := hitMap["_source"].(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析_source字段") // continue // } // // isbn, ok := source["isbn"].(string) // if ok && isbn != "" { // allISBNs = append(allISBNs, isbn) // batchCount++ // } else { // log.Printf("警告: 跳过空的ISBN字段") // } // // // 更新游标(使用最后一个文档的排序值) // sortValues, ok := hitMap["sort"].([]interface{}) // if ok && len(sortValues) > 0 { // searchAfter = sortValues // } // } // // totalCount += batchCount // log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount) // page++ // // // 如果返回的数量小于请求的数量,说明已经是最后一页 // if len(hitList) < 10000 { // log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList)) // break // } // // // 添加短暂延迟,避免对ES造成过大压力 // time.Sleep(100 * time.Millisecond) // } // // if len(allISBNs) == 0 { // return fmt.Errorf("没有找到符合条件的ISBN记录") // } // // // 去重 // isbnSet := make(map[string]bool) // uniqueISBNs := make([]string, 0) // for _, isbn := range allISBNs { // if !isbnSet[isbn] { // isbnSet[isbn] = true // uniqueISBNs = append(uniqueISBNs, isbn) // } // } // // log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs)) // // // 确保输出目录存在 // outputDir := filepath.Dir(outputFile) // if err := os.MkdirAll(outputDir, 0755); err != nil { // return fmt.Errorf("创建输出目录失败: %w", err) // } // // // 写入文件 // file, err := os.Create(outputFile) // if err != nil { // return fmt.Errorf("创建文件失败: %w", err) // } // defer file.Close() // // // 写入文件头信息 // header := fmt.Sprintf(`# 有销售记录且book_pic字符串中pddPath为空的ISBN列表 //# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0 OR day_sale_60 > 0) AND (book_pic包含"pddPath":"" 或 book_pic为空 或 book_pic字段不存在) //# 索引: %s //# 统计时间: %s //# 总记录数: %d // //`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs)) // // if _, err := file.WriteString(header); err != nil { // return fmt.Errorf("写入文件头失败: %w", err) // } // // // 按字母顺序排序后写入 // sort.Strings(uniqueISBNs) // successCount := 0 // for _, isbn := range uniqueISBNs { // if _, err := file.WriteString(isbn + "\n"); err != nil { // log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err) // continue // } // successCount++ // } // // log.Printf("成功导出 %d/%d 个有销售记录且book_pic字符串中pddPath为空的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile) // return nil //} // //// 查询并导出销售ISBN的主函数 //func mainQuerySaleISBNs() { // // 初始化ES客户端 // es, err := NewESClient([]string{esAddress}, esUsername, esPassword) // if err != nil { // log.Fatalf("ES连接失败: %v", err) // } // // // 检查ES健康状态 // if err := es.CheckHealth(); err != nil { // log.Fatalf("ES健康检查失败: %v", err) // } // // // 输出文件路径 // outputFile := "cmd/update_es_gt/all_isbns.txt" // // // 查询并导出ISBN // startTime := time.Now() // if err := exportAllISBNs(es, outputFile); err != nil { // log.Fatalf("导出销售ISBN失败: %v", err) // } // // elapsed := time.Since(startTime) // log.Printf("任务完成!耗时: %v,ISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile) //} // //// 导出所有ISBN到txt文件 //func exportAllISBNs(es *ESClient, outputFile string) error { // log.Printf("开始导出所有ISBN...") // // // 查询所有包含isbn字段的文档 // query := map[string]interface{}{ // "query": map[string]interface{}{ // "exists": map[string]interface{}{ // "field": "isbn", // }, // }, // "_source": []string{"isbn"}, // "sort": []map[string]interface{}{ // {"isbn": "asc"}, // 按ISBN排序 // }, // "size": 10000, // } // // var allISBNs []string // var searchAfter interface{} // totalCount := 0 // page := 1 // // for { // // 复制基础查询 // currentQuery := make(map[string]interface{}) // for k, v := range query { // currentQuery[k] = v // } // // // 添加游标 // if searchAfter != nil { // currentQuery["search_after"] = searchAfter // } // // body, err := json.Marshal(currentQuery) // if err != nil { // return fmt.Errorf("序列化查询失败: %w", err) // } // // log.Printf("执行第 %d 页查询...", page) // // // 执行搜索 // res, err := es.client.Search( // es.client.Search.WithIndex(esIndex), // es.client.Search.WithBody(strings.NewReader(string(body))), // es.client.Search.WithContext(context.Background()), // ) // if err != nil { // return fmt.Errorf("ES搜索失败: %w", err) // } // defer res.Body.Close() // // if res.IsError() { // bodyBytes, _ := io.ReadAll(res.Body) // return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes)) // } // // // 读取并解析响应体 // bodyBytes, err := io.ReadAll(res.Body) // if err != nil { // return fmt.Errorf("读取响应体失败: %w", err) // } // // var result map[string]interface{} // if err := json.Unmarshal(bodyBytes, &result); err != nil { // return fmt.Errorf("解析ES响应失败: %w", err) // } // // // 检查是否有错误 // if errMsg, exists := result["error"]; exists { // return fmt.Errorf("ES返回错误: %v", errMsg) // } // // hits, ok := result["hits"].(map[string]interface{}) // if !ok { // return fmt.Errorf("无法解析hits字段") // } // // // 获取总命中数 // if totalHits, exists := hits["total"].(map[string]interface{}); exists { // if totalValue, exists := totalHits["value"]; exists { // if page == 1 { // log.Printf("ES索引中共有 %.0f 条包含ISBN的记录", totalValue) // } // } // } // // hitList, ok := hits["hits"].([]interface{}) // if !ok || len(hitList) == 0 { // log.Printf("第 %d 页没有数据,查询完成", page) // break // 没有更多数据 // } // // // 处理当前批次的数据 // batchCount := 0 // for _, hit := range hitList { // hitMap, ok := hit.(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析hit数据") // continue // } // // source, ok := hitMap["_source"].(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析_source字段") // continue // } // // isbn, ok := source["isbn"].(string) // if ok && isbn != "" { // allISBNs = append(allISBNs, isbn) // batchCount++ // } else { // log.Printf("警告: 跳过空的ISBN字段") // } // // // 更新游标(使用最后一个文档的排序值) // sortValues, ok := hitMap["sort"].([]interface{}) // if ok && len(sortValues) > 0 { // searchAfter = sortValues // } // } // // totalCount += batchCount // log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount) // page++ // // // 如果返回的数量小于请求的数量,说明已经是最后一页 // if len(hitList) < 10000 { // log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList)) // break // } // // // 添加短暂延迟,避免对ES造成过大压力 // time.Sleep(100 * time.Millisecond) // } // // if len(allISBNs) == 0 { // return fmt.Errorf("没有找到包含ISBN字段的记录") // } // // // 去重 // isbnSet := make(map[string]bool) // uniqueISBNs := make([]string, 0) // for _, isbn := range allISBNs { // if !isbnSet[isbn] { // isbnSet[isbn] = true // uniqueISBNs = append(uniqueISBNs, isbn) // } // } // // log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs)) // // // 确保输出目录存在 // outputDir := filepath.Dir(outputFile) // if err := os.MkdirAll(outputDir, 0755); err != nil { // return fmt.Errorf("创建输出目录失败: %w", err) // } // // // 写入文件 // file, err := os.Create(outputFile) // if err != nil { // return fmt.Errorf("创建文件失败: %w", err) // } // defer file.Close() // // // 写入文件头信息 // header := fmt.Sprintf(`# 所有ISBN列表 //# 索引: %s //# 导出时间: %s //# 总记录数: %d // //`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs)) // // if _, err := file.WriteString(header); err != nil { // return fmt.Errorf("写入文件头失败: %w", err) // } // // // 按字母顺序排序后写入 // sort.Strings(uniqueISBNs) // successCount := 0 // for _, isbn := range uniqueISBNs { // if _, err := file.WriteString(isbn + "\n"); err != nil { // log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err) // continue // } // successCount++ // } // // log.Printf("成功导出 %d/%d 个ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile) // return nil //} // //// 从ES获取所有ISBN //func getAllISBNsFromES(es *ESClient) ([]string, error) { // log.Printf("开始从ES索引 %s 获取所有ISBN...", esIndex) // // var allISBNs []string // var searchAfter interface{} // totalCount := 0 // page := 1 // // for { // query := map[string]interface{}{ // "query": map[string]interface{}{ // "exists": map[string]interface{}{ // "field": "isbn", // }, // }, // "_source": []string{"isbn"}, // "sort": []map[string]interface{}{ // {"isbn": "asc"}, // }, // "size": 10000, // } // // // 添加游标 // if searchAfter != nil { // query["search_after"] = searchAfter // } // // body, err := json.Marshal(query) // if err != nil { // return nil, fmt.Errorf("序列化查询失败: %w", err) // } // // log.Printf("执行第 %d 页ES查询...", page) // // // 执行搜索 // res, err := es.client.Search( // es.client.Search.WithIndex(esIndex), // es.client.Search.WithBody(strings.NewReader(string(body))), // es.client.Search.WithContext(context.Background()), // ) // if err != nil { // return nil, fmt.Errorf("ES搜索失败: %w", err) // } // defer res.Body.Close() // // if res.IsError() { // bodyBytes, _ := io.ReadAll(res.Body) // return nil, fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes)) // } // // // 解析响应 // var result map[string]interface{} // if err := json.NewDecoder(res.Body).Decode(&result); err != nil { // return nil, fmt.Errorf("解析ES响应失败: %w", err) // } // // hits, ok := result["hits"].(map[string]interface{}) // if !ok { // return nil, fmt.Errorf("无法解析hits字段") // } // // hitList, ok := hits["hits"].([]interface{}) // if !ok || len(hitList) == 0 { // log.Printf("第 %d 页没有数据,查询完成", page) // break // } // // // 处理当前批次的数据 // batchCount := 0 // for _, hit := range hitList { // hitMap, ok := hit.(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析hit数据") // continue // } // // source, ok := hitMap["_source"].(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析_source字段") // continue // } // // isbn, ok := source["isbn"].(string) // if ok && isbn != "" { // allISBNs = append(allISBNs, isbn) // batchCount++ // } // // // 更新游标 // sortValues, ok := hitMap["sort"].([]interface{}) // if ok && len(sortValues) > 0 { // searchAfter = sortValues // } // } // // totalCount += batchCount // log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount) // page++ // // // 如果返回的数量小于请求的数量,说明已经是最后一页 // if len(hitList) < 10000 { // log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList)) // break // } // // time.Sleep(100 * time.Millisecond) // } // // if len(allISBNs) == 0 { // return nil, fmt.Errorf("ES中没有找到包含ISBN字段的记录") // } // // log.Printf("从ES中获取到 %d 个ISBN", len(allISBNs)) // return allISBNs, nil //} // //// 批量检查ISBN在数据库中是否存在 //func checkISBNsInDB(db *sql.DB, isbns []string) (map[string]bool, error) { // log.Printf("开始检查 %d 个ISBN在数据库中的存在情况...", len(isbns)) // // existsMap := make(map[string]bool) // // // 分批处理,避免SQL语句过长 // batchSize := 1000 // totalBatches := (len(isbns) + batchSize - 1) / batchSize // // for batch := 0; batch < totalBatches; batch++ { // start := batch * batchSize // end := start + batchSize // if end > len(isbns) { // end = len(isbns) // } // // batchISBNs := isbns[start:end] // log.Printf("处理数据库批次 %d/%d: ISBN范围 %d-%d", batch+1, totalBatches, start+1, end) // // // 构建IN查询的占位符 // placeholders := make([]string, len(batchISBNs)) // args := make([]interface{}, len(batchISBNs)) // for i, isbn := range batchISBNs { // placeholders[i] = "?" // args[i] = isbn // } // // query := fmt.Sprintf( // "SELECT isbn FROM xgy_base_item WHERE isbn IN (%s)", // strings.Join(placeholders, ","), // ) // // rows, err := db.Query(query, args...) // if err != nil { // return nil, fmt.Errorf("数据库查询失败: %w", err) // } // // // 读取存在的ISBN // for rows.Next() { // var isbn string // if err := rows.Scan(&isbn); err != nil { // rows.Close() // return nil, fmt.Errorf("扫描ISBN失败: %w", err) // } // existsMap[isbn] = true // } // rows.Close() // // if err = rows.Err(); err != nil { // return nil, fmt.Errorf("遍历数据库行时出错: %w", err) // } // // // 添加延迟避免对数据库造成压力 // if batch < totalBatches-1 { // time.Sleep(50 * time.Millisecond) // } // } // // log.Printf("数据库中存在 %d 个匹配的ISBN", len(existsMap)) // return existsMap, nil //} // //// 找出数据库中不存在的ISBN(ES中有但数据库中没有) //func findESOnlyISBNs(esISBNs []string, dbExistsMap map[string]bool) []string { // var esOnlyISBNs []string // // for _, isbn := range esISBNs { // if !dbExistsMap[isbn] { // esOnlyISBNs = append(esOnlyISBNs, isbn) // } // } // // log.Printf("ES中有 %d 个ISBN在数据库中不存在", len(esOnlyISBNs)) // return esOnlyISBNs //} // //// 导出ES独有ISBN到txt文件 //func exportESOnlyISBNs(esOnlyISBNs []string, outputFile string) error { // if len(esOnlyISBNs) == 0 { // log.Printf("没有ES独有的ISBN需要导出") // return nil // } // // // 确保输出目录存在 // outputDir := filepath.Dir(outputFile) // if err := os.MkdirAll(outputDir, 0755); err != nil { // return fmt.Errorf("创建输出目录失败: %w", err) // } // // // 写入文件 // file, err := os.Create(outputFile) // if err != nil { // return fmt.Errorf("创建文件失败: %w", err) // } // defer file.Close() // // // 写入文件头信息 // header := fmt.Sprintf(`# ES中有但数据库中没有的ISBN列表 //# 数据库表: xgy_base_item //# ES索引: %s //# 导出时间: %s //# 记录数: %d //# 说明: 这些ISBN在ES索引中存在但在数据库表中不存在 // //`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(esOnlyISBNs)) // // if _, err := file.WriteString(header); err != nil { // return fmt.Errorf("写入文件头失败: %w", err) // } // // // 按字母顺序排序后写入 // sort.Strings(esOnlyISBNs) // successCount := 0 // for _, isbn := range esOnlyISBNs { // if _, err := file.WriteString(isbn + "\n"); err != nil { // log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err) // continue // } // successCount++ // } // // log.Printf("成功导出 %d/%d 个ES独有ISBN到文件: %s", successCount, len(esOnlyISBNs), outputFile) // return nil //} // //// 主函数:查询ES中有但数据库中没有的ISBN //func mainFindESOnlyISBNs() { // // 初始化数据库连接 // dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", // "root", // "123456", // "localhost", // 3306, // "book_image") // 请根据实际情况修改数据库名 // // db, err := sql.Open("mysql", dsn) // if err != nil { // log.Fatalf("打开数据库连接失败: %v", err) // } // defer db.Close() // // // 设置连接池参数 // db.SetMaxOpenConns(20) // db.SetMaxIdleConns(10) // // // 测试数据库连接 // if err := db.Ping(); err != nil { // log.Fatalf("数据库连接测试失败: %v", err) // } // // // 初始化ES客户端 // es, err := NewESClient([]string{esAddress}, esUsername, esPassword) // if err != nil { // log.Fatalf("ES连接失败: %v", err) // } // // // 检查ES健康状态 // if err := es.CheckHealth(); err != nil { // log.Fatalf("ES健康检查失败: %v", err) // } // // startTime := time.Now() // log.Printf("开始处理ES与数据库的ISBN匹配...") // // // 步骤1: 从ES获取所有ISBN // esISBNs, err := getAllISBNsFromES(es) // if err != nil { // log.Fatalf("获取ES ISBN失败: %v", err) // } // // // 步骤2: 检查ISBN在数据库中的存在情况 // dbExistsMap, err := checkISBNsInDB(db, esISBNs) // if err != nil { // log.Fatalf("检查数据库中ISBN存在情况失败: %v", err) // } // // // 步骤3: 找出ES独有ISBN(ES中有但数据库中没有) // esOnlyISBNs := findESOnlyISBNs(esISBNs, dbExistsMap) // // // 步骤4: 导出ES独有ISBN // outputFile := "cmd/update_es_gt/missing_isbns.txt" // if err := exportESOnlyISBNs(esOnlyISBNs, outputFile); err != nil { // log.Fatalf("导出ES独有ISBN失败: %v", err) // } // // elapsed := time.Since(startTime) // // // 输出统计信息 // fmt.Printf("\n=== 处理完成 ===\n") // fmt.Printf("ES中ISBN总数: %d\n", len(esISBNs)) // fmt.Printf("数据库中匹配的ISBN数: %d\n", len(dbExistsMap)) // fmt.Printf("ES独有ISBN数(数据库中没有的): %d\n", len(esOnlyISBNs)) // fmt.Printf("独有比例: %.2f%%\n", float64(len(esOnlyISBNs))/float64(len(esISBNs))*100) // fmt.Printf("耗时: %v\n", elapsed.Round(time.Millisecond)) // fmt.Printf("输出文件: %s\n", outputFile) // // // 显示部分ES独有ISBN示例 // if len(esOnlyISBNs) > 0 { // fmt.Printf("\nES独有ISBN示例 (前10个):\n") // for i := 0; i < 10 && i < len(esOnlyISBNs); i++ { // fmt.Printf(" %s\n", esOnlyISBNs[i]) // } // if len(esOnlyISBNs) > 10 { // fmt.Printf(" ... 还有 %d 个\n", len(esOnlyISBNs)-10) // } // } //} // //// 查询并导出有销售记录且book_pic为空的ISBN //func queryAndExportSaleISBNsWithEmptyPic(es *ESClient, outputFile string) error { // log.Printf("开始查询有销售记录且book_pic为空的ISBN...") // // // 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND book_pic为空 // query := map[string]interface{}{ // "query": map[string]interface{}{ // "bool": map[string]interface{}{ // "must": []map[string]interface{}{ // { // "bool": map[string]interface{}{ // "should": []map[string]interface{}{ // {"range": map[string]interface{}{"day_sale_7": map[string]interface{}{"gt": 0}}}, // {"range": map[string]interface{}{"day_sale_15": map[string]interface{}{"gt": 0}}}, // {"range": map[string]interface{}{"day_sale_30": map[string]interface{}{"gt": 0}}}, // }, // "minimum_should_match": 1, // }, // }, // { // "bool": map[string]interface{}{ // "should": []map[string]interface{}{ // // 匹配book_pic字段为空 // {"term": map[string]interface{}{"book_pic": ""}}, // // 匹配book_pic字段不存在 // { // "bool": map[string]interface{}{ // "must_not": map[string]interface{}{ // "exists": map[string]interface{}{"field": "book_pic"}, // }, // }, // }, // }, // }, // }, // }, // }, // }, // "_source": []string{"isbn"}, // "sort": []map[string]interface{}{ // {"isbn": "asc"}, // 按ISBN排序 // }, // "size": 10000, // } // // // 打印查询条件用于验证 // queryJSON, _ := json.MarshalIndent(query, "", " ") // log.Printf("查询条件:\n%s", string(queryJSON)) // // var allISBNs []string // var searchAfter interface{} // totalCount := 0 // page := 1 // // for { // // 复制基础查询 // currentQuery := make(map[string]interface{}) // for k, v := range query { // currentQuery[k] = v // } // // // 添加游标 // if searchAfter != nil { // currentQuery["search_after"] = searchAfter // } // // body, err := json.Marshal(currentQuery) // if err != nil { // return fmt.Errorf("序列化查询失败: %w", err) // } // // log.Printf("执行第 %d 页查询...", page) // // // 执行搜索 // res, err := es.client.Search( // es.client.Search.WithIndex(esIndex), // es.client.Search.WithBody(strings.NewReader(string(body))), // es.client.Search.WithContext(context.Background()), // ) // if err != nil { // return fmt.Errorf("ES搜索失败: %w", err) // } // defer res.Body.Close() // // if res.IsError() { // bodyBytes, _ := io.ReadAll(res.Body) // return fmt.Errorf("ES搜索返回错误: %s, 响应: %s", res.String(), string(bodyBytes)) // } // // // 读取并解析响应体 // bodyBytes, err := io.ReadAll(res.Body) // if err != nil { // return fmt.Errorf("读取响应体失败: %w", err) // } // // var result map[string]interface{} // if err := json.Unmarshal(bodyBytes, &result); err != nil { // return fmt.Errorf("解析ES响应失败: %w", err) // } // // // 检查是否有错误 // if errMsg, exists := result["error"]; exists { // return fmt.Errorf("ES返回错误: %v", errMsg) // } // // hits, ok := result["hits"].(map[string]interface{}) // if !ok { // return fmt.Errorf("无法解析hits字段") // } // // // 获取总命中数 // if totalHits, exists := hits["total"].(map[string]interface{}); exists { // if totalValue, exists := totalHits["value"]; exists { // log.Printf("ES返回总命中数: %.0f", totalValue) // } // } // // hitList, ok := hits["hits"].([]interface{}) // if !ok || len(hitList) == 0 { // log.Printf("第 %d 页没有数据,查询完成", page) // break // 没有更多数据 // } // // // 处理当前批次的数据 // batchCount := 0 // for _, hit := range hitList { // hitMap, ok := hit.(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析hit数据") // continue // } // // source, ok := hitMap["_source"].(map[string]interface{}) // if !ok { // log.Printf("警告: 无法解析_source字段") // continue // } // // isbn, ok := source["isbn"].(string) // if ok && isbn != "" { // allISBNs = append(allISBNs, isbn) // batchCount++ // } else { // log.Printf("警告: 跳过空的ISBN字段") // } // // // 更新游标(使用最后一个文档的排序值) // sortValues, ok := hitMap["sort"].([]interface{}) // if ok && len(sortValues) > 0 { // searchAfter = sortValues // } // } // // totalCount += batchCount // log.Printf("第 %d 页: 获取 %d 条ISBN记录,总计: %d", page, batchCount, totalCount) // page++ // // // 如果返回的数量小于请求的数量,说明已经是最后一页 // if len(hitList) < 10000 { // log.Printf("最后一页数据量 %d < 10000,查询完成", len(hitList)) // break // } // // // 添加短暂延迟,避免对ES造成过大压力 // time.Sleep(100 * time.Millisecond) // } // // if len(allISBNs) == 0 { // return fmt.Errorf("没有找到符合条件的ISBN记录") // } // // // 去重 // isbnSet := make(map[string]bool) // uniqueISBNs := make([]string, 0) // for _, isbn := range allISBNs { // if !isbnSet[isbn] { // isbnSet[isbn] = true // uniqueISBNs = append(uniqueISBNs, isbn) // } // } // // log.Printf("去重前: %d 条, 去重后: %d 条", len(allISBNs), len(uniqueISBNs)) // // // 确保输出目录存在 // outputDir := filepath.Dir(outputFile) // if err := os.MkdirAll(outputDir, 0755); err != nil { // return fmt.Errorf("创建输出目录失败: %w", err) // } // // // 写入文件 // file, err := os.Create(outputFile) // if err != nil { // return fmt.Errorf("创建文件失败: %w", err) // } // defer file.Close() // // // 写入文件头信息 // header := fmt.Sprintf(`# 有销售记录且book_pic为空的ISBN列表 //# 查询条件: (day_sale_7 > 0 OR day_sale_15 > 0 OR day_sale_30 > 0) AND (book_pic为空 或 book_pic字段不存在) //# 索引: %s //# 查询时间: %s //# 总记录数: %d // //`, esIndex, time.Now().Format("2006-01-02 15:04:05"), len(uniqueISBNs)) // // if _, err := file.WriteString(header); err != nil { // return fmt.Errorf("写入文件头失败: %w", err) // } // // // 按字母顺序排序后写入 // sort.Strings(uniqueISBNs) // successCount := 0 // for _, isbn := range uniqueISBNs { // if _, err := file.WriteString(isbn + "\n"); err != nil { // log.Printf("警告: 写入ISBN失败 %s: %v", isbn, err) // continue // } // successCount++ // } // // log.Printf("成功导出 %d/%d 个符合条件的ISBN到文件: %s", successCount, len(uniqueISBNs), outputFile) // return nil //} // //// 查询并导出有销售记录且book_pic为空的ISBN主函数 //func mainQuerySaleISBNsWithEmptyPic() { // // 初始化ES客户端 // es, err := NewESClient([]string{esAddress}, esUsername, esPassword) // if err != nil { // log.Fatalf("ES连接失败: %v", err) // } // // // 检查ES健康状态 // if err := es.CheckHealth(); err != nil { // log.Fatalf("ES健康检查失败: %v", err) // } // // // 输出文件路径 // outputFile := "es/sale_isbns_empty_pic.txt" // // // 查询并导出ISBN // startTime := time.Now() // if err := queryAndExportSaleISBNsWithEmptyPic(es, outputFile); err != nil { // log.Fatalf("导出有销售记录且book_pic为空的ISBN失败: %v", err) // } // // elapsed := time.Since(startTime) // log.Printf("任务完成!耗时: %v,ISBN已导出到: %s", elapsed.Round(time.Millisecond), outputFile) //}