package main import ( "fmt" "io" "net/http" "os" "strconv" "sync" "time" ) // ============================== // 事件发射器 (Event Emitter) // ============================== // 事件类型 type EventType string const ( EventProgress EventType = "progress" EventComplete EventType = "complete" EventError EventType = "error" EventStart EventType = "start" EventPause EventType = "pause" EventResume EventType = "resume" EventCancel EventType = "cancel" ) // 事件数据 type EventData struct { Type EventType Downloaded int64 Total int64 Percentage float64 Speed float64 // KB/s Message string Timestamp time.Time Filename string Error error } // 事件监听器函数类型 type EventListener func(data EventData) // 事件发射器 type EventEmitter struct { listeners map[EventType][]EventListener mu sync.RWMutex } // 创建新的事件发射器 func NewEventEmitter() *EventEmitter { return &EventEmitter{ listeners: make(map[EventType][]EventListener), } } // 添加事件监听器 func (ee *EventEmitter) On(eventType EventType, listener EventListener) { ee.mu.Lock() defer ee.mu.Unlock() ee.listeners[eventType] = append(ee.listeners[eventType], listener) } // 移除事件监听器 func (ee *EventEmitter) Off(eventType EventType, listener EventListener) { ee.mu.Lock() defer ee.mu.Unlock() listeners := ee.listeners[eventType] for i, l := range listeners { if &l == &listener { ee.listeners[eventType] = append(listeners[:i], listeners[i+1:]...) break } } } // 发射事件 (类似 runtime.EventsEmit) func (ee *EventEmitter) Emit(eventType EventType, data EventData) { ee.mu.RLock() listeners := ee.listeners[eventType] ee.mu.RUnlock() data.Type = eventType data.Timestamp = time.Now() for _, listener := range listeners { listener(data) } } // ============================== // CSV下载器 // ============================== // CSV下载器 type CSVDownloader struct { emitter *EventEmitter interval time.Duration isPaused bool isCanceled bool mu sync.RWMutex } // 创建新的CSV下载器 func NewCSVDownloader() *CSVDownloader { return &CSVDownloader{ emitter: NewEventEmitter(), interval: 100 * time.Millisecond, } } // 设置进度报告间隔 func (d *CSVDownloader) SetInterval(interval time.Duration) { d.interval = interval } // 监听事件 func (d *CSVDownloader) On(eventType EventType, listener EventListener) { d.emitter.On(eventType, listener) } // 暂停下载 func (d *CSVDownloader) Pause() { d.mu.Lock() d.isPaused = true d.mu.Unlock() d.emitter.Emit(EventPause, EventData{Message: "下载已暂停"}) } // 恢复下载 func (d *CSVDownloader) Resume() { d.mu.Lock() d.isPaused = false d.mu.Unlock() d.emitter.Emit(EventResume, EventData{Message: "下载已恢复"}) } // 取消下载 func (d *CSVDownloader) Cancel() { d.mu.Lock() d.isCanceled = true d.mu.Unlock() d.emitter.Emit(EventCancel, EventData{Message: "下载已取消"}) } // 下载CSV文件 func (d *CSVDownloader) Download(url, filename string) error { // 检查是否已取消 d.mu.RLock() if d.isCanceled { d.mu.RUnlock() return fmt.Errorf("下载已取消") } d.mu.RUnlock() // 开始下载事件 d.emitter.Emit(EventStart, EventData{ Filename: filename, Message: "开始下载文件", }) // 创建HTTP请求 resp, err := http.Get(url) if err != nil { errorData := EventData{ Filename: filename, Message: fmt.Sprintf("无法开始下载: %v", err), Error: err, } d.emitter.Emit(EventError, errorData) return err } defer resp.Body.Close() // 获取文件总大小 totalSize, _ := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) if totalSize <= 0 { totalSize = 0 // 未知大小 } // 创建本地文件 file, err := os.Create(filename) if err != nil { errorData := EventData{ Filename: filename, Message: fmt.Sprintf("无法创建文件: %v", err), Error: err, } d.emitter.Emit(EventError, errorData) return err } defer file.Close() // 下载缓冲区 buffer := make([]byte, 32*1024) // 32KB var downloaded int64 = 0 var lastReport time.Time var lastDownloaded int64 = 0 startTime := time.Now() // 开始下载循环 for { // 检查是否暂停 d.mu.RLock() if d.isPaused { d.mu.RUnlock() for { time.Sleep(100 * time.Millisecond) d.mu.RLock() if !d.isPaused || d.isCanceled { d.mu.RUnlock() break } d.mu.RUnlock() } } // 检查是否取消 if d.isCanceled { d.mu.RUnlock() file.Close() os.Remove(filename) return fmt.Errorf("下载已取消") } d.mu.RUnlock() // 读取数据 n, err := resp.Body.Read(buffer) if n > 0 { // 写入文件 _, writeErr := file.Write(buffer[:n]) if writeErr != nil { errorData := EventData{ Filename: filename, Message: fmt.Sprintf("写入文件失败: %v", writeErr), Error: writeErr, } d.emitter.Emit(EventError, errorData) return writeErr } // 更新下载量 downloaded += int64(n) // 计算下载速度 now := time.Now() if lastReport.IsZero() { lastReport = now lastDownloaded = downloaded } // 定时报告进度 if time.Since(lastReport) >= d.interval || err == io.EOF { elapsed := time.Since(lastReport).Seconds() speed := 0.0 if elapsed > 0 { speed = float64(downloaded-lastDownloaded) / elapsed / 1024 // KB/s } // 计算百分比 percentage := 0.0 if totalSize > 0 { percentage = float64(downloaded) / float64(totalSize) * 100 } // 发射进度事件 d.emitter.Emit(EventProgress, EventData{ Downloaded: downloaded, Total: totalSize, Percentage: percentage, Speed: speed, Filename: filename, Message: fmt.Sprintf("下载中: %.2f%%", percentage), }) // 重置计时器 lastReport = now lastDownloaded = downloaded } } // 检查是否完成 if err != nil { if err == io.EOF { // 下载完成 totalTime := time.Since(startTime).Seconds() averageSpeed := 0.0 if totalTime > 0 { averageSpeed = float64(downloaded) / totalTime / 1024 } d.emitter.Emit(EventComplete, EventData{ Downloaded: downloaded, Total: totalSize, Percentage: 100.0, Speed: averageSpeed, Filename: filename, Message: fmt.Sprintf("下载完成! 总共用时: %.2f秒", totalTime), }) return nil } // 发生错误 errorData := EventData{ Filename: filename, Message: fmt.Sprintf("下载错误: %v", err), Error: err, } d.emitter.Emit(EventError, errorData) return err } } } // ============================== // 模拟文件生成器(用于测试) // ============================== type MockCSVGenerator struct { emitter *EventEmitter } func NewMockCSVGenerator() *MockCSVGenerator { return &MockCSVGenerator{ emitter: NewEventEmitter(), } } func (g *MockCSVGenerator) On(eventType EventType, listener EventListener) { g.emitter.On(eventType, listener) } func (g *MockCSVGenerator) Generate(filename string, totalRows int, columns []string) error { // 开始事件 g.emitter.Emit(EventStart, EventData{ Filename: filename, Message: "开始生成CSV文件", }) file, err := os.Create(filename) if err != nil { g.emitter.Emit(EventError, EventData{ Filename: filename, Error: err, Message: "无法创建文件", }) return err } defer file.Close() // 写入头部 header := "" for i, col := range columns { if i > 0 { header += "," } header += col } file.WriteString(header + "\n") startTime := time.Now() // 生成数据行 for i := 0; i < totalRows; i++ { // 生成一行数据 row := "" for j := 0; j < len(columns); j++ { if j > 0 { row += "," } row += fmt.Sprintf("data%d_%d", i, j) } file.WriteString(row + "\n") // 模拟处理时间 time.Sleep(5 * time.Millisecond) // 每100行报告一次进度 if (i+1)%100 == 0 || i == totalRows-1 { percentage := float64(i+1) / float64(totalRows) * 100 elapsed := time.Since(startTime).Seconds() rowsPerSecond := 0.0 if elapsed > 0 { rowsPerSecond = float64(i+1) / elapsed } g.emitter.Emit(EventProgress, EventData{ Downloaded: int64(i + 1), Total: int64(totalRows), Percentage: percentage, Speed: rowsPerSecond, Filename: filename, Message: fmt.Sprintf("已生成 %d/%d 行", i+1, totalRows), }) } } // 完成事件 g.emitter.Emit(EventComplete, EventData{ Downloaded: int64(totalRows), Total: int64(totalRows), Percentage: 100.0, Filename: filename, Message: "CSV文件生成完成", }) return nil } // ============================== // 主程序 // ============================== func main() { fmt.Println("=== CSV文件下载器 (事件监听模式) ===\n") // 创建控制台日志监听器 consoleLogger := func(data EventData) { timestamp := data.Timestamp.Format("15:04:05") switch data.Type { case EventStart: fmt.Printf("[%s] 🚀 %s: %s\n", timestamp, data.Filename, data.Message) case EventProgress: if data.Total > 0 { fmt.Printf("[%s] 📥 %s: %s (%.2f%%) - %.2f KB/s\n", timestamp, data.Filename, data.Message, data.Percentage, data.Speed) } else { fmt.Printf("[%s] 📥 %s: 已下载 %d bytes - %.2f KB/s\n", timestamp, data.Filename, data.Downloaded, data.Speed) } case EventComplete: fmt.Printf("[%s] ✅ %s: %s\n", timestamp, data.Filename, data.Message) case EventError: fmt.Printf("[%s] ❌ %s: %s (错误: %v)\n", timestamp, data.Filename, data.Message, data.Error) case EventPause: fmt.Printf("[%s] ⏸️ %s\n", timestamp, data.Message) case EventResume: fmt.Printf("[%s] ▶️ %s\n", timestamp, data.Message) case EventCancel: fmt.Printf("[%s] ⏹️ %s\n", timestamp, data.Message) } } // 创建进度条监听器(简单的文本进度条) progressBar := func(data EventData) { if data.Type == EventProgress && data.Total > 0 { barWidth := 50 completed := int(float64(barWidth) * data.Percentage / 100) remaining := barWidth - completed bar := "" for i := 0; i < completed; i++ { bar += "█" } for i := 0; i < remaining; i++ { bar += "░" } fmt.Printf("\r进度: [%s] %.2f%% | 速度: %.2f KB/s", bar, data.Percentage, data.Speed) if data.Percentage >= 100 { fmt.Println() } } } // 创建统计信息监听器 statsTracker := func(data EventData) { // 在实际应用中,这里可以记录统计数据到数据库或文件 if data.Type == EventComplete { fmt.Printf("[统计] 文件: %s, 大小: %d bytes, 平均速度: %.2f KB/s\n", data.Filename, data.Downloaded, data.Speed) } } // 示例1:模拟CSV文件生成 fmt.Println("示例1: 模拟生成CSV文件") fmt.Println("======================") generator := NewMockCSVGenerator() // 注册事件监听器 generator.On(EventStart, consoleLogger) generator.On(EventProgress, progressBar) generator.On(EventProgress, consoleLogger) generator.On(EventComplete, consoleLogger) generator.On(EventComplete, statsTracker) generator.On(EventError, consoleLogger) // 生成CSV文件 columns := []string{"ID", "Name", "Age", "Email", "City", "Salary", "Department"} err := generator.Generate("employees.csv", 500, columns) if err != nil { fmt.Printf("生成失败: %v\n", err) } fmt.Println("\n示例2: 模拟文件下载") fmt.Println("======================") // 创建下载器 downloader := NewCSVDownloader() downloader.SetInterval(200 * time.Millisecond) // 注册事件监听器 downloader.On(EventStart, consoleLogger) downloader.On(EventProgress, progressBar) downloader.On(EventProgress, consoleLogger) downloader.On(EventComplete, consoleLogger) downloader.On(EventComplete, statsTracker) downloader.On(EventError, consoleLogger) downloader.On(EventPause, consoleLogger) downloader.On(EventResume, consoleLogger) downloader.On(EventCancel, consoleLogger) // 模拟下载控制(在真实场景中,这可能是用户界面操作) go func() { time.Sleep(500 * time.Millisecond) downloader.Pause() time.Sleep(1 * time.Second) downloader.Resume() }() // 注意:这里使用了一个公开的测试CSV文件URL // 在实际使用中,请替换为真实的CSV文件URL testURL := "https://raw.githubusercontent.com/datasets/covid-19/main/data/time-series-19-covid-combined.csv" // 由于网络请求可能需要时间,这里我们使用goroutine来演示 go func() { err := downloader.Download(testURL, "covid-data.csv") if err != nil { fmt.Printf("下载失败: %v\n", err) } }() // 等待演示完成 time.Sleep(5 * time.Second) // 演示文件处理进度监控 fmt.Println("\n示例3: 文件处理进度监控") fmt.Println("======================") processor := NewEventEmitter() // 模拟文件处理 go func() { processor.Emit(EventStart, EventData{ Filename: "employees.csv", Message: "开始处理文件", }) totalRows := 500 for i := 0; i < totalRows; i++ { time.Sleep(10 * time.Millisecond) if (i+1)%50 == 0 { percentage := float64(i+1) / float64(totalRows) * 100 processor.Emit(EventProgress, EventData{ Downloaded: int64(i + 1), Total: int64(totalRows), Percentage: percentage, Filename: "employees.csv", Message: fmt.Sprintf("已处理 %d/%d 行", i+1, totalRows), }) } } processor.Emit(EventComplete, EventData{ Downloaded: int64(totalRows), Total: int64(totalRows), Percentage: 100.0, Filename: "employees.csv", Message: "文件处理完成", }) }() // 监听处理进度 processor.On(EventStart, consoleLogger) processor.On(EventProgress, func(data EventData) { barWidth := 30 completed := int(float64(barWidth) * data.Percentage / 100) remaining := barWidth - completed bar := "" for i := 0; i < completed; i++ { bar += "▊" } for i := 0; i < remaining; i++ { bar += "░" } fmt.Printf("\r处理: [%s] %.2f%%", bar, data.Percentage) if data.Percentage >= 100 { fmt.Println() } }) processor.On(EventComplete, consoleLogger) // 等待所有演示完成 time.Sleep(6 * time.Second) fmt.Println("\n✅ 所有演示完成!") fmt.Println("生成的文件:") fmt.Println(" - employees.csv (示例数据)") fmt.Println(" - covid-data.csv (从网络下载的数据)") }