package database import ( "fmt" "log" "psi/config" "psi/models" "strings" "sync" "time" "gorm.io/driver/mysql" "gorm.io/gorm" ) // TenantManager 租户数据库管理器 type TenantManager struct { connections map[int64]*gorm.DB // about_id -> DB连接 mu sync.RWMutex } var tenantManager *TenantManager func init() { tenantManager = &TenantManager{ connections: make(map[int64]*gorm.DB), } } // GetTenantDB 获取租户数据库连接(如果不存在则创建) func GetTenantDB(aboutID int64) (*gorm.DB, error) { if aboutID == 0 { return DB, nil // 主库 } tenantManager.mu.RLock() conn, exists := tenantManager.connections[aboutID] tenantManager.mu.RUnlock() if exists { // 增量迁移:确保现有连接也有新表 migrateIncrementalTenantTables(conn) return conn, nil } // 需要创建新连接 tenantManager.mu.Lock() defer tenantManager.mu.Unlock() // 双重检查 if conn, exists = tenantManager.connections[aboutID]; exists { migrateIncrementalTenantTables(conn) return conn, nil } // 创建租户数据库连接 dbName := fmt.Sprintf("psi_%d", aboutID) conn, err := createTenantDB(dbName) if err != nil { return nil, err } tenantManager.connections[aboutID] = conn log.Printf("租户数据库连接创建成功: %s", dbName) return conn, nil } // createTenantDB 创建租户数据库(如果不存在则新建库和表) func createTenantDB(dbName string) (*gorm.DB, error) { cfg := config.AppConfig // 先用主库连接检查/创建租户数据库 mainDSN := cfg.GetDSN() mainDB, err := gorm.Open(mysql.Open(mainDSN), &gorm.Config{}) if err != nil { return nil, fmt.Errorf("连接主库失败: %w", err) } // 检查数据库是否存在 var exists string err = mainDB.Raw(fmt.Sprintf("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s'", dbName)).Scan(&exists).Error needCreate := err != nil || exists == "" // 创建数据库(如果不存在) if needCreate { createSQL := fmt.Sprintf("CREATE DATABASE `%s` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci", dbName) if err = mainDB.Exec(createSQL).Error; err != nil { // 如果数据库已存在(Error 1007),忽略该错误 if strings.Contains(err.Error(), "1007") { log.Printf("租户数据库已存在: %s", dbName) } else { sqlMainDB, _ := mainDB.DB() sqlMainDB.Close() return nil, fmt.Errorf("创建租户数据库失败: %w", err) } } else { log.Printf("租户数据库创建成功: %s", dbName) } } sqlMainDB, _ := mainDB.DB() sqlMainDB.Close() // 连接租户数据库 tenantDSN := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=10s&readTimeout=30s&writeTimeout=30s&interpolateParams=true&multiStatements=true&allowNativePasswords=true&checkConnLiveness=true", cfg.Database.User, cfg.Database.Password, cfg.Database.Host, cfg.Database.Port, dbName) tenantDB, err := gorm.Open(mysql.Open(tenantDSN), &gorm.Config{}) if err != nil { return nil, fmt.Errorf("连接租户数据库失败: %w", err) } // 配置连接池 sqlDB, err := tenantDB.DB() if err != nil { return nil, fmt.Errorf("获取租户数据库连接池失败: %w", err) } sqlDB.SetMaxIdleConns(20) // 最大空闲连接数 sqlDB.SetMaxOpenConns(100) // 最大打开连接数 sqlDB.SetConnMaxLifetime(10 * time.Minute) // 单个连接最大存活10分钟 sqlDB.SetConnMaxIdleTime(2 * time.Minute) // 空闲连接2分钟后关闭 migrateTenantTables(tenantDB) return tenantDB, nil } // migrateTenantTables 迁移租户业务表 func migrateTenantTables(db *gorm.DB) { // 迁移所有业务表(不包括 Employee 表,Employee 在主库) err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='客户表'").AutoMigrate(&models.Customer{}) if err != nil { log.Printf("Customer表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存汇总表'").AutoMigrate(&models.Inventory{}) if err != nil { log.Printf("Inventory表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存明细表(库位级)'").AutoMigrate(&models.InventoryDetail{}) if err != nil { log.Printf("InventoryDetail表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存流水表'").AutoMigrate(&models.InventoryLog{}) if err != nil { log.Printf("InventoryLog表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库位表'").AutoMigrate(&models.Location{}) if err != nil { log.Printf("Location表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品分类表'").AutoMigrate(&models.ProductCategory{}) if err != nil { log.Printf("ProductCategory表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表'").AutoMigrate(&models.Product{}) if err != nil { log.Printf("Product表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品单位换算表'").AutoMigrate(&models.ProductUnitConversion{}) if err != nil { log.Printf("ProductUnitConversion表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='产品序列号表'").AutoMigrate(&models.ProductSerial{}) if err != nil { log.Printf("ProductSerial表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购订单主表'").AutoMigrate(&models.PurchaseOrder{}) if err != nil { log.Printf("PurchaseOrder表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购订单明细表'").AutoMigrate(&models.PurchaseOrderItem{}) if err != nil { log.Printf("PurchaseOrderItem表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='入库单主表'").AutoMigrate(&models.ReceivingOrder{}) if err != nil { log.Printf("ReceivingOrder表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='入库单明细表'").AutoMigrate(&models.ReceivingOrderItem{}) if err != nil { log.Printf("ReceivingOrderItem表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商表'").AutoMigrate(&models.Supplier{}) if err != nil { log.Printf("Supplier表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售订单主表'").AutoMigrate(&models.SalesOrder{}) if err != nil { log.Printf("SalesOrder表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售订单明细表'").AutoMigrate(&models.SalesOrderItem{}) if err != nil { log.Printf("SalesOrderItem表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仓库表'").AutoMigrate(&models.Warehouse{}) if err != nil { log.Printf("Warehouse表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次单主表'").AutoMigrate(&models.WaveHeader{}) if err != nil { log.Printf("WaveHeader表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次任务表'").AutoMigrate(&models.WaveTask{}) if err != nil { log.Printf("WaveTask表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次任务明细表'").AutoMigrate(&models.WaveTaskDetail{}) if err != nil { log.Printf("WaveTaskDetail表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='打印任务表'").AutoMigrate(&models.PrintTask{}) if err != nil { log.Printf("PrintTask表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='打印日志表'").AutoMigrate(&models.PrintLog{}) if err != nil { log.Printf("PrintLog表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='小车表'").AutoMigrate(&models.Car{}) if err != nil { log.Printf("Car表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='小车店铺关联表'").AutoMigrate(&models.CarShop{}) if err != nil { log.Printf("CarShop表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='店铺表'").AutoMigrate(&models.Shop{}) if err != nil { log.Printf("Shop表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='盘库单主表'").AutoMigrate(&models.StockCheck{}) if err != nil { log.Printf("StockCheck表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='盘库单明细表'").AutoMigrate(&models.StockCheckItem{}) if err != nil { log.Printf("StockCheckItem表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='外部任务表'").AutoMigrate(&models.OutTask{}) if err != nil { log.Printf("OutTask表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='外部任务异常日志表'").AutoMigrate(&models.OutTaskLog{}) if err != nil { log.Printf("OutTaskLog表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单主表'").AutoMigrate(&models.OutboundOrder{}) if err != nil { log.Printf("OutboundOrder表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单明细表'").AutoMigrate(&models.OutboundOrderItem{}) if err != nil { log.Printf("OutboundOrderItem表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='发货单主表'").AutoMigrate(&models.ShippingOrder{}) if err != nil { log.Printf("ShippingOrder表迁移失败: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='发货单明细表'").AutoMigrate(&models.ShippingOrderItem{}) if err != nil { log.Printf("ShippingOrderItem表迁移失败: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='运费模板表'").AutoMigrate(&models.Logistics{}) if err != nil { log.Printf("Logistics表迁移失败: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='统计表'").AutoMigrate(&models.Statist{}) if err != nil { log.Printf("Statist表迁移失败: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账配置表'").AutoMigrate(&models.SplitAccountConfig{}) if err != nil { log.Printf("SplitAccountConfig表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账扣钱日志表'").AutoMigrate(&models.SplitAccountDeductionLog{}) if err != nil { log.Printf("SplitAccountDeductionLog表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单库位变更记录表'").AutoMigrate(&models.OutboundOrderLocationLog{}) if err != nil { log.Printf("OutboundOrderLocationLog表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='配置表'").AutoMigrate(&models.Config{}) if err != nil { log.Printf("Config表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{}) if err != nil { log.Printf("WangdianSyncTask表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{}) if err != nil { log.Printf("DashboardDailyStat表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户每日统计表'").AutoMigrate(&models.UserDailyStat{}) if err != nil { log.Printf("UserDailyStat表迁移警告: %v", err) } err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品销毁日志表'").AutoMigrate(&models.ProductDestroyLog{}) if err != nil { log.Printf("ProductDestroyLog表迁移警告: %v", err) } log.Println("租户业务表迁移完成") // 重置旺店通运行中的任务(服务重启后清理) resetWangdianRunningTasks(db) //tableOptions := "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4" // //modelsToMigrate := []interface{}{ // //&models.BookInfo{}, // &models.Customer{}, // &models.Inventory{}, // &models.InventoryDetail{}, // &models.InventoryLog{}, // &models.Location{}, // &models.ProductCategory{}, // &models.Product{}, // &models.ProductUnitConversion{}, // &models.ProductSerial{}, // &models.PurchaseOrder{}, // &models.PurchaseOrderItem{}, // &models.ReceivingOrder{}, // &models.ReceivingOrderItem{}, // &models.Supplier{}, // &models.SalesOrder{}, // &models.SalesOrderItem{}, // &models.Warehouse{}, // &models.WaveHeader{}, // &models.WaveTask{}, // &models.WaveTaskDetail{}, // &models.PrintTask{}, // &models.PrintLog{}, // &models.Car{}, // &models.CarShop{}, // &models.Shop{}, // &models.StockCheck{}, // &models.StockCheckItem{}, // &models.OutTask{}, // &models.OutTaskLog{}, // &models.OutboundOrder{}, // &models.OutboundOrderItem{}, //} // //for _, model := range modelsToMigrate { // err := db.Set("gorm:table_options", tableOptions).AutoMigrate(model) // if err != nil { // log.Printf("租户表迁移警告: %v", err) // } //} } // CloseTenantConnections 关闭所有租户连接(用于优雅关闭) func CloseTenantConnections() { tenantManager.mu.Lock() defer tenantManager.mu.Unlock() for aboutID, conn := range tenantManager.connections { sqlDB, err := conn.DB() if err == nil { sqlDB.Close() } delete(tenantManager.connections, aboutID) } log.Println("所有租户数据库连接已关闭") } // migrateIncrementalTenantTables 增量迁移:补充已有租户数据库可能存在的新表 func migrateIncrementalTenantTables(db *gorm.DB) { // 检查 split_account_config 表是否存在,不存在则创建 if !db.Migrator().HasTable(&models.SplitAccountConfig{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账配置表'").AutoMigrate(&models.SplitAccountConfig{}) if err != nil { log.Printf("SplitAccountConfig表增量迁移警告: %v", err) } else { log.Println("SplitAccountConfig表增量迁移成功") } } if !db.Migrator().HasTable(&models.SplitAccountDeductionLog{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账扣钱日志表'").AutoMigrate(&models.SplitAccountDeductionLog{}) if err != nil { log.Printf("SplitAccountDeductionLog表增量迁移警告: %v", err) } else { log.Println("SplitAccountDeductionLog表增量迁移成功") } } if !db.Migrator().HasTable(&models.Config{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='配置表'").AutoMigrate(&models.Config{}) if err != nil { log.Printf("Config表增量迁移警告: %v", err) } else { log.Println("Config表增量迁移成功") } } if !db.Migrator().HasTable(&models.DashboardDailyStat{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{}) if err != nil { log.Printf("DashboardDailyStat表增量迁移警告: %v", err) } else { log.Println("DashboardDailyStat表增量迁移成功") } } if !db.Migrator().HasTable(&models.UserDailyStat{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户每日统计表'").AutoMigrate(&models.UserDailyStat{}) if err != nil { log.Printf("UserDailyStat表增量迁移警告: %v", err) } else { log.Println("UserDailyStat表增量迁移成功") } } if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{}) if err != nil { log.Printf("WangdianSyncTask表增量迁移警告: %v", err) } else { log.Println("WangdianSyncTask表增量迁移成功") } } if !db.Migrator().HasTable(&models.ProductDestroyLog{}) { err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品销毁日志表'").AutoMigrate(&models.ProductDestroyLog{}) if err != nil { log.Printf("ProductDestroyLog表增量迁移警告: %v", err) } else { log.Println("ProductDestroyLog表增量迁移成功") } } // 重置旺店通运行中的任务(服务重启后清理) resetWangdianRunningTasks(db) } // resetWangdianRunningTasks 重置旺店通运行中的任务为失败状态(用于服务重启后清理) func resetWangdianRunningTasks(db *gorm.DB) { if !db.Migrator().HasTable(&models.WangdianSyncTask{}) { return } now := time.Now().Unix() result := db.Model(&models.WangdianSyncTask{}). Where("status = ?", "running"). Updates(map[string]interface{}{ "status": "failed", "error_msg": "服务重启,任务已重置", "finished_at": now, "updated_at": now, }) if result.Error == nil && result.RowsAffected > 0 { log.Printf("[数据库] 服务重启, 已重置 %d 个旺店通运行中的任务为失败状态", result.RowsAffected) } }