269 lines
7.3 KiB
Go
269 lines
7.3 KiB
Go
package main
|
||
|
||
import (
|
||
"database/sql"
|
||
"fmt"
|
||
"log"
|
||
"strings"
|
||
"time"
|
||
|
||
_ "github.com/go-sql-driver/mysql"
|
||
)
|
||
|
||
const (
|
||
MaxOpenConns = 20
|
||
MaxIdleConns = 10
|
||
ConnMaxLifetime = 10 * time.Minute
|
||
ConnMaxIdleTime = 5 * time.Minute
|
||
)
|
||
|
||
// 雪花算法ID生成器
|
||
type Snowflake struct {
|
||
sequence int64
|
||
lastTimestamp int64
|
||
}
|
||
|
||
var snowflake = &Snowflake{}
|
||
|
||
// 生成19位雪花算法ID
|
||
func (s *Snowflake) GenerateID() int64 {
|
||
const (
|
||
epoch int64 = 1704067200000 // 2024-01-01 00:00:00
|
||
machineID int64 = 1
|
||
sequenceMask int64 = 4095
|
||
timestampShift = 22
|
||
machineIDShift = 12
|
||
)
|
||
|
||
now := time.Now().UnixNano() / 1e6
|
||
timestamp := now - epoch
|
||
|
||
if timestamp == s.lastTimestamp {
|
||
s.sequence = (s.sequence + 1) & sequenceMask
|
||
if s.sequence == 0 {
|
||
for timestamp <= s.lastTimestamp {
|
||
time.Sleep(100 * time.Microsecond)
|
||
now = time.Now().UnixNano() / 1e6
|
||
timestamp = now - epoch
|
||
}
|
||
}
|
||
} else {
|
||
s.sequence = 0
|
||
}
|
||
|
||
s.lastTimestamp = timestamp
|
||
|
||
id := (timestamp << timestampShift) | (machineID << machineIDShift) | s.sequence
|
||
return id
|
||
}
|
||
|
||
func main() {
|
||
fmt.Println("========================================")
|
||
fmt.Println(" ISBN数据迁移工具")
|
||
fmt.Println("========================================")
|
||
fmt.Println()
|
||
|
||
// 连接源数据库 (Master)
|
||
fmt.Println("正在连接源数据库 (Master)...")
|
||
dsnMaster := "zhishu:XsRR4K3ATizyc5BK@tcp(146.56.227.42:3306)/zhishu?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s"
|
||
dbMaster, err := sql.Open("mysql", dsnMaster)
|
||
if err != nil {
|
||
log.Fatalf("❌ 打开源数据库连接失败: %v", err)
|
||
}
|
||
defer dbMaster.Close()
|
||
|
||
dbMaster.SetMaxOpenConns(MaxOpenConns)
|
||
dbMaster.SetMaxIdleConns(MaxIdleConns)
|
||
dbMaster.SetConnMaxLifetime(ConnMaxLifetime)
|
||
dbMaster.SetConnMaxIdleTime(ConnMaxIdleTime)
|
||
|
||
err = dbMaster.Ping()
|
||
if err != nil {
|
||
log.Fatalf("❌ 连接源数据库失败: %v", err)
|
||
}
|
||
fmt.Println("✅ 源数据库连接成功")
|
||
|
||
// 连接目标数据库 (Slave)
|
||
fmt.Println("正在连接目标数据库 (Slave)...")
|
||
dsnSlave := "zhishu_slave:7DpixiEdCs5p3PEr@tcp(36.212.12.247:3306)/zhishu_slave?charset=utf8mb4&parseTime=True&loc=Local&timeout=5s"
|
||
dbSlave, err := sql.Open("mysql", dsnSlave)
|
||
if err != nil {
|
||
log.Fatalf("❌ 打开目标数据库连接失败: %v", err)
|
||
}
|
||
defer dbSlave.Close()
|
||
|
||
dbSlave.SetMaxOpenConns(MaxOpenConns)
|
||
dbSlave.SetMaxIdleConns(MaxIdleConns)
|
||
dbSlave.SetConnMaxLifetime(ConnMaxLifetime)
|
||
dbSlave.SetConnMaxIdleTime(ConnMaxIdleTime)
|
||
|
||
err = dbSlave.Ping()
|
||
if err != nil {
|
||
log.Fatalf("❌ 连接目标数据库失败: %v", err)
|
||
}
|
||
fmt.Println("✅ 目标数据库连接成功\n")
|
||
|
||
// 从源数据库查询数据
|
||
fmt.Println("========================================")
|
||
fmt.Println(" 步骤1: 从源数据库读取数据")
|
||
fmt.Println("========================================")
|
||
|
||
query := `SELECT isbn, create_time FROM shop_goods_rejection
|
||
WHERE rejection_reason = '您发布的商品涉嫌违规,请您检查相关商品并立即整改'
|
||
ORDER BY create_time ASC`
|
||
|
||
startTime := time.Now()
|
||
|
||
rows, err := dbMaster.Query(query)
|
||
if err != nil {
|
||
log.Fatalf("❌ 查询源数据失败: %v", err)
|
||
}
|
||
defer rows.Close()
|
||
|
||
type Record struct {
|
||
ISBN string
|
||
CreateTime time.Time
|
||
}
|
||
|
||
var records []Record
|
||
for rows.Next() {
|
||
var r Record
|
||
var isbn sql.NullString
|
||
var createTime sql.NullTime
|
||
|
||
if err := rows.Scan(&isbn, &createTime); err != nil {
|
||
log.Printf("读取数据失败: %v", err)
|
||
continue
|
||
}
|
||
|
||
if isbn.Valid && isbn.String != "" {
|
||
r.ISBN = isbn.String
|
||
if createTime.Valid {
|
||
r.CreateTime = createTime.Time
|
||
}
|
||
records = append(records, r)
|
||
}
|
||
}
|
||
|
||
elapsed := time.Since(startTime)
|
||
fmt.Printf("查询完成,耗时: %v\n", elapsed)
|
||
fmt.Printf("共读取到 %d 条记录\n\n", len(records))
|
||
|
||
if len(records) == 0 {
|
||
fmt.Println("没有需要迁移的数据")
|
||
return
|
||
}
|
||
|
||
// 查询目标表中已有的ISBN(去重)
|
||
fmt.Println("========================================")
|
||
fmt.Println(" 步骤2: 检查目标表已有数据")
|
||
fmt.Println("========================================")
|
||
|
||
existingISBNs := make(map[string]bool)
|
||
existingRows, err := dbSlave.Query("SELECT add_txt FROM t_filter_set WHERE limitation_type = '0'")
|
||
if err != nil {
|
||
log.Printf("⚠️ 查询已有数据失败(表可能为空): %v", err)
|
||
} else {
|
||
for existingRows.Next() {
|
||
var isbn string
|
||
if err := existingRows.Scan(&isbn); err == nil {
|
||
existingISBNs[isbn] = true
|
||
}
|
||
}
|
||
existingRows.Close()
|
||
}
|
||
fmt.Printf("目标表中已有 %d 条ISBN记录\n\n", len(existingISBNs))
|
||
|
||
// 过滤掉已存在的ISBN
|
||
fmt.Println("========================================")
|
||
fmt.Println(" 步骤3: 过滤重复数据")
|
||
fmt.Println("========================================")
|
||
|
||
var newRecords []Record
|
||
duplicateCount := 0
|
||
for _, r := range records {
|
||
if !existingISBNs[r.ISBN] {
|
||
newRecords = append(newRecords, r)
|
||
} else {
|
||
duplicateCount++
|
||
}
|
||
}
|
||
|
||
fmt.Printf("过滤完成\n")
|
||
fmt.Printf("需要插入的新记录: %d 条\n", len(newRecords))
|
||
fmt.Printf("已存在的重复记录: %d 条\n\n", duplicateCount)
|
||
|
||
if len(newRecords) == 0 {
|
||
fmt.Println("没有新数据需要插入")
|
||
return
|
||
}
|
||
|
||
// 批量插入数据
|
||
fmt.Println("========================================")
|
||
fmt.Println(" 步骤4: 批量插入数据")
|
||
fmt.Println("========================================")
|
||
|
||
batchSize := 100
|
||
totalInserted := 0
|
||
now := time.Now()
|
||
|
||
for i := 0; i < len(newRecords); i += batchSize {
|
||
end := i + batchSize
|
||
if end > len(newRecords) {
|
||
end = len(newRecords)
|
||
}
|
||
|
||
batch := newRecords[i:end]
|
||
|
||
// 构建批量插入SQL
|
||
var valueStrings []string
|
||
var args []interface{}
|
||
|
||
for _, r := range batch {
|
||
// 这里不需要单独生成id,后面会重新构建
|
||
createTime := now.Format("2006-01-02 15:04:05")
|
||
valueStrings = append(valueStrings, "(?, '1', '0', '0', ?, 1, ?, 1, ?, '0', '0', '000000', NULL, NULL, '0', NULL)")
|
||
args = append(args, r.ISBN, createTime, createTime)
|
||
}
|
||
|
||
|
||
|
||
// 重新构建包含ID的SQL
|
||
valueStrings = make([]string, 0)
|
||
args = make([]interface{}, 0)
|
||
|
||
for _, r := range batch {
|
||
id := snowflake.GenerateID()
|
||
createTime := now.Format("2006-01-02 15:04:05")
|
||
|
||
valueStrings = append(valueStrings, "(?, '1', '0', '0', ?, 1, ?, 1, ?, '0', '0', '000000', NULL, NULL, '0', NULL)")
|
||
args = append(args, id, r.ISBN, createTime, createTime)
|
||
}
|
||
|
||
insertSQL := fmt.Sprintf("INSERT INTO t_filter_set (id, filter_type, limitation_type, add_way, add_txt, create_by, create_time, update_by, update_time, status, del_flag, tenant_id, create_dept, reason, sort, shop_type) VALUES %s",
|
||
strings.Join(valueStrings, ", "))
|
||
|
||
result, err := dbSlave.Exec(insertSQL, args...)
|
||
if err != nil {
|
||
log.Printf("❌ 批次 %d-%d 插入失败: %v", i+1, end, err)
|
||
continue
|
||
}
|
||
|
||
rowsAffected, _ := result.RowsAffected()
|
||
totalInserted += int(rowsAffected)
|
||
|
||
fmt.Printf("进度: %d/%d ( %.1f%% )\n", totalInserted, len(newRecords), float64(totalInserted)/float64(len(newRecords))*100)
|
||
}
|
||
|
||
totalElapsed := time.Since(startTime)
|
||
|
||
fmt.Println("\n========================================")
|
||
fmt.Println(" 迁移完成")
|
||
fmt.Println("========================================")
|
||
fmt.Printf("源数据总数: %d 条\n", len(records))
|
||
fmt.Printf("重复过滤: %d 条\n", duplicateCount)
|
||
fmt.Printf("成功插入: %d 条\n", totalInserted)
|
||
fmt.Printf("总耗时: %v\n", totalElapsed)
|
||
fmt.Println("\n✅ 数据迁移完成!")
|
||
}
|