浏览代码

feat(waf): 实现WAF日志批量插入功能

- 新增WafLogJob的批处理逻辑,使用缓冲区暂存日志消息
- 实现定时检查缓冲区并批量插入数据库的功能- 在repository和service层添加批量插入Waf日志的方法
- 优化日志处理性能,减少频繁插入数据库的开销
fusu 1 周之前
父节点
当前提交
9dac3740cf
共有 3 个文件被更改,包括 155 次插入14 次删除
  1. 95 14
      internal/job/wafLog.go
  2. 8 0
      internal/repository/admin/waflog.go
  3. 52 0
      internal/service/admin/waflog.go

+ 95 - 14
internal/job/wafLog.go

@@ -7,34 +7,60 @@ import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
 	"github.com/rabbitmq/amqp091-go"
 	"github.com/rabbitmq/amqp091-go"
 	"go.uber.org/zap"
 	"go.uber.org/zap"
+	"sync"
+	"time"
 )
 )
 
 
 // 使用公共的 TaskHandler 定义
 // 使用公共的 TaskHandler 定义
 
 
-// WafLogJob 定义了处理白名单相关任务的接口
+// WafLogJob 定义了处理WAF日志相关任务的接口
 type WafLogJob interface {
 type WafLogJob interface {
-	// DomainConsumer 启动消费者,处理域名白名单任务
+	// AddWafLogConsumer 启动消费者,处理WAF日志任务
 	AddWafLogConsumer(ctx context.Context)
 	AddWafLogConsumer(ctx context.Context)
-
 }
 }
 
 
-// NewWafLogJob 创建一个新的 WhitelistJob
+// NewWafLogJob 创建一个新的 WafLogJob
 func NewWafLogJob(job *Job,
 func NewWafLogJob(job *Job,
 	wafLogService admin.WafLogService,
 	wafLogService admin.WafLogService,
 ) WafLogJob {
 ) WafLogJob {
 	return &wafLogJob{
 	return &wafLogJob{
 		Job:          job,
 		Job:          job,
 		wafLogService: wafLogService,
 		wafLogService: wafLogService,
+		buffer:       make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
+		bufferMutex:  &sync.Mutex{},
+		lastFlushTime: time.Now(),
 	}
 	}
 }
 }
 
 
 type wafLogJob struct {
 type wafLogJob struct {
 	*Job
 	*Job
 	wafLogService admin.WafLogService
 	wafLogService admin.WafLogService
+	buffer       []*adminApi.WafLog  // 消息缓冲区
+	bufferMutex  *sync.Mutex        // 缓冲区锁
+	lastFlushTime time.Time         // 上次刷新时间
 }
 }
 
 
-// DomainConsumer 启动域名白名单消费者
+
+
+// AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
 func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
 func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
+	// 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
+	ticker := time.NewTicker(1 * time.Second)
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				ticker.Stop()
+				// 确保关闭前处理剩余的消息
+				j.checkAndFlushBuffer(ctx)
+				return
+			case <-ticker.C:
+				j.checkAndFlushBuffer(ctx)
+			}
+		}
+	}()
+	
+	// 启动消费者
 	j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
 	j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
 }
 }
 
 
@@ -43,9 +69,8 @@ func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string,
 	j.Job.Consume(ctx, taskName, consumerName, handler)
 	j.Job.Consume(ctx, taskName, consumerName, handler)
 }
 }
 
 
-// handleDomainMessage 是域名白名单任务的具体处理器
+// handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中
 func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
 func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
-
 	var payload adminApi.WafLog
 	var payload adminApi.WafLog
 	if err := json.Unmarshal(d.Body, &payload); err != nil {
 	if err := json.Unmarshal(d.Body, &payload); err != nil {
 		logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
 		logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
@@ -58,14 +83,70 @@ func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger,
 		zap.String("routing_key", d.RoutingKey),
 		zap.String("routing_key", d.RoutingKey),
 	)
 	)
 
 
-	var processingErr error
-	processingErr = j.wafLogService.AddWafLog(ctx, payload)
+	// 将消息添加到缓冲区
+	j.bufferMutex.Lock()
+	defer j.bufferMutex.Unlock()
+	
+	// 如果是第一条消息,初始化刷新时间
+	if len(j.buffer) == 0 {
+		j.lastFlushTime = time.Now()
+	}
+	
+	// 添加到缓冲区
+	j.buffer = append(j.buffer, &payload)
+	
+	// 如果缓冲区达到100条,立即刷新
+	if len(j.buffer) >= 100 {
+		return j.flushBuffer(ctx, logger)
+	}
+	
+	return nil
+}
 
 
-	if processingErr != nil {
-		logger.Error("处理域名白名单任务失败", zap.Error(processingErr),zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid),zap.Any("req", payload))
-	} else {
-		logger.Info("已成功处理域名白名单任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", payload))
+// checkAndFlushBuffer 检查缓冲区是否需要刷新
+func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context) {
+	j.bufferMutex.Lock()
+	defer j.bufferMutex.Unlock()
+	
+	// 如果缓冲区为空,则不需要刷新
+	if len(j.buffer) == 0 {
+		return
+	}
+	
+	// 如果距离上次刷新时间超过5秒,或者缓冲区中有超过20条消息,则刷新缓冲区
+	if time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
+		// 创建一个新的logger代替j.logger
+		logger := zap.NewNop()
+		j.flushBuffer(ctx, logger)
 	}
 	}
