168 lines
4.2 KiB
Go
168 lines
4.2 KiB
Go
package utils
|
||
|
||
import (
|
||
"context"
|
||
"crypto/md5"
|
||
"database/sql"
|
||
"fmt"
|
||
"getErpSendPublishing/utils/redisConnectUtil"
|
||
"log"
|
||
"time"
|
||
|
||
_ "github.com/go-sql-driver/mysql"
|
||
)
|
||
|
||
func WriteToRedis(shopID int64) {
|
||
// 在开头调用judgISPdd函数
|
||
if !judgISPdd(shopID) {
|
||
log.Println("此店铺不是pdd店铺")
|
||
return
|
||
}
|
||
|
||
tableName := "t_running_task_" + fmt.Sprintf("%d", shopID)
|
||
|
||
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
|
||
"root", "Long6166@@", "nj-cynosdbmysql-grp-1v6vxn5f.sql.tencentcdb.com", 26247, "task")
|
||
|
||
taskDb, err := sql.Open("mysql", dsn)
|
||
if err != nil {
|
||
log.Printf("Task数据库连接失败: %v", err)
|
||
return
|
||
}
|
||
defer taskDb.Close()
|
||
|
||
// 查询success_data字段中的isbn
|
||
query := fmt.Sprintf(`
|
||
SELECT isbn
|
||
FROM %s
|
||
WHERE isbn IS NOT NULL
|
||
AND isbn != ''
|
||
AND shop_id = ?
|
||
ORDER BY id`, tableName)
|
||
// 建议:在表 %s 上创建索引:CREATE INDEX idx_shop_id_isbn ON %s (shop_id, isbn);
|
||
|
||
rows, err := taskDb.Query(query, shopID)
|
||
if err != nil {
|
||
log.Printf("查询ISBN失败: %v", err)
|
||
return
|
||
}
|
||
defer rows.Close()
|
||
|
||
// 存储ISBN列表
|
||
var isbnList []string
|
||
for rows.Next() {
|
||
var isbn string
|
||
if err := rows.Scan(&isbn); err != nil {
|
||
log.Printf("扫描ISBN失败: %v", err)
|
||
continue
|
||
}
|
||
isbnList = append(isbnList, isbn)
|
||
}
|
||
|
||
if err := rows.Err(); err != nil {
|
||
log.Printf("遍历行时发生错误: %v", err)
|
||
}
|
||
|
||
// 如果judgISPdd返回true,继续执行Redis相关操作
|
||
err = redisConnectUtil.InitRedis("36.212.20.113", "j8nZ4jra2E", 7963, 13)
|
||
if err != nil {
|
||
log.Printf("Redis连接失败: %v", err)
|
||
return
|
||
}
|
||
|
||
// 创建Redis key,使用shopID
|
||
redisKey := fmt.Sprintf("%d", shopID)
|
||
|
||
// 执行时间
|
||
execTime := time.Now().Format("2006-01-02 15:04:05")
|
||
|
||
// 并行处理MD5加密,提高性能
|
||
type encryptedISBN struct {
|
||
value string
|
||
}
|
||
|
||
encryptedIsbnChan := make(chan encryptedISBN, len(isbnList))
|
||
|
||
// 启动多个goroutine并行处理MD5加密
|
||
for i := 0; i < 10; i++ { // 使用10个goroutine
|
||
go func(start int) {
|
||
for j := start; j < len(isbnList); j += 10 {
|
||
isbn := isbnList[j]
|
||
// 加密ISBN(使用MD5)
|
||
hash := md5.Sum([]byte(isbn))
|
||
// 将MD5 hash转换为十六进制字符串
|
||
encryptedIsbn := fmt.Sprintf("%x", hash)
|
||
encryptedIsbnChan <- encryptedISBN{value: encryptedIsbn}
|
||
}
|
||
}(i)
|
||
}
|
||
|
||
// 收集加密结果
|
||
encryptedIsbnList := make([]string, 0, len(isbnList))
|
||
for i := 0; i < len(isbnList); i++ {
|
||
encryptedIsbn := <-encryptedIsbnChan
|
||
encryptedIsbnList = append(encryptedIsbnList, encryptedIsbn.value)
|
||
}
|
||
close(encryptedIsbnChan)
|
||
|
||
// 使用Redis管道批量执行HSet操作,减少网络往返
|
||
pipe := redisConnectUtil.RedisClient.Pipeline()
|
||
|
||
// 循环加密后的ISBN列表,执行Redis写入操作
|
||
for i := 0; i < len(isbnList); i++ {
|
||
encryptedIsbn := encryptedIsbnList[i]
|
||
// 以加密后的ISBN作为子key,存储执行时间
|
||
// 这里使用哈希表存储,key是shopid,field是加密后的ISBN,value是执行时间
|
||
pipe.HSet(context.Background(), redisKey, encryptedIsbn, execTime)
|
||
}
|
||
|
||
// 执行管道中的所有命令
|
||
_, err = pipe.Exec(context.Background())
|
||
if err != nil {
|
||
log.Printf("批量存储执行时间失败: %v", err)
|
||
}
|
||
|
||
log.Printf("成功处理店铺 %d,存储 %d 条数据到Redis", shopID, len(isbnList))
|
||
}
|
||
|
||
func judgISPdd(shopID int64) bool {
|
||
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4&parseTime=True&loc=Local",
|
||
"zhishu", "XsRR4K3ATizyc5BK", "146.56.227.42", 3306, "zhishu")
|
||
|
||
db, err := sql.Open("mysql", dsn)
|
||
if err != nil {
|
||
log.Printf("42db连接失败: %v", err)
|
||
return false
|
||
}
|
||
defer db.Close()
|
||
|
||
// 测试数据库连接
|
||
err = db.Ping()
|
||
if err != nil {
|
||
log.Printf("42db连接测试失败: %v", err)
|
||
return false
|
||
}
|
||
|
||
query := `SELECT
|
||
CASE
|
||
WHEN shop_type = '1' THEN true
|
||
ELSE false
|
||
END AS result
|
||
FROM t_shop
|
||
WHERE id = ? AND del_flag = '0';`
|
||
|
||
var result bool
|
||
err = db.QueryRow(query, shopID).Scan(&result)
|
||
if err != nil {
|
||
if err == sql.ErrNoRows {
|
||
log.Printf("未找到对应的店铺记录,ID: %d", shopID)
|
||
} else {
|
||
log.Printf("查询失败: %v", err)
|
||
}
|
||
return false
|
||
}
|
||
|
||
log.Printf("judgISPdd查询结果: %v", result)
|
||
return result
|
||
}
|