775 lines
19 KiB
Go
775 lines
19 KiB
Go
package monitor
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"github.com/elastic/go-elasticsearch/v8"
|
||
"github.com/elastic/go-elasticsearch/v8/esapi"
|
||
"io"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-redis/redis/v8"
|
||
)
|
||
|
||
// APIMonitor API 监控器 - 监控特定接口的 ES 和 Redis 调用
|
||
type APIMonitor struct {
|
||
endpoint string
|
||
esCalls []APICallRecord
|
||
redisCalls []APICallRecord
|
||
mutex sync.RWMutex
|
||
//maxRecords int
|
||
nextID int64
|
||
qpsWindow time.Duration // QPS 统计时间窗口
|
||
}
|
||
|
||
// APICallRecord API 调用记录
|
||
type APICallRecord struct {
|
||
ID int64 `json:"id"`
|
||
Timestamp time.Time `json:"timestamp"`
|
||
Duration int64 `json:"duration_ms"`
|
||
Success bool `json:"success"`
|
||
Error string `json:"error,omitempty"`
|
||
Operation string `json:"operation"` // ES: search/get, Redis: GET/SET/HGET
|
||
KeyOrIndex string `json:"key_or_index"` // ES 索引或 Redis Key
|
||
Query string `json:"query,omitempty"` // 查询内容
|
||
Request string `json:"request,omitempty"` // 请求内容
|
||
Response string `json:"response,omitempty"` // 响应内容
|
||
}
|
||
|
||
// APIStats API 统计数据
|
||
type APIStats struct {
|
||
Endpoint string `json:"endpoint"`
|
||
TotalCalls int64 `json:"total_calls"`
|
||
Escalls int64 `json:"es_calls"`
|
||
RedisCalls int64 `json:"redis_calls"`
|
||
ESQPS float64 `json:"es_qps"` // ES QPS
|
||
RedisQPS float64 `json:"redis_qps"` // Redis QPS
|
||
AvgDuration int64 `json:"avg_duration_ms"`
|
||
SuccessRate float64 `json:"success_rate"`
|
||
LastUpdate string `json:"last_update"`
|
||
}
|
||
|
||
// 全局监控器映射表:endpoint -> APIMonitor
|
||
var (
|
||
apiMonitors = make(map[string]*APIMonitor)
|
||
apiMonitorsMu sync.RWMutex
|
||
)
|
||
|
||
// GetOrCreateAPIMonitor 获取或创建 API 监控器
|
||
func GetOrCreateAPIMonitor(endpoint string, qpsWindow time.Duration) *APIMonitor {
|
||
apiMonitorsMu.Lock()
|
||
defer apiMonitorsMu.Unlock()
|
||
|
||
if monitor, exists := apiMonitors[endpoint]; exists {
|
||
return monitor
|
||
}
|
||
|
||
monitor := &APIMonitor{
|
||
endpoint: endpoint,
|
||
esCalls: make([]APICallRecord, 0),
|
||
redisCalls: make([]APICallRecord, 0),
|
||
//maxRecords: maxRecords,
|
||
nextID: 1,
|
||
qpsWindow: qpsWindow,
|
||
}
|
||
apiMonitors[endpoint] = monitor
|
||
return monitor
|
||
}
|
||
|
||
// RecordESCall 记录 ES 调用
|
||
func (am *APIMonitor) RecordESCall(operation, keyOrIndex, query string, duration time.Duration, err error) {
|
||
am.mutex.Lock()
|
||
defer am.mutex.Unlock()
|
||
|
||
record := APICallRecord{
|
||
ID: am.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: operation,
|
||
KeyOrIndex: keyOrIndex,
|
||
Query: query,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
am.esCalls = append(am.esCalls, record)
|
||
//if len(am.esCalls) > am.maxRecords {
|
||
// am.esCalls = am.esCalls[1:]
|
||
//}
|
||
}
|
||
|
||
// RecordRedisCall 记录 Redis 调用
|
||
func (am *APIMonitor) RecordRedisCall(operation, keyOrIndex, query string, duration time.Duration, err error) {
|
||
am.mutex.Lock()
|
||
defer am.mutex.Unlock()
|
||
|
||
record := APICallRecord{
|
||
ID: am.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: operation,
|
||
KeyOrIndex: keyOrIndex,
|
||
Query: query,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
am.redisCalls = append(am.redisCalls, record)
|
||
//if len(am.redisCalls) > am.maxRecords {
|
||
// am.redisCalls = am.redisCalls[1:]
|
||
//}
|
||
}
|
||
|
||
// getNextID 获取下一个 ID
|
||
func (am *APIMonitor) getNextID() int64 {
|
||
id := am.nextID
|
||
am.nextID++
|
||
return id
|
||
}
|
||
|
||
// GetStats 获取统计数据
|
||
func (am *APIMonitor) GetStats() *APIStats {
|
||
am.mutex.RLock()
|
||
defer am.mutex.RUnlock()
|
||
|
||
now := time.Now()
|
||
windowStart := now.Add(-am.qpsWindow)
|
||
|
||
// 统计 ES 调用
|
||
var esRecentCalls, esSuccessCalls int64
|
||
var esTotalDuration int64
|
||
for _, call := range am.esCalls {
|
||
if call.Timestamp.After(windowStart) {
|
||
esRecentCalls++
|
||
}
|
||
esTotalDuration += call.Duration
|
||
if call.Success {
|
||
esSuccessCalls++
|
||
}
|
||
}
|
||
|
||
// 统计 Redis 调用
|
||
var redisRecentCalls, redisSuccessCalls int64
|
||
var redisTotalDuration int64
|
||
for _, call := range am.redisCalls {
|
||
if call.Timestamp.After(windowStart) {
|
||
redisRecentCalls++
|
||
}
|
||
redisTotalDuration += call.Duration
|
||
if call.Success {
|
||
redisSuccessCalls++
|
||
}
|
||
}
|
||
|
||
totalCalls := int64(len(am.esCalls)) + int64(len(am.redisCalls))
|
||
successCalls := esSuccessCalls + redisSuccessCalls
|
||
totalDuration := esTotalDuration + redisTotalDuration
|
||
|
||
// 计算 QPS(每秒调用数)
|
||
windowSeconds := am.qpsWindow.Seconds()
|
||
esQPS := float64(esRecentCalls) / windowSeconds
|
||
redisQPS := float64(redisRecentCalls) / windowSeconds
|
||
|
||
var avgDuration int64
|
||
if totalCalls > 0 {
|
||
avgDuration = totalDuration / totalCalls
|
||
}
|
||
|
||
var successRate float64
|
||
if totalCalls > 0 {
|
||
successRate = float64(successCalls) / float64(totalCalls) * 100
|
||
}
|
||
|
||
return &APIStats{
|
||
Endpoint: am.endpoint,
|
||
TotalCalls: totalCalls,
|
||
Escalls: int64(len(am.esCalls)),
|
||
RedisCalls: int64(len(am.redisCalls)),
|
||
ESQPS: esQPS,
|
||
RedisQPS: redisQPS,
|
||
AvgDuration: avgDuration,
|
||
SuccessRate: successRate,
|
||
LastUpdate: now.Format("2006-01-02 15:04:05"),
|
||
}
|
||
}
|
||
|
||
// GetRecentESCalls 获取最近的 ES 调用记录(支持分页)
|
||
func (am *APIMonitor) GetRecentESCalls(page, pageSize int) ([]APICallRecord, int) {
|
||
am.mutex.RLock()
|
||
defer am.mutex.RUnlock()
|
||
|
||
total := len(am.esCalls)
|
||
if total == 0 {
|
||
return []APICallRecord{}, 0
|
||
}
|
||
|
||
if page <= 0 {
|
||
page = 1
|
||
}
|
||
if pageSize <= 0 {
|
||
pageSize = 50
|
||
}
|
||
if pageSize > 500 {
|
||
pageSize = 500
|
||
}
|
||
|
||
startIndex := (page - 1) * pageSize
|
||
endIndex := startIndex + pageSize
|
||
|
||
if startIndex >= total {
|
||
return []APICallRecord{}, total
|
||
}
|
||
if endIndex > total {
|
||
endIndex = total
|
||
}
|
||
|
||
result := make([]APICallRecord, endIndex-startIndex)
|
||
copy(result, am.esCalls[startIndex:endIndex])
|
||
|
||
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
|
||
result[i], result[j] = result[j], result[i]
|
||
}
|
||
|
||
return result, total
|
||
}
|
||
|
||
// GetRecentRedisCalls 获取最近的 Redis 调用记录(支持分页)
|
||
func (am *APIMonitor) GetRecentRedisCalls(page, pageSize int) ([]APICallRecord, int) {
|
||
am.mutex.RLock()
|
||
defer am.mutex.RUnlock()
|
||
|
||
total := len(am.redisCalls)
|
||
if total == 0 {
|
||
return []APICallRecord{}, 0
|
||
}
|
||
|
||
if page <= 0 {
|
||
page = 1
|
||
}
|
||
if pageSize <= 0 {
|
||
pageSize = 50
|
||
}
|
||
if pageSize > 500 {
|
||
pageSize = 500
|
||
}
|
||
|
||
startIndex := (page - 1) * pageSize
|
||
endIndex := startIndex + pageSize
|
||
|
||
if startIndex >= total {
|
||
return []APICallRecord{}, total
|
||
}
|
||
if endIndex > total {
|
||
endIndex = total
|
||
}
|
||
|
||
result := make([]APICallRecord, endIndex-startIndex)
|
||
copy(result, am.redisCalls[startIndex:endIndex])
|
||
|
||
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
|
||
result[i], result[j] = result[j], result[i]
|
||
}
|
||
|
||
return result, total
|
||
}
|
||
|
||
// GetRedisCallByID 根据 ID 获取 Redis 调用记录
|
||
func (am *APIMonitor) GetRedisCallByID(callID int64) *APICallRecord {
|
||
am.mutex.RLock()
|
||
defer am.mutex.RUnlock()
|
||
|
||
for i := len(am.redisCalls) - 1; i >= 0; i-- {
|
||
if am.redisCalls[i].ID == callID {
|
||
return &am.redisCalls[i]
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
/*----------------------------------------ES-----------------------------------*/
|
||
// MonitoredESClient 带监控的 ES 客户端
|
||
type MonitoredESClient struct {
|
||
client *elasticsearch.Client
|
||
monitor *APIMonitor
|
||
endpoint string
|
||
}
|
||
|
||
// NewMonitoredESClient 创建带监控的 ES 客户端
|
||
func NewMonitoredESClient(client *elasticsearch.Client, endpoint string) *MonitoredESClient {
|
||
//monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute)
|
||
monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute)
|
||
return &MonitoredESClient{
|
||
client: client,
|
||
monitor: monitor,
|
||
endpoint: endpoint,
|
||
}
|
||
}
|
||
|
||
// Search 带监控的 ES 搜索(使用 esapi.SearchRequest)
|
||
func (m *MonitoredESClient) Search(ctx context.Context, req *esapi.SearchRequest) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
indexName := ""
|
||
if len(req.Index) > 0 {
|
||
indexName = strings.Join(req.Index, ",")
|
||
}
|
||
|
||
queryBody := ""
|
||
if req.Body != nil {
|
||
buf := new(bytes.Buffer)
|
||
buf.ReadFrom(req.Body)
|
||
queryBody = buf.String()
|
||
// 重新设置 Body,因为 ReadFrom 已经读取了
|
||
req.Body = io.NopCloser(buf)
|
||
}
|
||
|
||
req.Pretty = true
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "search",
|
||
KeyOrIndex: indexName,
|
||
Query: queryBody,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
//m.monitor.RecordAPICall(record, "es")
|
||
m.monitor.RecordESCall("search", indexName, queryBody, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// RecordAPICall 通用记录方法
|
||
func (am *APIMonitor) RecordAPICall(record APICallRecord, callType string) {
|
||
am.mutex.Lock()
|
||
defer am.mutex.Unlock()
|
||
|
||
if callType == "es" {
|
||
am.esCalls = append(am.esCalls, record)
|
||
//if len(am.esCalls) > am.maxRecords {
|
||
// am.esCalls = am.esCalls[1:]
|
||
//}
|
||
} else if callType == "redis" {
|
||
am.redisCalls = append(am.redisCalls, record)
|
||
//if len(am.redisCalls) > am.maxRecords {
|
||
// am.redisCalls = am.redisCalls[1:]
|
||
//}
|
||
}
|
||
}
|
||
|
||
// SearchWithQuery 带监控的 ES 搜索(使用查询字符串)
|
||
func (m *MonitoredESClient) SearchWithQuery(ctx context.Context, index, query string) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.SearchRequest{
|
||
Index: []string{index},
|
||
Body: strings.NewReader(query),
|
||
Pretty: true,
|
||
}
|
||
|
||
req.Pretty = true
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
m.monitor.RecordESCall("search", index, query, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// Get 带监控的 ES Get 文档
|
||
func (m *MonitoredESClient) Get(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.GetRequest{
|
||
Index: index,
|
||
DocumentID: docID,
|
||
}
|
||
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
m.monitor.RecordESCall("get", index, "docID: "+docID, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// Index 带监控的 ES Index 文档
|
||
func (m *MonitoredESClient) Index(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.IndexRequest{
|
||
Index: index,
|
||
DocumentID: docID,
|
||
Body: body,
|
||
}
|
||
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
bodyStr := ""
|
||
if body != nil {
|
||
buf := new(bytes.Buffer)
|
||
buf.ReadFrom(body)
|
||
bodyStr = buf.String()
|
||
}
|
||
|
||
m.monitor.RecordESCall("index", index, bodyStr, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// Delete 带监控的 ES Delete 文档
|
||
func (m *MonitoredESClient) Delete(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.DeleteRequest{
|
||
Index: index,
|
||
DocumentID: docID,
|
||
}
|
||
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
m.monitor.RecordESCall("delete", index, "docID: "+docID, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// Update 带监控的 ES Update 文档
|
||
func (m *MonitoredESClient) Update(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.UpdateRequest{
|
||
Index: index,
|
||
DocumentID: docID,
|
||
Body: body,
|
||
}
|
||
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
bodyStr := ""
|
||
if body != nil {
|
||
buf := new(bytes.Buffer)
|
||
buf.ReadFrom(body)
|
||
bodyStr = buf.String()
|
||
}
|
||
|
||
m.monitor.RecordESCall("update", index, bodyStr, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// Count 带监控的 ES Count
|
||
func (m *MonitoredESClient) Count(ctx context.Context, index string, query string) (*esapi.Response, time.Duration, error) {
|
||
startTime := time.Now()
|
||
|
||
req := esapi.CountRequest{
|
||
Index: []string{index},
|
||
}
|
||
|
||
if query != "" {
|
||
req.Body = strings.NewReader(query)
|
||
}
|
||
|
||
resp, err := req.Do(ctx, m.client)
|
||
duration := time.Since(startTime)
|
||
|
||
m.monitor.RecordESCall("count", index, query, duration, err)
|
||
|
||
return resp, duration, err
|
||
}
|
||
|
||
// GetESCallByID 根据 ID 获取 ES 调用记录
|
||
func (am *APIMonitor) GetESCallByID(callID int64) *APICallRecord {
|
||
am.mutex.RLock()
|
||
defer am.mutex.RUnlock()
|
||
|
||
for i := len(am.esCalls) - 1; i >= 0; i-- {
|
||
if am.esCalls[i].ID == callID {
|
||
return &am.esCalls[i]
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
/*-----------------------------------REDIS-----------------------------------*/
|
||
// MonitoredRedisClient 带监控的 Redis 客户端
|
||
type MonitoredRedisClient struct {
|
||
client *redis.Client
|
||
monitor *APIMonitor
|
||
endpoint string
|
||
}
|
||
|
||
// NewMonitoredRedisClient 创建带监控的 Redis 客户端
|
||
func NewMonitoredRedisClient(client *redis.Client, endpoint string) *MonitoredRedisClient {
|
||
//monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute)
|
||
monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute)
|
||
return &MonitoredRedisClient{
|
||
client: client,
|
||
monitor: monitor,
|
||
endpoint: endpoint,
|
||
}
|
||
}
|
||
|
||
// Get 带监控的 Redis Get 操作
|
||
func (m *MonitoredRedisClient) Get(ctx context.Context, key string) (string, time.Duration, error) {
|
||
startTime := time.Now()
|
||
val, err := m.client.Get(ctx, key).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "GET",
|
||
KeyOrIndex: key,
|
||
Query: "",
|
||
Response: val,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
//m.monitor.RecordAPICall(record, "redis")
|
||
m.monitor.RecordRedisCall("GET", key, "", duration, err)
|
||
return val, duration, err
|
||
}
|
||
|
||
// Set 带监控的 Redis Set 操作
|
||
func (m *MonitoredRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, time.Duration, error) {
|
||
startTime := time.Now()
|
||
err := m.client.Set(ctx, key, value, expiration).Err()
|
||
duration := time.Since(startTime)
|
||
|
||
query := ""
|
||
if str, ok := value.(string); ok {
|
||
query = str
|
||
} else if data, err := json.Marshal(value); err == nil {
|
||
query = string(data)
|
||
}
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "SET",
|
||
KeyOrIndex: key,
|
||
Query: query,
|
||
Response: "OK",
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
//m.monitor.RecordAPICall(record, "redis")
|
||
m.monitor.RecordRedisCall("SET", key, query, duration, err)
|
||
|
||
return "OK", duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// HGet 带监控的 Redis HGet 操作
|
||
func (m *MonitoredRedisClient) HGet(ctx context.Context, key, field string) (string, time.Duration, error) {
|
||
startTime := time.Now()
|
||
val, err := m.client.HGet(ctx, key, field).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "HGET",
|
||
KeyOrIndex: key,
|
||
Query: "field: " + field,
|
||
Response: val,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return val, duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// HSet 带监控的 Redis HSet 操作
|
||
func (m *MonitoredRedisClient) HSet(ctx context.Context, key, field string, value interface{}) (int64, time.Duration, error) {
|
||
startTime := time.Now()
|
||
result, err := m.client.HSet(ctx, key, field, value).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
queryValue := ""
|
||
if str, ok := value.(string); ok {
|
||
queryValue = str
|
||
} else if data, err := json.Marshal(value); err == nil {
|
||
queryValue = string(data)
|
||
}
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "HSET",
|
||
KeyOrIndex: key,
|
||
Query: fmt.Sprintf("field: %s, value: %v", field, queryValue),
|
||
Response: fmt.Sprintf("%d", result),
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return result, duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// Exists 带监控的 Redis Exists 操作
|
||
func (m *MonitoredRedisClient) Exists(ctx context.Context, keys ...string) (int64, time.Duration, error) {
|
||
startTime := time.Now()
|
||
result, err := m.client.Exists(ctx, keys...).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "EXISTS",
|
||
KeyOrIndex: strings.Join(keys, ", "),
|
||
Query: "",
|
||
Response: fmt.Sprintf("%d", result),
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return result, duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// Del 带监控的 Redis Del 操作
|
||
func (m *MonitoredRedisClient) Del(ctx context.Context, keys ...string) (int64, time.Duration, error) {
|
||
startTime := time.Now()
|
||
result, err := m.client.Del(ctx, keys...).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "DEL",
|
||
KeyOrIndex: strings.Join(keys, ", "),
|
||
Query: "",
|
||
Response: fmt.Sprintf("%d", result),
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return result, duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// MGet 带监控的 Redis MGet 操作
|
||
func (m *MonitoredRedisClient) MGet(ctx context.Context, keys ...string) ([]interface{}, time.Duration, error) {
|
||
startTime := time.Now()
|
||
result, err := m.client.MGet(ctx, keys...).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
resultJSON := ""
|
||
if data, marshalErr := json.Marshal(result); marshalErr == nil {
|
||
resultJSON = string(data)
|
||
}
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "MGET",
|
||
KeyOrIndex: strings.Join(keys, ", "),
|
||
Query: "",
|
||
Response: resultJSON,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return result, duration, err
|
||
}
|
||
|
||
// ... existing code ...
|
||
|
||
// Ping 带监控的 Redis Ping 操作
|
||
func (m *MonitoredRedisClient) Ping(ctx context.Context) (string, time.Duration, error) {
|
||
startTime := time.Now()
|
||
val, err := m.client.Ping(ctx).Result()
|
||
duration := time.Since(startTime)
|
||
|
||
record := APICallRecord{
|
||
ID: m.monitor.getNextID(),
|
||
Timestamp: time.Now(),
|
||
Duration: duration.Milliseconds(),
|
||
Success: err == nil,
|
||
Operation: "PING",
|
||
KeyOrIndex: "",
|
||
Query: "PING",
|
||
Response: val,
|
||
}
|
||
|
||
if err != nil {
|
||
record.Error = err.Error()
|
||
}
|
||
|
||
m.monitor.RecordAPICall(record, "redis")
|
||
|
||
return val, duration, err
|
||
}
|