daShangDao_centerBook/sql_monitor.go
2026-02-28 14:27:33 +08:00

302 lines
7.6 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"database/sql"
"fmt"
"log"
"sync"
"time"
)
// SQLExecutionRecord SQL执行记录
type SQLExecutionRecord struct {
ID int64 `json:"id"`
Query string `json:"query"`
Duration int64 `json:"duration_ms"` // 执行时间(毫秒)
Timestamp time.Time `json:"timestamp"` // 执行时间戳
Success bool `json:"success"` // 是否成功
Error string `json:"error,omitempty"` // 错误信息
Endpoint string `json:"endpoint"` // 调用的接口
RowsAffected int64 `json:"rows_affected"` // 影响的行数
}
// SQLMonitor SQL监控器
type SQLMonitor struct {
records []SQLExecutionRecord
mutex sync.RWMutex
maxSize int
nextID int64
}
// 全局SQL监控器实例
var globalSQLMonitor *SQLMonitor
// InitSQLMonitor 初始化SQL监控器
func InitSQLMonitor(maxSize int) {
globalSQLMonitor = &SQLMonitor{
records: make([]SQLExecutionRecord, 0, maxSize),
maxSize: maxSize,
nextID: 1,
}
log.Printf("SQL监控器已初始化最大记录数: %d", maxSize)
}
// ExecuteWithMonitor 执行SQL并监控性能
func (sm *SQLMonitor) ExecuteWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) (*sql.Rows, error) {
startTime := time.Now()
rows, err := db.Query(query, args...)
duration := time.Since(startTime)
// 记录执行信息
record := SQLExecutionRecord{
ID: sm.getNextID(),
Query: query,
Duration: duration.Milliseconds(),
Timestamp: startTime,
Success: err == nil,
Endpoint: endpoint,
}
if err != nil {
record.Error = err.Error()
}
sm.addRecord(record)
// 打印SQL执行日志
if err != nil {
log.Printf("[SQL监控] 执行失败 - 接口: %s, 耗时: %dms, 错误: %v", endpoint, duration.Milliseconds(), err)
} else {
log.Printf("[SQL监控] 执行成功 - 接口: %s, 耗时: %dms", endpoint, duration.Milliseconds())
}
return rows, err
}
// ExecuteRowWithMonitor 执行单行查询并监控性能
func (sm *SQLMonitor) ExecuteRowWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) *sql.Row {
startTime := time.Now()
row := db.QueryRow(query, args...)
duration := time.Since(startTime)
// 记录执行信息
record := SQLExecutionRecord{
ID: sm.getNextID(),
Query: query,
Duration: duration.Milliseconds(),
Timestamp: startTime,
Success: true, // QueryRow总是成功错误在Scan时才出现
Endpoint: endpoint,
}
sm.addRecord(record)
log.Printf("[SQL监控] QueryRow执行 - 接口: %s, 耗时: %dms", endpoint, duration.Milliseconds())
return row
}
// ExecuteExecWithMonitor 执行更新/插入/删除并监控性能
func (sm *SQLMonitor) ExecuteExecWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) (sql.Result, error) {
startTime := time.Now()
result, err := db.Exec(query, args...)
duration := time.Since(startTime)
// 记录执行信息
record := SQLExecutionRecord{
ID: sm.getNextID(),
Query: query,
Duration: duration.Milliseconds(),
Timestamp: startTime,
Success: err == nil,
Endpoint: endpoint,
}
if err != nil {
record.Error = err.Error()
} else if result != nil {
if rowsAffected, rowErr := result.RowsAffected(); rowErr == nil {
record.RowsAffected = rowsAffected
}
}
sm.addRecord(record)
// 打印SQL执行日志
if err != nil {
log.Printf("[SQL监控] Exec执行失败 - 接口: %s, 耗时: %dms, 错误: %v", endpoint, duration.Milliseconds(), err)
} else {
log.Printf("[SQL监控] Exec执行成功 - 接口: %s, 耗时: %dms, 影响行数: %d", endpoint, duration.Milliseconds(), record.RowsAffected)
}
return result, err
}
// getNextID 获取下一个ID
func (sm *SQLMonitor) getNextID() int64 {
sm.mutex.Lock()
defer sm.mutex.Unlock()
id := sm.nextID
sm.nextID++
return id
}
// addRecord 添加记录
func (sm *SQLMonitor) addRecord(record SQLExecutionRecord) {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.records = append(sm.records, record)
// 保持最大记录数限制
if len(sm.records) > sm.maxSize {
sm.records = sm.records[1:]
}
}
// GetRecentRecords 获取最近的记录
func (sm *SQLMonitor) GetRecentRecords(limit int) []SQLExecutionRecord {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if limit <= 0 || limit > len(sm.records) {
limit = len(sm.records)
}
start := len(sm.records) - limit
result := make([]SQLExecutionRecord, limit)
copy(result, sm.records[start:])
// 反转数组,最新的在前面
for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 {
result[i], result[j] = result[j], result[i]
}
return result
}
// GetStats 获取统计信息
func (sm *SQLMonitor) GetStats() map[string]interface{} {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
if len(sm.records) == 0 {
return map[string]interface{}{
"total_queries": 0,
"success_queries": 0,
"failed_queries": 0,
"success_rate": 0.0,
"avg_duration_ms": 0,
"max_duration_ms": 0,
"min_duration_ms": 0,
}
}
var totalDuration int64
var successCount, failedCount int64
var maxDuration, minDuration int64
maxDuration = sm.records[0].Duration
minDuration = sm.records[0].Duration
for _, record := range sm.records {
totalDuration += record.Duration
if record.Success {
successCount++
} else {
failedCount++
}
if record.Duration > maxDuration {
maxDuration = record.Duration
}
if record.Duration < minDuration {
minDuration = record.Duration
}
}
totalQueries := int64(len(sm.records))
avgDuration := totalDuration / totalQueries
successRate := float64(successCount) / float64(totalQueries) * 100
return map[string]interface{}{
"total_queries": totalQueries,
"success_queries": successCount,
"failed_queries": failedCount,
"success_rate": fmt.Sprintf("%.2f", successRate),
"avg_duration_ms": avgDuration,
"max_duration_ms": maxDuration,
"min_duration_ms": minDuration,
"last_update": time.Now().Format("2006-01-02 15:04:05"),
}
}
// ClearRecords 清空记录
func (sm *SQLMonitor) ClearRecords() {
sm.mutex.Lock()
defer sm.mutex.Unlock()
sm.records = make([]SQLExecutionRecord, 0, sm.maxSize)
sm.nextID = 1
log.Println("[SQL监控] 记录已清空")
}
// GetSlowQueries 获取慢查询(超过指定时间的查询)
func (sm *SQLMonitor) GetSlowQueries(thresholdMs int64) []SQLExecutionRecord {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
var slowQueries []SQLExecutionRecord
for _, record := range sm.records {
if record.Duration > thresholdMs {
slowQueries = append(slowQueries, record)
}
}
return slowQueries
}
// GetFailedQueries 获取失败的查询
func (sm *SQLMonitor) GetFailedQueries() []SQLExecutionRecord {
sm.mutex.RLock()
defer sm.mutex.RUnlock()
var failedQueries []SQLExecutionRecord
for _, record := range sm.records {
if !record.Success {
failedQueries = append(failedQueries, record)
}
}
return failedQueries
}
// 便捷方法:直接使用全局监控器
func MonitorQuery(db *sql.DB, query string, endpoint string, args ...interface{}) (*sql.Rows, error) {
if globalSQLMonitor == nil {
return db.Query(query, args...)
}
return globalSQLMonitor.ExecuteWithMonitor(db, query, endpoint, args...)
}
func MonitorQueryRow(db *sql.DB, query string, endpoint string, args ...interface{}) *sql.Row {
if globalSQLMonitor == nil {
return db.QueryRow(query, args...)
}
return globalSQLMonitor.ExecuteRowWithMonitor(db, query, endpoint, args...)
}
func MonitorExec(db *sql.DB, query string, endpoint string, args ...interface{}) (sql.Result, error) {
if globalSQLMonitor == nil {
return db.Exec(query, args...)
}
return globalSQLMonitor.ExecuteExecWithMonitor(db, query, endpoint, args...)
}