daShangDao_kfz_goods_pricing/internal/service/goods_service.go

480 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"kfz-goods-pricing/internal/model"
"kfz-goods-pricing/internal/repository"
"github.com/parnurzeal/gorequest"
)
// GoodsService 商品服务
type GoodsService struct {
proxy string
apiRateLimitSeconds int
rateLimitMu sync.Mutex
lastAPICall time.Time
goodsRepository *repository.GoodsRepository
callbackURL string
tokenRepository *repository.TokenRepository
currentTokenIndex int
tokenMu sync.Mutex
badPasswords map[string]bool
badPWMu sync.Mutex
alertStartedMu sync.Mutex
alertStarted bool
}
// NewGoodsService 创建商品服务实例
func NewGoodsService(proxy string, apiRateLimitSeconds int, callbackURL string, tokenRepo *repository.TokenRepository) *GoodsService {
return &GoodsService{
proxy: proxy,
apiRateLimitSeconds: apiRateLimitSeconds,
goodsRepository: repository.NewGoodsRepository(),
callbackURL: callbackURL,
tokenRepository: tokenRepo,
currentTokenIndex: 0,
badPasswords: make(map[string]bool),
}
}
// QueryRequest 查询请求参数 {"isbn":"9787802204461","out_id":"132456","quality":"100"}
type QueryRequest struct {
ISBN string `json:"isbn"` // isbn
BookName string `json:"book_name"` // 书名
Author string `json:"author"` // 作者
Publishing string `json:"publishing"` // 出版社
OutID string `json:"out_id"` // 输出ID
Quality string `json:"quality"` // 品相
QueryIndex int `json:"query_index"` // 排第几位
UserID string `json:"user_id"` // 用户ID
PlaceholderDownPrice float64 `json:"placeholder_down_price"` // 占位降价
MinShippingFee float64 `json:"min_shipping_fee"` // 最低运费
MinPrice float64 `json:"min_price"` // 最低书价
}
// QueryResponse 查询响应
type QueryResponse struct {
Code int `json:"code"`
Message string `json:"message"`
ID int64 `json:"id,omitempty"`
Data interface{} `json:"data,omitempty"`
}
// QueryGoods 查询商品信息
func (s *GoodsService) QueryGoods(req *QueryRequest) *QueryResponse {
if req.ISBN == "0" {
log.Printf("[QueryGoods] 收到请求 isbn=0")
return &QueryResponse{
Code: 200,
Message: "success",
}
}
// 插入入参记录到数据库
id, err := s.goodsRepository.Insert(req.ISBN, req.BookName, req.Author, req.Publishing, req.OutID, req.Quality, req.UserID, req.QueryIndex, req.PlaceholderDownPrice, req.MinShippingFee, req.MinPrice)
if err != nil {
log.Printf("[QueryGoods] 保存记录失败: isbn=%s, out_id=%s, user_id=%s, 错误=%v", req.ISBN, req.OutID, req.UserID, err)
return &QueryResponse{
Code: 500,
Message: fmt.Sprintf("保存记录失败: %v", err),
}
}
log.Printf("[QueryGoods] 记录保存成功: id=%d, isbn=%s, book_name=%s, out_id=%s, user_id=%s", id, req.ISBN, req.BookName, req.OutID, req.UserID)
return &QueryResponse{
Code: 200,
Message: "success",
ID: id,
}
}
// rateLimitWait 限流等待
func (s *GoodsService) rateLimitWait() {
if s.apiRateLimitSeconds <= 0 {
return
}
s.rateLimitMu.Lock()
defer s.rateLimitMu.Unlock()
elapsed := time.Since(s.lastAPICall)
if elapsed < time.Duration(s.apiRateLimitSeconds)*time.Second {
time.Sleep(time.Duration(s.apiRateLimitSeconds)*time.Second - elapsed)
}
s.lastAPICall = time.Now()
}
// sendCallback 发送回调请求
func (s *GoodsService) sendCallback(outID, userID string, price, shippingFee float64, minPrice float64) {
if s.callbackURL == "" {
return
}
// price*100 转int, shipping_fee*100 转int
salePrice := int(price * 100)
cost := int(shippingFee * 100)
request := gorequest.New()
if s.proxy != "" {
request.Proxy(s.proxy)
}
_, _, errs := request.Post(s.callbackURL).
Set("Content-Type", "application/x-www-form-urlencoded").
Send(fmt.Sprintf("product_id=%s&user_id=%s&sale_price=%d&cost=%d", outID, userID, salePrice, cost)).
Timeout(30 * time.Second).
End()
if len(errs) > 0 {
log.Printf("回调失败: errs=%v", errs)
} else {
log.Printf("最低书价: min_price=%v", minPrice)
log.Printf("回调成功: product_id=%s, user_id=%s, sale_price=%d, cost=%d", outID, userID, salePrice, cost)
}
}
// reportBadPassword 记录密码错误并启动每3秒输出一次提示
func (s *GoodsService) reportBadPassword(username string) {
s.badPWMu.Lock()
if s.badPasswords[username] {
s.badPWMu.Unlock()
return
}
s.badPasswords[username] = true
s.badPWMu.Unlock()
log.Printf("[BadPassword] 孔网账号密码错误: username=%s, 已禁用该token, 每3秒输出一次提示", username)
s.alertStartedMu.Lock()
if s.alertStarted {
s.alertStartedMu.Unlock()
return
}
s.alertStarted = true
s.alertStartedMu.Unlock()
go func() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for range ticker.C {
s.badPWMu.Lock()
if len(s.badPasswords) == 0 {
s.badPWMu.Unlock()
s.alertStartedMu.Lock()
s.alertStarted = false
s.alertStartedMu.Unlock()
return
}
users := make([]string, 0, len(s.badPasswords))
for u := range s.badPasswords {
users = append(users, u)
}
s.badPWMu.Unlock()
for _, u := range users {
log.Printf("[BadPassword] 孔网账号或密码错误: username=%s, 请更新密码后重新登录", u)
}
}
}()
}
// StartTimerScheduler 启动定时器
func (s *GoodsService) StartTimerScheduler(intervalSeconds int) {
log.Printf("[TimerScheduler] 定时器启动, 间隔=%d秒", intervalSeconds)
ticker := time.NewTicker(time.Duration(intervalSeconds) * time.Second)
go func() {
for range ticker.C {
s.syncGoodsPricing()
}
}()
}
// syncGoodsPricing 定时同步商品价格
func (s *GoodsService) syncGoodsPricing() {
// 查询数据库数据
kfzConfig, err := repository.GetKfzConfig()
if err != nil {
log.Printf("[syncGoodsPricing] 获取config数据库数据失败: err=%v", err)
return
}
if kfzConfig == nil {
log.Printf("[syncGoodsPricing] kfz_config表中无配置数据, 跳过本次同步。请到进销存系统中设置核价器配置")
return
}
// 查询一条记录按fail_count升序、updated_at倒序
record, err := s.goodsRepository.GetAllOrderByUpdatedAt()
if err != nil {
log.Printf("[syncGoodsPricing] 查询待处理记录失败: err=%v", err)
return
}
if record == nil {
return
}
// 限流等待
s.rateLimitWait()
var price float64
var shippingFee float64
// 最终书价
var finalPrice float64
// 查询孔网数据
log.Printf("[syncGoodsPricing] 开始查询孔网数据: id=%d, isbn=%s, book_name=%s", record.ID, record.ISBN, record.BookName)
bookInfo, err := s.outGetAllGoods(record.ISBN, record.BookName, record.Author, record.Publishing, record.Quality, record.QueryIndex)
if err != nil {
log.Printf("[syncGoodsPricing] 查询孔网失败: id=%d, 错误=%v", record.ID, err)
if kfzConfig.NewPrice == 0 {
s.goodsRepository.MarkFailed(record.ID)
log.Printf("[syncGoodsPricing] 标记失败: id=%d, fail_count=%d, new_price=0不启用兜底", record.ID, record.FailCount+1)
return
}
finalPrice = kfzConfig.NewPrice
log.Printf("[syncGoodsPricing] 查询失败, 启用兜底价格: finalPrice=%.2f (new_price)", finalPrice)
} else {
// 查询成功,更新价格
price, _ = strconv.ParseFloat(bookInfo.Price, 64)
shippingFee, _ = strconv.ParseFloat(bookInfo.ShippingFee, 64)
totalPrice := price + shippingFee
finalPrice = totalPrice - record.PlaceholderDownPrice - record.MinShippingFee
if finalPrice < kfzConfig.MinPrice {
finalPrice = kfzConfig.MinPrice
}
// 保留两位小数
finalPrice, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", finalPrice), 64)
}
if err := s.goodsRepository.UpdatePrice(record.ID, price, shippingFee, finalPrice); err != nil {
log.Printf("[syncGoodsPricing] 更新价格失败: id=%d, 错误=%v", record.ID, err)
return
}
log.Printf("[syncGoodsPricing] 更新成功: id=%d, price=%.2f, shipping_fee=%.2f, final_price=%.2f", record.ID, price, shippingFee, finalPrice)
// 调用回调
s.sendCallback(record.OutID, record.UserID, finalPrice, record.MinShippingFee, kfzConfig.MinPrice)
}
// outGetAllGoods 爬取孔网所有商品页面
func (s *GoodsService) outGetAllGoods(isbn string, bookName string, author string, publishing string, quality string, queryIndex int) (*model.BookInfo, error) {
var actionPathList []string
// 从数据库获取启用的token列表
tokens, err := s.tokenRepository.GetEnabledTokens()
if err != nil || len(tokens) == 0 {
log.Printf("[outGetAllGoods] 没有可用的token, 查询终止")
return nil, fmt.Errorf("没有可用的token")
}
log.Printf("[outGetAllGoods] 可用token数量: count=%d", len(tokens))
// 轮询选择token
s.tokenMu.Lock()
s.currentTokenIndex = s.currentTokenIndex % len(tokens)
currentIdx := s.currentTokenIndex
s.currentTokenIndex++
s.tokenMu.Unlock()
token := tokens[currentIdx].Token
log.Printf("[outGetAllGoods] 使用token索引: index=%d, total=%d, login_name=%s, nickname=%s", currentIdx, len(tokens), tokens[currentIdx].LoginName, tokens[currentIdx].Username)
kfzUrl := "https://search.kongfz.com/pc-gw/search-web/client/pc/product/keyword/list?dataType=0&page=1&sortType=7&userArea=13003000000&quaSelect=2"
// 整理查询参数-加上排序
actionPathList = append(actionPathList, "sortType")
// isbn
if isbn != "" {
kfzUrl = kfzUrl + "&keyword=" + isbn
}
// 书名
if bookName != "" {
kfzUrl = kfzUrl + "&keyword=" + bookName
}
// 作者
if author != "" {
kfzUrl = kfzUrl + "&author=" + author
}
// 出版社
if publishing != "" {
kfzUrl = kfzUrl + "&press=" + publishing
}
// 品相
if quality != "" {
kfzUrl = kfzUrl + "&quality=" + quality + "~"
actionPathList = append(actionPathList, "quality")
}
// 参数进行分割
actionPath := strings.Join(actionPathList, ",")
// 加入查询参数
kfzUrl = kfzUrl + "&actionPath=" + actionPath
log.Printf("[outGetAllGoods] 请求孔网URL: url=%s", kfzUrl)
// 执行搜索请求最多重试2次首次失败+自动刷新token后重试1次
bookInfo, err := s.doKfzSearch(kfzUrl, token, tokens[currentIdx], queryIndex)
return bookInfo, err
}
// doKfzSearch 执行孔网搜索请求token失效时自动重新登录重试
func (s *GoodsService) doKfzSearch(kfzUrl, token string, tokenRecord *repository.KfzToken, queryIndex int) (*model.BookInfo, error) {
for attempt := 1; attempt <= 2; attempt++ {
bookInfo, err := s.doKfzSearchOnce(kfzUrl, token, queryIndex)
if err == nil {
return bookInfo, nil
}
// 检查是否是token失效错误需要登录
errMsg := err.Error()
if strings.Contains(errMsg, "请登录") || strings.Contains(errMsg, "GO_LOGIN") || strings.Contains(errMsg, "errType=102") {
if attempt == 1 && tokenRecord.Password != "" {
loginName := tokenRecord.LoginName
if loginName == "" {
loginName = tokenRecord.Username
}
log.Printf("[outGetAllGoods] Token已失效, 尝试自动重新登录: login_name=%s, nickname=%s, id=%d", loginName, tokenRecord.Username, tokenRecord.ID)
newToken, refreshErr := s.TryRefreshToken(tokenRecord.ID, loginName, tokenRecord.Password)
if refreshErr != nil {
if refreshErr == ErrPasswordWrong {
s.goodsRepository.DisableToken(tokenRecord.ID)
s.reportBadPassword(loginName)
return nil, refreshErr
}
log.Printf("[outGetAllGoods] 自动重新登录失败: err=%v", refreshErr)
return nil, err
}
token = newToken
log.Printf("[outGetAllGoods] 使用新Token重试请求")
continue
}
log.Printf("[outGetAllGoods] Token失效但无密码(未保存), 无法自动重新登录")
}
return nil, err
}
return nil, fmt.Errorf("重试次数已用完")
}
// doKfzSearchOnce 执行一次孔网搜索请求
func (s *GoodsService) doKfzSearchOnce(kfzUrl, token string, queryIndex int) (*model.BookInfo, error) {
// 创建HTTP客户端
requestSpt := gorequest.New()
// 设置代理(如果有提供代理URL)
if s.proxy != "" {
requestSpt.Proxy(s.proxy)
}
// 发送请求
respSpt, bodySpt, errsSpt := requestSpt.Get(kfzUrl).
Set("Cookie", fmt.Sprintf("PHPSESSID=%s", token)).
Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36").
Set("Accept", "application/json, text/plain, */*").
Set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8").
Set("Referer", "https://item.kongfz.com/").
Timeout(30 * time.Second).
End()
// 错误处理
if len(errsSpt) > 0 {
log.Printf("[outGetAllGoods] 孔网请求失败: errs=%v", errsSpt)
return nil, fmt.Errorf("请求失败: %v", errsSpt)
}
// 检查HTTP状态码
if respSpt.StatusCode != http.StatusOK {
log.Printf("[outGetAllGoods] 孔网HTTP错误: status=%s", respSpt.Status)
return nil, fmt.Errorf("HTTP错误: %s", respSpt.Status)
}
log.Printf("[outGetAllGoods] 孔网响应数据: body=%s", bodySpt)
// 解析响应
var apiSptResp struct {
Status int `json:"status"`
ErrType string `json:"errType"`
Message string `json:"message"`
SystemTime int64 `json:"systemTime"`
Data struct {
ItemResponse struct {
Total int `json:"total"`
List []struct {
Title string `json:"title"`
Author string `json:"author"`
Press string `json:"press"`
PubDateText string `json:"pubDateText"`
Isbn string `json:"isbn"`
Price float64 `json:"price"`
Postage struct {
ShippingList []struct {
ShippingFee float64 `json:"shippingFee"`
} `json:"shippingList"`
} `json:"postage"`
} `json:"list"`
} `json:"itemResponse"`
} `json:"data"`
}
// 解析JSON
if err := json.Unmarshal([]byte(bodySpt), &apiSptResp); err != nil {
log.Printf("[outGetAllGoods] 解析孔网响应JSON失败: err=%v", err)
return nil, fmt.Errorf("解析JSON失败: %w", err)
}
if apiSptResp.Status != 1 {
log.Printf("[outGetAllGoods] 孔网API返回错误: message=%s, errType=%s", apiSptResp.Message, apiSptResp.ErrType)
return nil, fmt.Errorf("错误信息: %v状态码: %s, errType=%s", apiSptResp.Message, apiSptResp.ErrType)
}
bookInfo := &model.BookInfo{}
if apiSptResp.Data.ItemResponse.Total > 0 && len(apiSptResp.Data.ItemResponse.List) > 0 {
log.Printf("[outGetAllGoods] 孔网搜索成功: 总共%d条结果, query_index=%d", apiSptResp.Data.ItemResponse.Total, queryIndex)
if queryIndex > 0 && queryIndex <= len(apiSptResp.Data.ItemResponse.List) {
goodsInfo := apiSptResp.Data.ItemResponse.List[queryIndex-1]
bookInfo.BookName = goodsInfo.Title
bookInfo.ISBN = goodsInfo.Isbn
bookInfo.Author = goodsInfo.Author
bookInfo.Publisher = goodsInfo.Press
bookInfo.PubDate = goodsInfo.PubDateText
bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price)
for _, shipping := range goodsInfo.Postage.ShippingList {
bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee)
}
log.Printf("[outGetAllGoods] 取第%d条结果: title=%s, price=%s, shipping=%s", queryIndex, bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee)
} else {
goodsInfo := apiSptResp.Data.ItemResponse.List[len(apiSptResp.Data.ItemResponse.List)-1]
bookInfo.BookName = goodsInfo.Title
bookInfo.ISBN = goodsInfo.Isbn
bookInfo.Author = goodsInfo.Author
bookInfo.Publisher = goodsInfo.Press
bookInfo.PubDate = goodsInfo.PubDateText
bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price)
for _, shipping := range goodsInfo.Postage.ShippingList {
bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee)
}
log.Printf("[outGetAllGoods] query_index无效, 取最后一条: title=%s, price=%s, shipping=%s", bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee)
}
return bookInfo, nil
}
log.Printf("[outGetAllGoods] 孔网搜索无结果: total=%d", apiSptResp.Data.ItemResponse.Total)
return nil, fmt.Errorf("查询失败,没有数据!")
}