daShangDao_kfzgw-info/csv/csv.go
97694732@qq.com ac2a39742d 各种修改
2026-06-11 13:21:55 +08:00

2293 lines
63 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
///*
//#include <stdlib.h>
//*/
//import "C"
//import (
// "bufio"
// "encoding/csv"
// "encoding/json"
// "fmt"
// "io"
// "os"
// "path/filepath"
// "sync"
// "sync/atomic"
// "time"
// "unsafe"
//)
//
//// ========================== 数据结构定义 ==========================
//
//// CSVHandle CSV文件句柄
//type CSVHandle struct {
// ID int64 // 句柄唯一ID
// Filename string // 文件名
// Delimiter rune // 分隔符
// HasHeader bool // 是否有表头
// Header []string // 表头(如果有)
// File *os.File // 底层文件句柄
// CSVReader *csv.Reader // CSV阅读器
// CSVWriter *csv.Writer // CSV写入器
// TotalRows int64 // 总行数(如果已计算)
// IsOpen bool // 是否已打开
// OpenTime time.Time // 打开时间
// AccessTime time.Time // 最后访问时间
// AccessCount int64 // 访问计数
// mu sync.RWMutex // 读写锁(保护数据结构)
// autoCloseTimer *time.Timer // 自动关闭计时器
// cachedRowCount int64 // 缓存的行数(避免重复计算)
// rowCountCached bool // 行数是否已缓存
// writeBuffer *bufio.Writer // 写入缓冲区
//
// // 新增:引用计数和状态管理
// refCount int64 // 引用计数
// refCountMu sync.RWMutex // 引用计数锁
// closing bool // 正在关闭中
// closed bool // 已关闭
// statusMu sync.RWMutex // 状态锁
// activeOps int64 // 正在进行的操作数
//}
//
//// CSVManager CSV文件管理器
//type CSVManager struct {
// handles sync.Map // map[int64]*CSVHandle
// fileLocks sync.Map // map[string]*sync.RWMutex 文件级锁
// nextHandle int64 // 下一个句柄ID
// maxOpen int // 最大打开文件数
// semaphore chan struct{} // 信号量控制并发打开
// config ManagerConfig // 管理器配置
//}
//
//// ManagerConfig 管理器配置
//type ManagerConfig struct {
// MaxOpenFiles int // 最大打开文件数
// AutoCloseTimeout time.Duration // 自动关闭超时
// BufferSize int // 缓冲区大小
// UseMMap bool // 是否使用内存映射(大文件)
// MMapThreshold int64 // 使用内存映射的阈值(字节)
//}
//
//// MergeOptions 合并选项
//type MergeOptions struct {
// AppendMode bool // true=追加模式false=覆盖模式
// SkipHeader bool // 是否跳过源文件的表头
// SkipDuplicates bool // 是否跳过重复行
// ColumnMapping map[int]int // 列映射:源文件列索引 -> 目标文件列索引
// ConflictResolution ConflictStrategy // 冲突解决策略
// TransformFunc func([]string) ([]string, error) // 数据转换函数
//}
//
//// ConflictStrategy 冲突解决策略
//type ConflictStrategy int
//
//const (
// Overwrite ConflictStrategy = iota // 覆盖目标文件的数据
// Skip // 跳过冲突的行
// KeepBoth // 保留两者
// UseSource // 使用源数据
// UseTarget // 使用目标数据
//)
//
//// CSVResponse CSV响应结构体
//type CSVResponse struct {
// Success bool `json:"success"`
// Message string `json:"message,omitempty"`
// Data interface{} `json:"data,omitempty"`
//}
//
//// DefaultConfig 默认配置
//var DefaultConfig = ManagerConfig{
// MaxOpenFiles: 100,
// AutoCloseTimeout: 5 * time.Minute,
// BufferSize: 32 * 1024, // 32KB
// UseMMap: true,
// MMapThreshold: 10 * 1024 * 1024, // 10MB
//}
//
//// 全局管理器实例
//var (
// globalManager *CSVManager
// managerInitOnce sync.Once
//)
//
//// ========================== 辅助函数 ==========================
//
//func min(a, b int) int {
// if a < b {
// return a
// }
// return b
//}
//
//// ========================== GetManager 获取全局管理器 ==========================
//
//func GetManager() *CSVManager {
// managerInitOnce.Do(func() {
// globalManager = NewCSVManager(DefaultConfig)
// })
// return globalManager
//}
//
//// ========================== NewCSVManager 创建新的CSV管理器 ==========================
//
//func NewCSVManager(config ManagerConfig) *CSVManager {
// if config.MaxOpenFiles <= 0 {
// config.MaxOpenFiles = DefaultConfig.MaxOpenFiles
// }
// return &CSVManager{
// handles: sync.Map{},
// fileLocks: sync.Map{},
// nextHandle: 1,
// maxOpen: config.MaxOpenFiles,
// semaphore: make(chan struct{}, config.MaxOpenFiles),
// config: config,
// }
//}
//
//// ========================== CSVHandle 引用计数方法 ==========================
//
//// addRef 增加引用计数
//func (h *CSVHandle) addRef() {
// atomic.AddInt64(&h.refCount, 1)
// h.AccessTime = time.Now()
// atomic.AddInt64(&h.AccessCount, 1)
//}
//
//// releaseRef 减少引用计数如果为0则返回true表示可以关闭
//func (h *CSVHandle) releaseRef() bool {
// h.refCountMu.Lock()
// defer h.refCountMu.Unlock()
//
// oldCount := atomic.AddInt64(&h.refCount, -1)
// return oldCount <= 0
//}
//
//// getRefCount 获取引用计数
//func (h *CSVHandle) getRefCount() int64 {
// return atomic.LoadInt64(&h.refCount)
//}
//
//// isValid 检查句柄是否有效
//func (h *CSVHandle) isValid() bool {
// h.statusMu.RLock()
// defer h.statusMu.RUnlock()
// return !h.closing && !h.closed && h.IsOpen
//}
//
//// markClosing 标记为正在关闭
//func (h *CSVHandle) markClosing() {
// h.statusMu.Lock()
// defer h.statusMu.Unlock()
// h.closing = true
//}
//
//// markClosed 标记为已关闭
//func (h *CSVHandle) markClosed() {
// h.statusMu.Lock()
// defer h.statusMu.Unlock()
// h.closing = false
// h.closed = true
// h.IsOpen = false
//}
//
//// beginOperation 开始一个操作
//func (h *CSVHandle) beginOperation() bool {
// h.statusMu.RLock()
// if h.closing || h.closed || !h.IsOpen {
// h.statusMu.RUnlock()
// return false
// }
// atomic.AddInt64(&h.activeOps, 1)
// h.statusMu.RUnlock()
// return true
//}
//
//// endOperation 结束一个操作
//func (h *CSVHandle) endOperation() {
// atomic.AddInt64(&h.activeOps, -1)
//}
//
//// waitForActiveOps 等待所有活动操作完成
//func (h *CSVHandle) waitForActiveOps(timeout time.Duration) bool {
// start := time.Now()
// for atomic.LoadInt64(&h.activeOps) > 0 {
// if time.Since(start) > timeout {
// return false
// }
// time.Sleep(10 * time.Millisecond)
// }
// return true
//}
//
//// ========================== CSVHandle 文件操作方法 ==========================
//
//// close 关闭CSV句柄内部方法
//func (h *CSVHandle) close() error {
// h.mu.Lock()
// defer h.mu.Unlock()
//
// if !h.IsOpen {
// return nil
// }
//
// // 停止自动关闭计时器
// if h.autoCloseTimer != nil {
// h.autoCloseTimer.Stop()
// h.autoCloseTimer = nil
// }
//
// // 确保缓冲区数据写入文件
// if h.CSVWriter != nil {
// h.CSVWriter.Flush()
// }
// if h.writeBuffer != nil {
// h.writeBuffer.Flush()
// }
//
// // 关闭文件
// if h.File != nil {
// if err := h.File.Close(); err != nil {
// return err
// }
// h.File = nil
// }
//
// h.CSVReader = nil
// h.CSVWriter = nil
// h.writeBuffer = nil
// h.IsOpen = false
// h.markClosed()
//
// return nil
//}
//
//// getHeader 获取表头
//func (h *CSVHandle) getHeader() []string {
// h.mu.RLock()
// defer h.mu.RUnlock()
// return h.Header
//}
//
//// calculateTotalRows 计算CSV文件的总行数
//func (h *CSVHandle) calculateTotalRows() (int64, error) {
// h.mu.Lock()
// defer h.mu.Unlock()
//
// // 如果已缓存,直接返回
// if h.rowCountCached {
// return h.cachedRowCount, nil
// }
//
// if !h.IsOpen {
// return 0, fmt.Errorf("文件未打开")
// }
//
// // 保存当前文件位置
// currentPos, err := h.File.Seek(0, 1) // 当前位置
// if err != nil {
// return 0, fmt.Errorf("获取当前文件位置失败: %w", err)
// }
//
// // 重置到文件开始
// if _, err := h.File.Seek(0, 0); err != nil {
// return 0, fmt.Errorf("重置文件指针失败: %w", err)
// }
//
// // 重置CSV阅读器
// reader := csv.NewReader(bufio.NewReader(h.File))
// reader.Comma = h.Delimiter
// reader.LazyQuotes = true
// reader.TrimLeadingSpace = true
// reader.FieldsPerRecord = -1 // 允许字段数量可变(不检查每行字段数)
//
// // 统计行数
// var rowCount int64 = 0
// for {
// _, err := reader.Read()
// if err != nil {
// if err == io.EOF {
// break
// }
// // 恢复文件位置
// h.File.Seek(currentPos, 0)
// return 0, fmt.Errorf("读取行失败: %w", err)
// }
// rowCount++
// }
//
// // 恢复文件位置
// if _, err := h.File.Seek(currentPos, 0); err != nil {
// return 0, fmt.Errorf("恢复文件位置失败: %w", err)
// }
//
// // 更新缓存
// h.cachedRowCount = rowCount
// h.rowCountCached = true
// h.TotalRows = rowCount
//
// // 如果有表头,需要减去表头行
// if h.HasHeader && rowCount > 0 {
// h.cachedRowCount = rowCount - 1
// h.TotalRows = rowCount - 1
// }
//
// return h.cachedRowCount, nil
//}
//
//// ========================== CSVManager 文件操作方法 ==========================
//
//// getFileLock 获取或创建文件锁
//func (mgr *CSVManager) getFileLock(filename string) *sync.RWMutex {
// lock, _ := mgr.fileLocks.LoadOrStore(filename, &sync.RWMutex{})
// return lock.(*sync.RWMutex)
//}
//
//// OpenCSVFile 打开CSV文件并返回句柄线程安全版本
//func (mgr *CSVManager) OpenCSVFile(filename string, delimiter rune, hasHeader bool) (int64, error) {
// // 获取文件锁
// fileLock := mgr.getFileLock(filename)
// fileLock.Lock()
// defer fileLock.Unlock()
//
// // 首先检查文件是否已经有打开的句柄
// if existingHandleID := mgr.findHandleByFilename(filename); existingHandleID != -1 {
// if handle, err := mgr.getHandle(existingHandleID); err == nil {
// handle.addRef()
// return existingHandleID, nil
// }
// }
//
// // 检查文件是否存在,如果不存在则创建
// fileInfo, err := os.Stat(filename)
// fileExists := true
// if os.IsNotExist(err) {
// fileExists = false
// // 创建文件
// file, createErr := os.Create(filename)
// if createErr != nil {
// return -1, fmt.Errorf("创建文件失败: %w", createErr)
// }
// file.Close() // 关闭文件,后续会重新打开
//
// // 重新获取文件信息
// fileInfo, err = os.Stat(filename)
// if err != nil {
// return -1, fmt.Errorf("获取文件信息失败: %w", err)
// }
// } else if err != nil {
// // 其他错误
// return -1, fmt.Errorf("检查文件状态失败: %w", err)
// }
//
// // 如果是新创建的空文件,需要特殊处理
// if !fileExists && fileInfo.Size() == 0 {
// return mgr.createEmptyCSVHandle(filename, delimiter, hasHeader)
// }
//
// // 限制并发打开文件数
// mgr.semaphore <- struct{}{}
// defer func() { <-mgr.semaphore }()
//
// // 生成唯一句柄ID
// handleID := atomic.AddInt64(&mgr.nextHandle, 1)
//
// // 创建CSV句柄
// csvHandle := &CSVHandle{
// ID: handleID,
// Filename: filename,
// Delimiter: delimiter,
// HasHeader: hasHeader,
// OpenTime: time.Now(),
// AccessTime: time.Now(),
// AccessCount: 1,
// refCount: 1, // 初始引用计数为1
// }
//
// // 打开文件
// if err := mgr.openFile(csvHandle); err != nil {
// return -1, fmt.Errorf("打开文件失败: %w", err)
// }
//
// // 如果是新创建的文件,可能需要写入表头
// if !fileExists && hasHeader {
// // 创建空表头,用户后续可以写入实际表头
// csvHandle.Header = []string{}
// } else if hasHeader && fileInfo.Size() > 0 {
// // 读取现有文件的表头
// if err := mgr.readHeader(csvHandle); err != nil {
// csvHandle.close()
// return -1, fmt.Errorf("读取表头失败: %w", err)
// }
// }
//
// // 如果有数据行,计算总行数
// if fileInfo.Size() > 0 {
// // 计算总行数
// rows, err := csvHandle.calculateTotalRows()
// if err != nil {
// csvHandle.close()
// return -1, fmt.Errorf("计算总行数失败: %w", err)
// }
// csvHandle.TotalRows = rows
// } else {
// csvHandle.TotalRows = 0
// csvHandle.cachedRowCount = 0
// csvHandle.rowCountCached = true
// }
//
// // 注册到管理器
// mgr.handles.Store(handleID, csvHandle)
//
// // 启动自动关闭计时器
// if mgr.config.AutoCloseTimeout > 0 {
// csvHandle.startAutoClose(mgr.config.AutoCloseTimeout, mgr)
// }
//
// return handleID, nil
//}
//
//// findHandleByFilename 根据文件名查找已存在的句柄ID
//func (mgr *CSVManager) findHandleByFilename(filename string) int64 {
// var existingHandleID int64 = -1
//
// // 遍历所有句柄,查找相同文件名的句柄
// mgr.handles.Range(func(key, value interface{}) bool {
// handle := value.(*CSVHandle)
//
// // 检查文件名是否相同
// if handle.Filename == filename {
// // 检查句柄是否有效
// if handle.isValid() {
// existingHandleID = handle.ID
// return false // 停止遍历
// }
// }
// return true // 继续遍历
// })
//
// return existingHandleID
//}
//
//// createEmptyCSVHandle 创建空的CSV文件句柄
//func (mgr *CSVManager) createEmptyCSVHandle(filename string, delimiter rune, hasHeader bool) (int64, error) {
// // 限制并发打开文件数
// mgr.semaphore <- struct{}{}
// defer func() { <-mgr.semaphore }()
//
// // 生成唯一句柄ID
// handleID := atomic.AddInt64(&mgr.nextHandle, 1)
//
// // 创建CSV句柄
// csvHandle := &CSVHandle{
// ID: handleID,
// Filename: filename,
// Delimiter: delimiter,
// HasHeader: hasHeader,
// OpenTime: time.Now(),
// AccessTime: time.Now(),
// TotalRows: 0,
// AccessCount: 1,
// refCount: 1, // 初始引用计数为1
// }
//
// // 以读写模式打开文件
// file, err := os.OpenFile(filename, os.O_RDWR|os.O_CREATE, 0755)
// if err != nil {
// return -1, fmt.Errorf("打开文件失败: %w", err)
// }
//
// // 创建CSV写入器
// writer := csv.NewWriter(file)
// writer.Comma = delimiter
//
// // 如果是新创建的文件且有表头,写入空表头占位
// if hasHeader {
// // 写入空表头(用户后续需要设置实际表头)
// if err := writer.Write([]string{}); err != nil {
// file.Close()
// return -1, fmt.Errorf("写入空表头失败: %w", err)
// }
// writer.Flush()
//
// // 重置到文件开始位置
// if _, err := file.Seek(0, 0); err != nil {
// file.Close()
// return -1, fmt.Errorf("重置文件指针失败: %w", err)
// }
// }
//
// // 创建CSV阅读器
// reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize))
// reader.Comma = delimiter
// reader.LazyQuotes = true
// reader.TrimLeadingSpace = true
//
// // 创建写入缓冲区
// writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize)
//
// csvHandle.File = file
// csvHandle.CSVReader = reader
// csvHandle.CSVWriter = csv.NewWriter(writeBuffer)
// csvHandle.CSVWriter.Comma = delimiter
// csvHandle.writeBuffer = writeBuffer
// csvHandle.IsOpen = true
// csvHandle.AccessTime = time.Now()
//
// // 如果有表头,初始化空表头数组
// if hasHeader {
// csvHandle.Header = []string{}
// }
//
// // 注册到管理器
// mgr.handles.Store(handleID, csvHandle)
//
// // 启动自动关闭计时器
// if mgr.config.AutoCloseTimeout > 0 {
// csvHandle.startAutoClose(mgr.config.AutoCloseTimeout, mgr)
// }
//
// return handleID, nil
//}
//
//// getHandle 获取句柄对象(增加引用计数)
//func (mgr *CSVManager) getHandle(handleID int64) (*CSVHandle, error) {
// value, ok := mgr.handles.Load(handleID)
// if !ok {
// return nil, fmt.Errorf("句柄不存在: %d", handleID)
// }
//
// handle := value.(*CSVHandle)
//
// // 检查句柄是否有效
// if !handle.isValid() {
// return nil, fmt.Errorf("句柄无效: %d", handleID)
// }
//
// // 增加引用计数
// handle.addRef()
//
// // 确保文件已打开
// handle.mu.RLock()
// isOpen := handle.IsOpen
// handle.mu.RUnlock()
//
// if !isOpen {
// // 获取文件锁
// fileLock := mgr.getFileLock(handle.Filename)
// fileLock.Lock()
// defer fileLock.Unlock()
//
// // 重新检查,防止其他协程已经重新打开了
// handle.mu.RLock()
// isOpen = handle.IsOpen
// handle.mu.RUnlock()
//
// if !isOpen {
// if err := mgr.openFile(handle); err != nil {
// // 恢复引用计数
// handle.releaseRef()
// return nil, fmt.Errorf("重新打开文件失败: %w", err)
// }
// }
// }
//
// return handle, nil
//}
//
//// releaseHandle 释放句柄引用
//func (mgr *CSVManager) releaseHandle(handleID int64) {
// value, ok := mgr.handles.Load(handleID)
// if !ok {
// return
// }
//
// handle := value.(*CSVHandle)
// // 减少引用计数如果为0则真正关闭
// if handle.releaseRef() {
// mgr.closeHandleInternal(handleID, handle)
// }
//}
//
//// ========================== 句柄管理方法 ==========================
//
//// closeHandleInternal 内部关闭句柄方法
//func (mgr *CSVManager) closeHandleInternal(handleID int64, handle *CSVHandle) {
// // 获取文件锁
// fileLock := mgr.getFileLock(handle.Filename)
// fileLock.Lock()
// defer fileLock.Unlock()
//
// // 再次检查引用计数,防止在获取锁期间有新的引用
// if handle.getRefCount() > 0 {
// return
// }
//
// // 标记为正在关闭
// handle.markClosing()
//
// // 等待所有活动操作完成
// if !handle.waitForActiveOps(30 * time.Second) {
// // 超时,强制关闭
// fmt.Printf("警告:句柄 %d 关闭超时,强制关闭\n", handleID)
// }
//
// // 真正关闭文件
// handle.close()
//
// // 从管理器移除
// mgr.handles.Delete(handleID)
//}
//
//// CloseHandle 关闭指定句柄(外部调用)
//func (mgr *CSVManager) CloseHandle(handleID int64) error {
// value, ok := mgr.handles.Load(handleID)
// if !ok {
// return fmt.Errorf("句柄不存在: %d", handleID)
// }
//
// handle := value.(*CSVHandle)
//
// // 减少引用计数如果为0则真正关闭
// if handle.releaseRef() {
// mgr.closeHandleInternal(handleID, handle)
// }
//
// return nil
//}
//
//// GracefulCloseHandle 优雅关闭句柄(等待所有操作完成)
//func (mgr *CSVManager) GracefulCloseHandle(handleID int64, timeout time.Duration) error {
// value, ok := mgr.handles.Load(handleID)
// if !ok {
// return fmt.Errorf("句柄不存在: %d", handleID)
// }
//
// handle := value.(*CSVHandle)
//
// // 标记为正在关闭,阻止新操作
// handle.markClosing()
//
// // 等待现有操作完成
// done := make(chan bool, 1)
// go func() {
// // 等待引用计数降为1只剩当前引用
// for handle.getRefCount() > 1 {
// time.Sleep(100 * time.Millisecond)
// }
// done <- true
// }()
//
// // 设置超时
// select {
// case <-done:
// // 真正关闭
// return mgr.ForceCloseHandle(handleID)
// case <-time.After(timeout):
// return fmt.Errorf("关闭句柄超时,仍有 %d 个引用", handle.getRefCount())
// }
//}
//
//// ForceCloseHandle 强制关闭句柄(无论引用计数如何)
//func (mgr *CSVManager) ForceCloseHandle(handleID int64) error {
// value, ok := mgr.handles.Load(handleID)
// if !ok {
// return fmt.Errorf("句柄不存在: %d", handleID)
// }
//
// handle := value.(*CSVHandle)
//
// // 强制设置引用计数为0
// handle.refCountMu.Lock()
// atomic.StoreInt64(&handle.refCount, 0)
// handle.refCountMu.Unlock()
//
// // 获取文件锁
// fileLock := mgr.getFileLock(handle.Filename)
// fileLock.Lock()
// defer fileLock.Unlock()
//
// // 标记为正在关闭
// handle.markClosing()
//
// // 等待所有活动操作完成
// handle.waitForActiveOps(5 * time.Second)
//
// // 真正关闭文件
// handle.close()
// mgr.handles.Delete(handle.ID)
//
// return nil
//}
//
//// closeAllHandles 关闭所有句柄
//func (mgr *CSVManager) closeAllHandles() {
// // 收集所有句柄ID
// var handleIDs []int64
// mgr.handles.Range(func(key, value interface{}) bool {
// handleIDs = append(handleIDs, key.(int64))
// return true
// })
//
// // 关闭每个句柄
// for _, handleID := range handleIDs {
// mgr.ForceCloseHandle(handleID)
// }
//}
//
//// ========================== 文件操作方法 ==========================
//
//// openFile 打开文件
//func (mgr *CSVManager) openFile(handle *CSVHandle) error {
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if handle.IsOpen {
// return nil
// }
//
// // 获取文件大小
// fileInfo, err := os.Stat(handle.Filename)
// if err != nil {
// return err
// }
//
// fileSize := fileInfo.Size()
//
// // 根据文件大小选择打开策略
// if mgr.config.UseMMap && fileSize > mgr.config.MMapThreshold {
// return mgr.openFileWithMMap(handle, fileSize)
// }
//
// return mgr.openFileNormal(handle)
//}
//
//// 正常打开文件
//func (mgr *CSVManager) openFileNormal(handle *CSVHandle) error {
// // 以读写模式打开文件
// file, err := os.OpenFile(handle.Filename, os.O_RDWR, 0755)
// if err != nil {
// return err
// }
//
// // 创建带缓冲的CSV阅读器
// reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize))
// reader.Comma = handle.Delimiter
// reader.LazyQuotes = true
// reader.TrimLeadingSpace = true
//
// // 创建写入缓冲区
// writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize)
//
// handle.File = file
// handle.CSVReader = reader
// handle.CSVWriter = csv.NewWriter(writeBuffer)
// handle.CSVWriter.Comma = handle.Delimiter
// handle.writeBuffer = writeBuffer
// handle.IsOpen = true
// handle.AccessTime = time.Now()
//
// return nil
//}
//
//// 使用内存映射打开大文件
//func (mgr *CSVManager) openFileWithMMap(handle *CSVHandle, fileSize int64) error {
// // 对于大文件,使用只读模式打开,写入需要特殊处理
// file, err := os.OpenFile(handle.Filename, os.O_RDWR, 0755)
// if err != nil {
// return err
// }
//
// // 对于大文件,我们可以先只打开,按需读取
// reader := csv.NewReader(bufio.NewReaderSize(file, mgr.config.BufferSize))
// reader.Comma = handle.Delimiter
// reader.LazyQuotes = true
//
// // 创建写入缓冲区
// writeBuffer := bufio.NewWriterSize(file, mgr.config.BufferSize)
//
// handle.File = file
// handle.CSVReader = reader
// handle.CSVWriter = csv.NewWriter(writeBuffer)
// handle.CSVWriter.Comma = handle.Delimiter
// handle.writeBuffer = writeBuffer
// handle.IsOpen = true
// handle.AccessTime = time.Now()
//
// return nil
//}
//
//// 读取CSV表头
//func (mgr *CSVManager) readHeader(handle *CSVHandle) error {
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return fmt.Errorf("文件未打开")
// }
//
// // 确保文件指针在开头
// if _, err := handle.File.Seek(0, 0); err != nil {
// return err
// }
//
// // 重置CSV阅读器
// handle.CSVReader = csv.NewReader(bufio.NewReader(handle.File))
// handle.CSVReader.Comma = handle.Delimiter
//
// // 读取表头
// header, err := handle.CSVReader.Read()
// if err != nil {
// return err
// }
//
// handle.Header = header
// return nil
//}
//
//// startAutoClose 启动自动关闭计时器
//func (h *CSVHandle) startAutoClose(timeout time.Duration, mgr *CSVManager) {
// h.autoCloseTimer = time.AfterFunc(timeout, func() {
// h.mu.Lock()
// defer h.mu.Unlock()
//
// // 检查是否长时间未访问
// if h.IsOpen && time.Since(h.AccessTime) > timeout {
// // 检查引用计数
// if h.getRefCount() == 0 {
// h.close()
// // 从管理器移除
// mgr.handles.Delete(h.ID)
// }
// }
// })
//}
//
//// ========================== 写入功能 ==========================
//
//// WriteHeader 写入CSV文件表头
//func (mgr *CSVManager) WriteHeader(handleID int64, header []string) error {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return fmt.Errorf("文件未打开")
// }
//
// // 检查文件是否已经有内容
// if handle.TotalRows > 0 {
// return fmt.Errorf("文件已有数据,无法修改表头")
// }
//
// // 移动到文件开头
// if _, err := handle.File.Seek(0, 0); err != nil {
// return fmt.Errorf("移动文件指针失败: %w", err)
// }
//
// // 清空文件内容
// if err := handle.File.Truncate(0); err != nil {
// return fmt.Errorf("清空文件失败: %w", err)
// }
//
// // 写入表头
// if err := handle.CSVWriter.Write(header); err != nil {
// return fmt.Errorf("写入表头失败: %w", err)
// }
//
// handle.CSVWriter.Flush()
// if handle.writeBuffer != nil {
// handle.writeBuffer.Flush()
// }
//
// // 更新句柄状态
// handle.Header = header
// handle.HasHeader = true
//
// return nil
//}
//
//// WriteRow 写入单行数据到CSV文件
//func (mgr *CSVManager) WriteRow(handleID int64, row []string) error {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return fmt.Errorf("文件未打开")
// }
//
// // 检查是否有表头且表头长度是否匹配
// if handle.HasHeader && len(handle.Header) > 0 && len(row) != len(handle.Header) {
// return fmt.Errorf("行数据列数(%d)与表头列数(%d)不匹配", len(row), len(handle.Header))
// }
//
// // 移动到文件末尾
// if _, err := handle.File.Seek(0, 2); err != nil {
// return fmt.Errorf("移动到文件末尾失败: %w", err)
// }
//
// // 写入行数据
// if err := handle.CSVWriter.Write(row); err != nil {
// return fmt.Errorf("写入行失败: %w", err)
// }
//
// // 更新行数统计
// handle.TotalRows++
// handle.cachedRowCount = handle.TotalRows
// handle.rowCountCached = true
//
// return nil
//}
//
//// WriteRows 批量写入多行数据到CSV文件
//func (mgr *CSVManager) WriteRows(handleID int64, rows [][]string) (int64, error) {
// // 首先记录日志
// if err := mgr.logWriteRows(handleID, rows); err != nil {
// // 日志记录失败不影响主流程,但可以打印警告
// fmt.Printf("警告:记录日志失败: %v\n", err)
// }
//
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return -1, fmt.Errorf("WriteRows 获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return -1, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return -1, fmt.Errorf("文件未打开")
// }
//
// // 检查是否有表头
// if handle.HasHeader && len(handle.Header) > 0 {
// // 验证每行的列数
// for i, row := range rows {
// if len(row) != len(handle.Header) {
// return -1, fmt.Errorf("第%d行列数(%d)与表头列数(%d)不匹配", i+1, len(row), len(handle.Header))
// }
// }
// }
//
// // 移动到文件末尾
// if _, err := handle.File.Seek(0, 2); err != nil {
// return -1, fmt.Errorf("移动到文件末尾失败: %w", err)
// }
//
// // 批量写入行数据
// for _, row := range rows {
// if err := handle.CSVWriter.Write(row); err != nil {
// return -1, fmt.Errorf("写入行失败: %w", err)
// }
// }
//
// // 更新行数统计
// handle.TotalRows += int64(len(rows))
// handle.cachedRowCount = handle.TotalRows
// handle.rowCountCached = true
//
// return handle.TotalRows, nil
//}
//
//// WriteRowsNum 批量写入多行数据到CSV文件
//// 返回每行数据存储的行号数组从1开始
//func (mgr *CSVManager) WriteRowsNum(handleID int64, rows [][]string) ([]int64, error) {
//
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return nil, fmt.Errorf("WriteRows 获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return nil, fmt.Errorf("文件未打开")
// }
//
// // 检查是否有表头
// if handle.HasHeader && len(handle.Header) > 0 {
// // 验证每行的列数
// for i, row := range rows {
// if len(row) != len(handle.Header) {
// return nil, fmt.Errorf("第%d行列数(%d)与表头列数(%d)不匹配", i+1, len(row), len(handle.Header))
// }
// }
// }
//
// // 移动到文件末尾
// if _, err := handle.File.Seek(0, 2); err != nil {
// return nil, fmt.Errorf("移动到文件末尾失败: %w", err)
// }
//
// // 计算起始行号 - 关键修正部分
// var startRow int64
//
// // 先获取当前文件的实际行数(包括表头)
// currentLineCount := handle.TotalRows
// if handle.HasHeader && currentLineCount == 0 {
// // 如果有表头但还没有数据行表头算第1行数据从第2行开始
// startRow = 2
// } else if handle.HasHeader {
// // 有表头且已有数据行表头是第1行TotalRows不包括表头
// // 所以下一行应该是 TotalRows + 2
// startRow = currentLineCount + 2
// } else {
// // 没有表头,直接累加
// startRow = currentLineCount + 1
// }
//
// // 创建行号数组
// rowNumbers := make([]int64, len(rows))
// for i := 0; i < len(rows); i++ {
// rowNumbers[i] = startRow + int64(i)
// }
//
// // 批量写入行数据
// for _, row := range rows {
// if err := handle.CSVWriter.Write(row); err != nil {
// return nil, fmt.Errorf("写入行失败: %w", err)
// }
// }
//
// // 更新行数统计
// handle.TotalRows += int64(len(rows))
// handle.cachedRowCount = handle.TotalRows
// handle.rowCountCached = true
//
// return rowNumbers, nil
//}
//
//// logWriteRows 记录行数据到日志文件
//func (mgr *CSVManager) logWriteRows(handleID int64, rows [][]string) error {
// // 获取句柄信息(但不锁定,因为我们只是读取元数据)
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("获取句柄信息失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 创建日志目录
// logDir := filepath.Join(filepath.Dir("csv"), "logs")
// if err := os.MkdirAll(logDir, 0755); err != nil {
// return fmt.Errorf("创建日志目录失败: %w", err)
// }
//
// // 生成日志文件名基于CSV文件名和时间
// baseName := filepath.Base("csv")
// logFileName := fmt.Sprintf("%s_%s_write.log",
// baseName[:len(baseName)-len(filepath.Ext(baseName))],
// time.Now().Format("20060102"))
// logFilePath := filepath.Join(logDir, logFileName)
//
// // 打开或创建日志文件
// logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
// if err != nil {
// return fmt.Errorf("打开日志文件失败: %w", err)
// }
// defer logFile.Close()
//
// // 创建日志条目
// logEntry := map[string]interface{}{
// "timestamp": time.Now().Format("2006-01-02 15:04:05.000"),
// "handle_id": handleID,
// "filename": "csv",
// "operation": "write_rows",
// "row_count": len(rows),
// "total_rows": handle.TotalRows + int64(len(rows)), // 预计的总行数
// "data": rows,
// }
//
// // 序列化为JSON
// logData, err := json.Marshal(logEntry)
// if err != nil {
// return fmt.Errorf("序列化日志数据失败: %w", err)
// }
//
// // 写入日志每行一个JSON对象
// if _, err := logFile.Write(append(logData, '\n')); err != nil {
// return fmt.Errorf("写入日志文件失败: %w", err)
// }
//
// return nil
//}
//
//// logWriteRows 记录行数据到日志文件
//func (mgr *CSVManager) logWriteRowsNum(handleID int64, rows [][]string, rowsNum []int64) error {
// // 获取句柄信息(但不锁定,因为我们只是读取元数据)
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("获取句柄信息失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 创建日志目录
// logDir := filepath.Join(filepath.Dir("csv"), "logs")
// if err := os.MkdirAll(logDir, 0755); err != nil {
// return fmt.Errorf("创建日志目录失败: %w", err)
// }
//
// // 生成日志文件名基于CSV文件名和时间
// baseName := filepath.Base("csv")
// logFileName := fmt.Sprintf("%s_%s_write.log",
// baseName[:len(baseName)-len(filepath.Ext(baseName))],
// time.Now().Format("20060102"))
// logFilePath := filepath.Join(logDir, logFileName)
//
// // 打开或创建日志文件
// logFile, err := os.OpenFile(logFilePath, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
// if err != nil {
// return fmt.Errorf("打开日志文件失败: %w", err)
// }
// defer logFile.Close()
//
// // 创建日志条目
// logEntry := map[string]interface{}{
// "timestamp": time.Now().Format("2006-01-02 15:04:05.000"),
// "handle_id": handleID,
// "filename": "csv",
// "operation": "write_rows",
// "row_count": len(rows),
// "total_rows": handle.TotalRows + int64(len(rows)), // 预计的总行数
// "data": rows,
// "rowsNum": rowsNum,
// }
//
// // 序列化为JSON
// logData, err := json.Marshal(logEntry)
// if err != nil {
// return fmt.Errorf("序列化日志数据失败: %w", err)
// }
//
// // 写入日志每行一个JSON对象
// if _, err := logFile.Write(append(logData, '\n')); err != nil {
// return fmt.Errorf("写入日志文件失败: %w", err)
// }
//
// return nil
//}
//
//// Flush 将缓冲区数据写入文件
//func (mgr *CSVManager) Flush(handleID int64) error {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("Flush 获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return fmt.Errorf("文件未打开")
// }
//
// if handle.CSVWriter == nil {
// return fmt.Errorf("CSV写入器未初始化")
// }
//
// // 刷新缓冲区
// handle.CSVWriter.Flush()
// if handle.writeBuffer != nil {
// if err := handle.writeBuffer.Flush(); err != nil {
// return fmt.Errorf("刷新写入缓冲区失败: %w", err)
// }
// }
//
// return nil
//}
//
//// AppendRow 追加行数据自动Flush
//func (mgr *CSVManager) AppendRow(handleID int64, row []string) error {
// if err := mgr.WriteRow(handleID, row); err != nil {
// return err
// }
// return mgr.Flush(handleID)
//}
//
//// AppendRows 批量追加行数据自动Flush
//func (mgr *CSVManager) AppendRows(handleID int64, rows [][]string) (int64, error) {
// totalRows, err := mgr.WriteRows(handleID, rows)
// if err != nil {
// return -1, err
// }
// err = mgr.Flush(handleID)
// if err != nil {
// return -1, err
// }
// return totalRows, nil
//}
//
//// AppendRows 批量追加行数据自动Flush
//func (mgr *CSVManager) AppendRowsNum(handleID int64, rows [][]string) ([]int64, error) {
// rowsNum, err := mgr.WriteRowsNum(handleID, rows)
// if err != nil {
// return nil, err
// }
// err = mgr.Flush(handleID)
// if err != nil {
// return nil, err
// }
// // 首先记录日志
// if err := mgr.logWriteRowsNum(handleID, rows, rowsNum); err != nil {
// // 日志记录失败不影响主流程,但可以打印警告
// fmt.Printf("警告:记录日志失败: %v\n", err)
// }
// return rowsNum, nil
//}
//
//// GetHeader 获取CSV文件表头
//func (mgr *CSVManager) GetHeader(handleID int64) ([]string, error) {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return nil, fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.RLock()
// defer handle.mu.RUnlock()
//
// if !handle.HasHeader {
// return nil, fmt.Errorf("文件没有表头")
// }
//
// return handle.Header, nil
//}
//
//// ========================== 获取总行数 ==========================
//
//// GetTotalRows 快速获取总行数(如果已计算过则直接返回,否则重新计算)
//func (mgr *CSVManager) GetTotalRows(handleID int64) (int64, error) {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return 0, fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return 0, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.RLock()
// // 如果已缓存,直接返回缓存值
// if handle.rowCountCached {
// count := handle.cachedRowCount
// handle.mu.RUnlock()
// return count, nil
// }
// handle.mu.RUnlock()
//
// // 否则计算总行数
// return handle.calculateTotalRows()
//}
//
//// ========================== 修改指定行功能 ==========================
//
//// UpdateRow 修改指定行的数据
//func (mgr *CSVManager) UpdateRow(handleID int64, rowNumber int64, newData []string) error {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// //handle.mu.Lock()
// //defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return fmt.Errorf("文件未打开")
// }
//
// // 检查行号是否有效
// if rowNumber < 0 {
// return fmt.Errorf("行号不能为负数: %d", rowNumber)
// }
//
// // 获取总行数
// totalRows, err := handle.calculateTotalRows()
// if err != nil {
// return fmt.Errorf("获取总行数失败: %w", err)
// }
//
// if rowNumber >= totalRows {
// return fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows)
// }
//
// // 检查新数据列数是否匹配
// if handle.HasHeader && len(handle.Header) > 0 {
// if len(newData) != len(handle.Header) {
// return fmt.Errorf("新数据列数(%d)与表头列数(%d)不匹配", len(newData), len(handle.Header))
// }
// }
//
// // 读取整个文件到内存
// allRows, err := mgr.readAllRows(handle)
// if err != nil {
// return fmt.Errorf("读取文件失败: %w", err)
// }
//
// // 修改指定行的数据
// if handle.HasHeader && len(allRows) > 0 {
// // 如果有表头,第一行是表头,实际数据从第二行开始
// if rowNumber+1 >= int64(len(allRows)) {
// return fmt.Errorf("行号超出范围: %d, 总数据行数: %d", rowNumber, len(allRows)-1)
// }
// allRows[rowNumber+1] = newData
// } else {
// if rowNumber >= int64(len(allRows)) {
// return fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, len(allRows))
// }
// allRows[rowNumber] = newData
// }
//
// // 写回文件
// if err := mgr.writeAllRows(handle, allRows); err != nil {
// return fmt.Errorf("写回文件失败: %w", err)
// }
//
// // 更新缓存的行数
// handle.rowCountCached = false
//
// // 重新计算总行数
// if _, err := handle.calculateTotalRows(); err != nil {
// return fmt.Errorf("重新计算总行数失败: %w", err)
// }
//
// return nil
//}
//
//// UpdateRows 批量修改多行数据
//func (mgr *CSVManager) UpdateRows(handleID int64, updates map[int64][]string) (int, error) {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return 0, fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return 0, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// handle.mu.Lock()
// defer handle.mu.Unlock()
//
// if !handle.IsOpen {
// return 0, fmt.Errorf("文件未打开")
// }
//
// // 检查行号是否有效
// for rowNumber := range updates {
// if rowNumber < 0 {
// return 0, fmt.Errorf("行号不能为负数: %d", rowNumber)
// }
// }
//
// // 获取总行数
// totalRows, err := handle.calculateTotalRows()
// if err != nil {
// return 0, fmt.Errorf("获取总行数失败: %w", err)
// }
//
// // 检查新数据列数是否匹配
// if handle.HasHeader && len(handle.Header) > 0 {
// for rowNumber, newData := range updates {
// if rowNumber >= totalRows {
// return 0, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows)
// }
// if len(newData) != len(handle.Header) {
// return 0, fmt.Errorf("行%d的新数据列数(%d)与表头列数(%d)不匹配", rowNumber, len(newData), len(handle.Header))
// }
// }
// }
//
// // 读取整个文件到内存
// allRows, err := mgr.readAllRows(handle)
// if err != nil {
// return 0, fmt.Errorf("读取文件失败: %w", err)
// }
//
// // 修改指定行的数据
// updatedCount := 0
// for rowNumber, newData := range updates {
// if handle.HasHeader && len(allRows) > 0 {
// // 如果有表头,第一行是表头,实际数据从第二行开始
// if rowNumber+1 >= int64(len(allRows)) {
// return 0, fmt.Errorf("行号超出范围: %d, 总数据行数: %d", rowNumber, len(allRows)-1)
// }
// allRows[rowNumber+1] = newData
// updatedCount++
// } else {
// if rowNumber >= int64(len(allRows)) {
// return 0, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, len(allRows))
// }
// allRows[rowNumber] = newData
// updatedCount++
// }
// }
//
// // 写回文件
// if err := mgr.writeAllRows(handle, allRows); err != nil {
// return 0, fmt.Errorf("写回文件失败: %w", err)
// }
//
// // 更新缓存的行数
// handle.rowCountCached = false
//
// // 重新计算总行数
// if _, err := handle.calculateTotalRows(); err != nil {
// return 0, fmt.Errorf("重新计算总行数失败: %w", err)
// }
//
// return updatedCount, nil
//}
//
//// readAllRows 读取文件中的所有行
//func (mgr *CSVManager) readAllRows(handle *CSVHandle) ([][]string, error) {
// // 保存当前文件位置
// currentPos, err := handle.File.Seek(0, 1) // 当前位置
// if err != nil {
// return nil, fmt.Errorf("获取当前文件位置失败: %w", err)
// }
//
// // 重置到文件开始
// if _, err := handle.File.Seek(0, 0); err != nil {
// return nil, fmt.Errorf("重置文件指针失败: %w", err)
// }
//
// // 重置CSV阅读器
// reader := csv.NewReader(bufio.NewReader(handle.File))
// reader.Comma = handle.Delimiter
// reader.LazyQuotes = true
// reader.TrimLeadingSpace = true
//
// // 读取所有行
// var allRows [][]string
// for {
// row, err := reader.Read()
// if err != nil {
// if err == io.EOF {
// break
// }
// // 恢复文件位置
// handle.File.Seek(currentPos, 0)
// return nil, fmt.Errorf("读取行失败: %w", err)
// }
// allRows = append(allRows, row)
// }
//
// // 恢复文件位置
// if _, err := handle.File.Seek(currentPos, 0); err != nil {
// return nil, fmt.Errorf("恢复文件位置失败: %w", err)
// }
//
// return allRows, nil
//}
//
//// writeAllRows 将所有行写回文件
//func (mgr *CSVManager) writeAllRows(handle *CSVHandle, rows [][]string) error {
// // 清空文件
// if err := handle.File.Truncate(0); err != nil {
// return fmt.Errorf("清空文件失败: %w", err)
// }
//
// // 移动到文件开头
// if _, err := handle.File.Seek(0, 0); err != nil {
// return fmt.Errorf("移动文件指针失败: %w", err)
// }
//
// // 写入所有行
// for _, row := range rows {
// if err := handle.CSVWriter.Write(row); err != nil {
// return fmt.Errorf("写入行失败: %w", err)
// }
// }
//
// // 刷新缓冲区
// handle.CSVWriter.Flush()
// if handle.writeBuffer != nil {
// if err := handle.writeBuffer.Flush(); err != nil {
// return fmt.Errorf("刷新写入缓冲区失败: %w", err)
// }
// }
//
// return nil
//}
//
//// GetRow 获取指定行的数据
//func (mgr *CSVManager) GetRow(handleID int64, rowNumber int64) ([]string, error) {
// // 获取句柄
// handle, err := mgr.getHandle(handleID)
// if err != nil {
// return nil, fmt.Errorf("获取句柄失败: %w", err)
// }
// defer mgr.releaseHandle(handleID)
//
// // 检查句柄状态
// if !handle.beginOperation() {
// return nil, fmt.Errorf("句柄已关闭或正在关闭: %d", handleID)
// }
// defer handle.endOperation()
//
// //handle.mu.RLock()
// //defer handle.mu.RUnlock()
//
// if !handle.IsOpen {
// return nil, fmt.Errorf("文件未打开")
// }
//
// // 检查行号是否有效
// if rowNumber < 0 {
// return nil, fmt.Errorf("行号不能为负数: %d", rowNumber)
// }
//
// // 获取总行数
// totalRows, err := handle.calculateTotalRows()
// if err != nil {
// return nil, fmt.Errorf("获取总行数失败: %w", err)
// }
//
// if rowNumber >= totalRows {
// return nil, fmt.Errorf("行号超出范围: %d, 总行数: %d", rowNumber, totalRows)
// }
//
// //// 保存当前文件位置
// //currentPos, err := handle.File.Seek(0, 1) // 当前位置
// //if err != nil {
// // return nil, fmt.Errorf("获取当前文件位置失败: %w", err)
// //}
//
// //// 重置到文件开始
// //if _, err := handle.File.Seek(0, 0); err != nil {
// // return nil, fmt.Errorf("重置文件指针失败: %w", err)
// //}
//
// // 创建文件副本用于读取避免影响其他goroutine
// file, err := os.Open(handle.Filename)
// if err != nil {
// return nil, fmt.Errorf("打开文件失败: %w", err)
// }
// defer file.Close()
//
// // 创建CSV阅读器
// reader := csv.NewReader(file)
// reader.Comma = handle.Delimiter
// reader.LazyQuotes = true
// reader.TrimLeadingSpace = true
//
// //// 重置CSV阅读器
// //reader := csv.NewReader(bufio.NewReader(handle.File))
// //reader.Comma = handle.Delimiter
// //reader.LazyQuotes = true
// //reader.TrimLeadingSpace = true
//
// // 定位到指定行
// var currentRow int64 = 0
// var targetRow []string
// for {
// row, err := reader.Read()
// if err != nil {
// if err == io.EOF {
// break
// }
// // 恢复文件位置
// //handle.File.Seek(currentPos, 0)
// return nil, fmt.Errorf("读取行失败: %w", err)
// }
//
// // 判断是否到达目标行
// if handle.HasHeader {
// // 如果有表头,跳过第一行
// if currentRow == rowNumber+1 {
// targetRow = row
// break
// }
// } else {
// // 如果没有表头,直接匹配
// if currentRow == rowNumber {
// targetRow = row
// break
// }
// }
// currentRow++
// }
//
// //// 恢复文件位置
// //if _, err := handle.File.Seek(currentPos, 0); err != nil {
// // return nil, fmt.Errorf("恢复文件位置失败: %w", err)
// //}
//
// if targetRow == nil {
// return nil, fmt.Errorf("未找到行号 %d", rowNumber)
// }
//
// return targetRow, nil
//}
//
//// ========================== 文件合并功能 ==========================
//
//// MergeCSVFiles 合并两个CSV文件
//func (mgr *CSVManager) MergeCSVFiles(srcHandleID, dstHandleID int64, options MergeOptions) (int64, error) {
// // 获取源文件句柄
// srcHandle, err := mgr.getHandle(srcHandleID)
// if err != nil {
// return 0, fmt.Errorf("获取源文件句柄失败: %w", err)
// }
// defer mgr.releaseHandle(srcHandleID)
//
// // 获取目标文件句柄
// dstHandle, err := mgr.getHandle(dstHandleID)
// if err != nil {
// return 0, fmt.Errorf("获取目标文件句柄失败: %w", err)
// }
// defer mgr.releaseHandle(dstHandleID)
//
// // 检查句柄状态
// if !srcHandle.beginOperation() {
// return 0, fmt.Errorf("源文件句柄已关闭或正在关闭: %d", srcHandleID)
// }
// defer srcHandle.endOperation()
//
// if !dstHandle.beginOperation() {
// return 0, fmt.Errorf("目标文件句柄已关闭或正在关闭: %d", dstHandleID)
// }
// defer dstHandle.endOperation()
//
// srcHandle.mu.RLock()
// dstHandle.mu.Lock()
// defer func() {
// srcHandle.mu.RUnlock()
// dstHandle.mu.Unlock()
// }()
//
// if !srcHandle.IsOpen {
// return 0, fmt.Errorf("源文件未打开")
// }
// if !dstHandle.IsOpen {
// return 0, fmt.Errorf("目标文件未打开")
// }
//
// // 1. 检查表头兼容性
// if err := mgr.checkHeaderCompatibility(srcHandle, dstHandle, options); err != nil {
// return 0, fmt.Errorf("表头不兼容: %w", err)
// }
//
// // 2. 读取源文件的所有行
// srcRows, err := mgr.readAllRows(srcHandle)
// if err != nil {
// return 0, fmt.Errorf("读取源文件失败: %w", err)
// }
//
// // 3. 合并数据
// mergedRows, err := mgr.mergeDataRows(srcRows, srcHandle, dstHandle, options)
// if err != nil {
// return 0, fmt.Errorf("合并数据失败: %w", err)
// }
//
// // 4. 写入目标文件
// if options.AppendMode {
// // 追加模式:写入到文件末尾
// if err := mgr.appendRowsToFile(dstHandle, mergedRows); err != nil {
// return 0, fmt.Errorf("追加数据失败: %w", err)
// }
// } else {
// // 覆盖模式:先读取目标文件的所有行,然后合并后写回
// dstRows, err := mgr.readAllRows(dstHandle)
// if err != nil {
// return 0, fmt.Errorf("读取目标文件失败: %w", err)
// }
//
// // 如果是第一次合并且目标文件为空,可能需要保留表头
// allRows := mgr.mergeAllRows(dstRows, mergedRows, dstHandle, srcHandle, options)
// if err := mgr.writeAllRows(dstHandle, allRows); err != nil {
// return 0, fmt.Errorf("写入合并数据失败: %w", err)
// }
// }
//
// // 5. 更新目标文件的行数缓存
// dstHandle.rowCountCached = false
// newTotalRows, err := dstHandle.calculateTotalRows()
// if err != nil {
// return 0, fmt.Errorf("重新计算行数失败: %w", err)
// }
//
// return newTotalRows, nil
//}
//
//// 检查表头兼容性
//func (mgr *CSVManager) checkHeaderCompatibility(srcHandle, dstHandle *CSVHandle, options MergeOptions) error {
// // 如果两个文件都有表头
// if srcHandle.HasHeader && dstHandle.HasHeader {
// srcHeader := srcHandle.Header
// dstHeader := dstHandle.Header
//
// // 如果指定了列映射,跳过表头检查
// if len(options.ColumnMapping) > 0 {
// return nil
// }
//
// // 检查列数是否相同
// if len(srcHeader) != len(dstHeader) && !options.SkipHeader {
// return fmt.Errorf("源文件列数(%d)与目标文件列数(%d)不一致", len(srcHeader), len(dstHeader))
// }
//
// // 检查列名是否相同(可选)
// for i := 0; i < min(len(srcHeader), len(dstHeader)); i++ {
// if srcHeader[i] != dstHeader[i] {
// fmt.Printf("警告: 第%d列列名不同: 源='%s', 目标='%s'\n", i+1, srcHeader[i], dstHeader[i])
// }
// }
// } else if srcHandle.HasHeader != dstHandle.HasHeader {
// // 一个有表头,一个没有表头
// if !options.SkipHeader {
// return fmt.Errorf("源文件与目标文件的表头设置不一致")
// }
// }
//
// return nil
//}
//
//// 合并数据行
//func (mgr *CSVManager) mergeDataRows(srcRows [][]string, srcHandle, dstHandle *CSVHandle, options MergeOptions) ([][]string, error) {
// var result [][]string
//
// startIndex := 0
// // 如果源文件有表头且需要跳过表头
// if srcHandle.HasHeader && options.SkipHeader {
// if len(srcRows) > 0 {
// startIndex = 1
// }
// }
//
// // 处理列映射
// for i := startIndex; i < len(srcRows); i++ {
// srcRow := srcRows[i]
// transformedRow, err := mgr.transformRow(srcRow, srcHandle, dstHandle, options)
// if err != nil {
// return nil, fmt.Errorf("转换第%d行数据失败: %w", i+1, err)
// }
//
// // 跳过空行
// if transformedRow == nil {
// continue
// }
//
// // 检查重复行
// if options.SkipDuplicates && mgr.isDuplicateRow(result, transformedRow) {
// continue
// }
//
// result = append(result, transformedRow)
// }
//
// return result, nil
//}
//
//// 转换行数据
//func (mgr *CSVManager) transformRow(srcRow []string, srcHandle, dstHandle *CSVHandle, options MergeOptions) ([]string, error) {
// // 如果有自定义转换函数,优先使用
// if options.TransformFunc != nil {
// return options.TransformFunc(srcRow)
// }
//
// // 如果有列映射,按照映射转换
// if len(options.ColumnMapping) > 0 {
// // 确定目标行的最大列数
// maxCol := 0
// for _, dstIndex := range options.ColumnMapping {
// if dstIndex > maxCol {
// maxCol = dstIndex
// }
// }
//
// // 创建目标行
// dstRow := make([]string, maxCol+1)
// for srcIndex, dstIndex := range options.ColumnMapping {
// if srcIndex < len(srcRow) && dstIndex >= 0 && dstIndex < len(dstRow) {
// dstRow[dstIndex] = srcRow[srcIndex]
// }
// }
// return dstRow, nil
// }
//
// // 如果没有列映射,直接复制
// dstRow := make([]string, len(srcRow))
// copy(dstRow, srcRow)
//
// // 如果目标文件有表头且列数不同,进行调整
// if dstHandle.HasHeader && len(dstRow) != len(dstHandle.Header) {
// if len(dstRow) < len(dstHandle.Header) {
// // 源列数少,用空字符串填充
// for i := len(dstRow); i < len(dstHandle.Header); i++ {
// dstRow = append(dstRow, "")
// }
// } else {
// // 源列数多,截断
// dstRow = dstRow[:len(dstHandle.Header)]
// }
// }
//
// return dstRow, nil
//}
//
//// 检查是否为重复行
//func (mgr *CSVManager) isDuplicateRow(rows [][]string, newRow []string) bool {
// for _, row := range rows {
// if mgr.compareRows(row, newRow) {
// return true
// }
// }
// return false
//}
//
//// 比较两行是否相同
//func (mgr *CSVManager) compareRows(row1, row2 []string) bool {
// if len(row1) != len(row2) {
// return false
// }
// for i := range row1 {
// if row1[i] != row2[i] {
// return false
// }
// }
// return true
//}
//
//// 追加行到文件末尾
//func (mgr *CSVManager) appendRowsToFile(handle *CSVHandle, rows [][]string) error {
// // 移动到文件末尾
// if _, err := handle.File.Seek(0, 2); err != nil {
// return fmt.Errorf("移动到文件末尾失败: %w", err)
// }
//
// // 批量写入行数据
// for _, row := range rows {
// if err := handle.CSVWriter.Write(row); err != nil {
// return fmt.Errorf("写入行失败: %w", err)
// }
// }
//
// // 刷新缓冲区
// handle.CSVWriter.Flush()
// if handle.writeBuffer != nil {
// if err := handle.writeBuffer.Flush(); err != nil {
// return fmt.Errorf("刷新写入缓冲区失败: %w", err)
// }
// }
//
// // 更新行数统计
// handle.TotalRows += int64(len(rows))
// handle.cachedRowCount = handle.TotalRows
// handle.rowCountCached = true
//
// return nil
//}
//
//// 合并所有行(用于覆盖模式)
//func (mgr *CSVManager) mergeAllRows(dstRows, srcRows [][]string, dstHandle, srcHandle *CSVHandle, options MergeOptions) [][]string {
// var result [][]string
//
// // 处理表头
// if dstHandle.HasHeader && len(dstRows) > 0 {
// // 保留目标文件的表头
// result = append(result, dstRows[0])
// dstRows = dstRows[1:]
// }
//
// // 根据冲突解决策略合并数据
// switch options.ConflictResolution {
// case Overwrite:
// // 覆盖模式:源数据完全替换目标数据
// result = append(result, srcRows...)
// case Skip:
// // 跳过模式:只保留目标数据中不冲突的行
// result = append(result, dstRows...)
// for _, srcRow := range srcRows {
// if !mgr.isDuplicateRow(result, srcRow) {
// result = append(result, srcRow)
// }
// }
// case KeepBoth:
// // 保留两者:先写目标数据,再写源数据
// result = append(result, dstRows...)
// result = append(result, srcRows...)
// case UseSource:
// // 使用源数据:源数据优先级更高
// result = append(result, srcRows...)
// case UseTarget:
// // 使用目标数据:目标数据优先级更高
// result = append(result, dstRows...)
// default:
// // 默认:追加模式
// result = append(result, dstRows...)
// result = append(result, srcRows...)
// }
//
// return result
//}
//
//// MergeCSVFilesSimple 简单合并CSV文件默认选项
//func (mgr *CSVManager) MergeCSVFilesSimple(srcHandleID, dstHandleID int64, appendMode bool) (int64, error) {
// options := MergeOptions{
// AppendMode: appendMode,
// SkipHeader: true, // 跳过源文件表头
// SkipDuplicates: false, // 不跳过重复行
// ConflictResolution: KeepBoth, // 保留两者
// }
// return mgr.MergeCSVFiles(srcHandleID, dstHandleID, options)
//}
//
//// ========================== C 导出函数 ==========================
//
//// OpenCSVFile 打开CSV文件
////
////export OpenCSVFile
//func OpenCSVFile(filename *C.char, delimiter C.char, hasHeader C.int) *C.char {
// filenameStr := C.GoString(filename)
// delimiterStr := rune(delimiter)
// handleID, err := GetManager().OpenCSVFile(filenameStr, delimiterStr, hasHeader != 0)
// response := struct {
// HandleID int64 `json:"handleID"`
// }{
// HandleID: handleID,
// }
// var csvResponse CSVResponse
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Data: response,
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// WriteHeader 写入表头
////
////export WriteHeader
//func WriteHeader(handleID C.int, header *C.char) *C.char {
// var csvResponse CSVResponse
// goHandleID := int64(handleID)
// goHeader := C.GoString(header)
// var data []string
// err := json.Unmarshal([]byte(goHeader), &data)
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: fmt.Sprintf("header JSON解析失败: %v", err),
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
// }
// errs := GetManager().WriteHeader(goHandleID, data)
// if errs != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: errs.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// WriteRows 写入/覆盖行数据
////
////export WriteRows
//func WriteRows(handleID C.int, rowsData *C.char) *C.char {
// var csvResponse CSVResponse
// goHandleID := int64(handleID)
// goRowsData := C.GoString(rowsData)
// // json解析
// var data [][]string
// err := json.Unmarshal([]byte(goRowsData), &data)
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: fmt.Sprintf("rowsData JSON解析失败: %v", err),
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
// }
// // 写入数据
// rows, err := GetManager().AppendRows(goHandleID, data)
// response := struct {
// TotalRows int64 `json:"totalRows"`
// }{
// TotalRows: rows,
// }
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Data: response,
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// WriteRowsNum 写入/覆盖行数据
////
////export WriteRowsNum
//func WriteRowsNum(handleID C.int, rowsData *C.char) *C.char {
// var csvResponse CSVResponse
// goHandleID := int64(handleID)
// goRowsData := C.GoString(rowsData)
// // json解析
// var data [][]string
// err := json.Unmarshal([]byte(goRowsData), &data)
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: fmt.Sprintf("rowsData JSON解析失败: %v", err),
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
// }
// // 写入数据
// rowsNum, err := GetManager().AppendRowsNum(goHandleID, data)
// response := struct {
// RowsNum []int64 `json:"rowsNum"`
// }{
// RowsNum: rowsNum,
// }
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Data: response,
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// UpdateRow C导出函数 - 修改指定行数据
////
////export UpdateRow
//func UpdateRow(handleID C.int, rowNumber C.int, rowData *C.char) *C.char {
// var csvResponse CSVResponse
// goHandleID := int64(handleID)
// goRowNumber := int64(rowNumber)
// goRowData := C.GoString(rowData)
//
// // JSON解析行数据
// var data []string
// err := json.Unmarshal([]byte(goRowData), &data)
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: fmt.Sprintf("rowsData JSON解析失败: %v", err),
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
// }
// // 修改行数据
// err = GetManager().UpdateRow(goHandleID, goRowNumber, data)
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Message: fmt.Sprintf("成功修改第%d行数据", goRowNumber),
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// GetRow C导出函数 - 获取指定行数据
////
////export GetRow
//func GetRow(handleID C.int, rowNumber C.int) *C.char {
// goHandleID := int64(handleID)
// goRowNumber := int64(rowNumber)
//
// // 获取行数据
// rowData, err := GetManager().GetRow(goHandleID, goRowNumber)
// var csvResponse CSVResponse
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Data: rowData,
// }
// }
//
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// MergeCSVFilesSimple 合并两个csv文件
////
////export MergeCSVFilesSimple
//func MergeCSVFilesSimple(srcHandleID, dstHandleID, appendMode C.int) *C.char {
// var csvResponse CSVResponse
// goSrcHandleID := int64(srcHandleID)
// goDstHandleID := int64(dstHandleID)
// goAppendMode := int(appendMode)
// rows, err := GetManager().MergeCSVFilesSimple(goSrcHandleID, goDstHandleID, goAppendMode == 0)
// response := struct {
// TotalRows int64 `json:"totalRows"`
// }{
// TotalRows: rows,
// }
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Data: response,
// }
// }
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// CloseHandles 关闭指定句柄(使用优雅关闭)
////
////export CloseHandles
//func CloseHandles(handleID C.int) *C.char {
// goHandleID := int64(handleID)
//
// // 使用优雅关闭
// err := GetManager().CloseHandle(goHandleID)
//
// var csvResponse CSVResponse
// if err != nil {
// csvResponse = CSVResponse{
// Success: false,
// Message: err.Error(),
// }
// } else {
// csvResponse = CSVResponse{
// Success: true,
// Message: fmt.Sprintf("成功关闭句柄 %d", goHandleID),
// }
// }
//
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// CloseAllHandles 关闭所有句柄
////
////export CloseAllHandles
//func CloseAllHandles() *C.char {
// GetManager().closeAllHandles()
//
// csvResponse := CSVResponse{
// Success: true,
// Message: "成功关闭所有句柄",
// }
//
// csvResponseStr, _ := json.Marshal(csvResponse)
// return C.CString(string(csvResponseStr))
//}
//
//// FreeCString 释放C字符串内存
////
////export FreeCString
//func FreeCString(str *C.char) {
// C.free(unsafe.Pointer(str))
//}
//
//// 主函数
////func main() {
////}