123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package repository
- import (
- "context"
- "fmt"
- "time"
- "github.com/go-nunu/nunu-layout-advanced/internal/model"
- "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
- )
- type LogRepository interface {
- GetLog(ctx context.Context, id int64) (*model.Log, error)
- GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error)
- AddLog(ctx context.Context, log *model.Log) error
- EditLog(ctx context.Context, log *model.Log) error
- }
- func NewLogRepository(
- repository *Repository,
- ) LogRepository {
- return &logRepository{
- Repository: repository,
- }
- }
- type logRepository struct {
- *Repository
- }
- func (r *logRepository) GetLog(ctx context.Context, id int64) (*model.Log, error) {
- var log model.Log
-
- // 获取分表管理器
- shardingMgr := r.getShardingManager()
-
- // 获取可能的表名(查询最近3个月)
- existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", nil, nil)
-
- // 在各个分表中查找
- for _, tableName := range existingTables {
- err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&log).Error
- if err == nil {
- log.SetTableName(tableName)
- return &log, nil
- }
- }
-
- return nil, fmt.Errorf("未找到ID为 %d 的日志记录", id)
- }
- func (r *logRepository) GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error) {
- var logs []*model.Log
-
- // 获取分表管理器
- shardingMgr := r.getShardingManager()
-
- // 检查存在的表
- existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", start, end)
-
- if len(existingTables) == 0 {
- return logs, nil // 没有分表,返回空结果
- }
-
- // 联合查询所有分表
- for _, tableName := range existingTables {
- var tableLogs []*model.Log
- query := r.DBWithName(ctx, "admin").Table(tableName)
-
- // 添加时间范围过滤
- if start != nil {
- query = query.Where("created_at >= ?", *start)
- }
- if end != nil {
- query = query.Where("created_at <= ?", *end)
- }
-
- err := query.Find(&tableLogs).Error
- if err != nil {
- return nil, err
- }
-
- // 设置表名
- for _, log := range tableLogs {
- log.SetTableName(tableName)
- }
-
- logs = append(logs, tableLogs...)
- }
-
- return logs, nil
- }
- func (r *logRepository) AddLog(ctx context.Context, log *model.Log) error {
- // 设置创建时间
- if log.CreatedAt.IsZero() {
- log.CreatedAt = time.Now()
- }
-
- // 获取分表管理器
- shardingMgr := r.getShardingManagerWithThreshold()
-
- // 获取最优的写入表(考虑数据量阈值)
- tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("log"))
- if err != nil {
- return fmt.Errorf("获取写入表失败: %v", err)
- }
-
- log.SetTableName(tableName)
-
- // 确保表存在
- err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
- if err != nil {
- return err
- }
-
- // 写入数据
- return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
- }
- func (r *logRepository) EditLog(ctx context.Context, log *model.Log) error {
- // 如果已经指定了表名,直接更新该表
- if log.TableName() != "log" {
- return r.DBWithName(ctx, "admin").Table(log.TableName()).Updates(log).Error
- }
-
- // 获取分表管理器
- shardingMgr := r.getShardingManager()
-
- // 确定表名
- tableName := shardingMgr.GetWriteTableName(log)
- log.SetTableName(tableName)
-
- // 确保表存在
- err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
- if err != nil {
- return err
- }
-
- return r.DBWithName(ctx, "admin").Table(tableName).Updates(log).Error
- }
- // getShardingManager 获取分表管理器
- func (r *logRepository) getShardingManager() *sharding.ShardingManager {
- // 使用月度分表策略
- strategy := sharding.NewMonthlyShardingStrategy()
- return sharding.NewShardingManager(strategy, r.Logger)
- }
- // getShardingManagerWithThreshold 获取带阈值配置的分表管理器
- func (r *logRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
- strategy := sharding.NewMonthlyShardingStrategy()
-
- // 阈值配置(这里可以从配置文件读取,暂时硬编码)
- thresholdConfig := &sharding.ThresholdConfig{
- Enabled: true,
- MaxRows: 3000000, // log表默认300万条
- CheckInterval: time.Hour,
- }
-
- return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
- }
- // getMaxRowsForTable 获取指定表的最大行数配置
- func (r *logRepository) getMaxRowsForTable(tableName string) int64 {
- switch tableName {
- case "log":
- return 3000000 // 300万条
- case "waf_log":
- return 5000000 // 500万条
- default:
- return 3000000 // 默认300万条
- }
- }
|