sharding.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package service
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
  9. "github.com/spf13/viper"
  10. "gorm.io/gorm"
  11. )
  12. type ShardingService interface {
  13. // InitializeSharding 初始化分表
  14. InitializeSharding(ctx context.Context) error
  15. // CleanOldTables 清理过期的分表
  16. CleanOldTables(ctx context.Context) error
  17. // GetShardingManager 获取分表管理器
  18. GetShardingManager() *sharding.ShardingManager
  19. }
  20. type shardingService struct {
  21. *Service
  22. db *gorm.DB
  23. logger *log.Logger
  24. config *viper.Viper
  25. shardingMgr *sharding.ShardingManager
  26. }
  27. func NewShardingService(
  28. service *Service,
  29. db *gorm.DB,
  30. logger *log.Logger,
  31. config *viper.Viper,
  32. ) ShardingService {
  33. // 根据配置创建分表策略
  34. strategy := createShardingStrategy(config)
  35. // 从配置文件读取阈值配置
  36. var thresholdConfig sharding.ThresholdConfig
  37. if err := config.UnmarshalKey("data.sharding.threshold", &thresholdConfig); err != nil {
  38. logger.Error("分表阈值配置读取失败: " + err.Error())
  39. panic("分表配置不存在,请在配置文件中添加 data.sharding.threshold 配置")
  40. }
  41. shardingMgr := sharding.NewShardingManager(strategy, logger, &thresholdConfig)
  42. return &shardingService{
  43. Service: service,
  44. db: db,
  45. logger: logger,
  46. config: config,
  47. shardingMgr: shardingMgr,
  48. }
  49. }
  50. func (s *shardingService) GetShardingManager() *sharding.ShardingManager {
  51. return s.shardingMgr
  52. }
  53. func (s *shardingService) InitializeSharding(ctx context.Context) error {
  54. if !s.config.GetBool("data.sharding.enabled") {
  55. s.logger.Info("分表功能未启用")
  56. return nil
  57. }
  58. s.logger.Info("开始初始化分表...")
  59. // 获取需要分表的表配置
  60. tables := s.config.Get("data.sharding.tables")
  61. if tables == nil {
  62. s.logger.Warn("未配置需要分表的表")
  63. return nil
  64. }
  65. tableConfigs, ok := tables.([]interface{})
  66. if !ok {
  67. return fmt.Errorf("分表配置格式错误")
  68. }
  69. for _, tableConfig := range tableConfigs {
  70. tableMap, ok := tableConfig.(map[string]interface{})
  71. if !ok {
  72. continue
  73. }
  74. tableName, exists := tableMap["name"].(string)
  75. if !exists {
  76. continue
  77. }
  78. enabled, exists := tableMap["enabled"].(bool)
  79. if !exists || !enabled {
  80. continue
  81. }
  82. if err := s.initializeTableSharding(ctx, tableName); err != nil {
  83. s.logger.Error(fmt.Sprintf("初始化表 %s 分表失败: %v", tableName, err))
  84. return err
  85. }
  86. }
  87. s.logger.Info("分表初始化完成")
  88. return nil
  89. }
  90. func (s *shardingService) initializeTableSharding(ctx context.Context, baseTableName string) error {
  91. s.logger.Info(fmt.Sprintf("初始化表 %s 的分表...", baseTableName))
  92. // 创建当前月的分表
  93. currentTableName := s.shardingMgr.GetCurrentTableName(baseTableName)
  94. var tableModel interface{}
  95. switch baseTableName {
  96. case "log":
  97. tableModel = &model.Log{}
  98. case "waf_log":
  99. tableModel = &model.WafLog{}
  100. default:
  101. return fmt.Errorf("不支持的表: %s", baseTableName)
  102. }
  103. // 确保当前表存在
  104. err := s.shardingMgr.EnsureTableExists(ctx, s.db, currentTableName, tableModel)
  105. if err != nil {
  106. return fmt.Errorf("创建表 %s 失败: %v", currentTableName, err)
  107. }
  108. s.logger.Info(fmt.Sprintf("表 %s 分表初始化完成", baseTableName))
  109. return nil
  110. }
  111. func (s *shardingService) CleanOldTables(ctx context.Context) error {
  112. if !s.config.GetBool("data.sharding.enabled") {
  113. return nil
  114. }
  115. keepMonths := s.config.GetInt("data.sharding.keep_months")
  116. if keepMonths <= 0 {
  117. keepMonths = 12 // 默认保留12个月
  118. }
  119. s.logger.Info(fmt.Sprintf("开始清理超过 %d 个月的旧分表...", keepMonths))
  120. // 计算清理时间点
  121. cutoffTime := time.Now().AddDate(0, -keepMonths, 0)
  122. // 获取需要分表的表配置
  123. tables := s.config.Get("data.sharding.tables")
  124. if tables == nil {
  125. return nil
  126. }
  127. tableConfigs, ok := tables.([]interface{})
  128. if !ok {
  129. return fmt.Errorf("分表配置格式错误")
  130. }
  131. for _, tableConfig := range tableConfigs {
  132. tableMap, ok := tableConfig.(map[string]interface{})
  133. if !ok {
  134. continue
  135. }
  136. tableName, exists := tableMap["name"].(string)
  137. if !exists {
  138. continue
  139. }
  140. enabled, exists := tableMap["enabled"].(bool)
  141. if !exists || !enabled {
  142. continue
  143. }
  144. if err := s.cleanOldTablesForBase(ctx, tableName, cutoffTime); err != nil {
  145. s.logger.Error(fmt.Sprintf("清理表 %s 的旧分表失败: %v", tableName, err))
  146. }
  147. }
  148. s.logger.Info("旧分表清理完成")
  149. return nil
  150. }
  151. func (s *shardingService) cleanOldTablesForBase(ctx context.Context, baseTableName string, cutoffTime time.Time) error {
  152. // 获取所有可能的表名(从cutoffTime到现在)
  153. now := time.Now()
  154. tableNames := s.shardingMgr.GetQueryTableNames(baseTableName, &cutoffTime, &now)
  155. for _, tableName := range tableNames {
  156. // 解析表名中的时间
  157. if !s.shouldDropTable(tableName, cutoffTime) {
  158. continue
  159. }
  160. // 检查表是否存在
  161. if !s.db.Migrator().HasTable(tableName) {
  162. continue
  163. }
  164. s.logger.Info(fmt.Sprintf("删除过期分表: %s", tableName))
  165. // 删除表
  166. err := s.db.Migrator().DropTable(tableName)
  167. if err != nil {
  168. s.logger.Error(fmt.Sprintf("删除表 %s 失败: %v", tableName, err))
  169. }
  170. }
  171. return nil
  172. }
  173. func (s *shardingService) shouldDropTable(tableName string, cutoffTime time.Time) bool {
  174. // 这里可以根据表名解析时间,判断是否应该删除
  175. // 简单实现:如果表名包含的时间早于cutoffTime,则删除
  176. // 实际项目中可以更精确地解析表名中的时间戳
  177. return true // 暂时简化实现
  178. }
  179. // createShardingStrategy 根据配置创建分表策略
  180. func createShardingStrategy(config *viper.Viper) sharding.ShardingStrategy {
  181. strategy := config.GetString("data.sharding.strategy")
  182. switch strategy {
  183. case "monthly":
  184. fallthrough
  185. default:
  186. return sharding.NewMonthlyShardingStrategy()
  187. }
  188. }