package main import ( "database/sql" "fmt" "log" "sync" "time" ) // SQLExecutionRecord SQL执行记录 type SQLExecutionRecord struct { ID int64 `json:"id"` Query string `json:"query"` Duration int64 `json:"duration_ms"` // 执行时间(毫秒) Timestamp time.Time `json:"timestamp"` // 执行时间戳 Success bool `json:"success"` // 是否成功 Error string `json:"error,omitempty"` // 错误信息 Endpoint string `json:"endpoint"` // 调用的接口 RowsAffected int64 `json:"rows_affected"` // 影响的行数 } // SQLMonitor SQL监控器 type SQLMonitor struct { records []SQLExecutionRecord mutex sync.RWMutex maxSize int nextID int64 } // 全局SQL监控器实例 var globalSQLMonitor *SQLMonitor // InitSQLMonitor 初始化SQL监控器 func InitSQLMonitor(maxSize int) { globalSQLMonitor = &SQLMonitor{ records: make([]SQLExecutionRecord, 0, maxSize), maxSize: maxSize, nextID: 1, } log.Printf("SQL监控器已初始化,最大记录数: %d", maxSize) } // ExecuteWithMonitor 执行SQL并监控性能 func (sm *SQLMonitor) ExecuteWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) (*sql.Rows, error) { startTime := time.Now() rows, err := db.Query(query, args...) duration := time.Since(startTime) // 记录执行信息 record := SQLExecutionRecord{ ID: sm.getNextID(), Query: query, Duration: duration.Milliseconds(), Timestamp: startTime, Success: err == nil, Endpoint: endpoint, } if err != nil { record.Error = err.Error() } sm.addRecord(record) // 打印SQL执行日志 if err != nil { log.Printf("[SQL监控] 执行失败 - 接口: %s, 耗时: %dms, 错误: %v", endpoint, duration.Milliseconds(), err) } else { log.Printf("[SQL监控] 执行成功 - 接口: %s, 耗时: %dms", endpoint, duration.Milliseconds()) } return rows, err } // ExecuteRowWithMonitor 执行单行查询并监控性能 func (sm *SQLMonitor) ExecuteRowWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) *sql.Row { startTime := time.Now() row := db.QueryRow(query, args...) duration := time.Since(startTime) // 记录执行信息 record := SQLExecutionRecord{ ID: sm.getNextID(), Query: query, Duration: duration.Milliseconds(), Timestamp: startTime, Success: true, // QueryRow总是成功,错误在Scan时才出现 Endpoint: endpoint, } sm.addRecord(record) log.Printf("[SQL监控] QueryRow执行 - 接口: %s, 耗时: %dms", endpoint, duration.Milliseconds()) return row } // ExecuteExecWithMonitor 执行更新/插入/删除并监控性能 func (sm *SQLMonitor) ExecuteExecWithMonitor(db *sql.DB, query string, endpoint string, args ...interface{}) (sql.Result, error) { startTime := time.Now() result, err := db.Exec(query, args...) duration := time.Since(startTime) // 记录执行信息 record := SQLExecutionRecord{ ID: sm.getNextID(), Query: query, Duration: duration.Milliseconds(), Timestamp: startTime, Success: err == nil, Endpoint: endpoint, } if err != nil { record.Error = err.Error() } else if result != nil { if rowsAffected, rowErr := result.RowsAffected(); rowErr == nil { record.RowsAffected = rowsAffected } } sm.addRecord(record) // 打印SQL执行日志 if err != nil { log.Printf("[SQL监控] Exec执行失败 - 接口: %s, 耗时: %dms, 错误: %v", endpoint, duration.Milliseconds(), err) } else { log.Printf("[SQL监控] Exec执行成功 - 接口: %s, 耗时: %dms, 影响行数: %d", endpoint, duration.Milliseconds(), record.RowsAffected) } return result, err } // getNextID 获取下一个ID func (sm *SQLMonitor) getNextID() int64 { sm.mutex.Lock() defer sm.mutex.Unlock() id := sm.nextID sm.nextID++ return id } // addRecord 添加记录 func (sm *SQLMonitor) addRecord(record SQLExecutionRecord) { sm.mutex.Lock() defer sm.mutex.Unlock() sm.records = append(sm.records, record) // 保持最大记录数限制 if len(sm.records) > sm.maxSize { sm.records = sm.records[1:] } } // GetRecentRecords 获取最近的记录 func (sm *SQLMonitor) GetRecentRecords(limit int) []SQLExecutionRecord { sm.mutex.RLock() defer sm.mutex.RUnlock() if limit <= 0 || limit > len(sm.records) { limit = len(sm.records) } start := len(sm.records) - limit result := make([]SQLExecutionRecord, limit) copy(result, sm.records[start:]) // 反转数组,最新的在前面 for i, j := 0, len(result)-1; i < j; i, j = i+1, j-1 { result[i], result[j] = result[j], result[i] } return result } // GetStats 获取统计信息 func (sm *SQLMonitor) GetStats() map[string]interface{} { sm.mutex.RLock() defer sm.mutex.RUnlock() if len(sm.records) == 0 { return map[string]interface{}{ "total_queries": 0, "success_queries": 0, "failed_queries": 0, "success_rate": 0.0, "avg_duration_ms": 0, "max_duration_ms": 0, "min_duration_ms": 0, } } var totalDuration int64 var successCount, failedCount int64 var maxDuration, minDuration int64 maxDuration = sm.records[0].Duration minDuration = sm.records[0].Duration for _, record := range sm.records { totalDuration += record.Duration if record.Success { successCount++ } else { failedCount++ } if record.Duration > maxDuration { maxDuration = record.Duration } if record.Duration < minDuration { minDuration = record.Duration } } totalQueries := int64(len(sm.records)) avgDuration := totalDuration / totalQueries successRate := float64(successCount) / float64(totalQueries) * 100 return map[string]interface{}{ "total_queries": totalQueries, "success_queries": successCount, "failed_queries": failedCount, "success_rate": fmt.Sprintf("%.2f", successRate), "avg_duration_ms": avgDuration, "max_duration_ms": maxDuration, "min_duration_ms": minDuration, "last_update": time.Now().Format("2006-01-02 15:04:05"), } } // ClearRecords 清空记录 func (sm *SQLMonitor) ClearRecords() { sm.mutex.Lock() defer sm.mutex.Unlock() sm.records = make([]SQLExecutionRecord, 0, sm.maxSize) sm.nextID = 1 log.Println("[SQL监控] 记录已清空") } // GetSlowQueries 获取慢查询(超过指定时间的查询) func (sm *SQLMonitor) GetSlowQueries(thresholdMs int64) []SQLExecutionRecord { sm.mutex.RLock() defer sm.mutex.RUnlock() var slowQueries []SQLExecutionRecord for _, record := range sm.records { if record.Duration > thresholdMs { slowQueries = append(slowQueries, record) } } return slowQueries } // GetFailedQueries 获取失败的查询 func (sm *SQLMonitor) GetFailedQueries() []SQLExecutionRecord { sm.mutex.RLock() defer sm.mutex.RUnlock() var failedQueries []SQLExecutionRecord for _, record := range sm.records { if !record.Success { failedQueries = append(failedQueries, record) } } return failedQueries } // 便捷方法:直接使用全局监控器 func MonitorQuery(db *sql.DB, query string, endpoint string, args ...interface{}) (*sql.Rows, error) { if globalSQLMonitor == nil { return db.Query(query, args...) } return globalSQLMonitor.ExecuteWithMonitor(db, query, endpoint, args...) } func MonitorQueryRow(db *sql.DB, query string, endpoint string, args ...interface{}) *sql.Row { if globalSQLMonitor == nil { return db.QueryRow(query, args...) } return globalSQLMonitor.ExecuteRowWithMonitor(db, query, endpoint, args...) } func MonitorExec(db *sql.DB, query string, endpoint string, args ...interface{}) (sql.Result, error) { if globalSQLMonitor == nil { return db.Exec(query, args...) } return globalSQLMonitor.ExecuteExecWithMonitor(db, query, endpoint, args...) }