123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- package service
- import (
- "context"
- "fmt"
- "time"
- "github.com/go-nunu/nunu-layout-advanced/internal/model"
- "github.com/go-nunu/nunu-layout-advanced/pkg/log"
- "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
- "github.com/spf13/viper"
- "gorm.io/gorm"
- )
- type ShardingService interface {
- // InitializeSharding 初始化分表
- InitializeSharding(ctx context.Context) error
- // CleanOldTables 清理过期的分表
- CleanOldTables(ctx context.Context) error
- // GetShardingManager 获取分表管理器
- GetShardingManager() *sharding.ShardingManager
- }
- type shardingService struct {
- *Service
- db *gorm.DB
- logger *log.Logger
- config *viper.Viper
- shardingMgr *sharding.ShardingManager
- }
- func NewShardingService(
- service *Service,
- db *gorm.DB,
- logger *log.Logger,
- config *viper.Viper,
- ) ShardingService {
- // 根据配置创建分表策略
- strategy := createShardingStrategy(config)
- shardingMgr := sharding.NewShardingManager(strategy, logger)
- return &shardingService{
- Service: service,
- db: db,
- logger: logger,
- config: config,
- shardingMgr: shardingMgr,
- }
- }
- func (s *shardingService) GetShardingManager() *sharding.ShardingManager {
- return s.shardingMgr
- }
- func (s *shardingService) InitializeSharding(ctx context.Context) error {
- if !s.config.GetBool("data.sharding.enabled") {
- s.logger.Info("分表功能未启用")
- return nil
- }
- s.logger.Info("开始初始化分表...")
- // 获取需要分表的表配置
- tables := s.config.Get("data.sharding.tables")
- if tables == nil {
- s.logger.Warn("未配置需要分表的表")
- return nil
- }
- tableConfigs, ok := tables.([]interface{})
- if !ok {
- return fmt.Errorf("分表配置格式错误")
- }
- for _, tableConfig := range tableConfigs {
- tableMap, ok := tableConfig.(map[string]interface{})
- if !ok {
- continue
- }
- tableName, exists := tableMap["name"].(string)
- if !exists {
- continue
- }
- enabled, exists := tableMap["enabled"].(bool)
- if !exists || !enabled {
- continue
- }
- if err := s.initializeTableSharding(ctx, tableName); err != nil {
- s.logger.Error(fmt.Sprintf("初始化表 %s 分表失败: %v", tableName, err))
- return err
- }
- }
- s.logger.Info("分表初始化完成")
- return nil
- }
- func (s *shardingService) initializeTableSharding(ctx context.Context, baseTableName string) error {
- s.logger.Info(fmt.Sprintf("初始化表 %s 的分表...", baseTableName))
- // 创建当前月的分表
- currentTableName := s.shardingMgr.GetCurrentTableName(baseTableName)
-
- var tableModel interface{}
- switch baseTableName {
- case "log":
- tableModel = &model.Log{}
- case "waf_log":
- tableModel = &model.WafLog{}
- default:
- return fmt.Errorf("不支持的表: %s", baseTableName)
- }
- // 确保当前表存在
- err := s.shardingMgr.EnsureTableExists(ctx, s.db, currentTableName, tableModel)
- if err != nil {
- return fmt.Errorf("创建表 %s 失败: %v", currentTableName, err)
- }
- s.logger.Info(fmt.Sprintf("表 %s 分表初始化完成", baseTableName))
- return nil
- }
- func (s *shardingService) CleanOldTables(ctx context.Context) error {
- if !s.config.GetBool("data.sharding.enabled") {
- return nil
- }
- keepMonths := s.config.GetInt("data.sharding.keep_months")
- if keepMonths <= 0 {
- keepMonths = 12 // 默认保留12个月
- }
- s.logger.Info(fmt.Sprintf("开始清理超过 %d 个月的旧分表...", keepMonths))
- // 计算清理时间点
- cutoffTime := time.Now().AddDate(0, -keepMonths, 0)
- // 获取需要分表的表配置
- tables := s.config.Get("data.sharding.tables")
- if tables == nil {
- return nil
- }
- tableConfigs, ok := tables.([]interface{})
- if !ok {
- return fmt.Errorf("分表配置格式错误")
- }
- for _, tableConfig := range tableConfigs {
- tableMap, ok := tableConfig.(map[string]interface{})
- if !ok {
- continue
- }
- tableName, exists := tableMap["name"].(string)
- if !exists {
- continue
- }
- enabled, exists := tableMap["enabled"].(bool)
- if !exists || !enabled {
- continue
- }
- if err := s.cleanOldTablesForBase(ctx, tableName, cutoffTime); err != nil {
- s.logger.Error(fmt.Sprintf("清理表 %s 的旧分表失败: %v", tableName, err))
- }
- }
- s.logger.Info("旧分表清理完成")
- return nil
- }
- func (s *shardingService) cleanOldTablesForBase(ctx context.Context, baseTableName string, cutoffTime time.Time) error {
- // 获取所有可能的表名(从cutoffTime到现在)
- now := time.Now()
- tableNames := s.shardingMgr.GetQueryTableNames(baseTableName, &cutoffTime, &now)
- for _, tableName := range tableNames {
- // 解析表名中的时间
- if !s.shouldDropTable(tableName, cutoffTime) {
- continue
- }
- // 检查表是否存在
- if !s.db.Migrator().HasTable(tableName) {
- continue
- }
- s.logger.Info(fmt.Sprintf("删除过期分表: %s", tableName))
-
- // 删除表
- err := s.db.Migrator().DropTable(tableName)
- if err != nil {
- s.logger.Error(fmt.Sprintf("删除表 %s 失败: %v", tableName, err))
- }
- }
- return nil
- }
- func (s *shardingService) shouldDropTable(tableName string, cutoffTime time.Time) bool {
- // 这里可以根据表名解析时间,判断是否应该删除
- // 简单实现:如果表名包含的时间早于cutoffTime,则删除
- // 实际项目中可以更精确地解析表名中的时间戳
- return true // 暂时简化实现
- }
- // createShardingStrategy 根据配置创建分表策略
- func createShardingStrategy(config *viper.Viper) sharding.ShardingStrategy {
- strategy := config.GetString("data.sharding.strategy")
- switch strategy {
- case "monthly":
- fallthrough
- default:
- return sharding.NewMonthlyShardingStrategy()
- }
- }
|