package monitor import ( "bytes" "context" "encoding/json" "fmt" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/esapi" "io" "strings" "sync" "time" "github.com/go-redis/redis/v8" ) // APIMonitor API 监控器 - 监控特定接口的 ES 和 Redis 调用 type APIMonitor struct { endpoint string esCalls []APICallRecord redisCalls []APICallRecord mutex sync.RWMutex //maxRecords int nextID int64 qpsWindow time.Duration // QPS 统计时间窗口 } // APICallRecord API 调用记录 type APICallRecord struct { ID int64 `json:"id"` Timestamp time.Time `json:"timestamp"` Duration int64 `json:"duration_ms"` Success bool `json:"success"` Error string `json:"error,omitempty"` Operation string `json:"operation"` // ES: search/get, Redis: GET/SET/HGET KeyOrIndex string `json:"key_or_index"` // ES 索引或 Redis Key Query string `json:"query,omitempty"` // 查询内容 Request string `json:"request,omitempty"` // 请求内容 Response string `json:"response,omitempty"` // 响应内容 } // APIStats API 统计数据 type APIStats struct { Endpoint string `json:"endpoint"` TotalCalls int64 `json:"total_calls"` Escalls int64 `json:"es_calls"` RedisCalls int64 `json:"redis_calls"` ESQPS float64 `json:"es_qps"` // ES QPS RedisQPS float64 `json:"redis_qps"` // Redis QPS AvgDuration int64 `json:"avg_duration_ms"` SuccessRate float64 `json:"success_rate"` LastUpdate string `json:"last_update"` } // 全局监控器映射表:endpoint -> APIMonitor var ( apiMonitors = make(map[string]*APIMonitor) apiMonitorsMu sync.RWMutex ) // GetOrCreateAPIMonitor 获取或创建 API 监控器 func GetOrCreateAPIMonitor(endpoint string, qpsWindow time.Duration) *APIMonitor { apiMonitorsMu.Lock() defer apiMonitorsMu.Unlock() if monitor, exists := apiMonitors[endpoint]; exists { return monitor } monitor := &APIMonitor{ endpoint: endpoint, esCalls: make([]APICallRecord, 0), redisCalls: make([]APICallRecord, 0), //maxRecords: maxRecords, nextID: 1, qpsWindow: qpsWindow, } apiMonitors[endpoint] = monitor return monitor } // RecordESCall 记录 ES 调用 func (am *APIMonitor) RecordESCall(operation, keyOrIndex, query string, duration time.Duration, err error) { am.mutex.Lock() defer am.mutex.Unlock() record := APICallRecord{ ID: am.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: operation, KeyOrIndex: keyOrIndex, Query: query, } if err != nil { record.Error = err.Error() } am.esCalls = append(am.esCalls, record) //if len(am.esCalls) > am.maxRecords { // am.esCalls = am.esCalls[1:] //} } // RecordRedisCall 记录 Redis 调用 func (am *APIMonitor) RecordRedisCall(operation, keyOrIndex, query string, duration time.Duration, err error) { am.mutex.Lock() defer am.mutex.Unlock() record := APICallRecord{ ID: am.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: operation, KeyOrIndex: keyOrIndex, Query: query, } if err != nil { record.Error = err.Error() } am.redisCalls = append(am.redisCalls, record) //if len(am.redisCalls) > am.maxRecords { // am.redisCalls = am.redisCalls[1:] //} } // getNextID 获取下一个 ID func (am *APIMonitor) getNextID() int64 { id := am.nextID am.nextID++ return id } // GetStats 获取统计数据 func (am *APIMonitor) GetStats() *APIStats { am.mutex.RLock() defer am.mutex.RUnlock() now := time.Now() windowStart := now.Add(-am.qpsWindow) // 统计 ES 调用 var esRecentCalls, esSuccessCalls int64 var esTotalDuration int64 for _, call := range am.esCalls { if call.Timestamp.After(windowStart) { esRecentCalls++ } esTotalDuration += call.Duration if call.Success { esSuccessCalls++ } } // 统计 Redis 调用 var redisRecentCalls, redisSuccessCalls int64 var redisTotalDuration int64 for _, call := range am.redisCalls { if call.Timestamp.After(windowStart) { redisRecentCalls++ } redisTotalDuration += call.Duration if call.Success { redisSuccessCalls++ } } totalCalls := int64(len(am.esCalls)) + int64(len(am.redisCalls)) successCalls := esSuccessCalls + redisSuccessCalls totalDuration := esTotalDuration + redisTotalDuration // 计算 QPS(每秒调用数) windowSeconds := am.qpsWindow.Seconds() esQPS := float64(esRecentCalls) / windowSeconds redisQPS := float64(redisRecentCalls) / windowSeconds var avgDuration int64 if totalCalls > 0 { avgDuration = totalDuration / totalCalls } var successRate float64 if totalCalls > 0 { successRate = float64(successCalls) / float64(totalCalls) * 100 } return &APIStats{ Endpoint: am.endpoint, TotalCalls: totalCalls, Escalls: int64(len(am.esCalls)), RedisCalls: int64(len(am.redisCalls)), ESQPS: esQPS, RedisQPS: redisQPS, AvgDuration: avgDuration, SuccessRate: successRate, LastUpdate: now.Format("2006-01-02 15:04:05"), } } // GetRecentESCalls 获取最近的 ES 调用记录(支持分页) func (am *APIMonitor) GetRecentESCalls(page, pageSize int) ([]APICallRecord, int) { am.mutex.RLock() defer am.mutex.RUnlock() total := len(am.esCalls) if total == 0 { return []APICallRecord{}, 0 } if page <= 0 { page = 1 } if pageSize <= 0 { pageSize = 50 } if pageSize > 500 { pageSize = 500 } startIndex := (page - 1) * pageSize endIndex := startIndex + pageSize if startIndex >= total { return []APICallRecord{}, total } if endIndex > total { endIndex = total } result := make([]APICallRecord, endIndex-startIndex) copy(result, am.esCalls[startIndex:endIndex]) for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } return result, total } // GetRecentRedisCalls 获取最近的 Redis 调用记录(支持分页) func (am *APIMonitor) GetRecentRedisCalls(page, pageSize int) ([]APICallRecord, int) { am.mutex.RLock() defer am.mutex.RUnlock() total := len(am.redisCalls) if total == 0 { return []APICallRecord{}, 0 } if page <= 0 { page = 1 } if pageSize <= 0 { pageSize = 50 } if pageSize > 500 { pageSize = 500 } startIndex := (page - 1) * pageSize endIndex := startIndex + pageSize if startIndex >= total { return []APICallRecord{}, total } if endIndex > total { endIndex = total } result := make([]APICallRecord, endIndex-startIndex) copy(result, am.redisCalls[startIndex:endIndex]) for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } return result, total } // GetRedisCallByID 根据 ID 获取 Redis 调用记录 func (am *APIMonitor) GetRedisCallByID(callID int64) *APICallRecord { am.mutex.RLock() defer am.mutex.RUnlock() for i := len(am.redisCalls) - 1; i >= 0; i-- { if am.redisCalls[i].ID == callID { return &am.redisCalls[i] } } return nil } /*----------------------------------------ES-----------------------------------*/ // MonitoredESClient 带监控的 ES 客户端 type MonitoredESClient struct { client *elasticsearch.Client monitor *APIMonitor endpoint string } // NewMonitoredESClient 创建带监控的 ES 客户端 func NewMonitoredESClient(client *elasticsearch.Client, endpoint string) *MonitoredESClient { //monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute) monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute) return &MonitoredESClient{ client: client, monitor: monitor, endpoint: endpoint, } } // Search 带监控的 ES 搜索(使用 esapi.SearchRequest) func (m *MonitoredESClient) Search(ctx context.Context, req *esapi.SearchRequest) (*esapi.Response, time.Duration, error) { startTime := time.Now() indexName := "" if len(req.Index) > 0 { indexName = strings.Join(req.Index, ",") } queryBody := "" if req.Body != nil { buf := new(bytes.Buffer) buf.ReadFrom(req.Body) queryBody = buf.String() // 重新设置 Body,因为 ReadFrom 已经读取了 req.Body = io.NopCloser(buf) } req.Pretty = true resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "search", KeyOrIndex: indexName, Query: queryBody, } if err != nil { record.Error = err.Error() } if err != nil { record.Error = err.Error() } //m.monitor.RecordAPICall(record, "es") m.monitor.RecordESCall("search", indexName, queryBody, duration, err) return resp, duration, err } // RecordAPICall 通用记录方法 func (am *APIMonitor) RecordAPICall(record APICallRecord, callType string) { am.mutex.Lock() defer am.mutex.Unlock() if callType == "es" { am.esCalls = append(am.esCalls, record) //if len(am.esCalls) > am.maxRecords { // am.esCalls = am.esCalls[1:] //} } else if callType == "redis" { am.redisCalls = append(am.redisCalls, record) //if len(am.redisCalls) > am.maxRecords { // am.redisCalls = am.redisCalls[1:] //} } } // SearchWithQuery 带监控的 ES 搜索(使用查询字符串) func (m *MonitoredESClient) SearchWithQuery(ctx context.Context, index, query string) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.SearchRequest{ Index: []string{index}, Body: strings.NewReader(query), Pretty: true, } req.Pretty = true resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) m.monitor.RecordESCall("search", index, query, duration, err) return resp, duration, err } // Get 带监控的 ES Get 文档 func (m *MonitoredESClient) Get(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.GetRequest{ Index: index, DocumentID: docID, } resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) m.monitor.RecordESCall("get", index, "docID: "+docID, duration, err) return resp, duration, err } // Index 带监控的 ES Index 文档 func (m *MonitoredESClient) Index(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.IndexRequest{ Index: index, DocumentID: docID, Body: body, } resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) bodyStr := "" if body != nil { buf := new(bytes.Buffer) buf.ReadFrom(body) bodyStr = buf.String() } m.monitor.RecordESCall("index", index, bodyStr, duration, err) return resp, duration, err } // Delete 带监控的 ES Delete 文档 func (m *MonitoredESClient) Delete(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.DeleteRequest{ Index: index, DocumentID: docID, } resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) m.monitor.RecordESCall("delete", index, "docID: "+docID, duration, err) return resp, duration, err } // Update 带监控的 ES Update 文档 func (m *MonitoredESClient) Update(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.UpdateRequest{ Index: index, DocumentID: docID, Body: body, } resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) bodyStr := "" if body != nil { buf := new(bytes.Buffer) buf.ReadFrom(body) bodyStr = buf.String() } m.monitor.RecordESCall("update", index, bodyStr, duration, err) return resp, duration, err } // Count 带监控的 ES Count func (m *MonitoredESClient) Count(ctx context.Context, index string, query string) (*esapi.Response, time.Duration, error) { startTime := time.Now() req := esapi.CountRequest{ Index: []string{index}, } if query != "" { req.Body = strings.NewReader(query) } resp, err := req.Do(ctx, m.client) duration := time.Since(startTime) m.monitor.RecordESCall("count", index, query, duration, err) return resp, duration, err } // GetESCallByID 根据 ID 获取 ES 调用记录 func (am *APIMonitor) GetESCallByID(callID int64) *APICallRecord { am.mutex.RLock() defer am.mutex.RUnlock() for i := len(am.esCalls) - 1; i >= 0; i-- { if am.esCalls[i].ID == callID { return &am.esCalls[i] } } return nil } /*-----------------------------------REDIS-----------------------------------*/ // MonitoredRedisClient 带监控的 Redis 客户端 type MonitoredRedisClient struct { client *redis.Client monitor *APIMonitor endpoint string } // NewMonitoredRedisClient 创建带监控的 Redis 客户端 func NewMonitoredRedisClient(client *redis.Client, endpoint string) *MonitoredRedisClient { //monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute) monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute) return &MonitoredRedisClient{ client: client, monitor: monitor, endpoint: endpoint, } } // Get 带监控的 Redis Get 操作 func (m *MonitoredRedisClient) Get(ctx context.Context, key string) (string, time.Duration, error) { startTime := time.Now() val, err := m.client.Get(ctx, key).Result() duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "GET", KeyOrIndex: key, Query: "", Response: val, } if err != nil { record.Error = err.Error() } //m.monitor.RecordAPICall(record, "redis") m.monitor.RecordRedisCall("GET", key, "", duration, err) return val, duration, err } // Set 带监控的 Redis Set 操作 func (m *MonitoredRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, time.Duration, error) { startTime := time.Now() err := m.client.Set(ctx, key, value, expiration).Err() duration := time.Since(startTime) query := "" if str, ok := value.(string); ok { query = str } else if data, err := json.Marshal(value); err == nil { query = string(data) } record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "SET", KeyOrIndex: key, Query: query, Response: "OK", } if err != nil { record.Error = err.Error() } //m.monitor.RecordAPICall(record, "redis") m.monitor.RecordRedisCall("SET", key, query, duration, err) return "OK", duration, err } // ... existing code ... // HGet 带监控的 Redis HGet 操作 func (m *MonitoredRedisClient) HGet(ctx context.Context, key, field string) (string, time.Duration, error) { startTime := time.Now() val, err := m.client.HGet(ctx, key, field).Result() duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "HGET", KeyOrIndex: key, Query: "field: " + field, Response: val, } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return val, duration, err } // ... existing code ... // HSet 带监控的 Redis HSet 操作 func (m *MonitoredRedisClient) HSet(ctx context.Context, key, field string, value interface{}) (int64, time.Duration, error) { startTime := time.Now() result, err := m.client.HSet(ctx, key, field, value).Result() duration := time.Since(startTime) queryValue := "" if str, ok := value.(string); ok { queryValue = str } else if data, err := json.Marshal(value); err == nil { queryValue = string(data) } record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "HSET", KeyOrIndex: key, Query: fmt.Sprintf("field: %s, value: %v", field, queryValue), Response: fmt.Sprintf("%d", result), } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return result, duration, err } // ... existing code ... // Exists 带监控的 Redis Exists 操作 func (m *MonitoredRedisClient) Exists(ctx context.Context, keys ...string) (int64, time.Duration, error) { startTime := time.Now() result, err := m.client.Exists(ctx, keys...).Result() duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "EXISTS", KeyOrIndex: strings.Join(keys, ", "), Query: "", Response: fmt.Sprintf("%d", result), } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return result, duration, err } // ... existing code ... // Del 带监控的 Redis Del 操作 func (m *MonitoredRedisClient) Del(ctx context.Context, keys ...string) (int64, time.Duration, error) { startTime := time.Now() result, err := m.client.Del(ctx, keys...).Result() duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "DEL", KeyOrIndex: strings.Join(keys, ", "), Query: "", Response: fmt.Sprintf("%d", result), } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return result, duration, err } // ... existing code ... // MGet 带监控的 Redis MGet 操作 func (m *MonitoredRedisClient) MGet(ctx context.Context, keys ...string) ([]interface{}, time.Duration, error) { startTime := time.Now() result, err := m.client.MGet(ctx, keys...).Result() duration := time.Since(startTime) resultJSON := "" if data, marshalErr := json.Marshal(result); marshalErr == nil { resultJSON = string(data) } record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "MGET", KeyOrIndex: strings.Join(keys, ", "), Query: "", Response: resultJSON, } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return result, duration, err } // ... existing code ... // Ping 带监控的 Redis Ping 操作 func (m *MonitoredRedisClient) Ping(ctx context.Context) (string, time.Duration, error) { startTime := time.Now() val, err := m.client.Ping(ctx).Result() duration := time.Since(startTime) record := APICallRecord{ ID: m.monitor.getNextID(), Timestamp: time.Now(), Duration: duration.Milliseconds(), Success: err == nil, Operation: "PING", KeyOrIndex: "", Query: "PING", Response: val, } if err != nil { record.Error = err.Error() } m.monitor.RecordAPICall(record, "redis") return val, duration, err }