123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152 |
- package job
- import (
- "context"
- "encoding/json"
- adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
- "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
- "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- "sync"
- "time"
- )
- // 使用公共的 TaskHandler 定义
- // WafLogJob 定义了处理WAF日志相关任务的接口
- type WafLogJob interface {
- // AddWafLogConsumer 启动消费者,处理WAF日志任务
- AddWafLogConsumer(ctx context.Context)
- }
- // NewWafLogJob 创建一个新的 WafLogJob
- func NewWafLogJob(job *Job,
- wafLogService admin.WafLogService,
- ) WafLogJob {
- return &wafLogJob{
- Job: job,
- wafLogService: wafLogService,
- buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
- bufferMutex: &sync.Mutex{},
- lastFlushTime: time.Now(),
- }
- }
- type wafLogJob struct {
- *Job
- wafLogService admin.WafLogService
- buffer []*adminApi.WafLog // 消息缓冲区
- bufferMutex *sync.Mutex // 缓冲区锁
- lastFlushTime time.Time // 上次刷新时间
- }
- // AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
- func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
- // 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
- ticker := time.NewTicker(1 * time.Second)
- go func() {
- for {
- select {
- case <-ctx.Done():
- ticker.Stop()
- // 确保关闭前处理剩余的消息
- j.checkAndFlushBuffer(ctx)
- return
- case <-ticker.C:
- j.checkAndFlushBuffer(ctx)
- }
- }
- }()
-
- // 启动消费者
- j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
- }
- // consume 调用公共的 Consume 方法
- func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
- j.Job.Consume(ctx, taskName, consumerName, handler)
- }
- // handleDomainMessage 处理单条WAF日志消息,放入批处理缓冲区中
- func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
- var payload adminApi.WafLog
- if err := json.Unmarshal(d.Body, &payload); err != nil {
- logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
- return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
- }
- logger.Info("收到添加日志任务",
- zap.Int("hostId", payload.HostId),
- zap.Int("uid", payload.Uid),
- zap.String("routing_key", d.RoutingKey),
- )
- // 将消息添加到缓冲区
- j.bufferMutex.Lock()
- defer j.bufferMutex.Unlock()
-
- // 如果是第一条消息,初始化刷新时间
- if len(j.buffer) == 0 {
- j.lastFlushTime = time.Now()
- }
-
- // 添加到缓冲区
- j.buffer = append(j.buffer, &payload)
-
- // 如果缓冲区达到100条,立即刷新
- if len(j.buffer) >= 100 {
- return j.flushBuffer(ctx, logger)
- }
-
- return nil
- }
- // checkAndFlushBuffer 检查缓冲区是否需要刷新
- func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context) {
- j.bufferMutex.Lock()
- defer j.bufferMutex.Unlock()
-
- // 如果缓冲区为空,则不需要刷新
- if len(j.buffer) == 0 {
- return
- }
-
- // 如果距离上次刷新时间超过5秒,或者缓冲区中有超过20条消息,则刷新缓冲区
- if time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
- // 创建一个新的logger代替j.logger
- logger := zap.NewNop()
- j.flushBuffer(ctx, logger)
- }
- }
- // flushBuffer 刷新缓冲区中的消息
- func (j *wafLogJob) flushBuffer(ctx context.Context, logger *zap.Logger) error {
- if len(j.buffer) == 0 {
- return nil
- }
-
- // 复制当前缓冲区数据,然后清空缓冲区
- messageCount := len(j.buffer)
- logger.Info("开始批量处理WAF日志", zap.Int("日志数量", messageCount))
-
- // 复制一份数据进行处理
- batch := make([]*adminApi.WafLog, len(j.buffer))
- copy(batch, j.buffer)
-
- // 清空缓冲区并更新刷新时间
- j.buffer = j.buffer[:0]
- j.lastFlushTime = time.Now()
-
- // 批量处理消息
- err := j.wafLogService.BatchAddWafLog(ctx, batch)
-
- if err != nil {
- logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", messageCount))
- } else {
- logger.Info("成功批量处理WAF日志", zap.Int("数量", messageCount))
- }
-
- return err
- }
|