sharding.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
  9. "github.com/spf13/viper"
  10. "gorm.io/gorm"
  11. )
  12. type ShardingService interface {
  13. // InitializeSharding 初始化分表
  14. InitializeSharding(ctx context.Context) error
  15. // CleanOldTables 清理过期的分表
  16. CleanOldTables(ctx context.Context) error
  17. // GetShardingManager 获取分表管理器
  18. GetShardingManager() *sharding.ShardingManager
  19. }
  20. type shardingService struct {
  21. *Service
  22. db *gorm.DB
  23. logger *log.Logger
  24. config *viper.Viper
  25. shardingMgr *sharding.ShardingManager
  26. }
  27. func NewShardingService(
  28. service *Service,
  29. db *gorm.DB,
  30. logger *log.Logger,
  31. config *viper.Viper,
  32. ) ShardingService {
  33. // 根据配置创建分表策略
  34. strategy := createShardingStrategy(config)
  35. shardingMgr := sharding.NewShardingManager(strategy, logger)
  36. return &shardingService{
  37. Service: service,
  38. db: db,
  39. logger: logger,
  40. config: config,
  41. shardingMgr: shardingMgr,
  42. }
  43. }
  44. func (s *shardingService) GetShardingManager() *sharding.ShardingManager {
  45. return s.shardingMgr
  46. }
  47. func (s *shardingService) InitializeSharding(ctx context.Context) error {
  48. if !s.config.GetBool("data.sharding.enabled") {
  49. s.logger.Info("分表功能未启用")
  50. return nil
  51. }
  52. s.logger.Info("开始初始化分表...")
  53. // 获取需要分表的表配置
  54. tables := s.config.Get("data.sharding.tables")
  55. if tables == nil {
  56. s.logger.Warn("未配置需要分表的表")
  57. return nil
  58. }
  59. tableConfigs, ok := tables.([]interface{})
  60. if !ok {
  61. return fmt.Errorf("分表配置格式错误")
  62. }
  63. for _, tableConfig := range tableConfigs {
  64. tableMap, ok := tableConfig.(map[string]interface{})
  65. if !ok {
  66. continue
  67. }
  68. tableName, exists := tableMap["name"].(string)
  69. if !exists {
  70. continue
  71. }
  72. enabled, exists := tableMap["enabled"].(bool)
  73. if !exists || !enabled {
  74. continue
  75. }
  76. if err := s.initializeTableSharding(ctx, tableName); err != nil {
  77. s.logger.Error(fmt.Sprintf("初始化表 %s 分表失败: %v", tableName, err))
  78. return err
  79. }
  80. }
  81. s.logger.Info("分表初始化完成")
  82. return nil
  83. }
  84. func (s *shardingService) initializeTableSharding(ctx context.Context, baseTableName string) error {
  85. s.logger.Info(fmt.Sprintf("初始化表 %s 的分表...", baseTableName))
  86. // 创建当前月的分表
  87. currentTableName := s.shardingMgr.GetCurrentTableName(baseTableName)
  88. var tableModel interface{}
  89. switch baseTableName {
  90. case "log":
  91. tableModel = &model.Log{}
  92. case "waf_log":
  93. tableModel = &model.WafLog{}
  94. default:
  95. return fmt.Errorf("不支持的表: %s", baseTableName)
  96. }
  97. // 确保当前表存在
  98. err := s.shardingMgr.EnsureTableExists(ctx, s.db, currentTableName, tableModel)
  99. if err != nil {
  100. return fmt.Errorf("创建表 %s 失败: %v", currentTableName, err)
  101. }
  102. s.logger.Info(fmt.Sprintf("表 %s 分表初始化完成", baseTableName))
  103. return nil
  104. }
  105. func (s *shardingService) CleanOldTables(ctx context.Context) error {
  106. if !s.config.GetBool("data.sharding.enabled") {
  107. return nil
  108. }
  109. keepMonths := s.config.GetInt("data.sharding.keep_months")
  110. if keepMonths <= 0 {
  111. keepMonths = 12 // 默认保留12个月
  112. }
  113. s.logger.Info(fmt.Sprintf("开始清理超过 %d 个月的旧分表...", keepMonths))
  114. // 计算清理时间点
  115. cutoffTime := time.Now().AddDate(0, -keepMonths, 0)
  116. // 获取需要分表的表配置
  117. tables := s.config.Get("data.sharding.tables")
  118. if tables == nil {
  119. return nil
  120. }
  121. tableConfigs, ok := tables.([]interface{})
  122. if !ok {
  123. return fmt.Errorf("分表配置格式错误")
  124. }
  125. for _, tableConfig := range tableConfigs {
  126. tableMap, ok := tableConfig.(map[string]interface{})
  127. if !ok {
  128. continue
  129. }
  130. tableName, exists := tableMap["name"].(string)
  131. if !exists {
  132. continue
  133. }
  134. enabled, exists := tableMap["enabled"].(bool)
  135. if !exists || !enabled {
  136. continue
  137. }
  138. if err := s.cleanOldTablesForBase(ctx, tableName, cutoffTime); err != nil {
  139. s.logger.Error(fmt.Sprintf("清理表 %s 的旧分表失败: %v", tableName, err))
  140. }
  141. }
  142. s.logger.Info("旧分表清理完成")
  143. return nil
  144. }
  145. func (s *shardingService) cleanOldTablesForBase(ctx context.Context, baseTableName string, cutoffTime time.Time) error {
  146. // 获取所有可能的表名(从cutoffTime到现在)
  147. now := time.Now()
  148. tableNames := s.shardingMgr.GetQueryTableNames(baseTableName, &cutoffTime, &now)
  149. for _, tableName := range tableNames {
  150. // 解析表名中的时间
  151. if !s.shouldDropTable(tableName, cutoffTime) {
  152. continue
  153. }
  154. // 检查表是否存在
  155. if !s.db.Migrator().HasTable(tableName) {
  156. continue
  157. }
  158. s.logger.Info(fmt.Sprintf("删除过期分表: %s", tableName))
  159. // 删除表
  160. err := s.db.Migrator().DropTable(tableName)
  161. if err != nil {
  162. s.logger.Error(fmt.Sprintf("删除表 %s 失败: %v", tableName, err))
  163. }
  164. }
  165. return nil
  166. }
  167. func (s *shardingService) shouldDropTable(tableName string, cutoffTime time.Time) bool {
  168. // 这里可以根据表名解析时间,判断是否应该删除
  169. // 简单实现:如果表名包含的时间早于cutoffTime,则删除
  170. // 实际项目中可以更精确地解析表名中的时间戳
  171. return true // 暂时简化实现
  172. }
  173. // createShardingStrategy 根据配置创建分表策略
  174. func createShardingStrategy(config *viper.Viper) sharding.ShardingStrategy {
  175. strategy := config.GetString("data.sharding.strategy")
  176. switch strategy {
  177. case "monthly":
  178. fallthrough
  179. default:
  180. return sharding.NewMonthlyShardingStrategy()
  181. }
  182. }