package service import ( "encoding/json" "fmt" "log" "net/http" "strconv" "strings" "sync" "time" "kfz-goods-pricing/internal/model" "kfz-goods-pricing/internal/repository" "github.com/parnurzeal/gorequest" ) // GoodsService 商品服务 type GoodsService struct { proxy string apiRateLimitSeconds int rateLimitMu sync.Mutex lastAPICall time.Time goodsRepository *repository.GoodsRepository callbackURL string tokenRepository *repository.TokenRepository currentTokenIndex int tokenMu sync.Mutex badPasswords map[string]bool badPWMu sync.Mutex alertStartedMu sync.Mutex alertStarted bool } // NewGoodsService 创建商品服务实例 func NewGoodsService(proxy string, apiRateLimitSeconds int, callbackURL string, tokenRepo *repository.TokenRepository) *GoodsService { return &GoodsService{ proxy: proxy, apiRateLimitSeconds: apiRateLimitSeconds, goodsRepository: repository.NewGoodsRepository(), callbackURL: callbackURL, tokenRepository: tokenRepo, currentTokenIndex: 0, badPasswords: make(map[string]bool), } } // QueryRequest 查询请求参数 {"isbn":"9787802204461","out_id":"132456","quality":"100"} type QueryRequest struct { ISBN string `json:"isbn"` // isbn BookName string `json:"book_name"` // 书名 Author string `json:"author"` // 作者 Publishing string `json:"publishing"` // 出版社 OutID string `json:"out_id"` // 输出ID Quality string `json:"quality"` // 品相 QueryIndex int `json:"query_index"` // 排第几位 UserID string `json:"user_id"` // 用户ID PlaceholderDownPrice float64 `json:"placeholder_down_price"` // 占位降价 MinShippingFee float64 `json:"min_shipping_fee"` // 最低运费 MinPrice float64 `json:"min_price"` // 最低书价 } // QueryResponse 查询响应 type QueryResponse struct { Code int `json:"code"` Message string `json:"message"` ID int64 `json:"id,omitempty"` Data interface{} `json:"data,omitempty"` } // QueryGoods 查询商品信息 func (s *GoodsService) QueryGoods(req *QueryRequest) *QueryResponse { if req.ISBN == "0" { log.Printf("[QueryGoods] 收到请求 isbn=0") return &QueryResponse{ Code: 200, Message: "success", } } // 插入入参记录到数据库 id, err := s.goodsRepository.Insert(req.ISBN, req.BookName, req.Author, req.Publishing, req.OutID, req.Quality, req.UserID, req.QueryIndex, req.PlaceholderDownPrice, req.MinShippingFee, req.MinPrice) if err != nil { log.Printf("[QueryGoods] 保存记录失败: isbn=%s, out_id=%s, user_id=%s, 错误=%v", req.ISBN, req.OutID, req.UserID, err) return &QueryResponse{ Code: 500, Message: fmt.Sprintf("保存记录失败: %v", err), } } log.Printf("[QueryGoods] 记录保存成功: id=%d, isbn=%s, book_name=%s, out_id=%s, user_id=%s", id, req.ISBN, req.BookName, req.OutID, req.UserID) return &QueryResponse{ Code: 200, Message: "success", ID: id, } } // rateLimitWait 限流等待 func (s *GoodsService) rateLimitWait() { if s.apiRateLimitSeconds <= 0 { return } s.rateLimitMu.Lock() defer s.rateLimitMu.Unlock() elapsed := time.Since(s.lastAPICall) if elapsed < time.Duration(s.apiRateLimitSeconds)*time.Second { time.Sleep(time.Duration(s.apiRateLimitSeconds)*time.Second - elapsed) } s.lastAPICall = time.Now() } // sendCallback 发送回调请求 func (s *GoodsService) sendCallback(outID, userID string, price, shippingFee float64, minPrice float64) { if s.callbackURL == "" { return } // price*100 转int, shipping_fee*100 转int salePrice := int(price * 100) cost := int(shippingFee * 100) request := gorequest.New() if s.proxy != "" { request.Proxy(s.proxy) } _, _, errs := request.Post(s.callbackURL). Set("Content-Type", "application/x-www-form-urlencoded"). Send(fmt.Sprintf("product_id=%s&user_id=%s&sale_price=%d&cost=%d", outID, userID, salePrice, cost)). Timeout(30 * time.Second). End() if len(errs) > 0 { log.Printf("回调失败: errs=%v", errs) } else { log.Printf("最低书价: min_price=%v", minPrice) log.Printf("回调成功: product_id=%s, user_id=%s, sale_price=%d, cost=%d", outID, userID, salePrice, cost) } } // reportBadPassword 记录密码错误并启动每3秒输出一次提示 func (s *GoodsService) reportBadPassword(username string) { s.badPWMu.Lock() if s.badPasswords[username] { s.badPWMu.Unlock() return } s.badPasswords[username] = true s.badPWMu.Unlock() log.Printf("[BadPassword] 孔网账号密码错误: username=%s, 已禁用该token, 每3秒输出一次提示", username) s.alertStartedMu.Lock() if s.alertStarted { s.alertStartedMu.Unlock() return } s.alertStarted = true s.alertStartedMu.Unlock() go func() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() for range ticker.C { s.badPWMu.Lock() if len(s.badPasswords) == 0 { s.badPWMu.Unlock() s.alertStartedMu.Lock() s.alertStarted = false s.alertStartedMu.Unlock() return } users := make([]string, 0, len(s.badPasswords)) for u := range s.badPasswords { users = append(users, u) } s.badPWMu.Unlock() for _, u := range users { log.Printf("[BadPassword] 孔网账号或密码错误: username=%s, 请更新密码后重新登录", u) } } }() } // StartTimerScheduler 启动定时器 func (s *GoodsService) StartTimerScheduler(intervalSeconds int) { log.Printf("[TimerScheduler] 定时器启动, 间隔=%d秒", intervalSeconds) ticker := time.NewTicker(time.Duration(intervalSeconds) * time.Second) go func() { for range ticker.C { s.syncGoodsPricing() } }() } // syncGoodsPricing 定时同步商品价格 func (s *GoodsService) syncGoodsPricing() { // 查询数据库数据 kfzConfig, err := repository.GetKfzConfig() if err != nil { log.Printf("[syncGoodsPricing] 获取config数据库数据失败: err=%v", err) return } if kfzConfig == nil { log.Printf("[syncGoodsPricing] kfz_config表中无配置数据, 跳过本次同步。请到进销存系统中设置核价器配置") return } // 查询一条记录,按fail_count升序、updated_at倒序 record, err := s.goodsRepository.GetAllOrderByUpdatedAt() if err != nil { log.Printf("[syncGoodsPricing] 查询待处理记录失败: err=%v", err) return } if record == nil { return } // 限流等待 s.rateLimitWait() var price float64 var shippingFee float64 // 最终书价 var finalPrice float64 // 查询孔网数据 log.Printf("[syncGoodsPricing] 开始查询孔网数据: id=%d, isbn=%s, book_name=%s", record.ID, record.ISBN, record.BookName) bookInfo, err := s.outGetAllGoods(record.ISBN, record.BookName, record.Author, record.Publishing, record.Quality, record.QueryIndex) if err != nil { log.Printf("[syncGoodsPricing] 查询孔网失败: id=%d, 错误=%v", record.ID, err) if kfzConfig.NewPrice == 0 { s.goodsRepository.MarkFailed(record.ID) log.Printf("[syncGoodsPricing] 标记失败: id=%d, fail_count=%d, new_price=0不启用兜底", record.ID, record.FailCount+1) return } finalPrice = kfzConfig.NewPrice log.Printf("[syncGoodsPricing] 查询失败, 启用兜底价格: finalPrice=%.2f (new_price)", finalPrice) } else { // 查询成功,更新价格 price, _ = strconv.ParseFloat(bookInfo.Price, 64) shippingFee, _ = strconv.ParseFloat(bookInfo.ShippingFee, 64) totalPrice := price + shippingFee finalPrice = totalPrice - record.PlaceholderDownPrice - record.MinShippingFee if finalPrice < kfzConfig.MinPrice { finalPrice = kfzConfig.MinPrice } // 保留两位小数 finalPrice, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", finalPrice), 64) } if err := s.goodsRepository.UpdatePrice(record.ID, price, shippingFee, finalPrice); err != nil { log.Printf("[syncGoodsPricing] 更新价格失败: id=%d, 错误=%v", record.ID, err) return } log.Printf("[syncGoodsPricing] 更新成功: id=%d, price=%.2f, shipping_fee=%.2f, final_price=%.2f", record.ID, price, shippingFee, finalPrice) // 调用回调 s.sendCallback(record.OutID, record.UserID, finalPrice, record.MinShippingFee, kfzConfig.MinPrice) } // outGetAllGoods 爬取孔网所有商品页面 func (s *GoodsService) outGetAllGoods(isbn string, bookName string, author string, publishing string, quality string, queryIndex int) (*model.BookInfo, error) { var actionPathList []string // 从数据库获取启用的token列表 tokens, err := s.tokenRepository.GetEnabledTokens() if err != nil || len(tokens) == 0 { log.Printf("[outGetAllGoods] 没有可用的token, 查询终止") return nil, fmt.Errorf("没有可用的token") } log.Printf("[outGetAllGoods] 可用token数量: count=%d", len(tokens)) // 轮询选择token s.tokenMu.Lock() s.currentTokenIndex = s.currentTokenIndex % len(tokens) currentIdx := s.currentTokenIndex s.currentTokenIndex++ s.tokenMu.Unlock() token := tokens[currentIdx].Token log.Printf("[outGetAllGoods] 使用token索引: index=%d, total=%d, username=%s", currentIdx, len(tokens), tokens[currentIdx].Username) kfzUrl := "https://search.kongfz.com/pc-gw/search-web/client/pc/product/keyword/list?dataType=0&page=1&sortType=7&userArea=13003000000&quaSelect=2" // 整理查询参数-加上排序 actionPathList = append(actionPathList, "sortType") // isbn if isbn != "" { kfzUrl = kfzUrl + "&keyword=" + isbn } // 书名 if bookName != "" { kfzUrl = kfzUrl + "&keyword=" + bookName } // 作者 if author != "" { kfzUrl = kfzUrl + "&author=" + author } // 出版社 if publishing != "" { kfzUrl = kfzUrl + "&press=" + publishing } // 品相 if quality != "" { kfzUrl = kfzUrl + "&quality=" + quality + "~" actionPathList = append(actionPathList, "quality") } // 参数进行分割 actionPath := strings.Join(actionPathList, ",") // 加入查询参数 kfzUrl = kfzUrl + "&actionPath=" + actionPath log.Printf("[outGetAllGoods] 请求孔网URL: url=%s", kfzUrl) // 执行搜索请求,最多重试2次(首次失败+自动刷新token后重试1次) bookInfo, err := s.doKfzSearch(kfzUrl, token, tokens[currentIdx], queryIndex) return bookInfo, err } // doKfzSearch 执行孔网搜索请求,token失效时自动重新登录重试 func (s *GoodsService) doKfzSearch(kfzUrl, token string, tokenRecord *repository.KfzToken, queryIndex int) (*model.BookInfo, error) { for attempt := 1; attempt <= 2; attempt++ { bookInfo, err := s.doKfzSearchOnce(kfzUrl, token, queryIndex) if err == nil { return bookInfo, nil } // 检查是否是token失效错误(需要登录) errMsg := err.Error() if strings.Contains(errMsg, "请登录") || strings.Contains(errMsg, "GO_LOGIN") || strings.Contains(errMsg, "errType=102") { if attempt == 1 && tokenRecord.Password != "" { log.Printf("[outGetAllGoods] Token已失效, 尝试自动重新登录: username=%s, id=%d", tokenRecord.Username, tokenRecord.ID) newToken, refreshErr := s.TryRefreshToken(tokenRecord.ID, tokenRecord.Username, tokenRecord.Password) if refreshErr != nil { if refreshErr == ErrPasswordWrong { s.goodsRepository.DisableToken(tokenRecord.ID) s.reportBadPassword(tokenRecord.Username) return nil, refreshErr } log.Printf("[outGetAllGoods] 自动重新登录失败: err=%v", refreshErr) return nil, err } token = newToken log.Printf("[outGetAllGoods] 使用新Token重试请求") continue } log.Printf("[outGetAllGoods] Token失效但无密码(未保存), 无法自动重新登录") } return nil, err } return nil, fmt.Errorf("重试次数已用完") } // doKfzSearchOnce 执行一次孔网搜索请求 func (s *GoodsService) doKfzSearchOnce(kfzUrl, token string, queryIndex int) (*model.BookInfo, error) { // 创建HTTP客户端 requestSpt := gorequest.New() // 设置代理(如果有提供代理URL) if s.proxy != "" { requestSpt.Proxy(s.proxy) } // 发送请求 respSpt, bodySpt, errsSpt := requestSpt.Get(kfzUrl). Set("Cookie", fmt.Sprintf("PHPSESSID=%s", token)). Set("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"). Set("Accept", "application/json, text/plain, */*"). Set("Accept-Language", "zh-CN,zh;q=0.9,en;q=0.8"). Set("Referer", "https://item.kongfz.com/"). Timeout(30 * time.Second). End() // 错误处理 if len(errsSpt) > 0 { log.Printf("[outGetAllGoods] 孔网请求失败: errs=%v", errsSpt) return nil, fmt.Errorf("请求失败: %v", errsSpt) } // 检查HTTP状态码 if respSpt.StatusCode != http.StatusOK { log.Printf("[outGetAllGoods] 孔网HTTP错误: status=%s", respSpt.Status) return nil, fmt.Errorf("HTTP错误: %s", respSpt.Status) } log.Printf("[outGetAllGoods] 孔网响应数据: body=%s", bodySpt) // 解析响应 var apiSptResp struct { Status int `json:"status"` ErrType string `json:"errType"` Message string `json:"message"` SystemTime int64 `json:"systemTime"` Data struct { ItemResponse struct { Total int `json:"total"` List []struct { Title string `json:"title"` Author string `json:"author"` Press string `json:"press"` PubDateText string `json:"pubDateText"` Isbn string `json:"isbn"` Price float64 `json:"price"` Postage struct { ShippingList []struct { ShippingFee float64 `json:"shippingFee"` } `json:"shippingList"` } `json:"postage"` } `json:"list"` } `json:"itemResponse"` } `json:"data"` } // 解析JSON if err := json.Unmarshal([]byte(bodySpt), &apiSptResp); err != nil { log.Printf("[outGetAllGoods] 解析孔网响应JSON失败: err=%v", err) return nil, fmt.Errorf("解析JSON失败: %w", err) } if apiSptResp.Status != 1 { log.Printf("[outGetAllGoods] 孔网API返回错误: message=%s, errType=%s", apiSptResp.Message, apiSptResp.ErrType) return nil, fmt.Errorf("错误信息: %v,状态码: %s, errType=%s", apiSptResp.Message, apiSptResp.ErrType) } bookInfo := &model.BookInfo{} if apiSptResp.Data.ItemResponse.Total > 0 && len(apiSptResp.Data.ItemResponse.List) > 0 { log.Printf("[outGetAllGoods] 孔网搜索成功: 总共%d条结果, query_index=%d", apiSptResp.Data.ItemResponse.Total, queryIndex) if queryIndex > 0 && queryIndex <= len(apiSptResp.Data.ItemResponse.List) { goodsInfo := apiSptResp.Data.ItemResponse.List[queryIndex-1] bookInfo.BookName = goodsInfo.Title bookInfo.ISBN = goodsInfo.Isbn bookInfo.Author = goodsInfo.Author bookInfo.Publisher = goodsInfo.Press bookInfo.PubDate = goodsInfo.PubDateText bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price) for _, shipping := range goodsInfo.Postage.ShippingList { bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee) } log.Printf("[outGetAllGoods] 取第%d条结果: title=%s, price=%s, shipping=%s", queryIndex, bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee) } else { goodsInfo := apiSptResp.Data.ItemResponse.List[len(apiSptResp.Data.ItemResponse.List)-1] bookInfo.BookName = goodsInfo.Title bookInfo.ISBN = goodsInfo.Isbn bookInfo.Author = goodsInfo.Author bookInfo.Publisher = goodsInfo.Press bookInfo.PubDate = goodsInfo.PubDateText bookInfo.Price = fmt.Sprintf("%.2f", goodsInfo.Price) for _, shipping := range goodsInfo.Postage.ShippingList { bookInfo.ShippingFee = fmt.Sprintf("%.2f", shipping.ShippingFee) } log.Printf("[outGetAllGoods] query_index无效, 取最后一条: title=%s, price=%s, shipping=%s", bookInfo.BookName, bookInfo.Price, bookInfo.ShippingFee) } return bookInfo, nil } log.Printf("[outGetAllGoods] 孔网搜索无结果: total=%d", apiSptResp.Data.ItemResponse.Total) return nil, fmt.Errorf("查询失败,没有数据!") }