|
@@ -11,7 +11,6 @@ import (
|
|
|
adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
|
|
|
"github.com/go-nunu/nunu-layout-advanced/internal/model"
|
|
|
"github.com/go-nunu/nunu-layout-advanced/internal/repository"
|
|
|
- "github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
|
|
|
"gorm.io/gorm"
|
|
|
)
|
|
|
|
|
@@ -82,12 +81,9 @@ func (r *wafLogRepository) buildExportQuery(ctx context.Context, req adminApi.Ex
|
|
|
|
|
|
func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
|
|
|
var res model.WafLog
|
|
|
-
|
|
|
- // 获取分表管理器
|
|
|
- shardingMgr := r.getShardingManager()
|
|
|
-
|
|
|
+
|
|
|
// 获取存在的分表
|
|
|
- existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
|
|
|
+ existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
|
|
|
|
|
|
// 在各个分表中查找
|
|
|
for _, tableName := range existingTables {
|
|
@@ -102,8 +98,7 @@ func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafL
|
|
|
}
|
|
|
|
|
|
func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
|
|
|
- // 获取分表管理器
|
|
|
- shardingMgr := r.getShardingManager()
|
|
|
+
|
|
|
|
|
|
// 解析时间范围(如果有的话)
|
|
|
var startTime, endTime *time.Time
|
|
@@ -111,7 +106,7 @@ func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.Searc
|
|
|
// 暂时查询最近3个月的数据
|
|
|
|
|
|
// 获取需要查询的表
|
|
|
- existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
|
|
|
+ existingTables := r.Manager.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
|
|
|
|
|
|
if len(existingTables) == 0 {
|
|
|
// 没有分表,返回空结果
|
|
@@ -275,12 +270,10 @@ func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) err
|
|
|
if log.CreatedAt.IsZero() {
|
|
|
log.CreatedAt = time.Now()
|
|
|
}
|
|
|
-
|
|
|
- // 获取分表管理器
|
|
|
- shardingMgr := r.getShardingManagerWithThreshold()
|
|
|
+
|
|
|
|
|
|
// 获取最优的写入表(考虑数据量阈值)
|
|
|
- tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("waf_log"))
|
|
|
+ tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("waf_log"))
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("获取写入表失败: %v", err)
|
|
|
}
|
|
@@ -288,7 +281,7 @@ func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) err
|
|
|
log.SetTableName(tableName)
|
|
|
|
|
|
// 确保表存在
|
|
|
- err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
|
|
|
+ err = r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -301,9 +294,7 @@ func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.Waf
|
|
|
if len(logs) == 0 {
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
- // 获取带阈值的分表管理器
|
|
|
- shardingMgr := r.getShardingManagerWithThreshold()
|
|
|
+
|
|
|
maxRows := r.getMaxRowsForTable("waf_log")
|
|
|
|
|
|
// 按表名分组
|
|
@@ -316,7 +307,7 @@ func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.Waf
|
|
|
}
|
|
|
|
|
|
// 获取最优的写入表(考虑数据量阈值)
|
|
|
- tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, maxRows)
|
|
|
+ tableName, err := r.Manager.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, maxRows)
|
|
|
if err != nil {
|
|
|
return fmt.Errorf("获取写入表失败: %v", err)
|
|
|
}
|
|
@@ -330,7 +321,7 @@ func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.Waf
|
|
|
// 为每个表批量插入
|
|
|
for tableName, tableLogs := range tableGroups {
|
|
|
// 确保表存在
|
|
|
- err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
|
|
|
+ err := r.Manager.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
@@ -399,26 +390,6 @@ func (r *wafLogRepository) GetWafLogExportCount(ctx context.Context, req adminAp
|
|
|
return int(count), nil
|
|
|
}
|
|
|
|
|
|
-// getShardingManager 获取分表管理器
|
|
|
-func (r *wafLogRepository) getShardingManager() *sharding.ShardingManager {
|
|
|
- // 使用月度分表策略
|
|
|
- strategy := sharding.NewMonthlyShardingStrategy()
|
|
|
- return sharding.NewShardingManager(strategy, r.Logger)
|
|
|
-}
|
|
|
-
|
|
|
-// getShardingManagerWithThreshold 获取带阈值配置的分表管理器
|
|
|
-func (r *wafLogRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
|
|
|
- strategy := sharding.NewMonthlyShardingStrategy()
|
|
|
-
|
|
|
- // 阈值配置(这里可以从配置文件读取,暂时硬编码)
|
|
|
- thresholdConfig := &sharding.ThresholdConfig{
|
|
|
- Enabled: true,
|
|
|
- MaxRows: 5000000, // waf_log表默认500万条
|
|
|
- CheckInterval: time.Hour,
|
|
|
- }
|
|
|
-
|
|
|
- return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
|
|
|
-}
|
|
|
|
|
|
// getMaxRowsForTable 获取指定表的最大行数配置
|
|
|
func (r *wafLogRepository) getMaxRowsForTable(tableName string) int64 {
|