manager.go 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package sharding
  2. import (
  3. "context"
  4. "fmt"
  5. "regexp"
  6. "strconv"
  7. "strings"
  8. "time"
  9. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  10. "gorm.io/gorm"
  11. )
  12. // TableModel 支持分表的模型接口
  13. type TableModel interface {
  14. GetBaseTableName() string
  15. GetCreatedAt() time.Time
  16. }
  17. // ThresholdConfig 阈值配置
  18. type ThresholdConfig struct {
  19. Enabled bool `mapstructure:"enabled"`
  20. MaxRows int64 `mapstructure:"max_rows"`
  21. CheckInterval time.Duration `mapstructure:"check_interval"`
  22. Tables []TableConfig `mapstructure:"tables"`
  23. }
  24. // TableConfig 单表配置
  25. type TableConfig struct {
  26. Name string `mapstructure:"name"`
  27. Enabled bool `mapstructure:"enabled"`
  28. MaxRows int64 `mapstructure:"max_rows"`
  29. }
  30. // ShardingManager 分表管理器
  31. type ShardingManager struct {
  32. strategy ShardingStrategy
  33. logger *log.Logger
  34. thresholdConfig *ThresholdConfig
  35. }
  36. // NewShardingManager 从配置创建ShardingManager
  37. func NewShardingManager(strategy ShardingStrategy, logger *log.Logger, config *ThresholdConfig) *ShardingManager {
  38. return &ShardingManager{
  39. strategy: strategy,
  40. logger: logger,
  41. thresholdConfig: config,
  42. }
  43. }
  44. // GetWriteTableName 获取写入表名(基于记录的创建时间)
  45. func (sm *ShardingManager) GetWriteTableName(model TableModel) string {
  46. baseTableName := model.GetBaseTableName()
  47. createdAt := model.GetCreatedAt()
  48. if createdAt.IsZero() {
  49. createdAt = time.Now()
  50. }
  51. return sm.strategy.GetTableName(baseTableName, createdAt)
  52. }
  53. // GetCurrentTableName 获取当前表名(用于写入新记录)
  54. func (sm *ShardingManager) GetCurrentTableName(baseTableName string) string {
  55. return sm.strategy.GetCurrentTableName(baseTableName)
  56. }
  57. // GetQueryTableNames 获取查询需要的所有表名
  58. func (sm *ShardingManager) GetQueryTableNames(baseTableName string, start, end *time.Time) []string {
  59. if start == nil || end == nil {
  60. // 如果没有指定时间范围,默认查询最近3个月的表
  61. now := time.Now()
  62. defaultStart := now.AddDate(0, -2, 0) // 前2个月
  63. defaultEnd := now
  64. return sm.strategy.GetTableNamesByRange(baseTableName, defaultStart, defaultEnd)
  65. }
  66. return sm.strategy.GetTableNamesByRange(baseTableName, *start, *end)
  67. }
  68. // EnsureTableExists 确保表存在,不存在则创建
  69. func (sm *ShardingManager) EnsureTableExists(ctx context.Context, db *gorm.DB, tableName string, model interface{}) error {
  70. // 检查表是否存在
  71. if db.Migrator().HasTable(tableName) {
  72. return nil
  73. }
  74. sm.logger.Info(fmt.Sprintf("创建分表: %s", tableName))
  75. // 使用指定的表名创建表
  76. return db.Table(tableName).AutoMigrate(model)
  77. }
  78. // BuildUnionQuery 构建联合查询(用于跨表查询)
  79. func (sm *ShardingManager) BuildUnionQuery(ctx context.Context, db *gorm.DB, tableNames []string, baseQuery func(*gorm.DB) *gorm.DB) *gorm.DB {
  80. if len(tableNames) == 0 {
  81. return db
  82. }
  83. // 过滤存在的表
  84. var existingTables []string
  85. for _, tableName := range tableNames {
  86. if db.Migrator().HasTable(tableName) {
  87. existingTables = append(existingTables, tableName)
  88. }
  89. }
  90. if len(existingTables) == 0 {
  91. return db
  92. }
  93. // 如果只有一个表,直接查询该表
  94. if len(existingTables) == 1 {
  95. return baseQuery(db.Table(existingTables[0]))
  96. }
  97. // 多表联合查询
  98. var subQueries []string
  99. for _, tableName := range existingTables {
  100. subQueries = append(subQueries, fmt.Sprintf("SELECT * FROM %s", tableName))
  101. }
  102. unionSQL := strings.Join(subQueries, " UNION ALL ")
  103. return baseQuery(db.Table(fmt.Sprintf("(%s) as sharded_table", unionSQL)))
  104. }
  105. // GetTableNamesWithExistenceCheck 获取存在的表名列表(只返回分表,不包含原表)
  106. func (sm *ShardingManager) GetTableNamesWithExistenceCheck(db *gorm.DB, baseTableName string, start, end *time.Time) []string {
  107. allTableNames := sm.GetQueryTableNames(baseTableName, start, end)
  108. var existingTables []string
  109. for _, tableName := range allTableNames {
  110. if db.Migrator().HasTable(tableName) {
  111. existingTables = append(existingTables, tableName)
  112. }
  113. }
  114. // 还要检查动态分表(带序号的表)
  115. dynamicTables := sm.findDynamicTables(db, allTableNames)
  116. existingTables = append(existingTables, dynamicTables...)
  117. return existingTables
  118. }
  119. // findDynamicTables 查找动态分表(带序号的表)
  120. func (sm *ShardingManager) findDynamicTables(db *gorm.DB, baseTableNames []string) []string {
  121. var dynamicTables []string
  122. for _, baseTableName := range baseTableNames {
  123. // 查找类似 log_202408_01, log_202408_02 这样的表
  124. pattern := fmt.Sprintf("%s_\\d+", baseTableName)
  125. if tables := sm.findTablesByPattern(db, pattern); len(tables) > 0 {
  126. dynamicTables = append(dynamicTables, tables...)
  127. }
  128. }
  129. return dynamicTables
  130. }
  131. // findTablesByPattern 根据模式查找表
  132. func (sm *ShardingManager) findTablesByPattern(db *gorm.DB, pattern string) []string {
  133. var tables []string
  134. // 获取所有表名
  135. rows, err := db.Raw("SHOW TABLES").Rows()
  136. if err != nil {
  137. sm.logger.Error("获取表列表失败: " + err.Error())
  138. return tables
  139. }
  140. defer rows.Close()
  141. regex, err := regexp.Compile(pattern)
  142. if err != nil {
  143. sm.logger.Error("编译正则表达式失败: " + err.Error())
  144. return tables
  145. }
  146. for rows.Next() {
  147. var tableName string
  148. if err := rows.Scan(&tableName); err != nil {
  149. continue
  150. }
  151. if regex.MatchString(tableName) {
  152. tables = append(tables, tableName)
  153. }
  154. }
  155. return tables
  156. }
  157. // GetOptimalWriteTable 获取最优的写入表(根据model自动获取阈值)
  158. func (sm *ShardingManager) GetOptimalWriteTable(ctx context.Context, db *gorm.DB, model TableModel) (string, error) {
  159. baseTableName := model.GetBaseTableName()
  160. createdAt := model.GetCreatedAt()
  161. if createdAt.IsZero() {
  162. createdAt = time.Now()
  163. }
  164. // 先获取基础表名
  165. baseShardTableName := sm.strategy.GetTableName(baseTableName, createdAt)
  166. // 如果没有启用阈值检查,直接返回基础表名
  167. if sm.thresholdConfig == nil || !sm.thresholdConfig.Enabled {
  168. return baseShardTableName, nil
  169. }
  170. // 根据表名自动获取阈值
  171. maxRows := sm.GetMaxRowsForTable(baseTableName)
  172. // 如果返回-1,表示该表禁用了阈值检查,直接返回基础表名
  173. if maxRows == -1 {
  174. return baseShardTableName, nil
  175. }
  176. // 检查当前表是否已达到阈值
  177. currentTable := baseShardTableName
  178. for {
  179. if !db.Migrator().HasTable(currentTable) {
  180. // 表不存在,可以使用
  181. return currentTable, nil
  182. }
  183. // 检查表的数据量
  184. var count int64
  185. err := db.Table(currentTable).Count(&count).Error
  186. if err != nil {
  187. sm.logger.Error(fmt.Sprintf("检查表 %s 数据量失败: %v", currentTable, err))
  188. return currentTable, nil // 出错时返回当前表
  189. }
  190. if count < maxRows {
  191. // 当前表还有空间
  192. return currentTable, nil
  193. }
  194. // 当前表已满,尝试下一个序号的表
  195. currentTable = sm.getNextSequenceTable(currentTable)
  196. sm.logger.Info(fmt.Sprintf("表 %s 已达到阈值 %d,尝试使用 %s", baseShardTableName, maxRows, currentTable))
  197. }
  198. }
  199. // getNextSequenceTable 获取下一个序号的表名
  200. func (sm *ShardingManager) getNextSequenceTable(currentTableName string) string {
  201. // 检查是否已经有序号
  202. re := regexp.MustCompile(`^(.+)_(\d+)$`)
  203. matches := re.FindStringSubmatch(currentTableName)
  204. if len(matches) == 3 {
  205. // 已有序号,递增
  206. baseName := matches[1]
  207. seq, _ := strconv.Atoi(matches[2])
  208. return fmt.Sprintf("%s_%02d", baseName, seq+1)
  209. } else {
  210. // 没有序号,添加序号01
  211. return fmt.Sprintf("%s_01", currentTableName)
  212. }
  213. }
  214. // CheckAndCreateNewTable 检查是否需要创建新表(基于时间周期)
  215. func (sm *ShardingManager) CheckAndCreateNewTable(ctx context.Context, db *gorm.DB, baseTableName string, modelExample interface{}) error {
  216. currentTime := time.Now()
  217. expectedTableName := sm.strategy.GetTableName(baseTableName, currentTime)
  218. // 检查当前期间的表是否存在
  219. if !db.Migrator().HasTable(expectedTableName) {
  220. sm.logger.Info(fmt.Sprintf("创建新周期分表: %s", expectedTableName))
  221. return sm.EnsureTableExists(ctx, db, expectedTableName, modelExample)
  222. }
  223. return nil
  224. }
  225. // GetMaxRowsForTable 获取指定表的最大行数配置
  226. func (sm *ShardingManager) GetMaxRowsForTable(tableName string) int64 {
  227. // 检查表级配置
  228. if sm.thresholdConfig != nil && sm.thresholdConfig.Tables != nil {
  229. for _, tableConfig := range sm.thresholdConfig.Tables {
  230. if tableConfig.Name == tableName {
  231. if !tableConfig.Enabled {
  232. // 表级别禁用分表,返回-1表示无限制
  233. return -1
  234. }
  235. return tableConfig.MaxRows
  236. }
  237. }
  238. }
  239. // 检查全局配置是否启用
  240. if sm.thresholdConfig != nil && !sm.thresholdConfig.Enabled {
  241. // 全局禁用阈值检查,返回-1表示无限制
  242. return -1
  243. }
  244. // 使用全局默认配置
  245. if sm.thresholdConfig != nil && sm.thresholdConfig.MaxRows > 0 {
  246. return sm.thresholdConfig.MaxRows
  247. }
  248. // 配置缺失,返回错误而不是默认值
  249. panic(fmt.Sprintf("表 '%s' 的阈值配置缺失,请在配置文件中添加相应配置", tableName))
  250. }