package utils import ( "context" "crypto/md5" "database/sql" "fmt" "getErpSendPublishing/utils/redisConnectUtil" "log" "time" _ "github.com/go-sql-driver/mysql" ) func WriteToRedis(shopID int64) { // 在开头调用judgISPdd函数 if !judgISPdd(shopID) { log.Println("此店铺不是pdd店铺") return } tableName := "t_running_task_" + fmt.Sprintf("%d", shopID) dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "root", "Long6166@@", "nj-cynosdbmysql-grp-1v6vxn5f.sql.tencentcdb.com", 26247, "task") taskDb, err := sql.Open("mysql", dsn) if err != nil { log.Printf("Task数据库连接失败: %v", err) return } defer taskDb.Close() // 查询success_data字段中的isbn query := fmt.Sprintf(` SELECT isbn FROM %s WHERE isbn IS NOT NULL AND isbn != '' AND shop_id = ? ORDER BY id`, tableName) // 建议:在表 %s 上创建索引:CREATE INDEX idx_shop_id_isbn ON %s (shop_id, isbn); rows, err := taskDb.Query(query, shopID) if err != nil { log.Printf("查询ISBN失败: %v", err) return } defer rows.Close() // 存储ISBN列表 var isbnList []string for rows.Next() { var isbn string if err := rows.Scan(&isbn); err != nil { log.Printf("扫描ISBN失败: %v", err) continue } isbnList = append(isbnList, isbn) } if err := rows.Err(); err != nil { log.Printf("遍历行时发生错误: %v", err) } // 如果judgISPdd返回true,继续执行Redis相关操作 err = redisConnectUtil.InitRedis("36.212.20.113", "j8nZ4jra2E", 7963, 13) if err != nil { log.Printf("Redis连接失败: %v", err) return } // 创建Redis key,使用shopID redisKey := fmt.Sprintf("%d", shopID) // 执行时间 execTime := time.Now().Format("2006-01-02 15:04:05") // 并行处理MD5加密,提高性能 type encryptedISBN struct { value string } encryptedIsbnChan := make(chan encryptedISBN, len(isbnList)) // 启动多个goroutine并行处理MD5加密 for i := 0; i < 10; i++ { // 使用10个goroutine go func(start int) { for j := start; j < len(isbnList); j += 10 { isbn := isbnList[j] // 加密ISBN(使用MD5) hash := md5.Sum([]byte(isbn)) // 将MD5 hash转换为十六进制字符串 encryptedIsbn := fmt.Sprintf("%x", hash) encryptedIsbnChan <- encryptedISBN{value: encryptedIsbn} } }(i) } // 收集加密结果 encryptedIsbnList := make([]string, 0, len(isbnList)) for i := 0; i < len(isbnList); i++ { encryptedIsbn := <-encryptedIsbnChan encryptedIsbnList = append(encryptedIsbnList, encryptedIsbn.value) } close(encryptedIsbnChan) // 使用Redis管道批量执行HSet操作,减少网络往返 pipe := redisConnectUtil.RedisClient.Pipeline() // 循环加密后的ISBN列表,执行Redis写入操作 for i := 0; i < len(isbnList); i++ { encryptedIsbn := encryptedIsbnList[i] // 以加密后的ISBN作为子key,存储执行时间 // 这里使用哈希表存储,key是shopid,field是加密后的ISBN,value是执行时间 pipe.HSet(context.Background(), redisKey, encryptedIsbn, execTime) } // 执行管道中的所有命令 _, err = pipe.Exec(context.Background()) if err != nil { log.Printf("批量存储执行时间失败: %v", err) } log.Printf("成功处理店铺 %d,存储 %d 条数据到Redis", shopID, len(isbnList)) } func judgISPdd(shopID int64) bool { dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local", "zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306, "zhishu") db, err := sql.Open("mysql", dsn) if err != nil { log.Printf("42db连接失败: %v", err) return false } defer db.Close() // 测试数据库连接 err = db.Ping() if err != nil { log.Printf("42db连接测试失败: %v", err) return false } query := `SELECT CASE WHEN shop_type = '1' THEN true ELSE false END AS result FROM t_shop WHERE id = ? AND del_flag = '0';` var result bool err = db.QueryRow(query, shopID).Scan(&result) if err != nil { if err == sql.ErrNoRows { log.Printf("未找到对应的店铺记录,ID: %d", shopID) } else { log.Printf("查询失败: %v", err) } return false } log.Printf("judgISPdd查询结果: %v", result) return result }