daShangDao_psiServer/database/tenant.go
2026-07-01 18:07:51 +08:00

497 lines
18 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package database
import (
"fmt"
"log"
"psi/config"
"psi/models"
"strings"
"sync"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
// TenantManager 租户数据库管理器
type TenantManager struct {
connections map[int64]*gorm.DB // about_id -> DB连接
mu sync.RWMutex
}
var tenantManager *TenantManager
func init() {
tenantManager = &TenantManager{
connections: make(map[int64]*gorm.DB),
}
}
// GetTenantDB 获取租户数据库连接(如果不存在则创建)
func GetTenantDB(aboutID int64) (*gorm.DB, error) {
if aboutID == 0 {
return DB, nil // 主库
}
tenantManager.mu.RLock()
conn, exists := tenantManager.connections[aboutID]
tenantManager.mu.RUnlock()
if exists {
// 增量迁移:确保现有连接也有新表
migrateIncrementalTenantTables(conn)
return conn, nil
}
// 需要创建新连接
tenantManager.mu.Lock()
defer tenantManager.mu.Unlock()
// 双重检查
if conn, exists = tenantManager.connections[aboutID]; exists {
migrateIncrementalTenantTables(conn)
return conn, nil
}
// 创建租户数据库连接
dbName := fmt.Sprintf("psi_%d", aboutID)
conn, err := createTenantDB(dbName)
if err != nil {
return nil, err
}
tenantManager.connections[aboutID] = conn
log.Printf("租户数据库连接创建成功: %s", dbName)
return conn, nil
}
// createTenantDB 创建租户数据库(如果不存在则新建库和表)
func createTenantDB(dbName string) (*gorm.DB, error) {
cfg := config.AppConfig
// 先用主库连接检查/创建租户数据库
mainDSN := cfg.GetDSN()
mainDB, err := gorm.Open(mysql.Open(mainDSN), &gorm.Config{})
if err != nil {
return nil, fmt.Errorf("连接主库失败: %w", err)
}
// 检查数据库是否存在
var exists string
err = mainDB.Raw(fmt.Sprintf("SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = '%s'", dbName)).Scan(&exists).Error
needCreate := err != nil || exists == ""
// 创建数据库(如果不存在)
if needCreate {
createSQL := fmt.Sprintf("CREATE DATABASE `%s` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci", dbName)
if err = mainDB.Exec(createSQL).Error; err != nil {
// 如果数据库已存在Error 1007忽略该错误
if strings.Contains(err.Error(), "1007") {
log.Printf("租户数据库已存在: %s", dbName)
} else {
sqlMainDB, _ := mainDB.DB()
sqlMainDB.Close()
return nil, fmt.Errorf("创建租户数据库失败: %w", err)
}
} else {
log.Printf("租户数据库创建成功: %s", dbName)
}
}
sqlMainDB, _ := mainDB.DB()
sqlMainDB.Close()
// 连接租户数据库
tenantDSN := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local&timeout=10s&readTimeout=30s&writeTimeout=30s&interpolateParams=true&multiStatements=true&allowNativePasswords=true&checkConnLiveness=true",
cfg.Database.User, cfg.Database.Password, cfg.Database.Host, cfg.Database.Port, dbName)
tenantDB, err := gorm.Open(mysql.Open(tenantDSN), &gorm.Config{})
if err != nil {
return nil, fmt.Errorf("连接租户数据库失败: %w", err)
}
// 配置连接池
sqlDB, err := tenantDB.DB()
if err != nil {
return nil, fmt.Errorf("获取租户数据库连接池失败: %w", err)
}
sqlDB.SetMaxIdleConns(20) // 最大空闲连接数
sqlDB.SetMaxOpenConns(100) // 最大打开连接数
sqlDB.SetConnMaxLifetime(10 * time.Minute) // 单个连接最大存活10分钟
sqlDB.SetConnMaxIdleTime(2 * time.Minute) // 空闲连接2分钟后关闭
migrateTenantTables(tenantDB)
return tenantDB, nil
}
// migrateTenantTables 迁移租户业务表
func migrateTenantTables(db *gorm.DB) {
// 迁移所有业务表(不包括 Employee 表Employee 在主库)
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='客户表'").AutoMigrate(&models.Customer{})
if err != nil {
log.Printf("Customer表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存汇总表'").AutoMigrate(&models.Inventory{})
if err != nil {
log.Printf("Inventory表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存明细表(库位级)'").AutoMigrate(&models.InventoryDetail{})
if err != nil {
log.Printf("InventoryDetail表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库存流水表'").AutoMigrate(&models.InventoryLog{})
if err != nil {
log.Printf("InventoryLog表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='库位表'").AutoMigrate(&models.Location{})
if err != nil {
log.Printf("Location表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品分类表'").AutoMigrate(&models.ProductCategory{})
if err != nil {
log.Printf("ProductCategory表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品表'").AutoMigrate(&models.Product{})
if err != nil {
log.Printf("Product表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品单位换算表'").AutoMigrate(&models.ProductUnitConversion{})
if err != nil {
log.Printf("ProductUnitConversion表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='产品序列号表'").AutoMigrate(&models.ProductSerial{})
if err != nil {
log.Printf("ProductSerial表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购订单主表'").AutoMigrate(&models.PurchaseOrder{})
if err != nil {
log.Printf("PurchaseOrder表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='采购订单明细表'").AutoMigrate(&models.PurchaseOrderItem{})
if err != nil {
log.Printf("PurchaseOrderItem表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='入库单主表'").AutoMigrate(&models.ReceivingOrder{})
if err != nil {
log.Printf("ReceivingOrder表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='入库单明细表'").AutoMigrate(&models.ReceivingOrderItem{})
if err != nil {
log.Printf("ReceivingOrderItem表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='供应商表'").AutoMigrate(&models.Supplier{})
if err != nil {
log.Printf("Supplier表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售订单主表'").AutoMigrate(&models.SalesOrder{})
if err != nil {
log.Printf("SalesOrder表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='销售订单明细表'").AutoMigrate(&models.SalesOrderItem{})
if err != nil {
log.Printf("SalesOrderItem表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仓库表'").AutoMigrate(&models.Warehouse{})
if err != nil {
log.Printf("Warehouse表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次单主表'").AutoMigrate(&models.WaveHeader{})
if err != nil {
log.Printf("WaveHeader表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次任务表'").AutoMigrate(&models.WaveTask{})
if err != nil {
log.Printf("WaveTask表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='波次任务明细表'").AutoMigrate(&models.WaveTaskDetail{})
if err != nil {
log.Printf("WaveTaskDetail表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='打印任务表'").AutoMigrate(&models.PrintTask{})
if err != nil {
log.Printf("PrintTask表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='打印日志表'").AutoMigrate(&models.PrintLog{})
if err != nil {
log.Printf("PrintLog表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='小车表'").AutoMigrate(&models.Car{})
if err != nil {
log.Printf("Car表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='小车店铺关联表'").AutoMigrate(&models.CarShop{})
if err != nil {
log.Printf("CarShop表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='店铺表'").AutoMigrate(&models.Shop{})
if err != nil {
log.Printf("Shop表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='盘库单主表'").AutoMigrate(&models.StockCheck{})
if err != nil {
log.Printf("StockCheck表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='盘库单明细表'").AutoMigrate(&models.StockCheckItem{})
if err != nil {
log.Printf("StockCheckItem表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='外部任务表'").AutoMigrate(&models.OutTask{})
if err != nil {
log.Printf("OutTask表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='外部任务异常日志表'").AutoMigrate(&models.OutTaskLog{})
if err != nil {
log.Printf("OutTaskLog表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单主表'").AutoMigrate(&models.OutboundOrder{})
if err != nil {
log.Printf("OutboundOrder表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单明细表'").AutoMigrate(&models.OutboundOrderItem{})
if err != nil {
log.Printf("OutboundOrderItem表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='发货单主表'").AutoMigrate(&models.ShippingOrder{})
if err != nil {
log.Printf("ShippingOrder表迁移失败: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='发货单明细表'").AutoMigrate(&models.ShippingOrderItem{})
if err != nil {
log.Printf("ShippingOrderItem表迁移失败: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='运费模板表'").AutoMigrate(&models.Logistics{})
if err != nil {
log.Printf("Logistics表迁移失败: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='统计表'").AutoMigrate(&models.Statist{})
if err != nil {
log.Printf("Statist表迁移失败: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账配置表'").AutoMigrate(&models.SplitAccountConfig{})
if err != nil {
log.Printf("SplitAccountConfig表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账扣钱日志表'").AutoMigrate(&models.SplitAccountDeductionLog{})
if err != nil {
log.Printf("SplitAccountDeductionLog表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='出库单库位变更记录表'").AutoMigrate(&models.OutboundOrderLocationLog{})
if err != nil {
log.Printf("OutboundOrderLocationLog表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='配置表'").AutoMigrate(&models.Config{})
if err != nil {
log.Printf("Config表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{})
if err != nil {
log.Printf("WangdianSyncTask表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{})
if err != nil {
log.Printf("DashboardDailyStat表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户每日统计表'").AutoMigrate(&models.UserDailyStat{})
if err != nil {
log.Printf("UserDailyStat表迁移警告: %v", err)
}
err = db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品销毁日志表'").AutoMigrate(&models.ProductDestroyLog{})
if err != nil {
log.Printf("ProductDestroyLog表迁移警告: %v", err)
}
log.Println("租户业务表迁移完成")
// 重置旺店通运行中的任务(服务重启后清理)
resetWangdianRunningTasks(db)
//tableOptions := "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4"
//
//modelsToMigrate := []interface{}{
// //&models.BookInfo{},
// &models.Customer{},
// &models.Inventory{},
// &models.InventoryDetail{},
// &models.InventoryLog{},
// &models.Location{},
// &models.ProductCategory{},
// &models.Product{},
// &models.ProductUnitConversion{},
// &models.ProductSerial{},
// &models.PurchaseOrder{},
// &models.PurchaseOrderItem{},
// &models.ReceivingOrder{},
// &models.ReceivingOrderItem{},
// &models.Supplier{},
// &models.SalesOrder{},
// &models.SalesOrderItem{},
// &models.Warehouse{},
// &models.WaveHeader{},
// &models.WaveTask{},
// &models.WaveTaskDetail{},
// &models.PrintTask{},
// &models.PrintLog{},
// &models.Car{},
// &models.CarShop{},
// &models.Shop{},
// &models.StockCheck{},
// &models.StockCheckItem{},
// &models.OutTask{},
// &models.OutTaskLog{},
// &models.OutboundOrder{},
// &models.OutboundOrderItem{},
//}
//
//for _, model := range modelsToMigrate {
// err := db.Set("gorm:table_options", tableOptions).AutoMigrate(model)
// if err != nil {
// log.Printf("租户表迁移警告: %v", err)
// }
//}
}
// CloseTenantConnections 关闭所有租户连接(用于优雅关闭)
func CloseTenantConnections() {
tenantManager.mu.Lock()
defer tenantManager.mu.Unlock()
for aboutID, conn := range tenantManager.connections {
sqlDB, err := conn.DB()
if err == nil {
sqlDB.Close()
}
delete(tenantManager.connections, aboutID)
}
log.Println("所有租户数据库连接已关闭")
}
// migrateIncrementalTenantTables 增量迁移:补充已有租户数据库可能存在的新表
func migrateIncrementalTenantTables(db *gorm.DB) {
// 检查 split_account_config 表是否存在,不存在则创建
if !db.Migrator().HasTable(&models.SplitAccountConfig{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账配置表'").AutoMigrate(&models.SplitAccountConfig{})
if err != nil {
log.Printf("SplitAccountConfig表增量迁移警告: %v", err)
} else {
log.Println("SplitAccountConfig表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.SplitAccountDeductionLog{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='分账扣钱日志表'").AutoMigrate(&models.SplitAccountDeductionLog{})
if err != nil {
log.Printf("SplitAccountDeductionLog表增量迁移警告: %v", err)
} else {
log.Println("SplitAccountDeductionLog表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.Config{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='配置表'").AutoMigrate(&models.Config{})
if err != nil {
log.Printf("Config表增量迁移警告: %v", err)
} else {
log.Println("Config表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.DashboardDailyStat{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='仪表盘每日统计表'").AutoMigrate(&models.DashboardDailyStat{})
if err != nil {
log.Printf("DashboardDailyStat表增量迁移警告: %v", err)
} else {
log.Println("DashboardDailyStat表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.UserDailyStat{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='用户每日统计表'").AutoMigrate(&models.UserDailyStat{})
if err != nil {
log.Printf("UserDailyStat表增量迁移警告: %v", err)
} else {
log.Println("UserDailyStat表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='旺店通同步任务表'").AutoMigrate(&models.WangdianSyncTask{})
if err != nil {
log.Printf("WangdianSyncTask表增量迁移警告: %v", err)
} else {
log.Println("WangdianSyncTask表增量迁移成功")
}
}
if !db.Migrator().HasTable(&models.ProductDestroyLog{}) {
err := db.Set("gorm:table_options", "ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品销毁日志表'").AutoMigrate(&models.ProductDestroyLog{})
if err != nil {
log.Printf("ProductDestroyLog表增量迁移警告: %v", err)
} else {
log.Println("ProductDestroyLog表增量迁移成功")
}
}
// 重置旺店通运行中的任务(服务重启后清理)
resetWangdianRunningTasks(db)
}
// resetWangdianRunningTasks 重置旺店通运行中的任务为失败状态(用于服务重启后清理)
func resetWangdianRunningTasks(db *gorm.DB) {
if !db.Migrator().HasTable(&models.WangdianSyncTask{}) {
return
}
now := time.Now().Unix()
result := db.Model(&models.WangdianSyncTask{}).
Where("status = ?", "running").
Updates(map[string]interface{}{
"status": "failed",
"error_msg": "服务重启,任务已重置",
"finished_at": now,
"updated_at": now,
})
if result.Error == nil && result.RowsAffected > 0 {
log.Printf("[数据库] 服务重启, 已重置 %d 个旺店通运行中的任务为失败状态", result.RowsAffected)
}
}