package main ///* //#include //*/ //import "C" //import ( // "bufio" // "encoding/csv" // "encoding/json" // "fmt" // "io" // "os" // "path/filepath" // "sync" // "sync/atomic" // "time" // "unsafe" //) // //// ========================== 数据结构定义 ========================== // //// CSVHandle CSV文件句柄 //type CSVHandle struct { // ID int64 // 句柄唯一ID // Filename string // 文件名 // Delimiter rune // 分隔符 // HasHeader bool // 是否有表头 // Header []string // 表头(如果有) // File *os.File // 底层文件句柄 // CSVReader *csv.Reader // CSV阅读器 // CSVWriter *csv.Writer // CSV写入器 // TotalRows int64 // 总行数(如果已计算) // IsOpen bool // 是否已打开 // OpenTime time.Time // 打开时间 // AccessTime time.Time // 最后访问时间 // AccessCount int64 // 访问计数 // mu sync.RWMutex // 读写锁(保护数据结构) // autoCloseTimer *time.Timer // 自动关闭计时器 // cachedRowCount int64 // 缓存的行数(避免重复计算) // rowCountCached bool // 行数是否已缓存 // writeBuffer *bufio.Writer // 写入缓冲区 // // // 新增:引用计数和状态管理 // refCount int64 // 引用计数 // refCountMu sync.RWMutex // 引用计数锁 // closing bool // 正在关闭中 // closed bool // 已关闭 // statusMu sync.RWMutex // 状态锁 // activeOps int64 // 正在进行的操作数 //} // //// CSVManager CSV文件管理器 //type CSVManager struct { // handles sync.Map // map[int64]*CSVHandle // fileLocks sync.Map // map[string]*sync.RWMutex 文件级锁 // nextHandle int64 // 下一个句柄ID // maxOpen int // 最大打开文件数 // semaphore chan struct{} // 信号量控制并发打开 // config ManagerConfig // 管理器配置 //} // //// ManagerConfig 管理器配置 //type ManagerConfig struct { // MaxOpenFiles int // 最大打开文件数 // AutoCloseTimeout time.Duration // 自动关闭超时 // BufferSize int // 缓冲区大小 // UseMMap bool // 是否使用内存映射(大文件) // MMapThreshold int64 // 使用内存映射的阈值(字节) //} // //// MergeOptions 合并选项 //type MergeOptions struct { // AppendMode bool // true=追加模式,false=覆盖模式 // SkipHeader bool // 是否跳过源文件的表头 // SkipDuplicates bool // 是否跳过重复行 // ColumnMapping map[int]int // 列映射:源文件列索引 -> 目标文件列索引 // ConflictResolution ConflictStrategy // 冲突解决策略 // TransformFunc func([]string) ([]string, error) // 数据转换函数 //} // //// ConflictStrategy 冲突解决策略 //type ConflictStrategy int // //const ( // Overwrite ConflictStrategy = iota // 覆盖目标文件的数据 // Skip // 跳过冲突的行 // KeepBoth // 保留两者 // UseSource // 使用源数据 // UseTarget // 使用目标数据 //) // //// CSVResponse CSV响应结构体 //type CSVResponse struct { // Success bool `json:"success"` // Message string `json:"message,omitempty"` // Data interface{} `json:"data,omitempty"` //} // //// DefaultConfig 默认配置 //var DefaultConfig = ManagerConfig{ // MaxOpenFiles: 100, // AutoCloseTimeout: 5 * time.Minute, // BufferSize: 32 * 1024, // 32KB // UseMMap: true, // MMapThreshold: 10 * 1024 * 1024, // 10MB //} // //// 全局管理器实例 //var ( // globalManager *CSVManager // managerInitOnce sync.Once //) // //// ========================== 辅助函数 ========================== // //func min(a, b int) int { // if a < b { // return a // } // return b //} // //// ========================== GetManager 获取全局管理器 ========================== // //func GetManager() *CSVManager { // managerInitOnce.Do(func() { // globalManager = NewCSVManager(DefaultConfig) // }) // return globalManager //} // //// ========================== NewCSVManager 创建新的CSV管理器 ========================== // //func NewCSVManager(config ManagerConfig) *CSVManager { // if config.MaxOpenFiles <= 0 { // config.MaxOpenFiles = DefaultConfig.MaxOpenFiles // } // return &CSVManager{ // handles: sync.Map{}, // fileLocks: sync.Map{}, // nextHandle: 1, // maxOpen: config.MaxOpenFiles, // semaphore: make(chan struct{}, config.MaxOpenFiles), // config: config, // } //} // //// ========================== CSVHandle 引用计数方法 ========================== // //// addRef 增加引用计数 //func (h *CSVHandle) addRef() { // atomic.AddInt64(&h.refCount, 1) // h.AccessTime = time.Now() // atomic.AddInt64(&h.AccessCount, 1) //} // //// releaseRef 减少引用计数,如果为0则返回true表示可以关闭 //func (h *CSVHandle) releaseRef() bool { // h.refCountMu.Lock() // defer h.refCountMu.Unlock() // // oldCount := atomic.AddInt64(&h.refCount, -1) // return oldCount <= 0 //} // //// getRefCount 获取引用计数 //func (h *CSVHandle) getRefCount() int64 { // return atomic.LoadInt64(&h.refCount) //} // //// isValid 检查句柄是否有效 //func (h *CSVHandle) isValid() bool { // h.statusMu.RLock() // defer h.statusMu.RUnlock() // return !h.closing && !h.closed && h.IsOpen //} // //// markClosing 标记为正在关闭 //func (h *CSVHandle) markClosing() { // h.statusMu.Lock() // defer h.statusMu.Unlock() // h.closing = true //} // //// markClosed 标记为已关闭 //func (h *CSVHandle) markClosed() { // h.statusMu.Lock() // defer h.statusMu.Unlock() // h.closing = false // h.closed = true // h.IsOpen = false //} // //// beginOperation 开始一个操作 //func (h *CSVHandle) beginOperation() bool { // h.statusMu.RLock() // if h.closing || h.closed || !h.IsOpen { // h.statusMu.RUnlock() // return false // } // atomic.AddInt64(&h.activeOps, 1) // h.statusMu.RUnlock() // return true //} // //// endOperation 结束一个操作 //func (h *CSVHandle) endOperation() { // atomic.AddInt64(&h.activeOps, -1) //} // //// waitForActiveOps 等待所有活动操作完成 //func (h *CSVHandle) waitForActiveOps(timeout time.Duration) bool { // start := time.Now() // for atomic.LoadInt64(&h.activeOps) > 0 { // if time.Since(start) > timeout { // return false // } // time.Sleep(10 * time.Millisecond) // } // return true //} // //// ========================== CSVHandle 文件操作方法 ========================== // //// close 关闭CSV句柄(内部方法) //func (h *CSVHandle) close() error { // h.mu.Lock() // defer h.mu.Unlock() // // if !h.IsOpen { // return nil // } // // // 停止自动关闭计时器 // if h.autoCloseTimer != nil { // h.autoCloseTimer.Stop() // h.autoCloseTimer = nil // } // // // 确保缓冲区数据写入文件 // if h.CSVWriter != nil { // h.CSVWriter.Flush() // } // if h.writeBuffer != nil { // h.writeBuffer.Flush() // } // // // 关闭文件 // if h.File != nil { // if err := h.File.Close(); err != nil { // return err // } // h.File = nil // } // // h.CSVReader = nil // h.CSVWriter = nil // h.writeBuffer = nil // h.IsOpen = false // h.markClosed() // // return nil //} // //// getHeader 获取表头 //func (h *CSVHandle) getHeader() []string { // h.mu.RLock() // defer h.mu.RUnlock() // return h.Header //} // //// calculateTotalRows 计算CSV文件的总行数 //func (h *CSVHandle) calculateTotalRows() (int64, error) { // h.mu.Lock() // defer h.mu.Unlock() // // // 如果已缓存,直接返回 // if h.rowCountCached { // return h.cachedRowCount, nil // } // // if !h.IsOpen { // return 0, fmt.Errorf("文件未打开") // } // // // 保存当前文件位置 // currentPos, err := h.File.Seek(0, 1) // 当前位置 // if err != nil { // return 0, fmt.Errorf("获取当前文件位置失败: %w", err) // } // // // 重置到文件开始 // if _, err := h.File.Seek(0, 0); err != nil { // return 0, fmt.Errorf("重置文件指针失败: %w", err) // } // // // 重置CSV阅读器 // reader := csv.NewReader(bufio.NewReader(h.File)) // reader.Comma = h.Delimiter // reader.LazyQuotes = true // reader.TrimLeadingSpace = true // reader.FieldsPerRecord = -1 // 允许字段数量可变(不检查每行字段数) // // // 统计行数 // var rowCount int64 = 0 // for { // _, err := reader.Read() // if err != nil { // if err == io.EOF { // break // } // // 恢复文件位置 // h.File.Seek(currentPos, 0) // return 0, fmt.Errorf("读取行失败: %w", err) // } // rowCount++ // } // // // 恢复文件位置 // if _, err := h.File.Seek(currentPos, 0); err != nil { // return 0, fmt.Errorf("恢复文件位置失败: %w", err) // } // // // 更新缓存 // h.cachedRowCount = rowCount // h.rowCountCached = true // h.TotalRows = rowCount // // // 如果有表头,需要减去表头行 // if h.HasHeader && rowCount > 0 { // h.cachedRowCount = rowCount - 1 // h.TotalRows = rowCount - 1 // } // // return h.cachedRowCount, nil //} // //// ========================== CSVManager 文件操作方法 ========================== // //// getFileLock 获取或创建文件锁 //func (mgr *CSVManager) getFileLock(filename string) *sync.RWMutex { // lock, _ := mgr.fileLocks.LoadOrStore(filename, &sync.RWMutex{}) // return lock.(*sync.RWMutex) //} // //// OpenCSVFile 打开CSV文件并返回句柄(线程安全版本) //func (mgr *CSVManager) OpenCSVFile(filename string, delimiter rune, hasHeader bool) (int64, error) { // // 获取文件锁 // fileLock := mgr.getFileLock(filename) // fileLock.Lock() // defer fileLock.Unlock() // // // 首先检查文件是否已经有打开的句柄 // if existingHandleID := mgr.findHandleByFilename(filename); existingHandleID != -1 { // if handle, err := mgr.getHandle(existingHandleID); err == nil { // handle.addRef() // return existingHandleID, nil // } // } // // // 检查文件是否存在,如果不存在则创建 // fileInfo, err := os.Stat(filename) // fileExists := true // if os.IsNotExist(err) { // fileExists = false // // 创建文件 // file, createErr := os.Create(filename) // if createErr != nil { // return -1, fmt.Errorf("创建文件失败: %w", createErr) // } // file.Close() // 关闭文件,后续会重新打开 // // // 重新获取文件信息 // fileInfo, err = os.Stat(filename) // if err != nil { // return -1, fmt.Errorf("获取文件信息失败: %w", err) // } // } else if err != nil { // // 其他错误 // return -1, fmt.Errorf("检查文件状态失败: %w", err) // } // // // 如果是新创建的空文件,需要特殊处理 // if !fileExists && fileInfo.Size() == 0 { // return mgr.createEmptyCSVHandle(filename, delimiter, hasHeader) // } // // // 限制并发打开文件数 // mgr.semaphore <- struct{}{} // defer func() { <-mgr.semaphore }() // // // 生成唯一句柄ID // handleID := atomic.AddInt64(&mgr.nextHandle, 1) // // // 创建CSV句柄 // csvHandle := &CSVHandle{ // ID: handleID, // Filename: filename, // Delimiter: delimiter, // HasHeader: hasHeader, // OpenTime: time.Now(), // AccessTime: time.Now(), // AccessCount: 1, // refCount: 1, // 初始引用计数为1 // } // // // 打开文件 // if err := mgr.openFile(csvHandle); err != nil { // return -1, fmt.Errorf("打开文件失败: %w", err) // } // // // 如果是新创建的文件,可能需要写入表头 // if !fileExists && hasHeader { // // 创建空表头,用户后续可以写入实际表头 // csvHandle.Header = []string{} // } else if hasHeader && fileInfo.Size() > 0 { // // 读取现有文件的表头 // if err := mgr.readHeader(csvHandle); err != nil { // csvHandle.close() // return -1, fmt.Errorf("读取表头失败: %w", err) // } // } // // // 如果有数据行,计算总行数 // if fileInfo.Size() > 0 { // // 计算总行数 // rows, err := csvHandle.calculateTotalRows() // if err != nil { // csvHandle.close() // return -1, fmt.Errorf("计算总行数失败: %w", err) // } // csvHandle.TotalRows = rows // } else { // csvHandle.TotalRows = 0 // csvHandle.cachedRowCount = 0 // csvHandle.rowCountCached = true // } // // // 注册到管理器 // mgr.handles.Store(handleID, csvHandle) // // // 启动自动关闭计时器 // if mgr.config.AutoCloseTimeout > 0 { // csvHandle.startAutoClose(mgr.config.AutoCloseTimeout, mgr) // } // // return handleID, nil //} // //// findHandleByFilename 根据文件名查找已存在的句柄ID //func (mgr *CSVManager) findHandleByFilename(filename string) int64 { // var existingHandleID int64 = -1 // // // 遍历所有句柄,查找相同文件名的句柄 // mgr.handles.Range(func(key, value interface{}) bool { // handle := value.(*CSVHandle) // // // 检查文件名是否相同 // if handle.Filename == filename { // // 检查句柄是否有效 // if handle.isValid() { // existingHandleID = handle.ID // return false // 停止遍历 // } // } // return true // 继续遍历 // }) // // return existingHandleID //} // //// createEmptyCSVHandle 创建空的CSV文件句柄 //func (mgr *CSVManager) createEmptyCSVHandle(filename string, delimiter rune, hasHeader bool) (int64, error) { // // 限制并发打开文件数 // mgr.semaphore <- struct{}{} // defer func() { <-mgr.semaphore }() // // // 生成唯一句柄ID // handleID := atomic.AddInt64(&mgr.nextHandle, 1) // // // 创建CSV句柄 // csvHandle := &CSVHandle{ // ID: handleID, // Filename: filename, // Delimiter: delimiter, // HasHeader: hasHeader, // OpenTime: time.Now(), // AccessTime: time.Now(), // TotalRows: 0, // AccessCount: 1, // refCount: 1, // 初始引用计数为1 // } // // // 以读写模式打开文件 // file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0755) // if err != nil { // return -1, fmt.Errorf("打开文件失败: %w", err) // } // // // 创建CSV写入器 // writer := csv.NewWriter(file) // writer.Comma = delimiter // // // 如果是新创建的文件且有表头,写入空表头占位 // if hasHeader { // // 写入空表头(用户后续需要设置实际表头) // if err := writer.Write([]string{}); err != nil { // file.Close() // return -1, fmt.Errorf("写入空表头失败: %w", err) // } // writer.Flush() // // // 重置到文件开始位置 // if _, err := file.Seek(0, 0); err != nil { // file.Close() // return -1, fmt.Errorf("重置文件指针失败: %w", err) // } // } // // // 创建CSV阅读器 // reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize)) // reader.Comma = delimiter // reader.LazyQuotes = true // reader.TrimLeadingSpace = true // // // 创建写入缓冲区 // writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize) // // csvHandle.File = file // csvHandle.CSVReader = reader // csvHandle.CSVWriter = csv.NewWriter(writeBuffer) // csvHandle.CSVWriter.Comma = delimiter // csvHandle.writeBuffer = writeBuffer // csvHandle.IsOpen = true // csvHandle.AccessTime = time.Now() // // // 如果有表头,初始化空表头数组 // if hasHeader { // csvHandle.Header = []string{} // } // // // 注册到管理器 // mgr.handles.Store(handleID, csvHandle) // // // 启动自动关闭计时器 // if mgr.config.AutoCloseTimeout > 0 { // csvHandle.startAutoClose(mgr.config.AutoCloseTimeout, mgr) // } // // return handleID, nil //} // //// getHandle 获取句柄对象(增加引用计数) //func (mgr *CSVManager) getHandle(handleID int64) (*CSVHandle, error) { // value, ok := mgr.handles.Load(handleID) // if !ok { // return nil, fmt.Errorf("句柄不存在: %d", handleID) // } // // handle := value.(*CSVHandle) // // // 检查句柄是否有效 // if !handle.isValid() { // return nil, fmt.Errorf("句柄无效: %d", handleID) // } // // // 增加引用计数 // handle.addRef() // // // 确保文件已打开 // handle.mu.RLock() // isOpen := handle.IsOpen // handle.mu.RUnlock() // // if !isOpen { // // 获取文件锁 // fileLock := mgr.getFileLock(handle.Filename) // fileLock.Lock() // defer fileLock.Unlock() // // // 重新检查,防止其他协程已经重新打开了 // handle.mu.RLock() // isOpen = handle.IsOpen // handle.mu.RUnlock() // // if !isOpen { // if err := mgr.openFile(handle); err != nil { // // 恢复引用计数 // handle.releaseRef() // return nil, fmt.Errorf("重新打开文件失败: %w", err) // } // } // } // // return handle, nil //} // //// releaseHandle 释放句柄引用 //func (mgr *CSVManager) releaseHandle(handleID int64) { // value, ok := mgr.handles.Load(handleID) // if !ok { // return // } // // handle := value.(*CSVHandle) // // 减少引用计数,如果为0则真正关闭 // if handle.releaseRef() { // mgr.closeHandleInternal(handleID, handle) // } //} // //// ========================== 句柄管理方法 ========================== // //// closeHandleInternal 内部关闭句柄方法 //func (mgr *CSVManager) closeHandleInternal(handleID int64, handle *CSVHandle) { // // 获取文件锁 // fileLock := mgr.getFileLock(handle.Filename) // fileLock.Lock() // defer fileLock.Unlock() // // // 再次检查引用计数,防止在获取锁期间有新的引用 // if handle.getRefCount() > 0 { // return // } // // // 标记为正在关闭 // handle.markClosing() // // // 等待所有活动操作完成 // if !handle.waitForActiveOps(30 * time.Second) { // // 超时,强制关闭 // fmt.Printf("警告:句柄 %d 关闭超时,强制关闭\n", handleID) // } // // // 真正关闭文件 // handle.close() // // // 从管理器移除 // mgr.handles.Delete(handleID) //} // //// CloseHandle 关闭指定句柄(外部调用) //func (mgr *CSVManager) CloseHandle(handleID int64) error { // value, ok := mgr.handles.Load(handleID) // if !ok { // return fmt.Errorf("句柄不存在: %d", handleID) // } // // handle := value.(*CSVHandle) // // // 减少引用计数,如果为0则真正关闭 // if handle.releaseRef() { // mgr.closeHandleInternal(handleID, handle) // } // // return nil //} // //// GracefulCloseHandle 优雅关闭句柄(等待所有操作完成) //func (mgr *CSVManager) GracefulCloseHandle(handleID int64, timeout time.Duration) error { // value, ok := mgr.handles.Load(handleID) // if !ok { // return fmt.Errorf("句柄不存在: %d", handleID) // } // // handle := value.(*CSVHandle) // // // 标记为正在关闭,阻止新操作 // handle.markClosing() // // // 等待现有操作完成 // done := make(chan bool, 1) // go func() { // // 等待引用计数降为1(只剩当前引用) // for handle.getRefCount() > 1 { // time.Sleep(100 * time.Millisecond) // } // done <- true // }() // // // 设置超时 // select { // case <-done: // // 真正关闭 // return mgr.ForceCloseHandle(handleID) // case <-time.After(timeout): // return fmt.Errorf("关闭句柄超时,仍有 %d 个引用", handle.getRefCount()) // } //} // //// ForceCloseHandle 强制关闭句柄(无论引用计数如何) //func (mgr *CSVManager) ForceCloseHandle(handleID int64) error { // value, ok := mgr.handles.Load(handleID) // if !ok { // return fmt.Errorf("句柄不存在: %d", handleID) // } // // handle := value.(*CSVHandle) // // // 强制设置引用计数为0 // handle.refCountMu.Lock() // atomic.StoreInt64(&handle.refCount, 0) // handle.refCountMu.Unlock() // // // 获取文件锁 // fileLock := mgr.getFileLock(handle.Filename) // fileLock.Lock() // defer fileLock.Unlock() // // // 标记为正在关闭 // handle.markClosing() // // // 等待所有活动操作完成 // handle.waitForActiveOps(5 * time.Second) // // // 真正关闭文件 // handle.close() // mgr.handles.Delete(handle.ID) // // return nil //} // //// closeAllHandles 关闭所有句柄 //func (mgr *CSVManager) closeAllHandles() { // // 收集所有句柄ID // var handleIDs []int64 // mgr.handles.Range(func(key, value interface{}) bool { // handleIDs = append(handleIDs, key.(int64)) // return true // }) // // // 关闭每个句柄 // for _, handleID := range handleIDs { // mgr.ForceCloseHandle(handleID) // } //} // //// ========================== 文件操作方法 ========================== // //// openFile 打开文件 //func (mgr *CSVManager) openFile(handle *CSVHandle) error { // handle.mu.Lock() // defer handle.mu.Unlock() // // if handle.IsOpen { // return nil // } // // // 获取文件大小 // fileInfo, err := os.Stat(handle.Filename) // if err != nil { // return err // } // // fileSize := fileInfo.Size() // // // 根据文件大小选择打开策略 // if mgr.config.UseMMap && fileSize > mgr.config.MMapThreshold { // return mgr.openFileWithMMap(handle, fileSize) // } // // return mgr.openFileNormal(handle) //} // //// 正常打开文件 //func (mgr *CSVManager) openFileNormal(handle *CSVHandle) error { // // 以读写模式打开文件 // file, err := os.OpenFile(handle.Filename, os.O_RDWR, 0755) // if err != nil { // return err // } // // // 创建带缓冲的CSV阅读器 // reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize)) // reader.Comma = handle.Delimiter // reader.LazyQuotes = true // reader.TrimLeadingSpace = true // // // 创建写入缓冲区 // writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize) // // handle.File = file // handle.CSVReader = reader // handle.CSVWriter = csv.NewWriter(writeBuffer) // handle.CSVWriter.Comma = handle.Delimiter // handle.writeBuffer = writeBuffer // handle.IsOpen = true // handle.AccessTime = time.Now() // // return nil //} // //// 使用内存映射打开大文件 //func (mgr *CSVManager) openFileWithMMap(handle *CSVHandle, fileSize int64) error { // // 对于大文件,使用只读模式打开,写入需要特殊处理 // file, err := os.OpenFile(handle.Filename, os.O_RDWR, 0755) // if err != nil { // return err // } // // // 对于大文件,我们可以先只打开,按需读取 // reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize)) // reader.Comma = handle.Delimiter // reader.LazyQuotes = true // // // 创建写入缓冲区 // writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize) // // handle.File = file // handle.CSVReader = reader // handle.CSVWriter = csv.NewWriter(writeBuffer) // handle.CSVWriter.Comma = handle.Delimiter // handle.writeBuffer = writeBuffer // handle.IsOpen = true // handle.AccessTime = time.Now() // // return nil //} // //// 读取CSV表头 //func (mgr *CSVManager) readHeader(handle *CSVHandle) error { // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return fmt.Errorf("文件未打开") // } // // // 确保文件指针在开头 // if _, err := handle.File.Seek(0, 0); err != nil { // return err // } // // // 重置CSV阅读器 // handle.CSVReader = csv.NewReader(bufio.NewReader(handle.File)) // handle.CSVReader.Comma = handle.Delimiter // // // 读取表头 // header, err := handle.CSVReader.Read() // if err != nil { // return err // } // // handle.Header = header // return nil //} // //// startAutoClose 启动自动关闭计时器 //func (h *CSVHandle) startAutoClose(timeout time.Duration, mgr *CSVManager) { // h.autoCloseTimer = time.AfterFunc(timeout, func() { // h.mu.Lock() // defer h.mu.Unlock() // // // 检查是否长时间未访问 // if h.IsOpen && time.Since(h.AccessTime) > timeout { // // 检查引用计数 // if h.getRefCount() == 0 { // h.close() // // 从管理器移除 // mgr.handles.Delete(h.ID) // } // } // }) //} // //// ========================== 写入功能 ========================== // //// WriteHeader 写入CSV文件表头 //func (mgr *CSVManager) WriteHeader(handleID int64, header []string) error { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return fmt.Errorf("文件未打开") // } // // // 检查文件是否已经有内容 // if handle.TotalRows > 0 { // return fmt.Errorf("文件已有数据,无法修改表头") // } // // // 移动到文件开头 // if _, err := handle.File.Seek(0, 0); err != nil { // return fmt.Errorf("移动文件指针失败: %w", err) // } // // // 清空文件内容 // if err := handle.File.Truncate(0); err != nil { // return fmt.Errorf("清空文件失败: %w", err) // } // // // 写入表头 // if err := handle.CSVWriter.Write(header); err != nil { // return fmt.Errorf("写入表头失败: %w", err) // } // // handle.CSVWriter.Flush() // if handle.writeBuffer != nil { // handle.writeBuffer.Flush() // } // // // 更新句柄状态 // handle.Header = header // handle.HasHeader = true // // return nil //} // //// WriteRow 写入单行数据到CSV文件 //func (mgr *CSVManager) WriteRow(handleID int64, row []string) error { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return fmt.Errorf("文件未打开") // } // // // 检查是否有表头且表头长度是否匹配 // if handle.HasHeader && len(handle.Header) > 0 && len(row) != len(handle.Header) { // return fmt.Errorf("行数据列数(%d)与表头列数(%d)不匹配", len(row), len(handle.Header)) // } // // // 移动到文件末尾 // if _, err := handle.File.Seek(0, 2); err != nil { // return fmt.Errorf("移动到文件末尾失败: %w", err) // } // // // 写入行数据 // if err := handle.CSVWriter.Write(row); err != nil { // return fmt.Errorf("写入行失败: %w", err) // } // // // 更新行数统计 // handle.TotalRows++ // handle.cachedRowCount = handle.TotalRows // handle.rowCountCached = true // // return nil //} // //// WriteRows 批量写入多行数据到CSV文件 //func (mgr *CSVManager) WriteRows(handleID int64, rows [][]string) (int64, error) { // // 首先记录日志 // if err := mgr.logWriteRows(handleID, rows); err != nil { // // 日志记录失败不影响主流程,但可以打印警告 // fmt.Printf("警告:记录日志失败: %v\n", err) // } // // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return -1, fmt.Errorf("WriteRows 获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return -1, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return -1, fmt.Errorf("文件未打开") // } // // // 检查是否有表头 // if handle.HasHeader && len(handle.Header) > 0 { // // 验证每行的列数 // for i, row := range rows { // if len(row) != len(handle.Header) { // return -1, fmt.Errorf("第%d行列数(%d)与表头列数(%d)不匹配", i+1, len(row), len(handle.Header)) // } // } // } // // // 移动到文件末尾 // if _, err := handle.File.Seek(0, 2); err != nil { // return -1, fmt.Errorf("移动到文件末尾失败: %w", err) // } // // // 批量写入行数据 // for _, row := range rows { // if err := handle.CSVWriter.Write(row); err != nil { // return -1, fmt.Errorf("写入行失败: %w", err) // } // } // // // 更新行数统计 // handle.TotalRows += int64(len(rows)) // handle.cachedRowCount = handle.TotalRows // handle.rowCountCached = true // // return handle.TotalRows, nil //} // //// WriteRowsNum 批量写入多行数据到CSV文件 //// 返回每行数据存储的行号数组(从1开始) //func (mgr *CSVManager) WriteRowsNum(handleID int64, rows [][]string) ([]int64, error) { // // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return nil, fmt.Errorf("WriteRows 获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return nil, fmt.Errorf("文件未打开") // } // // // 检查是否有表头 // if handle.HasHeader && len(handle.Header) > 0 { // // 验证每行的列数 // for i, row := range rows { // if len(row) != len(handle.Header) { // return nil, fmt.Errorf("第%d行列数(%d)与表头列数(%d)不匹配", i+1, len(row), len(handle.Header)) // } // } // } // // // 移动到文件末尾 // if _, err := handle.File.Seek(0, 2); err != nil { // return nil, fmt.Errorf("移动到文件末尾失败: %w", err) // } // // // 计算起始行号 - 关键修正部分 // var startRow int64 // // // 先获取当前文件的实际行数(包括表头) // currentLineCount := handle.TotalRows // if handle.HasHeader && currentLineCount == 0 { // // 如果有表头但还没有数据行,表头算第1行,数据从第2行开始 // startRow = 2 // } else if handle.HasHeader { // // 有表头且已有数据行,表头是第1行,TotalRows不包括表头 // // 所以下一行应该是 TotalRows + 2 // startRow = currentLineCount + 2 // } else { // // 没有表头,直接累加 // startRow = currentLineCount + 1 // } // // // 创建行号数组 // rowNumbers := make([]int64, len(rows)) // for i := 0; i < len(rows); i++ { // rowNumbers[i] = startRow + int64(i) // } // // // 批量写入行数据 // for _, row := range rows { // if err := handle.CSVWriter.Write(row); err != nil { // return nil, fmt.Errorf("写入行失败: %w", err) // } // } // // // 更新行数统计 // handle.TotalRows += int64(len(rows)) // handle.cachedRowCount = handle.TotalRows // handle.rowCountCached = true // // return rowNumbers, nil //} // //// logWriteRows 记录行数据到日志文件 //func (mgr *CSVManager) logWriteRows(handleID int64, rows [][]string) error { // // 获取句柄信息(但不锁定,因为我们只是读取元数据) // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("获取句柄信息失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 创建日志目录 // logDir := filepath.Join(filepath.Dir("csv"), "logs") // if err := os.MkdirAll(logDir, 0755); err != nil { // return fmt.Errorf("创建日志目录失败: %w", err) // } // // // 生成日志文件名(基于CSV文件名和时间) // baseName := filepath.Base("csv") // logFileName := fmt.Sprintf("%s_%s_write.log", // baseName[:len(baseName)-len(filepath.Ext(baseName))], // time.Now().Format("20060102")) // logFilePath := filepath.Join(logDir, logFileName) // // // 打开或创建日志文件 // logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) // if err != nil { // return fmt.Errorf("打开日志文件失败: %w", err) // } // defer logFile.Close() // // // 创建日志条目 // logEntry := map[string]interface{}{ // "timestamp": time.Now().Format("2006-01-02 15:04:05.000"), // "handle_id": handleID, // "filename": "csv", // "operation": "write_rows", // "row_count": len(rows), // "total_rows": handle.TotalRows + int64(len(rows)), // 预计的总行数 // "data": rows, // } // // // 序列化为JSON // logData, err := json.Marshal(logEntry) // if err != nil { // return fmt.Errorf("序列化日志数据失败: %w", err) // } // // // 写入日志(每行一个JSON对象) // if _, err := logFile.Write(append(logData, '\n')); err != nil { // return fmt.Errorf("写入日志文件失败: %w", err) // } // // return nil //} // //// logWriteRows 记录行数据到日志文件 //func (mgr *CSVManager) logWriteRowsNum(handleID int64, rows [][]string, rowsNum []int64) error { // // 获取句柄信息(但不锁定,因为我们只是读取元数据) // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("获取句柄信息失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 创建日志目录 // logDir := filepath.Join(filepath.Dir("csv"), "logs") // if err := os.MkdirAll(logDir, 0755); err != nil { // return fmt.Errorf("创建日志目录失败: %w", err) // } // // // 生成日志文件名(基于CSV文件名和时间) // baseName := filepath.Base("csv") // logFileName := fmt.Sprintf("%s_%s_write.log", // baseName[:len(baseName)-len(filepath.Ext(baseName))], // time.Now().Format("20060102")) // logFilePath := filepath.Join(logDir, logFileName) // // // 打开或创建日志文件 // logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) // if err != nil { // return fmt.Errorf("打开日志文件失败: %w", err) // } // defer logFile.Close() // // // 创建日志条目 // logEntry := map[string]interface{}{ // "timestamp": time.Now().Format("2006-01-02 15:04:05.000"), // "handle_id": handleID, // "filename": "csv", // "operation": "write_rows", // "row_count": len(rows), // "total_rows": handle.TotalRows + int64(len(rows)), // 预计的总行数 // "data": rows, // "rowsNum": rowsNum, // } // // // 序列化为JSON // logData, err := json.Marshal(logEntry) // if err != nil { // return fmt.Errorf("序列化日志数据失败: %w", err) // } // // // 写入日志(每行一个JSON对象) // if _, err := logFile.Write(append(logData, '\n')); err != nil { // return fmt.Errorf("写入日志文件失败: %w", err) // } // // return nil //} // //// Flush 将缓冲区数据写入文件 //func (mgr *CSVManager) Flush(handleID int64) error { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("Flush 获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return fmt.Errorf("文件未打开") // } // // if handle.CSVWriter == nil { // return fmt.Errorf("CSV写入器未初始化") // } // // // 刷新缓冲区 // handle.CSVWriter.Flush() // if handle.writeBuffer != nil { // if err := handle.writeBuffer.Flush(); err != nil { // return fmt.Errorf("刷新写入缓冲区失败: %w", err) // } // } // // return nil //} // //// AppendRow 追加行数据(自动Flush) //func (mgr *CSVManager) AppendRow(handleID int64, row []string) error { // if err := mgr.WriteRow(handleID, row); err != nil { // return err // } // return mgr.Flush(handleID) //} // //// AppendRows 批量追加行数据(自动Flush) //func (mgr *CSVManager) AppendRows(handleID int64, rows [][]string) (int64, error) { // totalRows, err := mgr.WriteRows(handleID, rows) // if err != nil { // return -1, err // } // err = mgr.Flush(handleID) // if err != nil { // return -1, err // } // return totalRows, nil //} // //// AppendRows 批量追加行数据(自动Flush) //func (mgr *CSVManager) AppendRowsNum(handleID int64, rows [][]string) ([]int64, error) { // rowsNum, err := mgr.WriteRowsNum(handleID, rows) // if err != nil { // return nil, err // } // err = mgr.Flush(handleID) // if err != nil { // return nil, err // } // // 首先记录日志 // if err := mgr.logWriteRowsNum(handleID, rows, rowsNum); err != nil { // // 日志记录失败不影响主流程,但可以打印警告 // fmt.Printf("警告:记录日志失败: %v\n", err) // } // return rowsNum, nil //} // //// GetHeader 获取CSV文件表头 //func (mgr *CSVManager) GetHeader(handleID int64) ([]string, error) { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return nil, fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.RLock() // defer handle.mu.RUnlock() // // if !handle.HasHeader { // return nil, fmt.Errorf("文件没有表头") // } // // return handle.Header, nil //} // //// ========================== 获取总行数 ========================== // //// GetTotalRows 快速获取总行数(如果已计算过则直接返回,否则重新计算) //func (mgr *CSVManager) GetTotalRows(handleID int64) (int64, error) { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return 0, fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return 0, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.RLock() // // 如果已缓存,直接返回缓存值 // if handle.rowCountCached { // count := handle.cachedRowCount // handle.mu.RUnlock() // return count, nil // } // handle.mu.RUnlock() // // // 否则计算总行数 // return handle.calculateTotalRows() //} // //// ========================== 修改指定行功能 ========================== // //// UpdateRow 修改指定行的数据 //func (mgr *CSVManager) UpdateRow(handleID int64, rowNumber int64, newData []string) error { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // //handle.mu.Lock() // //defer handle.mu.Unlock() // // if !handle.IsOpen { // return fmt.Errorf("文件未打开") // } // // // 检查行号是否有效 // if rowNumber < 0 { // return fmt.Errorf("行号不能为负数: %d", rowNumber) // } // // // 获取总行数 // totalRows, err := handle.calculateTotalRows() // if err != nil { // return fmt.Errorf("获取总行数失败: %w", err) // } // // if rowNumber >= totalRows { // return fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows) // } // // // 检查新数据列数是否匹配 // if handle.HasHeader && len(handle.Header) > 0 { // if len(newData) != len(handle.Header) { // return fmt.Errorf("新数据列数(%d)与表头列数(%d)不匹配", len(newData), len(handle.Header)) // } // } // // // 读取整个文件到内存 // allRows, err := mgr.readAllRows(handle) // if err != nil { // return fmt.Errorf("读取文件失败: %w", err) // } // // // 修改指定行的数据 // if handle.HasHeader && len(allRows) > 0 { // // 如果有表头,第一行是表头,实际数据从第二行开始 // if rowNumber+1 >= int64(len(allRows)) { // return fmt.Errorf("行号超出范围: %d, 总数据行数: %d", rowNumber, len(allRows)-1) // } // allRows[rowNumber+1] = newData // } else { // if rowNumber >= int64(len(allRows)) { // return fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, len(allRows)) // } // allRows[rowNumber] = newData // } // // // 写回文件 // if err := mgr.writeAllRows(handle, allRows); err != nil { // return fmt.Errorf("写回文件失败: %w", err) // } // // // 更新缓存的行数 // handle.rowCountCached = false // // // 重新计算总行数 // if _, err := handle.calculateTotalRows(); err != nil { // return fmt.Errorf("重新计算总行数失败: %w", err) // } // // return nil //} // //// UpdateRows 批量修改多行数据 //func (mgr *CSVManager) UpdateRows(handleID int64, updates map[int64][]string) (int, error) { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return 0, fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return 0, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // handle.mu.Lock() // defer handle.mu.Unlock() // // if !handle.IsOpen { // return 0, fmt.Errorf("文件未打开") // } // // // 检查行号是否有效 // for rowNumber := range updates { // if rowNumber < 0 { // return 0, fmt.Errorf("行号不能为负数: %d", rowNumber) // } // } // // // 获取总行数 // totalRows, err := handle.calculateTotalRows() // if err != nil { // return 0, fmt.Errorf("获取总行数失败: %w", err) // } // // // 检查新数据列数是否匹配 // if handle.HasHeader && len(handle.Header) > 0 { // for rowNumber, newData := range updates { // if rowNumber >= totalRows { // return 0, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows) // } // if len(newData) != len(handle.Header) { // return 0, fmt.Errorf("行%d的新数据列数(%d)与表头列数(%d)不匹配", rowNumber, len(newData), len(handle.Header)) // } // } // } // // // 读取整个文件到内存 // allRows, err := mgr.readAllRows(handle) // if err != nil { // return 0, fmt.Errorf("读取文件失败: %w", err) // } // // // 修改指定行的数据 // updatedCount := 0 // for rowNumber, newData := range updates { // if handle.HasHeader && len(allRows) > 0 { // // 如果有表头,第一行是表头,实际数据从第二行开始 // if rowNumber+1 >= int64(len(allRows)) { // return 0, fmt.Errorf("行号超出范围: %d, 总数据行数: %d", rowNumber, len(allRows)-1) // } // allRows[rowNumber+1] = newData // updatedCount++ // } else { // if rowNumber >= int64(len(allRows)) { // return 0, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, len(allRows)) // } // allRows[rowNumber] = newData // updatedCount++ // } // } // // // 写回文件 // if err := mgr.writeAllRows(handle, allRows); err != nil { // return 0, fmt.Errorf("写回文件失败: %w", err) // } // // // 更新缓存的行数 // handle.rowCountCached = false // // // 重新计算总行数 // if _, err := handle.calculateTotalRows(); err != nil { // return 0, fmt.Errorf("重新计算总行数失败: %w", err) // } // // return updatedCount, nil //} // //// readAllRows 读取文件中的所有行 //func (mgr *CSVManager) readAllRows(handle *CSVHandle) ([][]string, error) { // // 保存当前文件位置 // currentPos, err := handle.File.Seek(0, 1) // 当前位置 // if err != nil { // return nil, fmt.Errorf("获取当前文件位置失败: %w", err) // } // // // 重置到文件开始 // if _, err := handle.File.Seek(0, 0); err != nil { // return nil, fmt.Errorf("重置文件指针失败: %w", err) // } // // // 重置CSV阅读器 // reader := csv.NewReader(bufio.NewReader(handle.File)) // reader.Comma = handle.Delimiter // reader.LazyQuotes = true // reader.TrimLeadingSpace = true // // // 读取所有行 // var allRows [][]string // for { // row, err := reader.Read() // if err != nil { // if err == io.EOF { // break // } // // 恢复文件位置 // handle.File.Seek(currentPos, 0) // return nil, fmt.Errorf("读取行失败: %w", err) // } // allRows = append(allRows, row) // } // // // 恢复文件位置 // if _, err := handle.File.Seek(currentPos, 0); err != nil { // return nil, fmt.Errorf("恢复文件位置失败: %w", err) // } // // return allRows, nil //} // //// writeAllRows 将所有行写回文件 //func (mgr *CSVManager) writeAllRows(handle *CSVHandle, rows [][]string) error { // // 清空文件 // if err := handle.File.Truncate(0); err != nil { // return fmt.Errorf("清空文件失败: %w", err) // } // // // 移动到文件开头 // if _, err := handle.File.Seek(0, 0); err != nil { // return fmt.Errorf("移动文件指针失败: %w", err) // } // // // 写入所有行 // for _, row := range rows { // if err := handle.CSVWriter.Write(row); err != nil { // return fmt.Errorf("写入行失败: %w", err) // } // } // // // 刷新缓冲区 // handle.CSVWriter.Flush() // if handle.writeBuffer != nil { // if err := handle.writeBuffer.Flush(); err != nil { // return fmt.Errorf("刷新写入缓冲区失败: %w", err) // } // } // // return nil //} // //// GetRow 获取指定行的数据 //func (mgr *CSVManager) GetRow(handleID int64, rowNumber int64) ([]string, error) { // // 获取句柄 // handle, err := mgr.getHandle(handleID) // if err != nil { // return nil, fmt.Errorf("获取句柄失败: %w", err) // } // defer mgr.releaseHandle(handleID) // // // 检查句柄状态 // if !handle.beginOperation() { // return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID) // } // defer handle.endOperation() // // //handle.mu.RLock() // //defer handle.mu.RUnlock() // // if !handle.IsOpen { // return nil, fmt.Errorf("文件未打开") // } // // // 检查行号是否有效 // if rowNumber < 0 { // return nil, fmt.Errorf("行号不能为负数: %d", rowNumber) // } // // // 获取总行数 // totalRows, err := handle.calculateTotalRows() // if err != nil { // return nil, fmt.Errorf("获取总行数失败: %w", err) // } // // if rowNumber >= totalRows { // return nil, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows) // } // // //// 保存当前文件位置 // //currentPos, err := handle.File.Seek(0, 1) // 当前位置 // //if err != nil { // // return nil, fmt.Errorf("获取当前文件位置失败: %w", err) // //} // // //// 重置到文件开始 // //if _, err := handle.File.Seek(0, 0); err != nil { // // return nil, fmt.Errorf("重置文件指针失败: %w", err) // //} // // // 创建文件副本用于读取,避免影响其他goroutine // file, err := os.Open(handle.Filename) // if err != nil { // return nil, fmt.Errorf("打开文件失败: %w", err) // } // defer file.Close() // // // 创建CSV阅读器 // reader := csv.NewReader(file) // reader.Comma = handle.Delimiter // reader.LazyQuotes = true // reader.TrimLeadingSpace = true // // //// 重置CSV阅读器 // //reader := csv.NewReader(bufio.NewReader(handle.File)) // //reader.Comma = handle.Delimiter // //reader.LazyQuotes = true // //reader.TrimLeadingSpace = true // // // 定位到指定行 // var currentRow int64 = 0 // var targetRow []string // for { // row, err := reader.Read() // if err != nil { // if err == io.EOF { // break // } // // 恢复文件位置 // //handle.File.Seek(currentPos, 0) // return nil, fmt.Errorf("读取行失败: %w", err) // } // // // 判断是否到达目标行 // if handle.HasHeader { // // 如果有表头,跳过第一行 // if currentRow == rowNumber+1 { // targetRow = row // break // } // } else { // // 如果没有表头,直接匹配 // if currentRow == rowNumber { // targetRow = row // break // } // } // currentRow++ // } // // //// 恢复文件位置 // //if _, err := handle.File.Seek(currentPos, 0); err != nil { // // return nil, fmt.Errorf("恢复文件位置失败: %w", err) // //} // // if targetRow == nil { // return nil, fmt.Errorf("未找到行号 %d", rowNumber) // } // // return targetRow, nil //} // //// ========================== 文件合并功能 ========================== // //// MergeCSVFiles 合并两个CSV文件 //func (mgr *CSVManager) MergeCSVFiles(srcHandleID, dstHandleID int64, options MergeOptions) (int64, error) { // // 获取源文件句柄 // srcHandle, err := mgr.getHandle(srcHandleID) // if err != nil { // return 0, fmt.Errorf("获取源文件句柄失败: %w", err) // } // defer mgr.releaseHandle(srcHandleID) // // // 获取目标文件句柄 // dstHandle, err := mgr.getHandle(dstHandleID) // if err != nil { // return 0, fmt.Errorf("获取目标文件句柄失败: %w", err) // } // defer mgr.releaseHandle(dstHandleID) // // // 检查句柄状态 // if !srcHandle.beginOperation() { // return 0, fmt.Errorf("源文件句柄已关闭或正在关闭: %d", srcHandleID) // } // defer srcHandle.endOperation() // // if !dstHandle.beginOperation() { // return 0, fmt.Errorf("目标文件句柄已关闭或正在关闭: %d", dstHandleID) // } // defer dstHandle.endOperation() // // srcHandle.mu.RLock() // dstHandle.mu.Lock() // defer func() { // srcHandle.mu.RUnlock() // dstHandle.mu.Unlock() // }() // // if !srcHandle.IsOpen { // return 0, fmt.Errorf("源文件未打开") // } // if !dstHandle.IsOpen { // return 0, fmt.Errorf("目标文件未打开") // } // // // 1. 检查表头兼容性 // if err := mgr.checkHeaderCompatibility(srcHandle, dstHandle, options); err != nil { // return 0, fmt.Errorf("表头不兼容: %w", err) // } // // // 2. 读取源文件的所有行 // srcRows, err := mgr.readAllRows(srcHandle) // if err != nil { // return 0, fmt.Errorf("读取源文件失败: %w", err) // } // // // 3. 合并数据 // mergedRows, err := mgr.mergeDataRows(srcRows, srcHandle, dstHandle, options) // if err != nil { // return 0, fmt.Errorf("合并数据失败: %w", err) // } // // // 4. 写入目标文件 // if options.AppendMode { // // 追加模式:写入到文件末尾 // if err := mgr.appendRowsToFile(dstHandle, mergedRows); err != nil { // return 0, fmt.Errorf("追加数据失败: %w", err) // } // } else { // // 覆盖模式:先读取目标文件的所有行,然后合并后写回 // dstRows, err := mgr.readAllRows(dstHandle) // if err != nil { // return 0, fmt.Errorf("读取目标文件失败: %w", err) // } // // // 如果是第一次合并且目标文件为空,可能需要保留表头 // allRows := mgr.mergeAllRows(dstRows, mergedRows, dstHandle, srcHandle, options) // if err := mgr.writeAllRows(dstHandle, allRows); err != nil { // return 0, fmt.Errorf("写入合并数据失败: %w", err) // } // } // // // 5. 更新目标文件的行数缓存 // dstHandle.rowCountCached = false // newTotalRows, err := dstHandle.calculateTotalRows() // if err != nil { // return 0, fmt.Errorf("重新计算行数失败: %w", err) // } // // return newTotalRows, nil //} // //// 检查表头兼容性 //func (mgr *CSVManager) checkHeaderCompatibility(srcHandle, dstHandle *CSVHandle, options MergeOptions) error { // // 如果两个文件都有表头 // if srcHandle.HasHeader && dstHandle.HasHeader { // srcHeader := srcHandle.Header // dstHeader := dstHandle.Header // // // 如果指定了列映射,跳过表头检查 // if len(options.ColumnMapping) > 0 { // return nil // } // // // 检查列数是否相同 // if len(srcHeader) != len(dstHeader) && !options.SkipHeader { // return fmt.Errorf("源文件列数(%d)与目标文件列数(%d)不一致", len(srcHeader), len(dstHeader)) // } // // // 检查列名是否相同(可选) // for i := 0; i < min(len(srcHeader), len(dstHeader)); i++ { // if srcHeader[i] != dstHeader[i] { // fmt.Printf("警告: 第%d列列名不同: 源='%s', 目标='%s'\n", i+1, srcHeader[i], dstHeader[i]) // } // } // } else if srcHandle.HasHeader != dstHandle.HasHeader { // // 一个有表头,一个没有表头 // if !options.SkipHeader { // return fmt.Errorf("源文件与目标文件的表头设置不一致") // } // } // // return nil //} // //// 合并数据行 //func (mgr *CSVManager) mergeDataRows(srcRows [][]string, srcHandle, dstHandle *CSVHandle, options MergeOptions) ([][]string, error) { // var result [][]string // // startIndex := 0 // // 如果源文件有表头且需要跳过表头 // if srcHandle.HasHeader && options.SkipHeader { // if len(srcRows) > 0 { // startIndex = 1 // } // } // // // 处理列映射 // for i := startIndex; i < len(srcRows); i++ { // srcRow := srcRows[i] // transformedRow, err := mgr.transformRow(srcRow, srcHandle, dstHandle, options) // if err != nil { // return nil, fmt.Errorf("转换第%d行数据失败: %w", i+1, err) // } // // // 跳过空行 // if transformedRow == nil { // continue // } // // // 检查重复行 // if options.SkipDuplicates && mgr.isDuplicateRow(result, transformedRow) { // continue // } // // result = append(result, transformedRow) // } // // return result, nil //} // //// 转换行数据 //func (mgr *CSVManager) transformRow(srcRow []string, srcHandle, dstHandle *CSVHandle, options MergeOptions) ([]string, error) { // // 如果有自定义转换函数,优先使用 // if options.TransformFunc != nil { // return options.TransformFunc(srcRow) // } // // // 如果有列映射,按照映射转换 // if len(options.ColumnMapping) > 0 { // // 确定目标行的最大列数 // maxCol := 0 // for _, dstIndex := range options.ColumnMapping { // if dstIndex > maxCol { // maxCol = dstIndex // } // } // // // 创建目标行 // dstRow := make([]string, maxCol+1) // for srcIndex, dstIndex := range options.ColumnMapping { // if srcIndex < len(srcRow) && dstIndex >= 0 && dstIndex < len(dstRow) { // dstRow[dstIndex] = srcRow[srcIndex] // } // } // return dstRow, nil // } // // // 如果没有列映射,直接复制 // dstRow := make([]string, len(srcRow)) // copy(dstRow, srcRow) // // // 如果目标文件有表头且列数不同,进行调整 // if dstHandle.HasHeader && len(dstRow) != len(dstHandle.Header) { // if len(dstRow) < len(dstHandle.Header) { // // 源列数少,用空字符串填充 // for i := len(dstRow); i < len(dstHandle.Header); i++ { // dstRow = append(dstRow, "") // } // } else { // // 源列数多,截断 // dstRow = dstRow[:len(dstHandle.Header)] // } // } // // return dstRow, nil //} // //// 检查是否为重复行 //func (mgr *CSVManager) isDuplicateRow(rows [][]string, newRow []string) bool { // for _, row := range rows { // if mgr.compareRows(row, newRow) { // return true // } // } // return false //} // //// 比较两行是否相同 //func (mgr *CSVManager) compareRows(row1, row2 []string) bool { // if len(row1) != len(row2) { // return false // } // for i := range row1 { // if row1[i] != row2[i] { // return false // } // } // return true //} // //// 追加行到文件末尾 //func (mgr *CSVManager) appendRowsToFile(handle *CSVHandle, rows [][]string) error { // // 移动到文件末尾 // if _, err := handle.File.Seek(0, 2); err != nil { // return fmt.Errorf("移动到文件末尾失败: %w", err) // } // // // 批量写入行数据 // for _, row := range rows { // if err := handle.CSVWriter.Write(row); err != nil { // return fmt.Errorf("写入行失败: %w", err) // } // } // // // 刷新缓冲区 // handle.CSVWriter.Flush() // if handle.writeBuffer != nil { // if err := handle.writeBuffer.Flush(); err != nil { // return fmt.Errorf("刷新写入缓冲区失败: %w", err) // } // } // // // 更新行数统计 // handle.TotalRows += int64(len(rows)) // handle.cachedRowCount = handle.TotalRows // handle.rowCountCached = true // // return nil //} // //// 合并所有行(用于覆盖模式) //func (mgr *CSVManager) mergeAllRows(dstRows, srcRows [][]string, dstHandle, srcHandle *CSVHandle, options MergeOptions) [][]string { // var result [][]string // // // 处理表头 // if dstHandle.HasHeader && len(dstRows) > 0 { // // 保留目标文件的表头 // result = append(result, dstRows[0]) // dstRows = dstRows[1:] // } // // // 根据冲突解决策略合并数据 // switch options.ConflictResolution { // case Overwrite: // // 覆盖模式:源数据完全替换目标数据 // result = append(result, srcRows...) // case Skip: // // 跳过模式:只保留目标数据中不冲突的行 // result = append(result, dstRows...) // for _, srcRow := range srcRows { // if !mgr.isDuplicateRow(result, srcRow) { // result = append(result, srcRow) // } // } // case KeepBoth: // // 保留两者:先写目标数据,再写源数据 // result = append(result, dstRows...) // result = append(result, srcRows...) // case UseSource: // // 使用源数据:源数据优先级更高 // result = append(result, srcRows...) // case UseTarget: // // 使用目标数据:目标数据优先级更高 // result = append(result, dstRows...) // default: // // 默认:追加模式 // result = append(result, dstRows...) // result = append(result, srcRows...) // } // // return result //} // //// MergeCSVFilesSimple 简单合并CSV文件(默认选项) //func (mgr *CSVManager) MergeCSVFilesSimple(srcHandleID, dstHandleID int64, appendMode bool) (int64, error) { // options := MergeOptions{ // AppendMode: appendMode, // SkipHeader: true, // 跳过源文件表头 // SkipDuplicates: false, // 不跳过重复行 // ConflictResolution: KeepBoth, // 保留两者 // } // return mgr.MergeCSVFiles(srcHandleID, dstHandleID, options) //} // //// ========================== C 导出函数 ========================== // //// OpenCSVFile 打开CSV文件 //// ////export OpenCSVFile //func OpenCSVFile(filename *C.char, delimiter C.char, hasHeader C.int) *C.char { // filenameStr := C.GoString(filename) // delimiterStr := rune(delimiter) // handleID, err := GetManager().OpenCSVFile(filenameStr, delimiterStr, hasHeader != 0) // response := struct { // HandleID int64 `json:"handleID"` // }{ // HandleID: handleID, // } // var csvResponse CSVResponse // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Data: response, // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// WriteHeader 写入表头 //// ////export WriteHeader //func WriteHeader(handleID C.int, header *C.char) *C.char { // var csvResponse CSVResponse // goHandleID := int64(handleID) // goHeader := C.GoString(header) // var data []string // err := json.Unmarshal([]byte(goHeader), &data) // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: fmt.Sprintf("header JSON解析失败: %v", err), // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) // } // errs := GetManager().WriteHeader(goHandleID, data) // if errs != nil { // csvResponse = CSVResponse{ // Success: false, // Message: errs.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// WriteRows 写入/覆盖行数据 //// ////export WriteRows //func WriteRows(handleID C.int, rowsData *C.char) *C.char { // var csvResponse CSVResponse // goHandleID := int64(handleID) // goRowsData := C.GoString(rowsData) // // json解析 // var data [][]string // err := json.Unmarshal([]byte(goRowsData), &data) // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: fmt.Sprintf("rowsData JSON解析失败: %v", err), // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) // } // // 写入数据 // rows, err := GetManager().AppendRows(goHandleID, data) // response := struct { // TotalRows int64 `json:"totalRows"` // }{ // TotalRows: rows, // } // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Data: response, // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// WriteRowsNum 写入/覆盖行数据 //// ////export WriteRowsNum //func WriteRowsNum(handleID C.int, rowsData *C.char) *C.char { // var csvResponse CSVResponse // goHandleID := int64(handleID) // goRowsData := C.GoString(rowsData) // // json解析 // var data [][]string // err := json.Unmarshal([]byte(goRowsData), &data) // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: fmt.Sprintf("rowsData JSON解析失败: %v", err), // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) // } // // 写入数据 // rowsNum, err := GetManager().AppendRowsNum(goHandleID, data) // response := struct { // RowsNum []int64 `json:"rowsNum"` // }{ // RowsNum: rowsNum, // } // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Data: response, // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// UpdateRow C导出函数 - 修改指定行数据 //// ////export UpdateRow //func UpdateRow(handleID C.int, rowNumber C.int, rowData *C.char) *C.char { // var csvResponse CSVResponse // goHandleID := int64(handleID) // goRowNumber := int64(rowNumber) // goRowData := C.GoString(rowData) // // // JSON解析行数据 // var data []string // err := json.Unmarshal([]byte(goRowData), &data) // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: fmt.Sprintf("rowsData JSON解析失败: %v", err), // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) // } // // 修改行数据 // err = GetManager().UpdateRow(goHandleID, goRowNumber, data) // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Message: fmt.Sprintf("成功修改第%d行数据", goRowNumber), // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// GetRow C导出函数 - 获取指定行数据 //// ////export GetRow //func GetRow(handleID C.int, rowNumber C.int) *C.char { // goHandleID := int64(handleID) // goRowNumber := int64(rowNumber) // // // 获取行数据 // rowData, err := GetManager().GetRow(goHandleID, goRowNumber) // var csvResponse CSVResponse // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Data: rowData, // } // } // // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// MergeCSVFilesSimple 合并两个csv文件 //// ////export MergeCSVFilesSimple //func MergeCSVFilesSimple(srcHandleID, dstHandleID, appendMode C.int) *C.char { // var csvResponse CSVResponse // goSrcHandleID := int64(srcHandleID) // goDstHandleID := int64(dstHandleID) // goAppendMode := int(appendMode) // rows, err := GetManager().MergeCSVFilesSimple(goSrcHandleID, goDstHandleID, goAppendMode == 0) // response := struct { // TotalRows int64 `json:"totalRows"` // }{ // TotalRows: rows, // } // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Data: response, // } // } // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// CloseHandles 关闭指定句柄(使用优雅关闭) //// ////export CloseHandles //func CloseHandles(handleID C.int) *C.char { // goHandleID := int64(handleID) // // // 使用优雅关闭 // err := GetManager().CloseHandle(goHandleID) // // var csvResponse CSVResponse // if err != nil { // csvResponse = CSVResponse{ // Success: false, // Message: err.Error(), // } // } else { // csvResponse = CSVResponse{ // Success: true, // Message: fmt.Sprintf("成功关闭句柄 %d", goHandleID), // } // } // // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// CloseAllHandles 关闭所有句柄 //// ////export CloseAllHandles //func CloseAllHandles() *C.char { // GetManager().closeAllHandles() // // csvResponse := CSVResponse{ // Success: true, // Message: "成功关闭所有句柄", // } // // csvResponseStr, _ := json.Marshal(csvResponse) // return C.CString(string(csvResponseStr)) //} // //// FreeCString 释放C字符串内存 //// ////export FreeCString //func FreeCString(str *C.char) { // C.free(unsafe.Pointer(str)) //} // //// 主函数 ////func main() { ////}