wafLog.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
  7. "github.com/rabbitmq/amqp091-go"
  8. "go.uber.org/zap"
  9. "sync"
  10. "time"
  11. )
  12. // 使用公共的 TaskHandler 定义
  13. // WafLogJob 定义了处理WAF日志相关任务的接口
  14. type WafLogJob interface {
  15. // AddWafLogConsumer 启动消费者,处理WAF日志任务
  16. AddWafLogConsumer(ctx context.Context)
  17. }
  18. // NewWafLogJob 创建一个新的 WafLogJob
  19. func NewWafLogJob(job *Job,
  20. wafLogService admin.WafLogService,
  21. ) WafLogJob {
  22. return &wafLogJob{
  23. Job: job,
  24. wafLogService: wafLogService,
  25. buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
  26. bufferMutex: &sync.Mutex{},
  27. lastFlushTime: time.Now(),
  28. }
  29. }
  30. type wafLogJob struct {
  31. *Job
  32. wafLogService admin.WafLogService
  33. buffer []*adminApi.WafLog // 消息缓冲区
  34. bufferMutex *sync.Mutex // 缓冲区锁
  35. lastFlushTime time.Time // 上次刷新时间
  36. }
  37. // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
  38. func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
  39. // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
  40. ticker := time.NewTicker(1 * time.Second)
  41. go func() {
  42. for {
  43. select {
  44. case <-ctx.Done():
  45. ticker.Stop()
  46. // 确保关闭前处理剩余的消息
  47. j.checkAndFlushBuffer(ctx)
  48. return
  49. case <-ticker.C:
  50. j.checkAndFlushBuffer(ctx)
  51. }
  52. }
  53. }()
  54. // 启动消费者
  55. j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
  56. }
  57. // consume 调用公共的 Consume 方法
  58. func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
  59. j.Job.Consume(ctx, taskName, consumerName, handler)
  60. }
  61. // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中
  62. func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  63. var payload adminApi.WafLog
  64. if err := json.Unmarshal(d.Body, &payload); err != nil {
  65. logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  66. return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
  67. }
  68. logger.Info("收到添加日志任务",
  69. zap.Int("hostId", payload.HostId),
  70. zap.Int("uid", payload.Uid),
  71. zap.String("routing_key", d.RoutingKey),
  72. )
  73. // 将消息添加到缓冲区
  74. j.bufferMutex.Lock()
  75. defer j.bufferMutex.Unlock()
  76. // 如果是第一条消息,初始化刷新时间
  77. if len(j.buffer) == 0 {
  78. j.lastFlushTime = time.Now()
  79. }
  80. // 添加到缓冲区
  81. j.buffer = append(j.buffer, &payload)
  82. // 如果缓冲区达到100条,立即刷新
  83. if len(j.buffer) >= 100 {
  84. return j.flushBuffer(ctx, logger)
  85. }
  86. return nil
  87. }
  88. // checkAndFlushBuffer 检查缓冲区是否需要刷新
  89. func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context) {
  90. j.bufferMutex.Lock()
  91. defer j.bufferMutex.Unlock()
  92. // 如果缓冲区为空,则不需要刷新
  93. if len(j.buffer) == 0 {
  94. return
  95. }
  96. // 如果距离上次刷新时间超过5秒,或者缓冲区中有超过20条消息,则刷新缓冲区
  97. if time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
  98. // 创建一个新的logger代替j.logger
  99. logger := zap.NewNop()
  100. j.flushBuffer(ctx, logger)
  101. }
  102. }
  103. // flushBuffer 刷新缓冲区中的消息
  104. func (j *wafLogJob) flushBuffer(ctx context.Context, logger *zap.Logger) error {
  105. if len(j.buffer) == 0 {
  106. return nil
  107. }
  108. // 复制当前缓冲区数据,然后清空缓冲区
  109. messageCount := len(j.buffer)
  110. logger.Info("开始批量处理WAF日志", zap.Int("日志数量", messageCount))
  111. // 复制一份数据进行处理
  112. batch := make([]*adminApi.WafLog, len(j.buffer))
  113. copy(batch, j.buffer)
  114. // 清空缓冲区并更新刷新时间
  115. j.buffer = j.buffer[:0]
  116. j.lastFlushTime = time.Now()
  117. // 批量处理消息
  118. err := j.wafLogService.BatchAddWafLog(ctx, batch)
  119. if err != nil {
  120. logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", messageCount))
  121. } else {
  122. logger.Info("成功批量处理WAF日志", zap.Int("数量", messageCount))
  123. }
  124. return err
  125. }