package service import ( "context" "fmt" "time" "github.com/go-nunu/nunu-layout-advanced/internal/model" "github.com/go-nunu/nunu-layout-advanced/pkg/log" "github.com/go-nunu/nunu-layout-advanced/pkg/sharding" "github.com/spf13/viper" "gorm.io/gorm" ) type ShardingService interface { // InitializeSharding 初始化分表 InitializeSharding(ctx context.Context) error // CleanOldTables 清理过期的分表 CleanOldTables(ctx context.Context) error // GetShardingManager 获取分表管理器 GetShardingManager() *sharding.ShardingManager } type shardingService struct { *Service db *gorm.DB logger *log.Logger config *viper.Viper shardingMgr *sharding.ShardingManager } func NewShardingService( service *Service, db *gorm.DB, logger *log.Logger, config *viper.Viper, ) ShardingService { // 根据配置创建分表策略 strategy := createShardingStrategy(config) shardingMgr := sharding.NewShardingManager(strategy, logger) return &shardingService{ Service: service, db: db, logger: logger, config: config, shardingMgr: shardingMgr, } } func (s *shardingService) GetShardingManager() *sharding.ShardingManager { return s.shardingMgr } func (s *shardingService) InitializeSharding(ctx context.Context) error { if !s.config.GetBool("data.sharding.enabled") { s.logger.Info("分表功能未启用") return nil } s.logger.Info("开始初始化分表...") // 获取需要分表的表配置 tables := s.config.Get("data.sharding.tables") if tables == nil { s.logger.Warn("未配置需要分表的表") return nil } tableConfigs, ok := tables.([]interface{}) if !ok { return fmt.Errorf("分表配置格式错误") } for _, tableConfig := range tableConfigs { tableMap, ok := tableConfig.(map[string]interface{}) if !ok { continue } tableName, exists := tableMap["name"].(string) if !exists { continue } enabled, exists := tableMap["enabled"].(bool) if !exists || !enabled { continue } if err := s.initializeTableSharding(ctx, tableName); err != nil { s.logger.Error(fmt.Sprintf("初始化表 %s 分表失败: %v", tableName, err)) return err } } s.logger.Info("分表初始化完成") return nil } func (s *shardingService) initializeTableSharding(ctx context.Context, baseTableName string) error { s.logger.Info(fmt.Sprintf("初始化表 %s 的分表...", baseTableName)) // 创建当前月的分表 currentTableName := s.shardingMgr.GetCurrentTableName(baseTableName) var tableModel interface{} switch baseTableName { case "log": tableModel = &model.Log{} case "waf_log": tableModel = &model.WafLog{} default: return fmt.Errorf("不支持的表: %s", baseTableName) } // 确保当前表存在 err := s.shardingMgr.EnsureTableExists(ctx, s.db, currentTableName, tableModel) if err != nil { return fmt.Errorf("创建表 %s 失败: %v", currentTableName, err) } s.logger.Info(fmt.Sprintf("表 %s 分表初始化完成", baseTableName)) return nil } func (s *shardingService) CleanOldTables(ctx context.Context) error { if !s.config.GetBool("data.sharding.enabled") { return nil } keepMonths := s.config.GetInt("data.sharding.keep_months") if keepMonths <= 0 { keepMonths = 12 // 默认保留12个月 } s.logger.Info(fmt.Sprintf("开始清理超过 %d 个月的旧分表...", keepMonths)) // 计算清理时间点 cutoffTime := time.Now().AddDate(0, -keepMonths, 0) // 获取需要分表的表配置 tables := s.config.Get("data.sharding.tables") if tables == nil { return nil } tableConfigs, ok := tables.([]interface{}) if !ok { return fmt.Errorf("分表配置格式错误") } for _, tableConfig := range tableConfigs { tableMap, ok := tableConfig.(map[string]interface{}) if !ok { continue } tableName, exists := tableMap["name"].(string) if !exists { continue } enabled, exists := tableMap["enabled"].(bool) if !exists || !enabled { continue } if err := s.cleanOldTablesForBase(ctx, tableName, cutoffTime); err != nil { s.logger.Error(fmt.Sprintf("清理表 %s 的旧分表失败: %v", tableName, err)) } } s.logger.Info("旧分表清理完成") return nil } func (s *shardingService) cleanOldTablesForBase(ctx context.Context, baseTableName string, cutoffTime time.Time) error { // 获取所有可能的表名(从cutoffTime到现在) now := time.Now() tableNames := s.shardingMgr.GetQueryTableNames(baseTableName, &cutoffTime, &now) for _, tableName := range tableNames { // 解析表名中的时间 if !s.shouldDropTable(tableName, cutoffTime) { continue } // 检查表是否存在 if !s.db.Migrator().HasTable(tableName) { continue } s.logger.Info(fmt.Sprintf("删除过期分表: %s", tableName)) // 删除表 err := s.db.Migrator().DropTable(tableName) if err != nil { s.logger.Error(fmt.Sprintf("删除表 %s 失败: %v", tableName, err)) } } return nil } func (s *shardingService) shouldDropTable(tableName string, cutoffTime time.Time) bool { // 这里可以根据表名解析时间,判断是否应该删除 // 简单实现:如果表名包含的时间早于cutoffTime,则删除 // 实际项目中可以更精确地解析表名中的时间戳 return true // 暂时简化实现 } // createShardingStrategy 根据配置创建分表策略 func createShardingStrategy(config *viper.Viper) sharding.ShardingStrategy { strategy := config.GetString("data.sharding.strategy") switch strategy { case "monthly": fallthrough default: return sharding.NewMonthlyShardingStrategy() } }