package job import ( "context" "encoding/json" adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin" "github.com/go-nunu/nunu-layout-advanced/internal/service/admin" "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" "sync" "time" ) // 使用公共的 TaskHandler 定义 // WafLogJob 定义了处理WAF日志相关任务的接口 type WafLogJob interface { // AddWafLogConsumer 启动消费者,处理WAF日志任务 AddWafLogConsumer(ctx context.Context) } // NewWafLogJob 创建一个新的 WafLogJob func NewWafLogJob(job *Job, wafLogService admin.WafLogService, ) WafLogJob { return &wafLogJob{ Job: job, wafLogService: wafLogService, buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区 bufferMutex: &sync.Mutex{}, } } type wafLogJob struct { *Job wafLogService admin.WafLogService buffer []*adminApi.WafLog // 消息缓冲区 bufferMutex *sync.Mutex // 缓冲区锁 lastFlushTime time.Time // 上次刷新时间 } // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器 func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) { // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息) ticker := time.NewTicker(1 * time.Second) go func() { for { select { case <-ctx.Done(): ticker.Stop() // 优雅停机:确保在关闭前处理缓冲区中所有剩余的消息 j.checkAndFlushBuffer(ctx, true) return case <-ticker.C: // 定时检查是否需要刷新 j.checkAndFlushBuffer(ctx, false) } } }() // 启动消费者 j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage) } // consume 调用公共的 Consume 方法 func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) { j.Job.Consume(ctx, taskName, consumerName, handler) } // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中 func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error { var payload adminApi.WafLog if err := json.Unmarshal(d.Body, &payload); err != nil { logger.Error("解析添加日志消息失败, 消息将被丢弃", zap.Error(err), zap.ByteString("body", d.Body)) // 返回 nil 以 ack 此消息,防止格式错误的消息反复投递 return nil } logger.Info("收到添加日志任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.String("routing_key", d.RoutingKey), ) var batchToProcess []*adminApi.WafLog // --- 锁开始 --- j.bufferMutex.Lock() if len(j.buffer) == 0 { j.lastFlushTime = time.Now() // 如果是第一条消息,重置计时器 } j.buffer = append(j.buffer, &payload) // 如果缓冲区达到数量阈值,准备刷新 if len(j.buffer) >= 100 { // 从缓冲区取出数据准备处理 batchToProcess = j.flushBufferUnlocked(logger) } j.bufferMutex.Unlock() // --- 锁结束 --- // 在锁外执行耗时的批量处理操作 if batchToProcess != nil { j.processBatch(ctx, logger, batchToProcess) } // 始终返回 nil,因为批量处理的成功与否不应该影响单条消息的确认 return nil } // checkAndFlushBuffer 检查并根据条件(时间、数量、强制)刷新缓冲区 // forceFlush 参数用于程序退出前的强制刷新 func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context, forceFlush bool) { var batchToProcess []*adminApi.WafLog // --- 锁开始 --- j.bufferMutex.Lock() // 如果缓冲区为空,则不需要做任何事 if len(j.buffer) == 0 { j.bufferMutex.Unlock() return } // 检查是否满足刷新条件:强制刷新 或 超过时间阈值 或 超过数量阈值 if forceFlush || time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 { // 注意: 此处可以传递一个真实logger,以便观察由定时器触发的刷新操作 logger := zap.NewNop() // 或者使用 j.Job.logger batchToProcess = j.flushBufferUnlocked(logger) } j.bufferMutex.Unlock() // --- 锁结束 --- // 在锁外执行耗时的批量处理操作 if batchToProcess != nil { // 注意: 此处可以传递一个真实logger logger := zap.NewNop() // 或者使用 j.Job.logger j.processBatch(ctx, logger, batchToProcess) } } // flushBufferUnlocked 从缓冲区复制数据并清空缓冲区。 // **重要**: 此方法不包含锁,必须在调用方加锁保护。 func (j *wafLogJob) flushBufferUnlocked(logger *zap.Logger) []*adminApi.WafLog { messageCount := len(j.buffer) logger.Info("准备批量处理WAF日志", zap.Int("日志数量", messageCount)) // 复制一份数据用于处理 batch := make([]*adminApi.WafLog, messageCount) copy(batch, j.buffer) // 清空缓冲区 (通过切片重置) 并更新刷新时间 j.buffer = j.buffer[:0] j.lastFlushTime = time.Now() return batch } // processBatch 执行实际的批量写入操作 func (j *wafLogJob) processBatch(ctx context.Context, logger *zap.Logger, batch []*adminApi.WafLog) { if len(batch) == 0 { return } logger.Info("开始批量处理WAF日志", zap.Int("数量", len(batch))) // 实际执行批量处理 err := j.wafLogService.BatchAddWafLog(ctx, batch) if err != nil { logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", len(batch))) // 此处可以根据业务需求增加失败重试或告警逻辑 } else { logger.Info("成功批量处理WAF日志", zap.Int("数量", len(batch))) } }