123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- package job
- import (
- "context"
- "encoding/json"
- adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
- "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
- "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- "sync"
- "time"
- )
- // 使用公共的 TaskHandler 定义
- // WafLogJob 定义了处理WAF日志相关任务的接口
- type WafLogJob interface {
- // AddWafLogConsumer 启动消费者,处理WAF日志任务
- AddWafLogConsumer(ctx context.Context)
- }
- // NewWafLogJob 创建一个新的 WafLogJob
- func NewWafLogJob(job *Job,
- wafLogService admin.WafLogService,
- ) WafLogJob {
- return &wafLogJob{
- Job: job,
- wafLogService: wafLogService,
- buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
- bufferMutex: &sync.Mutex{},
- }
- }
- type wafLogJob struct {
- *Job
- wafLogService admin.WafLogService
- buffer []*adminApi.WafLog // 消息缓冲区
- bufferMutex *sync.Mutex // 缓冲区锁
- lastFlushTime time.Time // 上次刷新时间
- }
- // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
- func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
- // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
- ticker := time.NewTicker(1 * time.Second)
- go func() {
- for {
- select {
- case <-ctx.Done():
- ticker.Stop()
- // 优雅停机:确保在关闭前处理缓冲区中所有剩余的消息
- j.checkAndFlushBuffer(ctx, true)
- return
- case <-ticker.C:
- // 定时检查是否需要刷新
- j.checkAndFlushBuffer(ctx, false)
- }
- }
- }()
- // 启动消费者
- j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
- }
- // consume 调用公共的 Consume 方法
- func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
- j.Job.Consume(ctx, taskName, consumerName, handler)
- }
- // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中
- func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
- var payload adminApi.WafLog
- if err := json.Unmarshal(d.Body, &payload); err != nil {
- logger.Error("解析添加日志消息失败, 消息将被丢弃", zap.Error(err), zap.ByteString("body", d.Body))
- // 返回 nil 以 ack 此消息,防止格式错误的消息反复投递
- return nil
- }
- logger.Info("收到添加日志任务",
- zap.Int("hostId", payload.HostId),
- zap.Int("uid", payload.Uid),
- zap.String("routing_key", d.RoutingKey),
- )
- var batchToProcess []*adminApi.WafLog
- // --- 锁开始 ---
- j.bufferMutex.Lock()
- if len(j.buffer) == 0 {
- j.lastFlushTime = time.Now() // 如果是第一条消息,重置计时器
- }
- j.buffer = append(j.buffer, &payload)
- // 如果缓冲区达到数量阈值,准备刷新
- if len(j.buffer) >= 100 {
- // 从缓冲区取出数据准备处理
- batchToProcess = j.flushBufferUnlocked(logger)
- }
- j.bufferMutex.Unlock()
- // --- 锁结束 ---
- // 在锁外执行耗时的批量处理操作
- if batchToProcess != nil {
- j.processBatch(ctx, logger, batchToProcess)
- }
- // 始终返回 nil,因为批量处理的成功与否不应该影响单条消息的确认
- return nil
- }
- // checkAndFlushBuffer 检查并根据条件(时间、数量、强制)刷新缓冲区
- // forceFlush 参数用于程序退出前的强制刷新
- func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context, forceFlush bool) {
- var batchToProcess []*adminApi.WafLog
- // --- 锁开始 ---
- j.bufferMutex.Lock()
- // 如果缓冲区为空,则不需要做任何事
- if len(j.buffer) == 0 {
- j.bufferMutex.Unlock()
- return
- }
- // 检查是否满足刷新条件:强制刷新 或 超过时间阈值 或 超过数量阈值
- if forceFlush || time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
- // 注意: 此处可以传递一个真实logger,以便观察由定时器触发的刷新操作
- logger := zap.NewNop() // 或者使用 j.Job.logger
- batchToProcess = j.flushBufferUnlocked(logger)
- }
- j.bufferMutex.Unlock()
- // --- 锁结束 ---
- // 在锁外执行耗时的批量处理操作
- if batchToProcess != nil {
- // 注意: 此处可以传递一个真实logger
- logger := zap.NewNop() // 或者使用 j.Job.logger
- j.processBatch(ctx, logger, batchToProcess)
- }
- }
- // flushBufferUnlocked 从缓冲区复制数据并清空缓冲区。
- // **重要**: 此方法不包含锁,必须在调用方加锁保护。
- func (j *wafLogJob) flushBufferUnlocked(logger *zap.Logger) []*adminApi.WafLog {
- messageCount := len(j.buffer)
- logger.Info("准备批量处理WAF日志", zap.Int("日志数量", messageCount))
- // 复制一份数据用于处理
- batch := make([]*adminApi.WafLog, messageCount)
- copy(batch, j.buffer)
- // 清空缓冲区 (通过切片重置) 并更新刷新时间
- j.buffer = j.buffer[:0]
- j.lastFlushTime = time.Now()
- return batch
- }
- // processBatch 执行实际的批量写入操作
- func (j *wafLogJob) processBatch(ctx context.Context, logger *zap.Logger, batch []*adminApi.WafLog) {
- if len(batch) == 0 {
- return
- }
- logger.Info("开始批量处理WAF日志", zap.Int("数量", len(batch)))
- // 实际执行批量处理
- err := j.wafLogService.BatchAddWafLog(ctx, batch)
- if err != nil {
- logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", len(batch)))
- // 此处可以根据业务需求增加失败重试或告警逻辑
- } else {
- logger.Info("成功批量处理WAF日志", zap.Int("数量", len(batch)))
- }
- }
|