daShangDao_kfz_goods_pricing/internal/service/goods_service.go

391 lines
14 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package service
import (
"encoding/json"
"fmt"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"kfz-goods-pricing/internal/model"
"kfz-goods-pricing/internal/repository"
"github.com/parnurzeal/gorequest"
)
// GoodsService 商品服务
type GoodsService struct {
proxy string
apiRateLimitSeconds int
rateLimitMu sync.Mutex
lastAPICall time.Time
goodsRepository *repository.GoodsRepository
callbackURL string
tokenRepository *repository.TokenRepository
currentTokenIndex int
tokenMu sync.Mutex
}
// NewGoodsService 创建商品服务实例
func NewGoodsService(proxy string, apiRateLimitSeconds int, callbackURL string, tokenRepo *repository.TokenRepository) *GoodsService {
return &GoodsService{
proxy: proxy,
apiRateLimitSeconds: apiRateLimitSeconds,
goodsRepository: repository.NewGoodsRepository(),
callbackURL: callbackURL,
tokenRepository: tokenRepo,
currentTokenIndex: 0,
}
}
// QueryRequest 查询请求参数 {"isbn":"9787802204461","out_id":"132456","quality":"100"}
type QueryRequest struct {
ISBN string `json:"isbn"` // isbn
BookName string `json:"book_name"` // 书名
Author string `json:"author"` // 作者
Publishing string `json:"publishing"` // 出版社
OutID string `json:"out_id"` // 输出ID
Quality string `json:"quality"` // 品相
QueryIndex int `json:"query_index"` // 排第几位
UserID string `json:"user_id"` // 用户ID
PlaceholderDownPrice float64 `json:"placeholder_down_price"` // 占位降价
MinShippingFee float64 `json:"min_shipping_fee"` // 最低运费
MinPrice float64 `json:"min_price"` // 最低书价
}
// QueryResponse 查询响应
type QueryResponse struct {
Code int `json:"code"`
Message string `json:"message"`
ID int64 `json:"id,omitempty"`
Data interface{} `json:"data,omitempty"`
}
// QueryGoods 查询商品信息
func (s *GoodsService) QueryGoods(req *QueryRequest) *QueryResponse {
if req.ISBN == "0" {
log.Printf("[QueryGoods] ISBN为0, 跳过处理")
return &QueryResponse{
Code: 200,
Message: "success",
}
}
// 插入入参记录到数据库
id, err := s.goodsRepository.Insert(req.ISBN, req.BookName, req.Author, req.Publishing, req.OutID, req.Quality, req.UserID, req.QueryIndex, req.PlaceholderDownPrice, req.MinShippingFee, req.MinPrice)
if err != nil {
log.Printf("[QueryGoods] 保存记录失败: isbn=%s, out_id=%s, user_id=%s, 错误=%v", req.ISBN, req.OutID, req.UserID, err)
return &QueryResponse{
Code: 500,
Message: fmt.Sprintf("保存记录失败: %v", err),
}
}
log.Printf("[QueryGoods] 记录保存成功: id=%d, isbn=%s, book_name=%s, out_id=%s, user_id=%s", id, req.ISBN, req.BookName, req.OutID, req.UserID)
return &QueryResponse{
Code: 200,
Message: "success",
ID: id,
}
}
// rateLimitWait 限流等待
func (s *GoodsService) rateLimitWait() {
if s.apiRateLimitSeconds <= 0 {
return
}
s.rateLimitMu.Lock()
defer s.rateLimitMu.Unlock()
elapsed := time.Since(s.lastAPICall)
if elapsed < time.Duration(s.apiRateLimitSeconds)*time.Second {
time.Sleep(time.Duration(s.apiRateLimitSeconds)*time.Second - elapsed)
}
s.lastAPICall = time.Now()
}
// sendCallback 发送回调请求
func (s *GoodsService) sendCallback(outID, userID string, price, shippingFee float64, minPrice float64) {
if s.callbackURL == "" {
return
}
// price*100 转int, shipping_fee*100 转int
salePrice := int(price * 100)
cost := int(shippingFee * 100)
request := gorequest.New()
if s.proxy != "" {
request.Proxy(s.proxy)
}
_, _, errs := request.Post(s.callbackURL).
Set("Content-Type", "application/x-www-form-urlencoded").
Send(fmt.Sprintf("product_id=%s&user_id=%s&sale_price=%d&cost=%d", outID, userID, salePrice, cost)).
Timeout(30 * time.Second).
End()
if len(errs) > 0 {
log.Printf("回调失败: %v", errs)
} else {
log.Printf("最低书价:%v", minPrice)
log.Printf("回调成功: product_id=%s, user_id=%s, sale_price=%d, cost=%d", outID, userID, salePrice, cost)
}
}
// StartTimerScheduler 启动定时器
func (s *GoodsService) StartTimerScheduler(intervalSeconds int) {
log.Printf("[TimerScheduler] 定时器启动, 间隔=%d秒", intervalSeconds)
ticker := time.NewTicker(time.Duration(intervalSeconds) * time.Second)
go func() {
for range ticker.C {
s.syncGoodsPricing()
}
}()
}
// syncGoodsPricing 定时同步商品价格
func (s *GoodsService) syncGoodsPricing() {
log.Printf("[syncGoodsPricing] ===== 开始执行定时同步任务 =====")
// 查询数据库数据
kfzConfig, err := repository.GetKfzConfig()
if err != nil {
log.Printf("[syncGoodsPricing] 获取config数据库数据失败: %v", err)
return
}
log.Printf("[syncGoodsPricing] 全局配置: new_price=%.2f, placeholder_down_price=%.2f, min_shipping_fee=%.2f, min_price=%.2f, query_index=%d",
kfzConfig.NewPrice, kfzConfig.PlaceholderDownPrice, kfzConfig.MinShippingFee, kfzConfig.MinPrice, kfzConfig.QueryIndex)
// 查询一条记录按fail_count升序、updated_at倒序
record, err := s.goodsRepository.GetAllOrderByUpdatedAt()
if err != nil {
log.Printf("[syncGoodsPricing] 查询待处理记录失败: %v", err)
return
}
if record == nil {
log.Printf("[syncGoodsPricing] 没有待处理的记录, 跳过本次同步")
return
}
log.Printf("[syncGoodsPricing] 获取到待处理记录: id=%d, isbn=%s, book_name=%s, out_id=%s, fail_count=%d",
record.ID, record.ISBN, record.BookName, record.OutID, record.FailCount)
// 限流等待
s.rateLimitWait()
var price float64
var shippingFee float64
// 最终书价
var finalPrice float64
// 查询孔网数据
log.Printf("[syncGoodsPricing] 开始查询孔网数据: id=%d, isbn=%s, book_name=%s", record.ID, record.ISBN, record.BookName)
bookInfo, err := s.outGetAllGoods(record.ISBN, record.BookName, record.Author, record.Publishing, record.Quality, record.QueryIndex)
if err != nil {
log.Printf("[syncGoodsPricing] 查询孔网失败: id=%d, 错误=%v", record.ID, err)
if kfzConfig.NewPrice == 0 {
s.goodsRepository.MarkFailed(record.ID)
log.Printf("[syncGoodsPricing] 标记失败: id=%d, fail_count=%d, new_price=0不启用兜底", record.ID, record.FailCount+1)
return
}
finalPrice = kfzConfig.NewPrice
log.Printf("[syncGoodsPricing] 查询失败, 启用兜底价格: finalPrice=%.2f (new_price)", finalPrice)
} else {
// 查询成功,更新价格
price, _ = strconv.ParseFloat(bookInfo.Price, 64)
shippingFee, _ = strconv.ParseFloat(bookInfo.ShippingFee, 64)
totalPrice := price + shippingFee
log.Printf("[syncGoodsPricing] 孔网数据: id=%d, price=%.2f, shipping_fee=%.2f, total=%.2f", record.ID, price, shippingFee, totalPrice)
log.Printf("[syncGoodsPricing] 计算参数: placeholder_down_price=%.2f, min_shipping_fee=%.2f, kfzConfig.min_price=%.2f",
record.PlaceholderDownPrice, record.MinShippingFee, kfzConfig.MinPrice)
finalPrice = totalPrice - record.PlaceholderDownPrice - record.MinShippingFee
log.Printf("[syncGoodsPricing] 减去占位降价和运费后: finalPrice=%.2f", finalPrice)
if finalPrice < kfzConfig.MinPrice {
log.Printf("[syncGoodsPricing] 价格低于最低书价, 使用最低书价: %.2f -> %.2f", finalPrice, kfzConfig.MinPrice)
finalPrice = kfzConfig.MinPrice
}
// 保留两位小数
beforeRound := finalPrice
finalPrice, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", finalPrice), 64)
log.Printf("[syncGoodsPricing] 最终价格取两位小数: %.2f -> %.2f", beforeRound, finalPrice)
}
if err := s.goodsRepository.UpdatePrice(record.ID, price, shippingFee, finalPrice); err != nil {
log.Printf("[syncGoodsPricing] 更新价格失败: id=%d, 错误=%v", record.ID, err)
return
}
log.Printf("[syncGoodsPricing] 更新成功: id=%d, price=%.2f, shipping_fee=%.2f, final_price=%.2f", record.ID, price, shippingFee, finalPrice)
// 调用回调
s.sendCallback(record.OutID, record.UserID, finalPrice, record.MinShippingFee, kfzConfig.MinPrice)
}
// outGetAllGoods 爬取孔网所有商品页面
func (s *GoodsService) outGetAllGoods(isbn string, bookName string, author string, publishing string, quality string, queryIndex int) (*model.BookInfo, error) {
var actionPathList []string
// 从数据库获取启用的token列表
tokens, err := s.tokenRepository.GetEnabledTokens()
if err != nil || len(tokens) == 0 {
log.Printf("[outGetAllGoods] 没有可用的token, 查询终止")
return nil, fmt.Errorf("没有可用的token")
}
log.Printf("[outGetAllGoods] 可用token数量: %d", len(tokens))
// 轮询选择token
s.tokenMu.Lock()
s.currentTokenIndex = s.currentTokenIndex % len(tokens)
currentIdx := s.currentTokenIndex
s.currentTokenIndex++
s.tokenMu.Unlock()
token := tokens[currentIdx].Token
log.Printf("[outGetAllGoods] 使用token索引: %d/%d, username=%s", currentIdx, len(tokens), tokens[currentIdx].Username)
kfzUrl := "https://search.kongfz.com/pc-gw/search-web/client/pc/product/keyword/list?dataType=0&page=1&sortType=7&userArea=13003000000&quaSelect=2"
// 整理查询参数-加上排序
actionPathList = append(actionPathList, "sortType")
// isbn
if isbn != "" {
kfzUrl = kfzUrl + "&keyword=" + isbn
}
// 书名
if bookName != "" {
kfzUrl = kfzUrl + "&keyword=" + bookName
}
// 作者
if author != "" {
kfzUrl = kfzUrl + "&author=" + author
}
// 出版社
if publishing != "" {
kfzUrl = kfzUrl + "&press=" + publishing
}
// 品相
if quality != "" {
kfzUrl = kfzUrl + "&quality=" + quality + "~"
actionPathList = append(actionPathList, "quality")
}
// 参数进行分割
actionPath := strings.Join(actionPathList, ",")
// 加入查询参数
kfzUrl = kfzUrl + "&actionPath=" + actionPath
log.Printf("[outGetAllGoods] 请求孔网URL: %s", kfzUrl)
// 创建HTTP客户端
requestSpt := gorequest.New()
// 设置代理(如果有提供代理URL)
if s.proxy != "" {
requestSpt.Proxy(s.proxy)
}
// 发送请求
respSpt, bodySpt, errsSpt := requestSpt.Get(kfzUrl).
Set("Cookie", fmt.Sprintf("PHPSESSID=%s", token)).
Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36").
Set("Accept", "application/json, text/plain, */*").
Set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8").
Set("Referer", "https://item.kongfz.com/").
Timeout(30 * time.Second).
End()
// 错误处理
if len(errsSpt) > 0 {
log.Printf("[outGetAllGoods] 孔网请求失败: %v", errsSpt)
return nil, fmt.Errorf("请求失败: %v", errsSpt)
}
// 检查HTTP状态码
if respSpt.StatusCode != http.StatusOK {
log.Printf("[outGetAllGoods] 孔网HTTP错误: %s", respSpt.Status)
return nil, fmt.Errorf("HTTP错误: %s", respSpt.Status)
}
// 解析响应
var apiSptResp struct {
Status int `json:"status"`
ErrType string `json:"errType"`
Message string `json:"message"`
SystemTime int64 `json:"systemTime"`
Data struct {
ItemResponse struct {
Total int `json:"total"`
List []struct {
Title string `json:"title"`
Author string `json:"author"`
Press string `json:"press"`
PubDateText string `json:"pubDateText"`
Isbn string `json:"isbn"`
Price float64 `json:"price"`
Postage struct {
ShippingList []struct {
ShippingFee float64 `json:"shippingFee"`
} `json:"shippingList"`
} `json:"postage"`
} `json:"list"`
} `json:"itemResponse"`
} `json:"data"`
}
// 解析JSON
if err := json.Unmarshal([]byte(bodySpt), &apiSptResp); err != nil {
log.Printf("[outGetAllGoods] 解析孔网响应JSON失败: %v", err)
return nil, fmt.Errorf("解析JSON失败: %w", err)
}
if apiSptResp.Status != 1 {
log.Printf("[outGetAllGoods] 孔网API返回错误: message=%s, errType=%s", apiSptResp.Message, apiSptResp.ErrType)
return nil, fmt.Errorf("错误信息: %v状态码: %s", apiSptResp.Message, apiSptResp.ErrType)
}
bookInfo := &model.BookInfo{}
if apiSptResp.Data.ItemResponse.Total > 0 && len(apiSptResp.Data.ItemResponse.List) > 0 {
log.Printf("[outGetAllGoods] 孔网搜索成功: 总共%d条结果, query_index=%d", apiSptResp.Data.ItemResponse.Total, queryIndex)
if queryIndex > 0 && queryIndex <= len(apiSptResp.Data.ItemResponse.List) {
goodsInfo := apiSptResp.Data.ItemResponse.List[queryIndex-1]
bookInfo.BookName = goodsInfo.Title
bookInfo.ISBN = goodsInfo.Isbn
bookInfo.Author = goodsInfo.Author
bookInfo.Publisher = goodsInfo.Press
bookInfo.PubDate = goodsInfo.PubDateText
bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price)
for _, shipping := range goodsInfo.Postage.ShippingList {
bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee)
}
log.Printf("[outGetAllGoods] 取第%d条结果: title=%s, price=%s, shipping=%s", queryIndex, bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee)
} else {
goodsInfo := apiSptResp.Data.ItemResponse.List[len(apiSptResp.Data.ItemResponse.List)-1]
bookInfo.BookName = goodsInfo.Title
bookInfo.ISBN = goodsInfo.Isbn
bookInfo.Author = goodsInfo.Author
bookInfo.Publisher = goodsInfo.Press
bookInfo.PubDate = goodsInfo.PubDateText
bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price)
for _, shipping := range goodsInfo.Postage.ShippingList {
bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee)
}
log.Printf("[outGetAllGoods] query_index无效, 取最后一条: title=%s, price=%s, shipping=%s", bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee)
}
return bookInfo, nil
}
log.Printf("[outGetAllGoods] 孔网搜索无结果: total=%d", apiSptResp.Data.ItemResponse.Total)
return nil, fmt.Errorf("查询失败,没有数据!")
}