log.go 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package repository
  2. import (
  3. "context"
  4. "fmt"
  5. "time"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/model"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
  8. )
  9. type LogRepository interface {
  10. GetLog(ctx context.Context, id int64) (*model.Log, error)
  11. GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error)
  12. AddLog(ctx context.Context, log *model.Log) error
  13. EditLog(ctx context.Context, log *model.Log) error
  14. }
  15. func NewLogRepository(
  16. repository *Repository,
  17. ) LogRepository {
  18. return &logRepository{
  19. Repository: repository,
  20. }
  21. }
  22. type logRepository struct {
  23. *Repository
  24. }
  25. func (r *logRepository) GetLog(ctx context.Context, id int64) (*model.Log, error) {
  26. var log model.Log
  27. // 获取分表管理器
  28. shardingMgr := r.getShardingManager()
  29. // 获取可能的表名(查询最近3个月)
  30. existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", nil, nil)
  31. // 在各个分表中查找
  32. for _, tableName := range existingTables {
  33. err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&log).Error
  34. if err == nil {
  35. log.SetTableName(tableName)
  36. return &log, nil
  37. }
  38. }
  39. return nil, fmt.Errorf("未找到ID为 %d 的日志记录", id)
  40. }
  41. func (r *logRepository) GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error) {
  42. var logs []*model.Log
  43. // 获取分表管理器
  44. shardingMgr := r.getShardingManager()
  45. // 检查存在的表
  46. existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", start, end)
  47. if len(existingTables) == 0 {
  48. return logs, nil // 没有分表,返回空结果
  49. }
  50. // 联合查询所有分表
  51. for _, tableName := range existingTables {
  52. var tableLogs []*model.Log
  53. query := r.DBWithName(ctx, "admin").Table(tableName)
  54. // 添加时间范围过滤
  55. if start != nil {
  56. query = query.Where("created_at >= ?", *start)
  57. }
  58. if end != nil {
  59. query = query.Where("created_at <= ?", *end)
  60. }
  61. err := query.Find(&tableLogs).Error
  62. if err != nil {
  63. return nil, err
  64. }
  65. // 设置表名
  66. for _, log := range tableLogs {
  67. log.SetTableName(tableName)
  68. }
  69. logs = append(logs, tableLogs...)
  70. }
  71. return logs, nil
  72. }
  73. func (r *logRepository) AddLog(ctx context.Context, log *model.Log) error {
  74. // 设置创建时间
  75. if log.CreatedAt.IsZero() {
  76. log.CreatedAt = time.Now()
  77. }
  78. // 获取分表管理器
  79. shardingMgr := r.getShardingManagerWithThreshold()
  80. // 获取最优的写入表(考虑数据量阈值)
  81. tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("log"))
  82. if err != nil {
  83. return fmt.Errorf("获取写入表失败: %v", err)
  84. }
  85. log.SetTableName(tableName)
  86. // 确保表存在
  87. err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
  88. if err != nil {
  89. return err
  90. }
  91. // 写入数据
  92. return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
  93. }
  94. func (r *logRepository) EditLog(ctx context.Context, log *model.Log) error {
  95. // 如果已经指定了表名,直接更新该表
  96. if log.TableName() != "log" {
  97. return r.DBWithName(ctx, "admin").Table(log.TableName()).Updates(log).Error
  98. }
  99. // 获取分表管理器
  100. shardingMgr := r.getShardingManager()
  101. // 确定表名
  102. tableName := shardingMgr.GetWriteTableName(log)
  103. log.SetTableName(tableName)
  104. // 确保表存在
  105. err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
  106. if err != nil {
  107. return err
  108. }
  109. return r.DBWithName(ctx, "admin").Table(tableName).Updates(log).Error
  110. }
  111. // getShardingManager 获取分表管理器
  112. func (r *logRepository) getShardingManager() *sharding.ShardingManager {
  113. // 使用月度分表策略
  114. strategy := sharding.NewMonthlyShardingStrategy()
  115. return sharding.NewShardingManager(strategy, r.Logger)
  116. }
  117. // getShardingManagerWithThreshold 获取带阈值配置的分表管理器
  118. func (r *logRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
  119. strategy := sharding.NewMonthlyShardingStrategy()
  120. // 阈值配置(这里可以从配置文件读取,暂时硬编码)
  121. thresholdConfig := &sharding.ThresholdConfig{
  122. Enabled: true,
  123. MaxRows: 3000000, // log表默认300万条
  124. CheckInterval: time.Hour,
  125. }
  126. return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
  127. }
  128. // getMaxRowsForTable 获取指定表的最大行数配置
  129. func (r *logRepository) getMaxRowsForTable(tableName string) int64 {
  130. switch tableName {
  131. case "log":
  132. return 3000000 // 300万条
  133. case "waf_log":
  134. return 5000000 // 500万条
  135. default:
  136. return 3000000 // 默认300万条
  137. }
  138. }