package main import ( "bytes" "database/sql" "encoding/json" "fmt" "io" "log" "net/http" "os" "os/signal" "strings" "syscall" "time" _ "github.com/go-sql-driver/mysql" ) // ============ 配置 ============ // 源库 const ( srcHost = "36.134.185.139" srcPort = 6603 srcUser = "tao_zhuang_bo" srcPassword = "Bi54NTUi0i" srcDatabase = "kong_book_set" ) // 目标MySQL库 const ( dstHost = "175.27.224.66" dstPort = 3306 dstUser = "suit_book" dstPassword = "2X8b8cBE6SmEECnm" dstDatabase = "suit_book" ) // 目标ES const ( esURL = "http://36.212.12.92:9527" esUser = "elastic" esPassword = "+Tz5qR_KushZ-bPgZ_H-" esIndexV2 = "books-from-mysql-v2" esIndexV3 = "books-from-mysql-v3" ) // 批量大小 const batchSize = 2000 // 增量同步状态文件 const syncStateFile = "sync_state.json" // ============ 数据结构 ============ type BookRow struct { KongID int64 ISBN string BookName string Author string Press string PressTime interface{} Binding string Price float64 ImgURL string Edition string Format string Pages int64 WordCount int64 Category string CreateTime interface{} UpdateTime interface{} } type ESDoc struct { ID int64 `json:"id"` Fid int64 `json:"fid"` ISBN string `json:"isbn"` BookName string `json:"book_name"` Author string `json:"author"` Publisher string `json:"publisher"` PublicationTime string `json:"publication_time"` BindingLayout string `json:"binding_layout"` FixPrice float64 `json:"fix_price"` BookPic ESBookPic `json:"book_pic"` BookPicS ESBookPicS `json:"book_pic_s"` Edition string `json:"edition"` BookFormat string `json:"book_format"` PageCount int64 `json:"page_count"` WordCount int64 `json:"word_count"` Category string `json:"category"` CatID ESCatID `json:"cat_id"` UpdateTime string `json:"update_time"` IsSuit int64 `json:"is_suit"` } type ESBookPic struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` } type ESBookPicS struct { LocalPath string `json:"localPath"` PddResponse string `json:"pddResponse"` } type ESCatID struct { KongFuZiCatID string `json:"kong_fu_zi_cat_id"` } type ImgURLJSON struct { BookPic ImgBookPic `json:"book_pic"` BookPicS ImgBookPicS `json:"book_pic_s"` } type ImgBookPic struct { LocalPath string `json:"localPath"` PddPath string `json:"pddPath"` } type ImgBookPicS struct { LocalPath string `json:"localPath"` PddResponse string `json:"pddResponse"` } type SyncState struct { LastSyncTime string `json:"last_sync_time"` } // ============ 辅助函数 ============ func buildImgURLJSON(raw string) string { if raw == "" { b, _ := json.Marshal(ImgURLJSON{ BookPic: ImgBookPic{LocalPath: "", PddPath: ""}, BookPicS: ImgBookPicS{LocalPath: "", PddResponse: ""}, }) return string(b) } b, _ := json.Marshal(ImgURLJSON{ BookPic: ImgBookPic{LocalPath: "", PddPath: ""}, BookPicS: ImgBookPicS{LocalPath: "", PddResponse: raw}, }) return string(b) } func rowToESDoc(r BookRow) ESDoc { var imgJSON ImgURLJSON json.Unmarshal([]byte(r.ImgURL), &imgJSON) var pubTimeStr string if r.PressTime != nil { if t, ok := r.PressTime.(string); ok { if parsed, err := time.Parse("2006-01-02 15:04:05", t); err == nil { pubTimeStr = fmt.Sprintf("%d", parsed.Unix()+5364000000) } else if parsed, err := time.Parse("2006-01-02", t); err == nil { pubTimeStr = fmt.Sprintf("%d", parsed.Unix()+5364000000) } else { pubTimeStr = t } } } var updateTimeStr string if r.UpdateTime != nil { if t, ok := r.UpdateTime.(string); ok { if parsed, err := time.Parse("2006-01-02 15:04:05", t); err == nil { updateTimeStr = fmt.Sprintf("%d", parsed.Unix()) } else { updateTimeStr = t } } } return ESDoc{ ID: r.KongID, Fid: 0, ISBN: r.ISBN, BookName: r.BookName, Author: r.Author, Publisher: r.Press, PublicationTime: pubTimeStr, BindingLayout: r.Binding, FixPrice: r.Price, BookPic: ESBookPic{LocalPath: "", PddPath: ""}, BookPicS: ESBookPicS{LocalPath: "", PddResponse: imgJSON.BookPicS.PddResponse}, Edition: r.Edition, BookFormat: r.Format, PageCount: r.Pages, WordCount: r.WordCount, Category: r.Category, CatID: ESCatID{KongFuZiCatID: r.Category}, UpdateTime: updateTimeStr, } } // ============ 同步状态读写 ============ func loadSyncState() *SyncState { data, err := os.ReadFile(syncStateFile) if err != nil { return nil } var state SyncState if json.Unmarshal(data, &state) != nil { return nil } return &state } func saveSyncState(t time.Time) { state := SyncState{ LastSyncTime: t.Format("2006-01-02 15:04:05"), } data, _ := json.MarshalIndent(state, "", " ") os.WriteFile(syncStateFile, data, 0644) } // ============ ES操作 ============ func queryV2ByISBNs(isbns []string) (map[string]bool, error) { if len(isbns) == 0 { return nil, nil } maxTerms := 500 result := make(map[string]bool) for i := 0; i < len(isbns); i += maxTerms { end := i + maxTerms if end > len(isbns) { end = len(isbns) } batch := isbns[i:end] query := map[string]interface{}{ "query": map[string]interface{}{ "terms": map[string]interface{}{ "isbn": batch, }, }, "_source": []string{"isbn"}, "size": len(batch), } queryBytes, _ := json.Marshal(query) req, err := http.NewRequest("POST", esURL+"/"+esIndexV2+"/_search", bytes.NewReader(queryBytes)) if err != nil { return nil, err } req.Header.Set("Content-Type", "application/json") req.SetBasicAuth(esUser, esPassword) resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() if resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return nil, fmt.Errorf("ES v2查询失败 HTTP %d: %s", resp.StatusCode, string(body)) } var searchResp struct { Hits struct { Hits []struct { Source struct { ISBN string `json:"isbn"` } `json:"_source"` } `json:"hits"` } `json:"hits"` } body, _ := io.ReadAll(resp.Body) json.Unmarshal(body, &searchResp) for _, hit := range searchResp.Hits.Hits { result[hit.Source.ISBN] = true } } return result, nil } func esBulkUpdateIsSuit(docs []ESDoc) (int, error) { if len(docs) == 0 { return 0, nil } var buf bytes.Buffer for _, doc := range docs { meta := fmt.Sprintf(`{"update":{"_index":"%s","_id":"%d"}}`, esIndexV3, doc.ID) buf.WriteString(meta) buf.WriteByte('\n') buf.WriteString(`{"script":{"source":"ctx._source.is_suit=params.val","params":{"val":1}}}`) buf.WriteByte('\n') } req, err := http.NewRequest("POST", esURL+"/_bulk", &buf) if err != nil { return 0, err } req.Header.Set("Content-Type", "application/x-ndjson") req.SetBasicAuth(esUser, esPassword) resp, err := http.DefaultClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() if resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return 0, fmt.Errorf("ES bulk更新失败 HTTP %d: %s", resp.StatusCode, string(body)) } var bulkResp struct { Items []struct { Update struct { Status int `json:"status"` } `json:"update"` } `json:"items"` } body, _ := io.ReadAll(resp.Body) json.Unmarshal(body, &bulkResp) success := 0 for _, item := range bulkResp.Items { if item.Update.Status >= 200 && item.Update.Status < 300 { success++ } } return success, nil } func esBulkInsert(docs []ESDoc, index string) (int, error) { if len(docs) == 0 { return 0, nil } var buf bytes.Buffer for _, doc := range docs { meta := fmt.Sprintf(`{"index":{"_index":"%s","_id":"%d"}}`, index, doc.ID) buf.WriteString(meta) buf.WriteByte('\n') docBytes, _ := json.Marshal(doc) buf.Write(docBytes) buf.WriteByte('\n') } req, err := http.NewRequest("POST", esURL+"/_bulk", &buf) if err != nil { return 0, err } req.Header.Set("Content-Type", "application/x-ndjson") req.SetBasicAuth(esUser, esPassword) resp, err := http.DefaultClient.Do(req) if err != nil { return 0, err } defer resp.Body.Close() if resp.StatusCode >= 300 { body, _ := io.ReadAll(resp.Body) return 0, fmt.Errorf("ES bulk失败 HTTP %d: %s", resp.StatusCode, string(body)) } var bulkResp struct { Items []struct { Index struct { Status int `json:"status"` } `json:"index"` } `json:"items"` } body, _ := io.ReadAll(resp.Body) json.Unmarshal(body, &bulkResp) success := 0 for _, item := range bulkResp.Items { if item.Index.Status >= 200 && item.Index.Status < 300 { success++ } } return success, nil } // ============ MySQL批量操作 ============ func batchUpsertMySQL(db *sql.DB, rows []BookRow) (int, error) { if len(rows) == 0 { return 0, nil } baseSQL := `REPLACE INTO suit_book_lib_merged (kong_id, fid, isbn, book_name, subtitle, author, press, press_time, binding, price, img_url, edition, format, pages, word_count, category, create_time, update_time) VALUES ` rowPlaceholder := "(?, 0, ?, ?, '', ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" placeholders := make([]string, len(rows)) for i := range placeholders { placeholders[i] = rowPlaceholder } fullSQL := baseSQL + strings.Join(placeholders, ", ") args := make([]interface{}, 0, len(rows)*18) for _, r := range rows { args = append(args, r.KongID, r.ISBN, r.BookName, r.Author, r.Press, r.PressTime, r.Binding, r.Price, r.ImgURL, r.Edition, r.Format, r.Pages, r.WordCount, r.Category, r.CreateTime, r.UpdateTime, ) } result, err := db.Exec(fullSQL, args...) if err != nil { return 0, err } affected, _ := result.RowsAffected() return int(affected), nil } // ============ NULL处理 ============ func nullStr(ns sql.NullString) string { if ns.Valid { return ns.String } return "" } func nullInt64(ns sql.NullInt64) int64 { if ns.Valid { return ns.Int64 } return 0 } func nullFloat64(ns sql.NullFloat64) float64 { if ns.Valid { return ns.Float64 } return 0 } func nullTimeInterface(ns sql.NullTime) interface{} { if ns.Valid { return ns.Time.Format("2006-01-02 15:04:05") } return nil } // ============ 同步任务函数 ============ func runSync() error { state := loadSyncState() var sinceTime string if state != nil && state.LastSyncTime != "" { sinceTime = state.LastSyncTime log.Printf("📅 增量同步:从 %s 开始", sinceTime) } else { log.Println("🚀 首次运行:全量同步") } syncStart := time.Now() srcDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&timeout=60s&parseTime=true&readTimeout=300s", srcUser, srcPassword, srcHost, srcPort, srcDatabase) srcDB, err := sql.Open("mysql", srcDSN) if err != nil { return fmt.Errorf("连接源库失败: %v", err) } defer srcDB.Close() srcDB.SetMaxOpenConns(2) srcDB.SetMaxIdleConns(2) dstDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&timeout=60s&parseTime=true&writeTimeout=300s", dstUser, dstPassword, dstHost, dstPort, dstDatabase) dstDB, err := sql.Open("mysql", dstDSN) if err != nil { return fmt.Errorf("连接目标库失败: %v", err) } defer dstDB.Close() dstDB.SetMaxOpenConns(10) dstDB.SetMaxIdleConns(10) if err := srcDB.Ping(); err != nil { return fmt.Errorf("源库连接测试失败: %v", err) } log.Println("✅ 源MySQL连接成功") if err := dstDB.Ping(); err != nil { return fmt.Errorf("目标MySQL连接测试失败: %v", err) } log.Println("✅ 目标MySQL连接成功") req, _ := http.NewRequest("GET", esURL, nil) req.SetBasicAuth(esUser, esPassword) resp, err := http.DefaultClient.Do(req) if err != nil || resp.StatusCode >= 300 { return fmt.Errorf("ES连接测试失败: %v", err) } resp.Body.Close() log.Println("✅ ES连接成功") query := ` SELECT b.id, b.isbn, b.book_name, b.author, b.press, b.press_time, b.binding, b.price, b.img_url, b.create_time, b.update_time, d.kong_id, d.edition, d.format, d.pages, d.word_count, d.category FROM book_lib b INNER JOIN book_lib_details d ON b.isbn = d.isbn ` var rows *sql.Rows if sinceTime != "" { query += " WHERE b.update_time > ?" rows, err = srcDB.Query(query, sinceTime) } else { rows, err = srcDB.Query(query) } if err != nil { return fmt.Errorf("查询源库失败: %v", err) } defer rows.Close() startTime := time.Now() if sinceTime != "" { log.Println("✅ 查询增量数据成功,开始同步 ...") } else { log.Println("✅ 查询全量数据成功,开始同步 ...") } totalMySQL := 0 totalESV3 := 0 totalIsSuit := 0 failCount := 0 batch := make([]BookRow, 0, batchSize) for rows.Next() { var ( id, kongID, pages, wordCount sql.NullInt64 isbn, bookName, author, press, binding, imgURL, edition, format, category sql.NullString price sql.NullFloat64 pressTime, createTime, updateTime sql.NullTime ) err := rows.Scan( &id, &isbn, &bookName, &author, &press, &pressTime, &binding, &price, &imgURL, &createTime, &updateTime, &kongID, &edition, &format, &pages, &wordCount, &category, ) if err != nil { failCount++ continue } row := BookRow{ KongID: nullInt64(kongID), ISBN: nullStr(isbn), BookName: nullStr(bookName), Author: nullStr(author), Press: nullStr(press), PressTime: nullTimeInterface(pressTime), Binding: nullStr(binding), Price: nullFloat64(price), ImgURL: buildImgURLJSON(nullStr(imgURL)), Edition: nullStr(edition), Format: nullStr(format), Pages: nullInt64(pages), WordCount: nullInt64(wordCount), Category: nullStr(category), CreateTime: nullTimeInterface(createTime), UpdateTime: nullTimeInterface(updateTime), } batch = append(batch, row) if len(batch) >= batchSize { mysqlAffected, err := batchUpsertMySQL(dstDB, batch) if err != nil { log.Printf("MySQL批量写入失败: %v", err) } totalMySQL += mysqlAffected esDocsV3 := make([]ESDoc, len(batch)) isbns := make([]string, len(batch)) for i, r := range batch { esDocsV3[i] = rowToESDoc(r) isbns[i] = r.ISBN } v3Count, err := esBulkInsert(esDocsV3, esIndexV3) if err != nil { log.Printf("ES v3 bulk失败: %v", err) } totalESV3 += v3Count v2Matches, err := queryV2ByISBNs(isbns) if err != nil { log.Printf("查询v2失败: %v", err) } else { var needUpdate []ESDoc for _, doc := range esDocsV3 { if v2Matches[doc.ISBN] { doc.IsSuit = 1 needUpdate = append(needUpdate, doc) } } if len(needUpdate) > 0 { updateCount, err := esBulkUpdateIsSuit(needUpdate) if err != nil { log.Printf("更新is_suit失败: %v", err) } totalIsSuit += updateCount } } elapsed := time.Since(startTime) log.Printf("MySQL: %d | ES-v3: %d | is_suit: %d | 耗时 %v", totalMySQL, totalESV3, totalIsSuit, elapsed.Round(time.Second)) batch = batch[:0] } } if len(batch) > 0 { mysqlAffected, _ := batchUpsertMySQL(dstDB, batch) totalMySQL += mysqlAffected esDocsV3 := make([]ESDoc, len(batch)) isbns := make([]string, len(batch)) for i, r := range batch { esDocsV3[i] = rowToESDoc(r) isbns[i] = r.ISBN } v3Count, _ := esBulkInsert(esDocsV3, esIndexV3) totalESV3 += v3Count v2Matches, _ := queryV2ByISBNs(isbns) if v2Matches != nil { var needUpdate []ESDoc for _, doc := range esDocsV3 { if v2Matches[doc.ISBN] { doc.IsSuit = 1 needUpdate = append(needUpdate, doc) } } if len(needUpdate) > 0 { updateCount, _ := esBulkUpdateIsSuit(needUpdate) totalIsSuit += updateCount } } } totalElapsed := time.Since(startTime) log.Printf("🎉 同步完成! MySQL: %d | ES v3: %d | is_suit: %d | 跳过: %d | 耗时 %v", totalMySQL, totalESV3, totalIsSuit, failCount, totalElapsed.Round(time.Millisecond)) saveSyncState(syncStart) log.Printf("📝 已记录同步时间点: %s", syncStart.Format("2006-01-02 15:04:05")) return nil } // ============ 主流程 ============ func main() { // 启动时先执行一次 log.Println("======== 启动同步任务 ========") log.Println("任务计划:每天凌晨 01:00 执行") log.Printf("当前时间:%s\n", time.Now().Format("2006-01-02 15:04:05")) // 先执行一次同步 if err := runSync(); err != nil { log.Printf("❌ 同步任务执行失败: %v", err) } else { log.Println("✅ 启动同步完成") } // 计算下一次执行时间(每天凌晨1点) nextRun := func() time.Time { now := time.Now() next := time.Date(now.Year(), now.Month(), now.Day()+1, 1, 0, 0, 0, now.Location()) if now.Hour() < 1 { next = time.Date(now.Year(), now.Month(), now.Day(), 1, 0, 0, 0, now.Location()) } return next } nextTime := nextRun() log.Printf("⏰ 下一次执行时间:%s (等待 %v)", nextTime.Format("2006-01-02 15:04:05"), time.Until(nextTime).Round(time.Second)) // 定时器 timer := time.NewTimer(time.Until(nextTime)) defer timer.Stop() // 信号处理 sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) for { select { case <-timer.C: log.Println("\n======== 开始执行定时同步 ========") log.Printf("执行时间:%s", time.Now().Format("2006-01-02 15:04:05")) if err := runSync(); err != nil { log.Printf("❌ 同步任务执行失败: %v", err) } else { log.Println("✅ 定时同步完成") } // 计算下一次执行时间 nextTime = nextRun() log.Printf("⏰ 下一次执行时间:%s (等待 %v)", nextTime.Format("2006-01-02 15:04:05"), time.Until(nextTime).Round(time.Second)) timer.Reset(time.Until(nextTime)) case sig := <-sigChan: log.Printf("\n⚠️ 收到信号 %v,程序退出", sig) return } } }