package job import ( "context" "encoding/json" adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin" "github.com/go-nunu/nunu-layout-advanced/internal/service/admin" "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" "sync" "time" ) // 使用公共的 TaskHandler 定义 // WafLogJob 定义了处理WAF日志相关任务的接口 type WafLogJob interface { // AddWafLogConsumer 启动消费者,处理WAF日志任务 AddWafLogConsumer(ctx context.Context) } // NewWafLogJob 创建一个新的 WafLogJob func NewWafLogJob(job *Job, wafLogService admin.WafLogService, ) WafLogJob { return &wafLogJob{ Job: job, wafLogService: wafLogService, buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区 bufferMutex: &sync.Mutex{}, lastFlushTime: time.Now(), } } type wafLogJob struct { *Job wafLogService admin.WafLogService buffer []*adminApi.WafLog // 消息缓冲区 bufferMutex *sync.Mutex // 缓冲区锁 lastFlushTime time.Time // 上次刷新时间 } // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器 func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) { // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息) ticker := time.NewTicker(1 * time.Second) go func() { for { select { case <-ctx.Done(): ticker.Stop() // 确保关闭前处理剩余的消息 j.checkAndFlushBuffer(ctx) return case <-ticker.C: j.checkAndFlushBuffer(ctx) } } }() // 启动消费者 j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage) } // consume 调用公共的 Consume 方法 func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) { j.Job.Consume(ctx, taskName, consumerName, handler) } // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中 func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error { var payload adminApi.WafLog if err := json.Unmarshal(d.Body, &payload); err != nil { logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body)) return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误 } logger.Info("收到添加日志任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.String("routing_key", d.RoutingKey), ) // 将消息添加到缓冲区 j.bufferMutex.Lock() defer j.bufferMutex.Unlock() // 如果是第一条消息,初始化刷新时间 if len(j.buffer) == 0 { j.lastFlushTime = time.Now() } // 添加到缓冲区 j.buffer = append(j.buffer, &payload) // 如果缓冲区达到100条,立即刷新 if len(j.buffer) >= 100 { return j.flushBuffer(ctx, logger) } return nil } // checkAndFlushBuffer 检查缓冲区是否需要刷新 func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context) { j.bufferMutex.Lock() defer j.bufferMutex.Unlock() // 如果缓冲区为空,则不需要刷新 if len(j.buffer) == 0 { return } // 如果距离上次刷新时间超过5秒,或者缓冲区中有超过20条消息,则刷新缓冲区 if time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 { // 创建一个新的logger代替j.logger logger := zap.NewNop() j.flushBuffer(ctx, logger) } } // flushBuffer 刷新缓冲区中的消息 func (j *wafLogJob) flushBuffer(ctx context.Context, logger *zap.Logger) error { if len(j.buffer) == 0 { return nil } // 复制当前缓冲区数据,然后清空缓冲区 messageCount := len(j.buffer) logger.Info("开始批量处理WAF日志", zap.Int("日志数量", messageCount)) // 复制一份数据进行处理 batch := make([]*adminApi.WafLog, len(j.buffer)) copy(batch, j.buffer) // 清空缓冲区并更新刷新时间 j.buffer = j.buffer[:0] j.lastFlushTime = time.Now() // 批量处理消息 err := j.wafLogService.BatchAddWafLog(ctx, batch) if err != nil { logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", messageCount)) } else { logger.Info("成功批量处理WAF日志", zap.Int("数量", messageCount)) } return err }