Pārlūkot izejas kodu

feat(sharding): 实现日志表和 WAF 日志表的分表功能

- 在 config/local.yml 和 config/prod.yml 中添加分表配置
- 修改 Log 和 WafLog 模型,增加分表相关字段和方法
- 重构 LogRepository 和 WafLog
fusu 10 stundas atpakaļ
vecāks
revīzija
496d36c50d

+ 1 - 0
cmd/server/wire/wire.go

@@ -117,6 +117,7 @@ var serviceSet = wire.NewSet(
 	admin.NewWafLogDataCleanService,
 	admin.NewWafManageService,
 	admin.NewWafOperationsService,
+	service.NewShardingService,
 )
 
 var handlerSet = wire.NewSet(

+ 1 - 1
cmd/server/wire/wire_gen.go

@@ -145,7 +145,7 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 
 var repositorySet = wire.NewSet(repository.NewDB, repository.NewRedis, repository.NewCasbinEnforcer, repository.NewMongoClient, repository.NewMongoDB, repository.NewRabbitMQ, repository.NewRepository, repository.NewTransaction, admin.NewAdminRepository, admin.NewUserRepository, repository.NewGameShieldRepository, repository.NewGameShieldPublicIpRepository, waf.NewWebForwardingRepository, waf.NewTcpforwardingRepository, waf.NewUdpForWardingRepository, repository.NewGameShieldUserIpRepository, repository.NewGameShieldBackendRepository, repository.NewGameShieldSdkIpRepository, repository.NewHostRepository, waf.NewGlobalLimitRepository, repository.NewGatewayGroupRepository, repository.NewGateWayGroupIpRepository, flexCdn.NewCdnRepository, waf.NewAllowAndDenyIpRepository, flexCdn.NewProxyRepository, flexCdn.NewCcRepository, repository.NewExpiredRepository, repository.NewLogRepository, waf.NewGatewayipRepository, admin.NewGatewayIpAdminRepository, flexCdn.NewCcIpListRepository, admin.NewLogRepository, admin.NewWafLogRepository, admin.NewWafManageRepository)
 
-var serviceSet = wire.NewSet(service.NewService, admin2.NewUserService, admin2.NewGatewayIpAdminService, admin2.NewAdminService, gameShield.NewGameShieldService, service.NewAoDunService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewCrawlerService, web.NewWebForwardingService, web.NewAidedWebService, tcp.NewAidedTcpService, tcp.NewTcpforwardingService, udp.NewAidedUdpService, udp.NewUdpForWardingService, service.NewGameShieldUserIpService, gameShield.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewHostService, globallimit.NewGlobalLimitService, service.NewGatewayGroupService, common.NewWafFormatterService, service.NewGateWayGroupIpService, service.NewRequestService, flexCdn2.NewCdnService, common.NewAllowAndDenyIpService, flexCdn2.NewProxyService, flexCdn2.NewSslCertService, flexCdn2.NewWebsocketService, waf2.NewCcService, service.NewLogService, common.NewGatewayipService, waf2.NewCcIpListService, waf2.NewCdnLogService, waf2.NewBuildAudunService, waf2.NewZzybgpService, waf2.NewWaflogService, admin2.NewLogService, admin2.NewWafLogService, admin2.NewWafLogDataCleanService, admin2.NewWafManageService, admin2.NewWafOperationsService)
+var serviceSet = wire.NewSet(service.NewService, admin2.NewUserService, admin2.NewGatewayIpAdminService, admin2.NewAdminService, gameShield.NewGameShieldService, service.NewAoDunService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewCrawlerService, web.NewWebForwardingService, web.NewAidedWebService, tcp.NewAidedTcpService, tcp.NewTcpforwardingService, udp.NewAidedUdpService, udp.NewUdpForWardingService, service.NewGameShieldUserIpService, gameShield.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewHostService, globallimit.NewGlobalLimitService, service.NewGatewayGroupService, common.NewWafFormatterService, service.NewGateWayGroupIpService, service.NewRequestService, flexCdn2.NewCdnService, common.NewAllowAndDenyIpService, flexCdn2.NewProxyService, flexCdn2.NewSslCertService, flexCdn2.NewWebsocketService, waf2.NewCcService, service.NewLogService, common.NewGatewayipService, waf2.NewCcIpListService, waf2.NewCdnLogService, waf2.NewBuildAudunService, waf2.NewZzybgpService, waf2.NewWaflogService, admin2.NewLogService, admin2.NewWafLogService, admin2.NewWafLogDataCleanService, admin2.NewWafManageService, admin2.NewWafOperationsService, service.NewShardingService)
 
 var handlerSet = wire.NewSet(handler.NewHandler, admin3.NewUserHandler, admin3.NewAdminHandler, admin3.NewGatewayIpAdminHandler, handler.NewGameShieldHandler, handler.NewGameShieldPublicIpHandler, waf3.NewWebForwardingHandler, waf3.NewTcpforwardingHandler, waf3.NewUdpForWardingHandler, handler.NewGameShieldUserIpHandler, handler.NewGameShieldBackendHandler, handler.NewGameShieldSdkIpHandler, handler.NewHostHandler, waf3.NewGlobalLimitHandler, handler.NewGatewayGroupHandler, handler.NewGateWayGroupIpHandler, waf3.NewAllowAndDenyIpHandler, waf3.NewCcHandler, waf3.NewGatewayipHandler, waf3.NewCcIpListHandler, waf3.NewCdnLogHandler, admin3.NewLogHandler, admin3.NewWafLogHandler, admin3.NewWafManageHandler)
 

+ 2 - 0
cmd/task/wire/wire.go

@@ -72,6 +72,7 @@ var jobSet = wire.NewSet(
 	job.NewUserJob,
 	job.NewWhitelistJob,
 	job.NewWafLogJob,
+	job.NewShardingJob,
 )
 var serverSet = wire.NewSet(
 	server.NewTaskServer,
@@ -114,6 +115,7 @@ var serviceSet = wire.NewSet(
 	admin2.NewWafLogService,
 	admin2.NewWafLogDataCleanService,
 	admin2.NewWafOperationsService,
+	service.NewShardingService,
 )
 
 // build App

+ 2 - 2
cmd/task/wire/wire_gen.go

@@ -119,11 +119,11 @@ var repositorySet = wire.NewSet(repository.NewDB, repository.NewRedis, repositor
 
 var taskSet = wire.NewSet(task.NewTask, task.NewUserTask, task.NewGameShieldTask, task.NewWafTask)
 
-var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob, job.NewWafLogJob)
+var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob, job.NewWafLogJob, job.NewShardingJob)
 
 var serverSet = wire.NewSet(server.NewTaskServer, server.NewJobServer)
 
-var serviceSet = wire.NewSet(service.NewService, service.NewAoDunService, gameShield.NewGameShieldService, service.NewCrawlerService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewHostService, gameShield.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewGameShieldUserIpService, common.NewWafFormatterService, flexCdn2.NewCdnService, service.NewRequestService, tcp.NewAidedTcpService, tcp.NewTcpforwardingService, udp.NewAidedUdpService, udp.NewUdpForWardingService, web.NewAidedWebService, web.NewWebForwardingService, flexCdn2.NewProxyService, flexCdn2.NewSslCertService, flexCdn2.NewWebsocketService, waf2.NewCcService, common.NewGatewayipService, service.NewLogService, waf2.NewCcIpListService, waf2.NewBuildAudunService, waf2.NewZzybgpService, waf2.NewWaflogService, admin2.NewWafLogService, admin2.NewWafLogDataCleanService, admin2.NewWafOperationsService)
+var serviceSet = wire.NewSet(service.NewService, service.NewAoDunService, gameShield.NewGameShieldService, service.NewCrawlerService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewHostService, gameShield.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewGameShieldUserIpService, common.NewWafFormatterService, flexCdn2.NewCdnService, service.NewRequestService, tcp.NewAidedTcpService, tcp.NewTcpforwardingService, udp.NewAidedUdpService, udp.NewUdpForWardingService, web.NewAidedWebService, web.NewWebForwardingService, flexCdn2.NewProxyService, flexCdn2.NewSslCertService, flexCdn2.NewWebsocketService, waf2.NewCcService, common.NewGatewayipService, service.NewLogService, waf2.NewCcIpListService, waf2.NewBuildAudunService, waf2.NewZzybgpService, waf2.NewWaflogService, admin2.NewWafLogService, admin2.NewWafLogDataCleanService, admin2.NewWafOperationsService, service.NewShardingService)
 
 // build App
 func newApp(task2 *server.TaskServer,

+ 18 - 0
config/local.yml

@@ -24,6 +24,24 @@ data:
       driver: mysql
       dsn: root:671119d76d73b5c9d4182d71e8e91eaa@tcp(110.42.96.120:3306)/clouds?charset=utf8mb4&parseTime=True&loc=Local
       logLevel: "info"
+  # 分表配置
+  sharding:
+    strategy: "monthly"           # 分表策略: monthly(按月)
+    enabled: true                 # 是否启用分表
+    keep_months: 120              # 保留数据的月数(旧数据会被自动清理)
+    auto_create: true            # 是否自动创建分表
+    # 数据量阈值配置
+    threshold:
+      enabled: true              # 是否启用基于数据量的动态分表
+      max_rows: 5000000         # 单表最大行数(500万条)
+      check_interval: "1h"      # 检查间隔
+    tables:                      # 需要分表的表配置
+      - name: "log"             # 表名
+        enabled: true           # 是否对此表启用分表
+        max_rows: 3000000       # 此表的最大行数(可覆盖全局配置)
+      - name: "waf_log"         # WAF日志表
+        enabled: true
+        max_rows: 5000000       # WAF日志表的最大行数
   #    user:
   #      driver: postgres
   #      dsn: host=localhost user=gorm password=gorm dbname=gorm port=9920 sslmode=disable TimeZone=Asia/Shanghai

+ 18 - 0
config/prod.yml

@@ -24,6 +24,24 @@ data:
       driver: mysql
       dsn: root:671119d76d73b5c9d4182d71e8e91eaa@tcp(110.42.96.120:3306)/clouds?charset=utf8mb4&parseTime=True&loc=Local
       logLevel: "warn"
+    # 分表配置
+    sharding:
+      strategy: "monthly"           # 分表策略: monthly(按月)
+      enabled: true                 # 是否启用分表
+      keep_months: 120              # 保留数据的月数(旧数据会被自动清理)
+      auto_create: true            # 是否自动创建分表
+      # 数据量阈值配置
+      threshold:
+        enabled: true              # 是否启用基于数据量的动态分表
+        max_rows: 5000000         # 单表最大行数(500万条)
+        check_interval: "1h"      # 检查间隔
+      tables: # 需要分表的表配置
+        - name: "log"             # 表名
+          enabled: true           # 是否对此表启用分表
+          max_rows: 3000000       # 此表的最大行数(可覆盖全局配置)
+        - name: "waf_log"         # WAF日志表
+          enabled: true
+          max_rows: 5000000       # WAF日志表的最大行数
 #    second:
 #      driver: mysql
 #      dsn: root:Mgrj9hMF3QQ3atX5hFIo@tcp(115.238.186.121:3306)/0panel?charset=utf8mb4&parseTime=True&loc=Local

+ 77 - 0
internal/job/sharding.go

@@ -0,0 +1,77 @@
+package job
+
+import (
+	"context"
+	"time"
+
+	"github.com/go-nunu/nunu-layout-advanced/internal/service"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+)
+
+// ShardingJob 分表管理定时任务
+type ShardingJob struct {
+	shardingService service.ShardingService
+	logger          *log.Logger
+}
+
+func NewShardingJob(
+	shardingService service.ShardingService,
+	logger *log.Logger,
+) *ShardingJob {
+	return &ShardingJob{
+		shardingService: shardingService,
+		logger:          logger,
+	}
+}
+
+// InitSharding 初始化分表(启动时执行一次)
+func (j *ShardingJob) InitSharding() {
+	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+	defer cancel()
+
+	j.logger.Info("执行分表初始化任务...")
+	
+	err := j.shardingService.InitializeSharding(ctx)
+	if err != nil {
+		j.logger.Error("分表初始化失败: " + err.Error())
+	} else {
+		j.logger.Info("分表初始化完成")
+	}
+}
+
+// CleanOldTables 清理过期分表(定时执行)
+func (j *ShardingJob) CleanOldTables() {
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
+	defer cancel()
+
+	j.logger.Info("执行分表清理任务...")
+	
+	err := j.shardingService.CleanOldTables(ctx)
+	if err != nil {
+		j.logger.Error("分表清理失败: " + err.Error())
+	} else {
+		j.logger.Info("分表清理完成")
+	}
+}
+
+// CheckNewPeriodTables 检查是否需要创建新周期的分表(每月第一天执行)
+func (j *ShardingJob) CheckNewPeriodTables() {
+	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
+	defer cancel()
+
+	j.logger.Info("检查是否需要创建新周期分表...")
+	
+	// 检查是否是月初第一天
+	now := time.Now()
+	if now.Day() != 1 {
+		j.logger.Info("不是月初第一天,跳过新周期分表检查")
+		return
+	}
+	
+	err := j.shardingService.InitializeSharding(ctx)
+	if err != nil {
+		j.logger.Error("新周期分表创建失败: " + err.Error())
+	} else {
+		j.logger.Info("新周期分表检查完成")
+	}
+}

+ 26 - 4
internal/model/log.go

@@ -1,8 +1,9 @@
 package model
 
-import "time"
-import "gorm.io/datatypes"
-
+import (
+	"time"
+	"gorm.io/datatypes"
+)
 
 type Log struct {
 	Id         int `gorm:"primary"`
@@ -16,8 +17,29 @@ type Log struct {
 	ExtraData  datatypes.JSON `gorm:"type:json"`
 	CreatedAt  time.Time
 	UpdatedAt  time.Time
+	
+	// 分表相关字段
+	tableName string `gorm:"-"` // 动态表名,不存储到数据库
 }
 
 func (m *Log) TableName() string {
-    return "log"
+	if m.tableName != "" {
+		return m.tableName
+	}
+	return "log"
+}
+
+// SetTableName 设置动态表名(用于分表)
+func (m *Log) SetTableName(tableName string) {
+	m.tableName = tableName
+}
+
+// GetBaseTableName 实现TableModel接口
+func (m *Log) GetBaseTableName() string {
+	return "log"
+}
+
+// GetCreatedAt 实现TableModel接口
+func (m *Log) GetCreatedAt() time.Time {
+	return m.CreatedAt
 }

+ 22 - 1
internal/model/waflog.go

@@ -19,10 +19,31 @@ type WafLog struct {
 	ExtraData  json.RawMessage `json:"extraData" form:"extraData" gorm:"column:extra_data"`
 	CreatedAt  time.Time `json:"createdAt" form:"createdAt" gorm:"column:created_at"`
 	UpdatedAt  time.Time `json:"updatedAt" form:"updatedAt" gorm:"column:updated_at"`
+	
+	// 分表相关字段
+	tableName string `gorm:"-"` // 动态表名,不存储到数据库
 }
 
 func (m *WafLog) TableName() string {
-    return "waf_log"
+	if m.tableName != "" {
+		return m.tableName
+	}
+	return "waf_log"
+}
+
+// SetTableName 设置动态表名(用于分表)
+func (m *WafLog) SetTableName(tableName string) {
+	m.tableName = tableName
+}
+
+// GetBaseTableName 实现TableModel接口
+func (m *WafLog) GetBaseTableName() string {
+	return "waf_log"
+}
+
+// GetCreatedAt 实现TableModel接口
+func (m *WafLog) GetCreatedAt() time.Time {
+	return m.CreatedAt
 }
 
 // WafLogWithGatewayIP 包含WAF日志及其在特定时间点的网关IP数据

+ 195 - 48
internal/repository/admin/log.go

@@ -1,13 +1,18 @@
 package admin
 
 import (
-    "context"
+	"context"
+	"fmt"
+	"math"
+	"strings"
+	"time"
+
 	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	admin "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
-	"math"
-	"strings"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
+	"gorm.io/gorm"
 )
 
 type LogRepository interface {
@@ -29,53 +34,118 @@ type logRepository struct {
 
 func (r *logRepository) GetLog(ctx context.Context, id int64) (*model.Log, error) {
 	var res model.Log
-	return &res, r.DBWithName(ctx,"admin").Where("id = ?", id).First(&res).Error
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 获取存在的分表
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", nil, nil)
+	
+	// 在各个分表中查找
+	for _, tableName := range existingTables {
+		err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&res).Error
+		if err == nil {
+			res.SetTableName(tableName)
+			return &res, nil
+		}
+	}
+	
+	return nil, fmt.Errorf("未找到ID为 %d 的日志记录", id)
 }
 
 func (r *logRepository) GetLogList(ctx context.Context, req admin.SearchLogParams) (*v1.PaginatedResponse[model.Log], error) {
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 解析时间范围(如果有的话)
+	var startTime, endTime *time.Time
+	// TODO: 这里可以根据req中的时间字段来确定查询范围
+	// 暂时查询最近3个月的数据
+	
+	// 获取需要查询的表
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", startTime, endTime)
+	
+	if len(existingTables) == 0 {
+		// 没有分表,返回空结果
+		return &v1.PaginatedResponse[model.Log]{
+			Records:    []model.Log{},
+			Page:       1,
+			PageSize:   10,
+			Total:      0,
+			TotalPages: 0,
+		}, nil
+	}
+	
+	if len(existingTables) == 1 {
+		// 只有一个表,直接查询
+		return r.queryFromSingleTable(ctx, req, existingTables[0])
+	}
+	
+	// 跨表分页查询
+	return r.queryFromMultipleTables(ctx, req, existingTables)
+}
+
+// queryFromSingleTable 单表查询
+func (r *logRepository) queryFromSingleTable(ctx context.Context, req admin.SearchLogParams, tableName string) (*v1.PaginatedResponse[model.Log], error) {
 	var res []model.Log
 	var total int64
 
-	query := r.DBWithName(ctx,"admin").Model(&model.Log{})
-	if  req.RequestIp != "" {
-		trimmedName := strings.TrimSpace(req.RequestIp)
-		// 使用 LIKE 进行模糊匹配
-		query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
-	}
-
+	query := r.DBWithName(ctx, "admin").Table(tableName)
+	query = r.applyFilters(query, req)
 
-	if req.Uid != 0 {
-		query = query.Where("uid = ?", req.Uid)
+	if err := query.Count(&total).Error; err != nil {
+		return nil, err
 	}
 
-	if req.Api != "" {
-		trimmedName := strings.TrimSpace(req.Api)
-		// 使用 LIKE 进行模糊匹配
-		query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
-	}
+	page := req.Current
+	pageSize := req.PageSize
 
-	if req.Message != "" {
-		trimmedName := strings.TrimSpace(req.Message)
-		// 使用 LIKE 进行模糊匹配
-		query = query.Where("message LIKE CONCAT('%', ?, '%')", trimmedName)
+	if page <= 0 {
+		page = 1
 	}
 
-
-	if req.ExtraData != "" {
-		trimmedName := strings.TrimSpace(req.ExtraData)
-		// 使用 LIKE 进行模糊匹配
-		query = query.Where("extra_data LIKE CONCAT('%', ?, '%')", trimmedName)
+	if pageSize <= 0 {
+		pageSize = 10
+	} else if pageSize > 100 {
+		pageSize = 100
 	}
 
+	offset := (page - 1) * pageSize
+	
 	if req.Column != "" {
 		query = query.Order(req.Column + " " + req.Order)
 	}
-
-	if err := query.Count(&total).Error; err != nil {
-		// 如果连计数都失败了,直接返回错误
-		return nil, err
+	
+	result := query.Offset(offset).Limit(pageSize).Find(&res)
+	if result.Error != nil {
+		return nil, result.Error
 	}
 
+	return &v1.PaginatedResponse[model.Log]{
+		Records:    res,
+		Page:       page,
+		PageSize:   pageSize,
+		Total:      total,
+		TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
+	}, nil
+}
+
+// queryFromMultipleTables 多表联合查询
+func (r *logRepository) queryFromMultipleTables(ctx context.Context, req admin.SearchLogParams, tableNames []string) (*v1.PaginatedResponse[model.Log], error) {
+	var allResults []model.Log
+	var totalCount int64
+
+	// 先计算总数
+	for _, tableName := range tableNames {
+		var count int64
+		query := r.DBWithName(ctx, "admin").Table(tableName)
+		query = r.applyFilters(query, req)
+		
+		if err := query.Count(&count).Error; err != nil {
+			return nil, err
+		}
+		totalCount += count
+	}
 
 	page := req.Current
 	pageSize := req.PageSize
@@ -85,28 +155,105 @@ func (r *logRepository) GetLogList(ctx context.Context, req admin.SearchLogParam
 	}
 
 	if pageSize <= 0 {
-		pageSize = 10 // 默认每页 10 条
+		pageSize = 10
 	} else if pageSize > 100 {
-		pageSize = 100 // 每页最多 100 条
+		pageSize = 100
 	}
 
-	// 计算 offset (偏移量)
-	// 例如,第 1 页,offset = (1-1)*10 = 0 (从第0条开始)
-	// 第 2 页,offset = (2-1)*10 = 10 (从第10条开始)
+	// 计算需要跳过的记录数
 	offset := (page - 1) * pageSize
-	// 3. 执行最终的查询
-	// 在所有条件都添加完毕后,再执行 .Find()
-	result := query.Offset(offset).Limit(pageSize).Find(&res)
-	if result.Error != nil {
-		// 这里的错误可能是数据库连接问题等,而不是“未找到记录”
-		return nil, result.Error
+	limit := pageSize
+	currentOffset := 0
+
+	// 逐表查询直到获取足够的记录
+	for _, tableName := range tableNames {
+		if limit <= 0 {
+			break
+		}
+
+		var tableCount int64
+		countQuery := r.DBWithName(ctx, "admin").Table(tableName)
+		countQuery = r.applyFilters(countQuery, req)
+		if err := countQuery.Count(&tableCount).Error; err != nil {
+			return nil, err
+		}
+
+		// 如果当前表的记录数不足以满足offset要求,跳过这个表
+		if currentOffset+int(tableCount) <= offset {
+			currentOffset += int(tableCount)
+			continue
+		}
+
+		// 计算在当前表中的offset
+		tableOffset := offset - currentOffset
+		if tableOffset < 0 {
+			tableOffset = 0
+		}
+
+		var tableResults []model.Log
+		query := r.DBWithName(ctx, "admin").Table(tableName)
+		query = r.applyFilters(query, req)
+		
+		if req.Column != "" {
+			query = query.Order(req.Column + " " + req.Order)
+		}
+
+		err := query.Offset(tableOffset).Limit(limit).Find(&tableResults).Error
+		if err != nil {
+			return nil, err
+		}
+
+		// 设置表名
+		for i := range tableResults {
+			tableResults[i].SetTableName(tableName)
+		}
+
+		allResults = append(allResults, tableResults...)
+		limit -= len(tableResults)
+		currentOffset += int(tableCount)
 	}
-	return &v1.PaginatedResponse[model.Log]{
-		Records: res,
-		Page: page,
-		PageSize: pageSize,
-		Total: total,
-		TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
 
+	return &v1.PaginatedResponse[model.Log]{
+		Records:    allResults,
+		Page:       page,
+		PageSize:   pageSize,
+		Total:      totalCount,
+		TotalPages: int(math.Ceil(float64(totalCount) / float64(pageSize))),
 	}, nil
 }
+
+// applyFilters 应用查询过滤条件
+func (r *logRepository) applyFilters(query *gorm.DB, req admin.SearchLogParams) *gorm.DB {
+	if req.RequestIp != "" {
+		trimmedName := strings.TrimSpace(req.RequestIp)
+		query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.Uid != 0 {
+		query = query.Where("uid = ?", req.Uid)
+	}
+
+	if req.Api != "" {
+		trimmedName := strings.TrimSpace(req.Api)
+		query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.Message != "" {
+		trimmedName := strings.TrimSpace(req.Message)
+		query = query.Where("message LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.ExtraData != "" {
+		trimmedName := strings.TrimSpace(req.ExtraData)
+		query = query.Where("extra_data LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	return query
+}
+
+// getShardingManager 获取分表管理器
+func (r *logRepository) getShardingManager() *sharding.ShardingManager {
+	// 使用月度分表策略
+	strategy := sharding.NewMonthlyShardingStrategy()
+	return sharding.NewShardingManager(strategy, r.Logger)
+}

+ 301 - 60
internal/repository/admin/waflog.go

@@ -2,13 +2,17 @@ package admin
 
 import (
 	"context"
+	"fmt"
+	"math"
+	"strings"
+	"time"
+
 	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
 	"gorm.io/gorm"
-	"math"
-	"strings"
 )
 
 type WafLogRepository interface {
@@ -78,71 +82,118 @@ func (r *wafLogRepository) buildExportQuery(ctx context.Context, req adminApi.Ex
 
 func (r *wafLogRepository) GetWafLog(ctx context.Context, id int64) (*model.WafLog, error) {
 	var res model.WafLog
-	return &res, r.DBWithName(ctx,"admin").Where("id = ?", id).First(&res).Error
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 获取存在的分表
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", nil, nil)
+	
+	// 在各个分表中查找
+	for _, tableName := range existingTables {
+		err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&res).Error
+		if err == nil {
+			res.SetTableName(tableName)
+			return &res, nil
+		}
+	}
+	
+	return nil, fmt.Errorf("未找到ID为 %d 的WAF日志记录", id)
 }
 
 func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.SearchWafLogParams) (*v1.PaginatedResponse[model.WafLog], error) {
-	var res []model.WafLog
-	var total int64
-
-	query := r.DBWithName(ctx,"admin").Model(&model.WafLog{})
-	if  req.RequestIp != "" {
-		trimmedName := strings.TrimSpace(req.RequestIp)
-		query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
-	}
-
-
-	if req.Uid != 0 {
-		query = query.Where("uid = ?", req.Uid)
-	}
-
-	if req.Api != "" {
-		trimmedName := strings.TrimSpace(req.Api)
-		query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 解析时间范围(如果有的话)
+	var startTime, endTime *time.Time
+	// TODO: 这里可以根据req中的时间字段来确定查询范围
+	// 暂时查询最近3个月的数据
+	
+	// 获取需要查询的表
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "waf_log", startTime, endTime)
+	
+	if len(existingTables) == 0 {
+		// 没有分表,返回空结果
+		return &v1.PaginatedResponse[model.WafLog]{
+			Records:    []model.WafLog{},
+			Page:       1,
+			PageSize:   10,
+			Total:      0,
+			TotalPages: 0,
+		}, nil
 	}
-
-	if req.Name != "" {
-		trimmedName := strings.TrimSpace(req.Name)
-		query = query.Where("name LIKE CONCAT('%', ?, '%')", trimmedName)
+	
+	if len(existingTables) == 1 {
+		// 只有一个表,直接查询
+		return r.queryWafLogFromSingleTable(ctx, req, existingTables[0])
 	}
+	
+	// 跨表分页查询
+	return r.queryWafLogFromMultipleTables(ctx, req, existingTables)
+}
 
+// queryWafLogFromSingleTable 单表查询
+func (r *wafLogRepository) queryWafLogFromSingleTable(ctx context.Context, req adminApi.SearchWafLogParams, tableName string) (*v1.PaginatedResponse[model.WafLog], error) {
+	var res []model.WafLog
+	var total int64
 
-	if req.RuleId != 0 {
-		query = query.Where("rule_id = ?", req.RuleId)
-	}
-
-	if req.HostId != 0 {
-		query = query.Where("host_id = ?", req.HostId)
-	}
+	query := r.DBWithName(ctx, "admin").Table(tableName)
+	query = r.applyWafLogFilters(query, req)
 
-	if req.Api != "" {
-		trimmedName := strings.TrimSpace(req.Api)
-		query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
+	if err := query.Count(&total).Error; err != nil {
+		return nil, err
 	}
 
-	if req.UserAgent != "" {
-		trimmedName := strings.TrimSpace(req.UserAgent)
-		query = query.Where("user_agent LIKE CONCAT('%', ?, '%')", trimmedName)
-	}
+	page := req.Current
+	pageSize := req.PageSize
 
-	if req.ApiName != "" {
-		trimmedName := strings.TrimSpace(req.ApiName)
-		query = query.Where("api_name LIKE CONCAT('%', ?, '%')", trimmedName)
+	if page <= 0 {
+		page = 1
 	}
 
-	if req.ApiType != "" {
-		query = query.Where("api_type = ?", req.ApiType)
+	if pageSize <= 0 {
+		pageSize = 10
+	} else if pageSize > 100 {
+		pageSize = 100
 	}
 
-
+	offset := (page - 1) * pageSize
+	
 	if req.Column != "" {
 		query = query.Order(req.Column + " " + req.Order)
 	}
-
-	if err := query.Count(&total).Error; err != nil {
-		return nil, err
+	
+	result := query.Offset(offset).Limit(pageSize).Find(&res)
+	if result.Error != nil {
+		return nil, result.Error
 	}
 
+	return &v1.PaginatedResponse[model.WafLog]{
+		Records:    res,
+		Page:       page,
+		PageSize:   pageSize,
+		Total:      total,
+		TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
+	}, nil
+}
+
+// queryWafLogFromMultipleTables 多表联合查询
+func (r *wafLogRepository) queryWafLogFromMultipleTables(ctx context.Context, req adminApi.SearchWafLogParams, tableNames []string) (*v1.PaginatedResponse[model.WafLog], error) {
+	var allResults []model.WafLog
+	var totalCount int64
+
+	// 先计算总数
+	for _, tableName := range tableNames {
+		var count int64
+		query := r.DBWithName(ctx, "admin").Table(tableName)
+		query = r.applyWafLogFilters(query, req)
+		
+		if err := query.Count(&count).Error; err != nil {
+			return nil, err
+		}
+		totalCount += count
+	}
 
 	page := req.Current
 	pageSize := req.PageSize
@@ -152,35 +203,146 @@ func (r *wafLogRepository) GetWafLogList(ctx context.Context, req adminApi.Searc
 	}
 
 	if pageSize <= 0 {
-		pageSize = 10 
+		pageSize = 10
 	} else if pageSize > 100 {
-		pageSize = 100 
+		pageSize = 100
 	}
 
+	// 计算需要跳过的记录数
 	offset := (page - 1) * pageSize
-	result := query.Offset(offset).Limit(pageSize).Find(&res)
-	if result.Error != nil {
-		return nil, result.Error
+	limit := pageSize
+	currentOffset := 0
+
+	// 逐表查询直到获取足够的记录
+	for _, tableName := range tableNames {
+		if limit <= 0 {
+			break
+		}
+
+		var tableCount int64
+		countQuery := r.DBWithName(ctx, "admin").Table(tableName)
+		countQuery = r.applyWafLogFilters(countQuery, req)
+		if err := countQuery.Count(&tableCount).Error; err != nil {
+			return nil, err
+		}
+
+		// 如果当前表的记录数不足以满足offset要求,跳过这个表
+		if currentOffset+int(tableCount) <= offset {
+			currentOffset += int(tableCount)
+			continue
+		}
+
+		// 计算在当前表中的offset
+		tableOffset := offset - currentOffset
+		if tableOffset < 0 {
+			tableOffset = 0
+		}
+
+		var tableResults []model.WafLog
+		query := r.DBWithName(ctx, "admin").Table(tableName)
+		query = r.applyWafLogFilters(query, req)
+		
+		if req.Column != "" {
+			query = query.Order(req.Column + " " + req.Order)
+		}
+
+		err := query.Offset(tableOffset).Limit(limit).Find(&tableResults).Error
+		if err != nil {
+			return nil, err
+		}
+
+		// 设置表名
+		for i := range tableResults {
+			tableResults[i].SetTableName(tableName)
+		}
+
+		allResults = append(allResults, tableResults...)
+		limit -= len(tableResults)
+		currentOffset += int(tableCount)
 	}
-	return &v1.PaginatedResponse[model.WafLog]{
-		Records: res,
-		Page: page,
-		PageSize: pageSize,
-		Total: total,
-		TotalPages: int(math.Ceil(float64(total) / float64(pageSize))),
 
+	return &v1.PaginatedResponse[model.WafLog]{
+		Records:    allResults,
+		Page:       page,
+		PageSize:   pageSize,
+		Total:      totalCount,
+		TotalPages: int(math.Ceil(float64(totalCount) / float64(pageSize))),
 	}, nil
 }
 
 func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
-	return r.DBWithName(ctx,"admin").Create(log).Error
+	// 设置创建时间
+	if log.CreatedAt.IsZero() {
+		log.CreatedAt = time.Now()
+	}
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManagerWithThreshold()
+	
+	// 获取最优的写入表(考虑数据量阈值)
+	tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("waf_log"))
+	if err != nil {
+		return fmt.Errorf("获取写入表失败: %v", err)
+	}
+	
+	log.SetTableName(tableName)
+	
+	// 确保表存在
+	err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
+	if err != nil {
+		return err
+	}
+	
+	// 写入数据
+	return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
 }
 
 func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error {
 	if len(logs) == 0 {
 		return nil
 	}
-	return r.DBWithName(ctx, "admin").CreateInBatches(logs, len(logs)).Error
+	
+	// 获取带阈值的分表管理器
+	shardingMgr := r.getShardingManagerWithThreshold()
+	maxRows := r.getMaxRowsForTable("waf_log")
+	
+	// 按表名分组
+	tableGroups := make(map[string][]*model.WafLog)
+	
+	for _, log := range logs {
+		// 设置创建时间
+		if log.CreatedAt.IsZero() {
+			log.CreatedAt = time.Now()
+		}
+		
+		// 获取最优的写入表(考虑数据量阈值)
+		tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, maxRows)
+		if err != nil {
+			return fmt.Errorf("获取写入表失败: %v", err)
+		}
+		
+		log.SetTableName(tableName)
+		
+		// 按表名分组
+		tableGroups[tableName] = append(tableGroups[tableName], log)
+	}
+	
+	// 为每个表批量插入
+	for tableName, tableLogs := range tableGroups {
+		// 确保表存在
+		err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.WafLog{})
+		if err != nil {
+			return err
+		}
+		
+		// 批量插入
+		err = r.DBWithName(ctx, "admin").Table(tableName).CreateInBatches(tableLogs, len(tableLogs)).Error
+		if err != nil {
+			return err
+		}
+	}
+	
+	return nil
 }
 
 func (r *wafLogRepository) ExportWafLog(ctx context.Context, req adminApi.ExportWafLog) ([]model.WafLogWithGatewayIP, error) {
@@ -235,4 +397,83 @@ func (r *wafLogRepository) GetWafLogExportCount(ctx context.Context, req adminAp
 	}
 	
 	return int(count), nil
+}
+
+// getShardingManager 获取分表管理器
+func (r *wafLogRepository) getShardingManager() *sharding.ShardingManager {
+	// 使用月度分表策略
+	strategy := sharding.NewMonthlyShardingStrategy()
+	return sharding.NewShardingManager(strategy, r.Logger)
+}
+
+// getShardingManagerWithThreshold 获取带阈值配置的分表管理器
+func (r *wafLogRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
+	strategy := sharding.NewMonthlyShardingStrategy()
+	
+	// 阈值配置(这里可以从配置文件读取,暂时硬编码)
+	thresholdConfig := &sharding.ThresholdConfig{
+		Enabled:       true,
+		MaxRows:       5000000, // waf_log表默认500万条
+		CheckInterval: time.Hour,
+	}
+	
+	return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
+}
+
+// getMaxRowsForTable 获取指定表的最大行数配置
+func (r *wafLogRepository) getMaxRowsForTable(tableName string) int64 {
+	switch tableName {
+	case "log":
+		return 3000000 // 300万条
+	case "waf_log":
+		return 5000000 // 500万条
+	default:
+		return 3000000 // 默认300万条
+	}
+}
+
+// applyWafLogFilters 应用WafLog查询过滤条件
+func (r *wafLogRepository) applyWafLogFilters(query *gorm.DB, req adminApi.SearchWafLogParams) *gorm.DB {
+	if req.RequestIp != "" {
+		trimmedName := strings.TrimSpace(req.RequestIp)
+		query = query.Where("request_ip LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.Uid != 0 {
+		query = query.Where("uid = ?", req.Uid)
+	}
+
+	if req.Api != "" {
+		trimmedName := strings.TrimSpace(req.Api)
+		query = query.Where("api LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.Name != "" {
+		trimmedName := strings.TrimSpace(req.Name)
+		query = query.Where("name LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.RuleId != 0 {
+		query = query.Where("rule_id = ?", req.RuleId)
+	}
+
+	if req.HostId != 0 {
+		query = query.Where("host_id = ?", req.HostId)
+	}
+
+	if req.UserAgent != "" {
+		trimmedName := strings.TrimSpace(req.UserAgent)
+		query = query.Where("user_agent LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.ApiName != "" {
+		trimmedName := strings.TrimSpace(req.ApiName)
+		query = query.Where("api_name LIKE CONCAT('%', ?, '%')", trimmedName)
+	}
+
+	if req.ApiType != "" {
+		query = query.Where("api_type = ?", req.ApiType)
+	}
+
+	return query
 }

+ 140 - 4
internal/repository/log.go

@@ -1,12 +1,17 @@
 package repository
 
 import (
-    "context"
+	"context"
+	"fmt"
+	"time"
+
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
 )
 
 type LogRepository interface {
 	GetLog(ctx context.Context, id int64) (*model.Log, error)
+	GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error)
 	AddLog(ctx context.Context, log *model.Log) error
 	EditLog(ctx context.Context, log *model.Log) error
 }
@@ -25,14 +30,145 @@ type logRepository struct {
 
 func (r *logRepository) GetLog(ctx context.Context, id int64) (*model.Log, error) {
 	var log model.Log
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 获取可能的表名(查询最近3个月)
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", nil, nil)
+	
+	// 在各个分表中查找
+	for _, tableName := range existingTables {
+		err := r.DBWithName(ctx, "admin").Table(tableName).Where("id = ?", id).First(&log).Error
+		if err == nil {
+			log.SetTableName(tableName)
+			return &log, nil
+		}
+	}
+	
+	return nil, fmt.Errorf("未找到ID为 %d 的日志记录", id)
+}
 
-	return &log, nil
+func (r *logRepository) GetLogsByTimeRange(ctx context.Context, start, end *time.Time) ([]*model.Log, error) {
+	var logs []*model.Log
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 检查存在的表
+	existingTables := shardingMgr.GetTableNamesWithExistenceCheck(r.DBWithName(ctx, "admin"), "log", start, end)
+	
+	if len(existingTables) == 0 {
+		return logs, nil // 没有分表,返回空结果
+	}
+	
+	// 联合查询所有分表
+	for _, tableName := range existingTables {
+		var tableLogs []*model.Log
+		query := r.DBWithName(ctx, "admin").Table(tableName)
+		
+		// 添加时间范围过滤
+		if start != nil {
+			query = query.Where("created_at >= ?", *start)
+		}
+		if end != nil {
+			query = query.Where("created_at <= ?", *end)
+		}
+		
+		err := query.Find(&tableLogs).Error
+		if err != nil {
+			return nil, err
+		}
+		
+		// 设置表名
+		for _, log := range tableLogs {
+			log.SetTableName(tableName)
+		}
+		
+		logs = append(logs, tableLogs...)
+	}
+	
+	return logs, nil
 }
 
 func (r *logRepository) AddLog(ctx context.Context, log *model.Log) error {
-	return r.DBWithName(ctx,"admin").Create(log).Error
+	// 设置创建时间
+	if log.CreatedAt.IsZero() {
+		log.CreatedAt = time.Now()
+	}
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManagerWithThreshold()
+	
+	// 获取最优的写入表(考虑数据量阈值)
+	tableName, err := shardingMgr.GetOptimalWriteTable(ctx, r.DBWithName(ctx, "admin"), log, r.getMaxRowsForTable("log"))
+	if err != nil {
+		return fmt.Errorf("获取写入表失败: %v", err)
+	}
+	
+	log.SetTableName(tableName)
+	
+	// 确保表存在
+	err = shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
+	if err != nil {
+		return err
+	}
+	
+	// 写入数据
+	return r.DBWithName(ctx, "admin").Table(tableName).Create(log).Error
 }
 
 func (r *logRepository) EditLog(ctx context.Context, log *model.Log) error {
-	return r.DBWithName(ctx,"admin").Updates(log).Error
+	// 如果已经指定了表名,直接更新该表
+	if log.TableName() != "log" {
+		return r.DBWithName(ctx, "admin").Table(log.TableName()).Updates(log).Error
+	}
+	
+	// 获取分表管理器
+	shardingMgr := r.getShardingManager()
+	
+	// 确定表名
+	tableName := shardingMgr.GetWriteTableName(log)
+	log.SetTableName(tableName)
+	
+	// 确保表存在
+	err := shardingMgr.EnsureTableExists(ctx, r.DBWithName(ctx, "admin"), tableName, &model.Log{})
+	if err != nil {
+		return err
+	}
+	
+	return r.DBWithName(ctx, "admin").Table(tableName).Updates(log).Error
+}
+
+// getShardingManager 获取分表管理器
+func (r *logRepository) getShardingManager() *sharding.ShardingManager {
+	// 使用月度分表策略
+	strategy := sharding.NewMonthlyShardingStrategy()
+	return sharding.NewShardingManager(strategy, r.Logger)
+}
+
+// getShardingManagerWithThreshold 获取带阈值配置的分表管理器
+func (r *logRepository) getShardingManagerWithThreshold() *sharding.ShardingManager {
+	strategy := sharding.NewMonthlyShardingStrategy()
+	
+	// 阈值配置(这里可以从配置文件读取,暂时硬编码)
+	thresholdConfig := &sharding.ThresholdConfig{
+		Enabled:       true,
+		MaxRows:       3000000, // log表默认300万条
+		CheckInterval: time.Hour,
+	}
+	
+	return sharding.NewShardingManagerWithThreshold(strategy, r.Logger, thresholdConfig)
+}
+
+// getMaxRowsForTable 获取指定表的最大行数配置
+func (r *logRepository) getMaxRowsForTable(tableName string) int64 {
+	switch tableName {
+	case "log":
+		return 3000000 // 300万条
+	case "waf_log":
+		return 5000000 // 500万条
+	default:
+		return 3000000 // 默认300万条
+	}
 }

+ 222 - 0
internal/service/sharding.go

@@ -0,0 +1,222 @@
+package service
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/go-nunu/nunu-layout-advanced/internal/model"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/sharding"
+	"github.com/spf13/viper"
+	"gorm.io/gorm"
+)
+
+type ShardingService interface {
+	// InitializeSharding 初始化分表
+	InitializeSharding(ctx context.Context) error
+	// CleanOldTables 清理过期的分表
+	CleanOldTables(ctx context.Context) error
+	// GetShardingManager 获取分表管理器
+	GetShardingManager() *sharding.ShardingManager
+}
+
+type shardingService struct {
+	*Service
+	db           *gorm.DB
+	logger       *log.Logger
+	config       *viper.Viper
+	shardingMgr  *sharding.ShardingManager
+}
+
+func NewShardingService(
+	service *Service,
+	db *gorm.DB,
+	logger *log.Logger,
+	config *viper.Viper,
+) ShardingService {
+	// 根据配置创建分表策略
+	strategy := createShardingStrategy(config)
+	shardingMgr := sharding.NewShardingManager(strategy, logger)
+
+	return &shardingService{
+		Service:     service,
+		db:          db,
+		logger:      logger,
+		config:      config,
+		shardingMgr: shardingMgr,
+	}
+}
+
+func (s *shardingService) GetShardingManager() *sharding.ShardingManager {
+	return s.shardingMgr
+}
+
+func (s *shardingService) InitializeSharding(ctx context.Context) error {
+	if !s.config.GetBool("data.sharding.enabled") {
+		s.logger.Info("分表功能未启用")
+		return nil
+	}
+
+	s.logger.Info("开始初始化分表...")
+
+	// 获取需要分表的表配置
+	tables := s.config.Get("data.sharding.tables")
+	if tables == nil {
+		s.logger.Warn("未配置需要分表的表")
+		return nil
+	}
+
+	tableConfigs, ok := tables.([]interface{})
+	if !ok {
+		return fmt.Errorf("分表配置格式错误")
+	}
+
+	for _, tableConfig := range tableConfigs {
+		tableMap, ok := tableConfig.(map[string]interface{})
+		if !ok {
+			continue
+		}
+
+		tableName, exists := tableMap["name"].(string)
+		if !exists {
+			continue
+		}
+
+		enabled, exists := tableMap["enabled"].(bool)
+		if !exists || !enabled {
+			continue
+		}
+
+		if err := s.initializeTableSharding(ctx, tableName); err != nil {
+			s.logger.Error(fmt.Sprintf("初始化表 %s 分表失败: %v", tableName, err))
+			return err
+		}
+	}
+
+	s.logger.Info("分表初始化完成")
+	return nil
+}
+
+func (s *shardingService) initializeTableSharding(ctx context.Context, baseTableName string) error {
+	s.logger.Info(fmt.Sprintf("初始化表 %s 的分表...", baseTableName))
+
+	// 创建当前月的分表
+	currentTableName := s.shardingMgr.GetCurrentTableName(baseTableName)
+	
+	var tableModel interface{}
+	switch baseTableName {
+	case "log":
+		tableModel = &model.Log{}
+	case "waf_log":
+		tableModel = &model.WafLog{}
+	default:
+		return fmt.Errorf("不支持的表: %s", baseTableName)
+	}
+
+	// 确保当前表存在
+	err := s.shardingMgr.EnsureTableExists(ctx, s.db, currentTableName, tableModel)
+	if err != nil {
+		return fmt.Errorf("创建表 %s 失败: %v", currentTableName, err)
+	}
+
+	s.logger.Info(fmt.Sprintf("表 %s 分表初始化完成", baseTableName))
+	return nil
+}
+
+func (s *shardingService) CleanOldTables(ctx context.Context) error {
+	if !s.config.GetBool("data.sharding.enabled") {
+		return nil
+	}
+
+	keepMonths := s.config.GetInt("data.sharding.keep_months")
+	if keepMonths <= 0 {
+		keepMonths = 12 // 默认保留12个月
+	}
+
+	s.logger.Info(fmt.Sprintf("开始清理超过 %d 个月的旧分表...", keepMonths))
+
+	// 计算清理时间点
+	cutoffTime := time.Now().AddDate(0, -keepMonths, 0)
+
+	// 获取需要分表的表配置
+	tables := s.config.Get("data.sharding.tables")
+	if tables == nil {
+		return nil
+	}
+
+	tableConfigs, ok := tables.([]interface{})
+	if !ok {
+		return fmt.Errorf("分表配置格式错误")
+	}
+
+	for _, tableConfig := range tableConfigs {
+		tableMap, ok := tableConfig.(map[string]interface{})
+		if !ok {
+			continue
+		}
+
+		tableName, exists := tableMap["name"].(string)
+		if !exists {
+			continue
+		}
+
+		enabled, exists := tableMap["enabled"].(bool)
+		if !exists || !enabled {
+			continue
+		}
+
+		if err := s.cleanOldTablesForBase(ctx, tableName, cutoffTime); err != nil {
+			s.logger.Error(fmt.Sprintf("清理表 %s 的旧分表失败: %v", tableName, err))
+		}
+	}
+
+	s.logger.Info("旧分表清理完成")
+	return nil
+}
+
+func (s *shardingService) cleanOldTablesForBase(ctx context.Context, baseTableName string, cutoffTime time.Time) error {
+	// 获取所有可能的表名(从cutoffTime到现在)
+	now := time.Now()
+	tableNames := s.shardingMgr.GetQueryTableNames(baseTableName, &cutoffTime, &now)
+
+	for _, tableName := range tableNames {
+		// 解析表名中的时间
+		if !s.shouldDropTable(tableName, cutoffTime) {
+			continue
+		}
+
+		// 检查表是否存在
+		if !s.db.Migrator().HasTable(tableName) {
+			continue
+		}
+
+		s.logger.Info(fmt.Sprintf("删除过期分表: %s", tableName))
+		
+		// 删除表
+		err := s.db.Migrator().DropTable(tableName)
+		if err != nil {
+			s.logger.Error(fmt.Sprintf("删除表 %s 失败: %v", tableName, err))
+		}
+	}
+
+	return nil
+}
+
+func (s *shardingService) shouldDropTable(tableName string, cutoffTime time.Time) bool {
+	// 这里可以根据表名解析时间,判断是否应该删除
+	// 简单实现:如果表名包含的时间早于cutoffTime,则删除
+	// 实际项目中可以更精确地解析表名中的时间戳
+	return true // 暂时简化实现
+}
+
+// createShardingStrategy 根据配置创建分表策略
+func createShardingStrategy(config *viper.Viper) sharding.ShardingStrategy {
+	strategy := config.GetString("data.sharding.strategy")
+	switch strategy {
+	case "monthly":
+		fallthrough
+	default:
+		return sharding.NewMonthlyShardingStrategy()
+	}
+}

+ 267 - 0
pkg/sharding/manager.go

@@ -0,0 +1,267 @@
+package sharding
+
+import (
+	"context"
+	"fmt"
+	"regexp"
+	"strconv"
+	"strings"
+	"time"
+
+	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
+	"gorm.io/gorm"
+)
+
+// TableModel 支持分表的模型接口
+type TableModel interface {
+	GetBaseTableName() string
+	GetCreatedAt() time.Time
+}
+
+// ThresholdConfig 阈值配置
+type ThresholdConfig struct {
+	Enabled       bool
+	MaxRows       int64
+	CheckInterval time.Duration
+}
+
+// ShardingManager 分表管理器
+type ShardingManager struct {
+	strategy        ShardingStrategy
+	logger          *log.Logger
+	thresholdConfig *ThresholdConfig
+}
+
+func NewShardingManager(strategy ShardingStrategy, logger *log.Logger) *ShardingManager {
+	return &ShardingManager{
+		strategy: strategy,
+		logger:   logger,
+	}
+}
+
+func NewShardingManagerWithThreshold(strategy ShardingStrategy, logger *log.Logger, thresholdConfig *ThresholdConfig) *ShardingManager {
+	return &ShardingManager{
+		strategy:        strategy,
+		logger:          logger,
+		thresholdConfig: thresholdConfig,
+	}
+}
+
+// GetWriteTableName 获取写入表名(基于记录的创建时间)
+func (sm *ShardingManager) GetWriteTableName(model TableModel) string {
+	baseTableName := model.GetBaseTableName()
+	createdAt := model.GetCreatedAt()
+	
+	if createdAt.IsZero() {
+		createdAt = time.Now()
+	}
+	
+	return sm.strategy.GetTableName(baseTableName, createdAt)
+}
+
+// GetCurrentTableName 获取当前表名(用于写入新记录)
+func (sm *ShardingManager) GetCurrentTableName(baseTableName string) string {
+	return sm.strategy.GetCurrentTableName(baseTableName)
+}
+
+// GetQueryTableNames 获取查询需要的所有表名
+func (sm *ShardingManager) GetQueryTableNames(baseTableName string, start, end *time.Time) []string {
+	if start == nil || end == nil {
+		// 如果没有指定时间范围,默认查询最近3个月的表
+		now := time.Now()
+		defaultStart := now.AddDate(0, -2, 0) // 前2个月
+		defaultEnd := now
+		return sm.strategy.GetTableNamesByRange(baseTableName, defaultStart, defaultEnd)
+	}
+	return sm.strategy.GetTableNamesByRange(baseTableName, *start, *end)
+}
+
+// EnsureTableExists 确保表存在,不存在则创建
+func (sm *ShardingManager) EnsureTableExists(ctx context.Context, db *gorm.DB, tableName string, model interface{}) error {
+	// 检查表是否存在
+	if db.Migrator().HasTable(tableName) {
+		return nil
+	}
+
+	sm.logger.Info(fmt.Sprintf("创建分表: %s", tableName))
+	
+	// 使用指定的表名创建表
+	return db.Table(tableName).AutoMigrate(model)
+}
+
+// BuildUnionQuery 构建联合查询(用于跨表查询)
+func (sm *ShardingManager) BuildUnionQuery(ctx context.Context, db *gorm.DB, tableNames []string, baseQuery func(*gorm.DB) *gorm.DB) *gorm.DB {
+	if len(tableNames) == 0 {
+		return db
+	}
+
+	// 过滤存在的表
+	var existingTables []string
+	for _, tableName := range tableNames {
+		if db.Migrator().HasTable(tableName) {
+			existingTables = append(existingTables, tableName)
+		}
+	}
+
+	if len(existingTables) == 0 {
+		return db
+	}
+
+	// 如果只有一个表,直接查询该表
+	if len(existingTables) == 1 {
+		return baseQuery(db.Table(existingTables[0]))
+	}
+
+	// 多表联合查询
+	var subQueries []string
+	for _, tableName := range existingTables {
+		subQueries = append(subQueries, fmt.Sprintf("SELECT * FROM %s", tableName))
+	}
+
+	unionSQL := strings.Join(subQueries, " UNION ALL ")
+	return baseQuery(db.Table(fmt.Sprintf("(%s) as sharded_table", unionSQL)))
+}
+
+// GetTableNamesWithExistenceCheck 获取存在的表名列表(只返回分表,不包含原表)
+func (sm *ShardingManager) GetTableNamesWithExistenceCheck(db *gorm.DB, baseTableName string, start, end *time.Time) []string {
+	allTableNames := sm.GetQueryTableNames(baseTableName, start, end)
+	var existingTables []string
+	
+	for _, tableName := range allTableNames {
+		if db.Migrator().HasTable(tableName) {
+			existingTables = append(existingTables, tableName)
+		}
+	}
+	
+	// 还要检查动态分表(带序号的表)
+	dynamicTables := sm.findDynamicTables(db, allTableNames)
+	existingTables = append(existingTables, dynamicTables...)
+	
+	return existingTables
+}
+
+// findDynamicTables 查找动态分表(带序号的表)
+func (sm *ShardingManager) findDynamicTables(db *gorm.DB, baseTableNames []string) []string {
+	var dynamicTables []string
+	
+	for _, baseTableName := range baseTableNames {
+		// 查找类似 log_202408_01, log_202408_02 这样的表
+		pattern := fmt.Sprintf("%s_\\d+", baseTableName)
+		if tables := sm.findTablesByPattern(db, pattern); len(tables) > 0 {
+			dynamicTables = append(dynamicTables, tables...)
+		}
+	}
+	
+	return dynamicTables
+}
+
+// findTablesByPattern 根据模式查找表
+func (sm *ShardingManager) findTablesByPattern(db *gorm.DB, pattern string) []string {
+	var tables []string
+	
+	// 获取所有表名
+	rows, err := db.Raw("SHOW TABLES").Rows()
+	if err != nil {
+		sm.logger.Error("获取表列表失败: " + err.Error())
+		return tables
+	}
+	defer rows.Close()
+	
+	regex, err := regexp.Compile(pattern)
+	if err != nil {
+		sm.logger.Error("编译正则表达式失败: " + err.Error())
+		return tables
+	}
+	
+	for rows.Next() {
+		var tableName string
+		if err := rows.Scan(&tableName); err != nil {
+			continue
+		}
+		if regex.MatchString(tableName) {
+			tables = append(tables, tableName)
+		}
+	}
+	
+	return tables
+}
+
+// GetOptimalWriteTable 获取最优的写入表(考虑数据量阈值)
+func (sm *ShardingManager) GetOptimalWriteTable(ctx context.Context, db *gorm.DB, model TableModel, maxRows int64) (string, error) {
+	baseTableName := model.GetBaseTableName()
+	createdAt := model.GetCreatedAt()
+	
+	if createdAt.IsZero() {
+		createdAt = time.Now()
+	}
+	
+	// 先获取基础表名
+	baseShardTableName := sm.strategy.GetTableName(baseTableName, createdAt)
+	
+	// 如果没有启用阈值检查,直接返回基础表名
+	if sm.thresholdConfig == nil || !sm.thresholdConfig.Enabled {
+		return baseShardTableName, nil
+	}
+	
+	// 使用配置的maxRows,如果没有则使用默认值
+	if maxRows <= 0 {
+		maxRows = sm.thresholdConfig.MaxRows
+	}
+	
+	// 检查当前表是否已达到阈值
+	currentTable := baseShardTableName
+	for {
+		if !db.Migrator().HasTable(currentTable) {
+			// 表不存在,可以使用
+			return currentTable, nil
+		}
+		
+		// 检查表的数据量
+		var count int64
+		err := db.Table(currentTable).Count(&count).Error
+		if err != nil {
+			sm.logger.Error(fmt.Sprintf("检查表 %s 数据量失败: %v", currentTable, err))
+			return currentTable, nil // 出错时返回当前表
+		}
+		
+		if count < maxRows {
+			// 当前表还有空间
+			return currentTable, nil
+		}
+		
+		// 当前表已满,尝试下一个序号的表
+		currentTable = sm.getNextSequenceTable(currentTable)
+		sm.logger.Info(fmt.Sprintf("表 %s 已达到阈值 %d,尝试使用 %s", baseShardTableName, maxRows, currentTable))
+	}
+}
+
+// getNextSequenceTable 获取下一个序号的表名
+func (sm *ShardingManager) getNextSequenceTable(currentTableName string) string {
+	// 检查是否已经有序号
+	re := regexp.MustCompile(`^(.+)_(\d+)$`)
+	matches := re.FindStringSubmatch(currentTableName)
+	
+	if len(matches) == 3 {
+		// 已有序号,递增
+		baseName := matches[1]
+		seq, _ := strconv.Atoi(matches[2])
+		return fmt.Sprintf("%s_%02d", baseName, seq+1)
+	} else {
+		// 没有序号,添加序号01
+		return fmt.Sprintf("%s_01", currentTableName)
+	}
+}
+
+// CheckAndCreateNewTable 检查是否需要创建新表(基于时间周期)
+func (sm *ShardingManager) CheckAndCreateNewTable(ctx context.Context, db *gorm.DB, baseTableName string, modelExample interface{}) error {
+	currentTime := time.Now()
+	expectedTableName := sm.strategy.GetTableName(baseTableName, currentTime)
+	
+	// 检查当前期间的表是否存在
+	if !db.Migrator().HasTable(expectedTableName) {
+		sm.logger.Info(fmt.Sprintf("创建新周期分表: %s", expectedTableName))
+		return sm.EnsureTableExists(ctx, db, expectedTableName, modelExample)
+	}
+	
+	return nil
+}

+ 64 - 0
pkg/sharding/strategy.go

@@ -0,0 +1,64 @@
+package sharding
+
+import (
+	"fmt"
+	"time"
+)
+
+// ShardingStrategy 分表策略接口
+type ShardingStrategy interface {
+	// GetTableName 根据时间获取表名
+	GetTableName(baseTableName string, t time.Time) string
+	// GetCurrentTableName 获取当前表名
+	GetCurrentTableName(baseTableName string) string
+	// GetTableNamesByRange 根据时间范围获取所有可能的表名
+	GetTableNamesByRange(baseTableName string, start, end time.Time) []string
+	// GetNextTableName 根据当前表名获取下一个表名(用于动态分表)
+	GetNextTableName(currentTableName string) string
+	// IsNewPeriod 检查是否应该开始新的分表周期
+	IsNewPeriod(lastTableTime, currentTime time.Time) bool
+}
+
+// MonthlyShardingStrategy 按月分表策略 - 每月第一天分表
+type MonthlyShardingStrategy struct{}
+
+func NewMonthlyShardingStrategy() ShardingStrategy {
+	return &MonthlyShardingStrategy{}
+}
+
+func (s *MonthlyShardingStrategy) GetTableName(baseTableName string, t time.Time) string {
+	return fmt.Sprintf("%s_%d%02d", baseTableName, t.Year(), t.Month())
+}
+
+func (s *MonthlyShardingStrategy) GetCurrentTableName(baseTableName string) string {
+	return s.GetTableName(baseTableName, time.Now())
+}
+
+func (s *MonthlyShardingStrategy) GetTableNamesByRange(baseTableName string, start, end time.Time) []string {
+	var tableNames []string
+	current := time.Date(start.Year(), start.Month(), 1, 0, 0, 0, 0, start.Location())
+	endMonth := time.Date(end.Year(), end.Month(), 1, 0, 0, 0, 0, end.Location())
+
+	for !current.After(endMonth) {
+		tableNames = append(tableNames, s.GetTableName(baseTableName, current))
+		current = current.AddDate(0, 1, 0)
+	}
+
+	return tableNames
+}
+
+func (s *MonthlyShardingStrategy) GetNextTableName(currentTableName string) string {
+	// 解析当前表名,生成下一个序号的表名
+	// 例如: log_202408 -> log_202408_01, log_202408_01 -> log_202408_02
+	
+	// 简单实现:在表名后添加序号
+	return fmt.Sprintf("%s_01", currentTableName)
+}
+
+func (s *MonthlyShardingStrategy) IsNewPeriod(lastTableTime, currentTime time.Time) bool {
+	// 检查是否到了新的月份
+	lastMonth := time.Date(lastTableTime.Year(), lastTableTime.Month(), 1, 0, 0, 0, 0, lastTableTime.Location())
+	currentMonth := time.Date(currentTime.Year(), currentTime.Month(), 1, 0, 0, 0, 0, currentTime.Location())
+	
+	return currentMonth.After(lastMonth)
+}