wafLog.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
  7. "github.com/rabbitmq/amqp091-go"
  8. "go.uber.org/zap"
  9. "sync"
  10. "time"
  11. )
  12. // 使用公共的 TaskHandler 定义
  13. // WafLogJob 定义了处理WAF日志相关任务的接口
  14. type WafLogJob interface {
  15. // AddWafLogConsumer 启动消费者,处理WAF日志任务
  16. AddWafLogConsumer(ctx context.Context)
  17. }
  18. // NewWafLogJob 创建一个新的 WafLogJob
  19. func NewWafLogJob(job *Job,
  20. wafLogService admin.WafLogService,
  21. ) WafLogJob {
  22. return &wafLogJob{
  23. Job: job,
  24. wafLogService: wafLogService,
  25. buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
  26. bufferMutex: &sync.Mutex{},
  27. }
  28. }
  29. type wafLogJob struct {
  30. *Job
  31. wafLogService admin.WafLogService
  32. buffer []*adminApi.WafLog // 消息缓冲区
  33. bufferMutex *sync.Mutex // 缓冲区锁
  34. lastFlushTime time.Time // 上次刷新时间
  35. }
  36. // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
  37. func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
  38. // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
  39. ticker := time.NewTicker(1 * time.Second)
  40. go func() {
  41. for {
  42. select {
  43. case <-ctx.Done():
  44. ticker.Stop()
  45. // 优雅停机:确保在关闭前处理缓冲区中所有剩余的消息
  46. j.checkAndFlushBuffer(ctx, true)
  47. return
  48. case <-ticker.C:
  49. // 定时检查是否需要刷新
  50. j.checkAndFlushBuffer(ctx, false)
  51. }
  52. }
  53. }()
  54. // 启动消费者
  55. j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
  56. }
  57. // consume 调用公共的 Consume 方法
  58. func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
  59. j.Job.Consume(ctx, taskName, consumerName, handler)
  60. }
  61. // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中
  62. func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  63. var payload adminApi.WafLog
  64. if err := json.Unmarshal(d.Body, &payload); err != nil {
  65. logger.Error("解析添加日志消息失败, 消息将被丢弃", zap.Error(err), zap.ByteString("body", d.Body))
  66. // 返回 nil 以 ack 此消息,防止格式错误的消息反复投递
  67. return nil
  68. }
  69. logger.Info("收到添加日志任务",
  70. zap.Int("hostId", payload.HostId),
  71. zap.Int("uid", payload.Uid),
  72. zap.String("routing_key", d.RoutingKey),
  73. )
  74. var batchToProcess []*adminApi.WafLog
  75. // --- 锁开始 ---
  76. j.bufferMutex.Lock()
  77. if len(j.buffer) == 0 {
  78. j.lastFlushTime = time.Now() // 如果是第一条消息,重置计时器
  79. }
  80. j.buffer = append(j.buffer, &payload)
  81. // 如果缓冲区达到数量阈值,准备刷新
  82. if len(j.buffer) >= 100 {
  83. // 从缓冲区取出数据准备处理
  84. batchToProcess = j.flushBufferUnlocked(logger)
  85. }
  86. j.bufferMutex.Unlock()
  87. // --- 锁结束 ---
  88. // 在锁外执行耗时的批量处理操作
  89. if batchToProcess != nil {
  90. j.processBatch(ctx, logger, batchToProcess)
  91. }
  92. // 始终返回 nil,因为批量处理的成功与否不应该影响单条消息的确认
  93. return nil
  94. }
  95. // checkAndFlushBuffer 检查并根据条件(时间、数量、强制)刷新缓冲区
  96. // forceFlush 参数用于程序退出前的强制刷新
  97. func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context, forceFlush bool) {
  98. var batchToProcess []*adminApi.WafLog
  99. // --- 锁开始 ---
  100. j.bufferMutex.Lock()
  101. // 如果缓冲区为空,则不需要做任何事
  102. if len(j.buffer) == 0 {
  103. j.bufferMutex.Unlock()
  104. return
  105. }
  106. // 检查是否满足刷新条件:强制刷新 或 超过时间阈值 或 超过数量阈值
  107. if forceFlush || time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
  108. // 注意: 此处可以传递一个真实logger,以便观察由定时器触发的刷新操作
  109. logger := zap.NewNop() // 或者使用 j.Job.logger
  110. batchToProcess = j.flushBufferUnlocked(logger)
  111. }
  112. j.bufferMutex.Unlock()
  113. // --- 锁结束 ---
  114. // 在锁外执行耗时的批量处理操作
  115. if batchToProcess != nil {
  116. // 注意: 此处可以传递一个真实logger
  117. logger := zap.NewNop() // 或者使用 j.Job.logger
  118. j.processBatch(ctx, logger, batchToProcess)
  119. }
  120. }
  121. // flushBufferUnlocked 从缓冲区复制数据并清空缓冲区。
  122. // **重要**: 此方法不包含锁,必须在调用方加锁保护。
  123. func (j *wafLogJob) flushBufferUnlocked(logger *zap.Logger) []*adminApi.WafLog {
  124. messageCount := len(j.buffer)
  125. logger.Info("准备批量处理WAF日志", zap.Int("日志数量", messageCount))
  126. // 复制一份数据用于处理
  127. batch := make([]*adminApi.WafLog, messageCount)
  128. copy(batch, j.buffer)
  129. // 清空缓冲区 (通过切片重置) 并更新刷新时间
  130. j.buffer = j.buffer[:0]
  131. j.lastFlushTime = time.Now()
  132. return batch
  133. }
  134. // processBatch 执行实际的批量写入操作
  135. func (j *wafLogJob) processBatch(ctx context.Context, logger *zap.Logger, batch []*adminApi.WafLog) {
  136. if len(batch) == 0 {
  137. return
  138. }
  139. logger.Info("开始批量处理WAF日志", zap.Int("数量", len(batch)))
  140. // 实际执行批量处理
  141. err := j.wafLogService.BatchAddWafLog(ctx, batch)
  142. if err != nil {
  143. logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", len(batch)))
  144. // 此处可以根据业务需求增加失败重试或告警逻辑
  145. } else {
  146. logger.Info("成功批量处理WAF日志", zap.Int("数量", len(batch)))
  147. }
  148. }