+}
 
 
-	return processingErr
+// flushBuffer 刷新缓冲区中的消息
+func (j *wafLogJob) flushBuffer(ctx context.Context, logger *zap.Logger) error {
+	if len(j.buffer) == 0 {
+		return nil
+	}
+	
+	// 复制当前缓冲区数据,然后清空缓冲区
+	messageCount := len(j.buffer)
+	logger.Info("开始批量处理WAF日志", zap.Int("日志数量", messageCount))
+	
+	// 复制一份数据进行处理
+	batch := make([]*adminApi.WafLog, len(j.buffer))
+	copy(batch, j.buffer)
+	
+	// 清空缓冲区并更新刷新时间
+	j.buffer = j.buffer[:0]
+	j.lastFlushTime = time.Now()
+	
+	// 批量处理消息
+	err := j.wafLogService.BatchAddWafLog(ctx, batch)
+	
+	if err != nil {
+		logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", messageCount))
+	} else {
+		logger.Info("成功批量处理WAF日志", zap.Int("数量", messageCount))
+	}
+	
+	return err
 }
 }

+ 8 - 0
internal/repository/admin/waflog.go

@@ -10,6 +10,7 @@ type WafLogRepository interface {
 	GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
 	GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
 	GetWafLogList(ctx context.Context) ([]model.WafLog, error)
 	GetWafLogList(ctx context.Context) ([]model.WafLog, error)
 	AddWafLog(ctx context.Context, log *model.WafLog) error
 	AddWafLog(ctx context.Context, log *model.WafLog) error
+	BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error
 }
 }
 
 
 func NewWafLogRepository(
 func NewWafLogRepository(
@@ -37,3 +38,10 @@ func (r *wafLogRepository) GetWafLogList(ctx context.Context) ([]model.WafLog, e
 func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
 func (r *wafLogRepository) AddWafLog(ctx context.Context, log *model.WafLog) error {
 	return r.DBWithName(ctx,"admin").Create(log).Error
 	return r.DBWithName(ctx,"admin").Create(log).Error
 }
 }
+
+func (r *wafLogRepository) BatchAddWafLog(ctx context.Context, logs []*model.WafLog) error {
+	if len(logs) == 0 {
+		return nil
+	}
+	return r.DBWithName(ctx, "admin").CreateInBatches(logs, len(logs)).Error
+}

+ 52 - 0
internal/service/admin/waflog.go

@@ -19,6 +19,7 @@ type WafLogService interface {
 	GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
 	GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
 	GetWafLogList(ctx context.Context) ([]model.WafLog, error)
 	GetWafLogList(ctx context.Context) ([]model.WafLog, error)
 	AddWafLog(ctx context.Context, req adminApi.WafLog) error
 	AddWafLog(ctx context.Context, req adminApi.WafLog) error
+	BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error
 	PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
 	PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
 }
 }
 func NewWafLogService(
 func NewWafLogService(
@@ -150,6 +151,57 @@ func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) erro
 	})
 	})
 }
 }
 
 
+func (s *wafLogService) BatchAddWafLog(ctx context.Context, reqs []*adminApi.WafLog) error {
+	if len(reqs) == 0 {
+		return nil
+	}
+	
+	wafLogs := make([]*model.WafLog, 0, len(reqs))
+	
+	for _, req := range reqs {
+		if req.Api != "" {
+			api := strings.TrimPrefix(req.Api, "/v1")
+			if _, ok := ApiDescriptionMap[api]; ok {
+				req.ApiName = ApiDescriptionMap[api]
+			}
+			
+			apiType, ok := s.getFirstPathSegment(req.Api)
+			if ok {
+				req.ApiType = apiType[len(apiType)-1]
+			}
+		}
+		
+		userInfo, err := s.globalLimitRepository.GetUserInfo(ctx, int64(req.Uid))
+		if err != nil {
+			s.Logger.Error("获取用户信息失败", zap.Error(err), zap.Int("uid", req.Uid))
+			continue
+		}
+		
+		req.Name = userInfo.Username
+		extraData, err := json.Marshal(req.ExtraData)
+		if err != nil {
+			s.Logger.Error("序列化额外数据失败", zap.Error(err))
+			continue
+		}
+		
+		wafLogs = append(wafLogs, &model.WafLog{
+			Uid:        req.Uid,
+			Name:       req.Name,
+			RequestIp:  req.RequestIp,
+			RuleId:     req.RuleId,
+			HostId:     req.HostId,
+			UserAgent:  req.UserAgent,
+			Api:        req.Api,
+			ApiType:    req.ApiType,
+			ApiName:    req.ApiName,
+			ExtraData:  extraData,
+		})
+	}
+	
+	// 调用repository层的批量插入方法
+	return s.wafLogRepository.BatchAddWafLog(ctx, wafLogs)
+}
+
 func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
 func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
 
 
 	payload := &req
 	payload := &req