daShangDao_centerBook/monitor/api_monitor.go
2026-03-17 18:01:50 +08:00

775 lines
19 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package monitor
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
"io"
"strings"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
// APIMonitor API 监控器 - 监控特定接口的 ES 和 Redis 调用
type APIMonitor struct {
endpoint string
esCalls []APICallRecord
redisCalls []APICallRecord
mutex sync.RWMutex
//maxRecords int
nextID int64
qpsWindow time.Duration // QPS 统计时间窗口
}
// APICallRecord API 调用记录
type APICallRecord struct {
ID int64 `json:"id"`
Timestamp time.Time `json:"timestamp"`
Duration int64 `json:"duration_ms"`
Success bool `json:"success"`
Error string `json:"error,omitempty"`
Operation string `json:"operation"` // ES: search/get, Redis: GET/SET/HGET
KeyOrIndex string `json:"key_or_index"` // ES 索引或 Redis Key
Query string `json:"query,omitempty"` // 查询内容
Request string `json:"request,omitempty"` // 请求内容
Response string `json:"response,omitempty"` // 响应内容
}
// APIStats API 统计数据
type APIStats struct {
Endpoint string `json:"endpoint"`
TotalCalls int64 `json:"total_calls"`
Escalls int64 `json:"es_calls"`
RedisCalls int64 `json:"redis_calls"`
ESQPS float64 `json:"es_qps"` // ES QPS
RedisQPS float64 `json:"redis_qps"` // Redis QPS
AvgDuration int64 `json:"avg_duration_ms"`
SuccessRate float64 `json:"success_rate"`
LastUpdate string `json:"last_update"`
}
// 全局监控器映射表endpoint -> APIMonitor
var (
apiMonitors = make(map[string]*APIMonitor)
apiMonitorsMu sync.RWMutex
)
// GetOrCreateAPIMonitor 获取或创建 API 监控器
func GetOrCreateAPIMonitor(endpoint string, qpsWindow time.Duration) *APIMonitor {
apiMonitorsMu.Lock()
defer apiMonitorsMu.Unlock()
if monitor, exists := apiMonitors[endpoint]; exists {
return monitor
}
monitor := &APIMonitor{
endpoint: endpoint,
esCalls: make([]APICallRecord, 0),
redisCalls: make([]APICallRecord, 0),
//maxRecords: maxRecords,
nextID: 1,
qpsWindow: qpsWindow,
}
apiMonitors[endpoint] = monitor
return monitor
}
// RecordESCall 记录 ES 调用
func (am *APIMonitor) RecordESCall(operation, keyOrIndex, query string, duration time.Duration, err error) {
am.mutex.Lock()
defer am.mutex.Unlock()
record := APICallRecord{
ID: am.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: operation,
KeyOrIndex: keyOrIndex,
Query: query,
}
if err != nil {
record.Error = err.Error()
}
am.esCalls = append(am.esCalls, record)
//if len(am.esCalls) > am.maxRecords {
// am.esCalls = am.esCalls[1:]
//}
}
// RecordRedisCall 记录 Redis 调用
func (am *APIMonitor) RecordRedisCall(operation, keyOrIndex, query string, duration time.Duration, err error) {
am.mutex.Lock()
defer am.mutex.Unlock()
record := APICallRecord{
ID: am.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: operation,
KeyOrIndex: keyOrIndex,
Query: query,
}
if err != nil {
record.Error = err.Error()
}
am.redisCalls = append(am.redisCalls, record)
//if len(am.redisCalls) > am.maxRecords {
// am.redisCalls = am.redisCalls[1:]
//}
}
// getNextID 获取下一个 ID
func (am *APIMonitor) getNextID() int64 {
id := am.nextID
am.nextID++
return id
}
// GetStats 获取统计数据
func (am *APIMonitor) GetStats() *APIStats {
am.mutex.RLock()
defer am.mutex.RUnlock()
now := time.Now()
windowStart := now.Add(-am.qpsWindow)
// 统计 ES 调用
var esRecentCalls, esSuccessCalls int64
var esTotalDuration int64
for _, call := range am.esCalls {
if call.Timestamp.After(windowStart) {
esRecentCalls++
}
esTotalDuration += call.Duration
if call.Success {
esSuccessCalls++
}
}
// 统计 Redis 调用
var redisRecentCalls, redisSuccessCalls int64
var redisTotalDuration int64
for _, call := range am.redisCalls {
if call.Timestamp.After(windowStart) {
redisRecentCalls++
}
redisTotalDuration += call.Duration
if call.Success {
redisSuccessCalls++
}
}
totalCalls := int64(len(am.esCalls)) + int64(len(am.redisCalls))
successCalls := esSuccessCalls + redisSuccessCalls
totalDuration := esTotalDuration + redisTotalDuration
// 计算 QPS每秒调用数
windowSeconds := am.qpsWindow.Seconds()
esQPS := float64(esRecentCalls) / windowSeconds
redisQPS := float64(redisRecentCalls) / windowSeconds
var avgDuration int64
if totalCalls > 0 {
avgDuration = totalDuration / totalCalls
}
var successRate float64
if totalCalls > 0 {
successRate = float64(successCalls) / float64(totalCalls) * 100
}
return &APIStats{
Endpoint: am.endpoint,
TotalCalls: totalCalls,
Escalls: int64(len(am.esCalls)),
RedisCalls: int64(len(am.redisCalls)),
ESQPS: esQPS,
RedisQPS: redisQPS,
AvgDuration: avgDuration,
SuccessRate: successRate,
LastUpdate: now.Format("2006-01-02 15:04:05"),
}
}
// GetRecentESCalls 获取最近的 ES 调用记录(支持分页)
func (am *APIMonitor) GetRecentESCalls(page, pageSize int) ([]APICallRecord, int) {
am.mutex.RLock()
defer am.mutex.RUnlock()
total := len(am.esCalls)
if total == 0 {
return []APICallRecord{}, 0
}
if page <= 0 {
page = 1
}
if pageSize <= 0 {
pageSize = 50
}
if pageSize > 500 {
pageSize = 500
}
startIndex := (page - 1) * pageSize
endIndex := startIndex + pageSize
if startIndex >= total {
return []APICallRecord{}, total
}
if endIndex > total {
endIndex = total
}
result := make([]APICallRecord, endIndex-startIndex)
copy(result, am.esCalls[startIndex:endIndex])
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result, total
}
// GetRecentRedisCalls 获取最近的 Redis 调用记录(支持分页)
func (am *APIMonitor) GetRecentRedisCalls(page, pageSize int) ([]APICallRecord, int) {
am.mutex.RLock()
defer am.mutex.RUnlock()
total := len(am.redisCalls)
if total == 0 {
return []APICallRecord{}, 0
}
if page <= 0 {
page = 1
}
if pageSize <= 0 {
pageSize = 50
}
if pageSize > 500 {
pageSize = 500
}
startIndex := (page - 1) * pageSize
endIndex := startIndex + pageSize
if startIndex >= total {
return []APICallRecord{}, total
}
if endIndex > total {
endIndex = total
}
result := make([]APICallRecord, endIndex-startIndex)
copy(result, am.redisCalls[startIndex:endIndex])
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result, total
}
// GetRedisCallByID 根据 ID 获取 Redis 调用记录
func (am *APIMonitor) GetRedisCallByID(callID int64) *APICallRecord {
am.mutex.RLock()
defer am.mutex.RUnlock()
for i := len(am.redisCalls) - 1; i >= 0; i-- {
if am.redisCalls[i].ID == callID {
return &am.redisCalls[i]
}
}
return nil
}
/*----------------------------------------ES-----------------------------------*/
// MonitoredESClient 带监控的 ES 客户端
type MonitoredESClient struct {
client *elasticsearch.Client
monitor *APIMonitor
endpoint string
}
// NewMonitoredESClient 创建带监控的 ES 客户端
func NewMonitoredESClient(client *elasticsearch.Client, endpoint string) *MonitoredESClient {
//monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute)
monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute)
return &MonitoredESClient{
client: client,
monitor: monitor,
endpoint: endpoint,
}
}
// Search 带监控的 ES 搜索(使用 esapi.SearchRequest
func (m *MonitoredESClient) Search(ctx context.Context, req *esapi.SearchRequest) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
indexName := ""
if len(req.Index) > 0 {
indexName = strings.Join(req.Index, ",")
}
queryBody := ""
if req.Body != nil {
buf := new(bytes.Buffer)
buf.ReadFrom(req.Body)
queryBody = buf.String()
// 重新设置 Body因为 ReadFrom 已经读取了
req.Body = io.NopCloser(buf)
}
req.Pretty = true
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "search",
KeyOrIndex: indexName,
Query: queryBody,
}
if err != nil {
record.Error = err.Error()
}
if err != nil {
record.Error = err.Error()
}
//m.monitor.RecordAPICall(record, "es")
m.monitor.RecordESCall("search", indexName, queryBody, duration, err)
return resp, duration, err
}
// RecordAPICall 通用记录方法
func (am *APIMonitor) RecordAPICall(record APICallRecord, callType string) {
am.mutex.Lock()
defer am.mutex.Unlock()
if callType == "es" {
am.esCalls = append(am.esCalls, record)
//if len(am.esCalls) > am.maxRecords {
// am.esCalls = am.esCalls[1:]
//}
} else if callType == "redis" {
am.redisCalls = append(am.redisCalls, record)
//if len(am.redisCalls) > am.maxRecords {
// am.redisCalls = am.redisCalls[1:]
//}
}
}
// SearchWithQuery 带监控的 ES 搜索(使用查询字符串)
func (m *MonitoredESClient) SearchWithQuery(ctx context.Context, index, query string) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.SearchRequest{
Index: []string{index},
Body: strings.NewReader(query),
Pretty: true,
}
req.Pretty = true
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
m.monitor.RecordESCall("search", index, query, duration, err)
return resp, duration, err
}
// Get 带监控的 ES Get 文档
func (m *MonitoredESClient) Get(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.GetRequest{
Index: index,
DocumentID: docID,
}
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
m.monitor.RecordESCall("get", index, "docID: "+docID, duration, err)
return resp, duration, err
}
// Index 带监控的 ES Index 文档
func (m *MonitoredESClient) Index(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.IndexRequest{
Index: index,
DocumentID: docID,
Body: body,
}
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
bodyStr := ""
if body != nil {
buf := new(bytes.Buffer)
buf.ReadFrom(body)
bodyStr = buf.String()
}
m.monitor.RecordESCall("index", index, bodyStr, duration, err)
return resp, duration, err
}
// Delete 带监控的 ES Delete 文档
func (m *MonitoredESClient) Delete(ctx context.Context, index, docID string) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.DeleteRequest{
Index: index,
DocumentID: docID,
}
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
m.monitor.RecordESCall("delete", index, "docID: "+docID, duration, err)
return resp, duration, err
}
// Update 带监控的 ES Update 文档
func (m *MonitoredESClient) Update(ctx context.Context, index, docID string, body io.Reader) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.UpdateRequest{
Index: index,
DocumentID: docID,
Body: body,
}
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
bodyStr := ""
if body != nil {
buf := new(bytes.Buffer)
buf.ReadFrom(body)
bodyStr = buf.String()
}
m.monitor.RecordESCall("update", index, bodyStr, duration, err)
return resp, duration, err
}
// Count 带监控的 ES Count
func (m *MonitoredESClient) Count(ctx context.Context, index string, query string) (*esapi.Response, time.Duration, error) {
startTime := time.Now()
req := esapi.CountRequest{
Index: []string{index},
}
if query != "" {
req.Body = strings.NewReader(query)
}
resp, err := req.Do(ctx, m.client)
duration := time.Since(startTime)
m.monitor.RecordESCall("count", index, query, duration, err)
return resp, duration, err
}
// GetESCallByID 根据 ID 获取 ES 调用记录
func (am *APIMonitor) GetESCallByID(callID int64) *APICallRecord {
am.mutex.RLock()
defer am.mutex.RUnlock()
for i := len(am.esCalls) - 1; i >= 0; i-- {
if am.esCalls[i].ID == callID {
return &am.esCalls[i]
}
}
return nil
}
/*-----------------------------------REDIS-----------------------------------*/
// MonitoredRedisClient 带监控的 Redis 客户端
type MonitoredRedisClient struct {
client *redis.Client
monitor *APIMonitor
endpoint string
}
// NewMonitoredRedisClient 创建带监控的 Redis 客户端
func NewMonitoredRedisClient(client *redis.Client, endpoint string) *MonitoredRedisClient {
//monitor := GetOrCreateAPIMonitor(endpoint, 1000, 1*time.Minute)
monitor := GetOrCreateAPIMonitor(endpoint, 1*time.Minute)
return &MonitoredRedisClient{
client: client,
monitor: monitor,
endpoint: endpoint,
}
}
// Get 带监控的 Redis Get 操作
func (m *MonitoredRedisClient) Get(ctx context.Context, key string) (string, time.Duration, error) {
startTime := time.Now()
val, err := m.client.Get(ctx, key).Result()
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "GET",
KeyOrIndex: key,
Query: "",
Response: val,
}
if err != nil {
record.Error = err.Error()
}
//m.monitor.RecordAPICall(record, "redis")
m.monitor.RecordRedisCall("GET", key, "", duration, err)
return val, duration, err
}
// Set 带监控的 Redis Set 操作
func (m *MonitoredRedisClient) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) (string, time.Duration, error) {
startTime := time.Now()
err := m.client.Set(ctx, key, value, expiration).Err()
duration := time.Since(startTime)
query := ""
if str, ok := value.(string); ok {
query = str
} else if data, err := json.Marshal(value); err == nil {
query = string(data)
}
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "SET",
KeyOrIndex: key,
Query: query,
Response: "OK",
}
if err != nil {
record.Error = err.Error()
}
//m.monitor.RecordAPICall(record, "redis")
m.monitor.RecordRedisCall("SET", key, query, duration, err)
return "OK", duration, err
}
// ... existing code ...
// HGet 带监控的 Redis HGet 操作
func (m *MonitoredRedisClient) HGet(ctx context.Context, key, field string) (string, time.Duration, error) {
startTime := time.Now()
val, err := m.client.HGet(ctx, key, field).Result()
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "HGET",
KeyOrIndex: key,
Query: "field: " + field,
Response: val,
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return val, duration, err
}
// ... existing code ...
// HSet 带监控的 Redis HSet 操作
func (m *MonitoredRedisClient) HSet(ctx context.Context, key, field string, value interface{}) (int64, time.Duration, error) {
startTime := time.Now()
result, err := m.client.HSet(ctx, key, field, value).Result()
duration := time.Since(startTime)
queryValue := ""
if str, ok := value.(string); ok {
queryValue = str
} else if data, err := json.Marshal(value); err == nil {
queryValue = string(data)
}
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "HSET",
KeyOrIndex: key,
Query: fmt.Sprintf("field: %s, value: %v", field, queryValue),
Response: fmt.Sprintf("%d", result),
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return result, duration, err
}
// ... existing code ...
// Exists 带监控的 Redis Exists 操作
func (m *MonitoredRedisClient) Exists(ctx context.Context, keys ...string) (int64, time.Duration, error) {
startTime := time.Now()
result, err := m.client.Exists(ctx, keys...).Result()
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "EXISTS",
KeyOrIndex: strings.Join(keys, ", "),
Query: "",
Response: fmt.Sprintf("%d", result),
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return result, duration, err
}
// ... existing code ...
// Del 带监控的 Redis Del 操作
func (m *MonitoredRedisClient) Del(ctx context.Context, keys ...string) (int64, time.Duration, error) {
startTime := time.Now()
result, err := m.client.Del(ctx, keys...).Result()
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "DEL",
KeyOrIndex: strings.Join(keys, ", "),
Query: "",
Response: fmt.Sprintf("%d", result),
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return result, duration, err
}
// ... existing code ...
// MGet 带监控的 Redis MGet 操作
func (m *MonitoredRedisClient) MGet(ctx context.Context, keys ...string) ([]interface{}, time.Duration, error) {
startTime := time.Now()
result, err := m.client.MGet(ctx, keys...).Result()
duration := time.Since(startTime)
resultJSON := ""
if data, marshalErr := json.Marshal(result); marshalErr == nil {
resultJSON = string(data)
}
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "MGET",
KeyOrIndex: strings.Join(keys, ", "),
Query: "",
Response: resultJSON,
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return result, duration, err
}
// ... existing code ...
// Ping 带监控的 Redis Ping 操作
func (m *MonitoredRedisClient) Ping(ctx context.Context) (string, time.Duration, error) {
startTime := time.Now()
val, err := m.client.Ping(ctx).Result()
duration := time.Since(startTime)
record := APICallRecord{
ID: m.monitor.getNextID(),
Timestamp: time.Now(),
Duration: duration.Milliseconds(),
Success: err == nil,
Operation: "PING",
KeyOrIndex: "",
Query: "PING",
Response: val,
}
if err != nil {
record.Error = err.Error()
}
m.monitor.RecordAPICall(record, "redis")
return val, duration, err
}