|
@@ -24,24 +24,21 @@ func NewWafLogJob(job *Job,
|
|
|
wafLogService admin.WafLogService,
|
|
|
) WafLogJob {
|
|
|
return &wafLogJob{
|
|
|
- Job: job,
|
|
|
+ Job: job,
|
|
|
wafLogService: wafLogService,
|
|
|
- buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
|
|
|
- bufferMutex: &sync.Mutex{},
|
|
|
- lastFlushTime: time.Now(),
|
|
|
+ buffer: make([]*adminApi.WafLog, 0, 100), // 预分配100条容量的缓冲区
|
|
|
+ bufferMutex: &sync.Mutex{},
|
|
|
}
|
|
|
}
|
|
|
|
|
|
type wafLogJob struct {
|
|
|
*Job
|
|
|
wafLogService admin.WafLogService
|
|
|
- buffer []*adminApi.WafLog // 消息缓冲区
|
|
|
- bufferMutex *sync.Mutex // 缓冲区锁
|
|
|
- lastFlushTime time.Time // 上次刷新时间
|
|
|
+ buffer []*adminApi.WafLog // 消息缓冲区
|
|
|
+ bufferMutex *sync.Mutex // 缓冲区锁
|
|
|
+ lastFlushTime time.Time // 上次刷新时间
|
|
|
}
|
|
|
|
|
|
-
|
|
|
-
|
|
|
// AddWafLogConsumer 启动WAF日志消费者并启动定时批处理器
|
|
|
func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
|
|
|
// 启动一个定时器,定期检查是否需要刷新缓冲区(处理积压的消息)
|
|
@@ -51,15 +48,16 @@ func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
ticker.Stop()
|
|
|
- // 确保关闭前处理剩余的消息
|
|
|
- j.checkAndFlushBuffer(ctx)
|
|
|
+ // 优雅停机:确保在关闭前处理缓冲区中所有剩余的消息
|
|
|
+ j.checkAndFlushBuffer(ctx, true)
|
|
|
return
|
|
|
case <-ticker.C:
|
|
|
- j.checkAndFlushBuffer(ctx)
|
|
|
+ // 定时检查是否需要刷新
|
|
|
+ j.checkAndFlushBuffer(ctx, false)
|
|
|
}
|
|
|
}
|
|
|
}()
|
|
|
-
|
|
|
+
|
|
|
// 启动消费者
|
|
|
j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
|
|
|
}
|
|
@@ -73,8 +71,9 @@ func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string,
|
|
|
func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
|
|
|
var payload adminApi.WafLog
|
|
|
if err := json.Unmarshal(d.Body, &payload); err != nil {
|
|
|
- logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
|
|
|
- return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
|
|
|
+ logger.Error("解析添加日志消息失败, 消息将被丢弃", zap.Error(err), zap.ByteString("body", d.Body))
|
|
|
+ // 返回 nil 以 ack 此消息,防止格式错误的消息反复投递
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
logger.Info("收到添加日志任务",
|
|
@@ -83,70 +82,93 @@ func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger,
|
|
|
zap.String("routing_key", d.RoutingKey),
|
|
|
)
|
|
|
|
|
|
- // 将消息添加到缓冲区
|
|
|
+ var batchToProcess []*adminApi.WafLog
|
|
|
+
|
|
|
+ // --- 锁开始 ---
|
|
|
j.bufferMutex.Lock()
|
|
|
- defer j.bufferMutex.Unlock()
|
|
|
-
|
|
|
- // 如果是第一条消息,初始化刷新时间
|
|
|
if len(j.buffer) == 0 {
|
|
|
- j.lastFlushTime = time.Now()
|
|
|
+ j.lastFlushTime = time.Now() // 如果是第一条消息,重置计时器
|
|
|
}
|
|
|
-
|
|
|
- // 添加到缓冲区
|
|
|
j.buffer = append(j.buffer, &payload)
|
|
|
-
|
|
|
- // 如果缓冲区达到100条,立即刷新
|
|
|
+
|
|
|
+ // 如果缓冲区达到数量阈值,准备刷新
|
|
|
if len(j.buffer) >= 100 {
|
|
|
- return j.flushBuffer(ctx, logger)
|
|
|
+ // 从缓冲区取出数据准备处理
|
|
|
+ batchToProcess = j.flushBufferUnlocked(logger)
|
|
|
}
|
|
|
-
|
|
|
+ j.bufferMutex.Unlock()
|
|
|
+ // --- 锁结束 ---
|
|
|
+
|
|
|
+ // 在锁外执行耗时的批量处理操作
|
|
|
+ if batchToProcess != nil {
|
|
|
+ j.processBatch(ctx, logger, batchToProcess)
|
|
|
+ }
|
|
|
+
|
|
|
+ // 始终返回 nil,因为批量处理的成功与否不应该影响单条消息的确认
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// checkAndFlushBuffer 检查缓冲区是否需要刷新
|
|
|
-func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context) {
|
|
|
+// checkAndFlushBuffer 检查并根据条件(时间、数量、强制)刷新缓冲区
|
|
|
+// forceFlush 参数用于程序退出前的强制刷新
|
|
|
+func (j *wafLogJob) checkAndFlushBuffer(ctx context.Context, forceFlush bool) {
|
|
|
+ var batchToProcess []*adminApi.WafLog
|
|
|
+
|
|
|
+ // --- 锁开始 ---
|
|
|
j.bufferMutex.Lock()
|
|
|
- defer j.bufferMutex.Unlock()
|
|
|
-
|
|
|
- // 如果缓冲区为空,则不需要刷新
|
|
|
+ // 如果缓冲区为空,则不需要做任何事
|
|
|
if len(j.buffer) == 0 {
|
|
|
+ j.bufferMutex.Unlock()
|
|
|
return
|
|
|
}
|
|
|
-
|
|
|
- // 如果距离上次刷新时间超过5秒,或者缓冲区中有超过20条消息,则刷新缓冲区
|
|
|
- if time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
|
|
|
- // 创建一个新的logger代替j.logger
|
|
|
- logger := zap.NewNop()
|
|
|
- j.flushBuffer(ctx, logger)
|
|
|
+
|
|
|
+ // 检查是否满足刷新条件:强制刷新 或 超过时间阈值 或 超过数量阈值
|
|
|
+ if forceFlush || time.Since(j.lastFlushTime) > 5*time.Second || len(j.buffer) >= 20 {
|
|
|
+ // 注意: 此处可以传递一个真实logger,以便观察由定时器触发的刷新操作
|
|
|
+ logger := zap.NewNop() // 或者使用 j.Job.logger
|
|
|
+ batchToProcess = j.flushBufferUnlocked(logger)
|
|
|
}
|
|
|
-}
|
|
|
+ j.bufferMutex.Unlock()
|
|
|
+ // --- 锁结束 ---
|
|
|
|
|
|
-// flushBuffer 刷新缓冲区中的消息
|
|
|
-func (j *wafLogJob) flushBuffer(ctx context.Context, logger *zap.Logger) error {
|
|
|
- if len(j.buffer) == 0 {
|
|
|
- return nil
|
|
|
+ // 在锁外执行耗时的批量处理操作
|
|
|
+ if batchToProcess != nil {
|
|
|
+ // 注意: 此处可以传递一个真实logger
|
|
|
+ logger := zap.NewNop() // 或者使用 j.Job.logger
|
|
|
+ j.processBatch(ctx, logger, batchToProcess)
|
|
|
}
|
|
|
-
|
|
|
- // 复制当前缓冲区数据,然后清空缓冲区
|
|
|
+}
|
|
|
+
|
|
|
+// flushBufferUnlocked 从缓冲区复制数据并清空缓冲区。
|
|
|
+// **重要**: 此方法不包含锁,必须在调用方加锁保护。
|
|
|
+func (j *wafLogJob) flushBufferUnlocked(logger *zap.Logger) []*adminApi.WafLog {
|
|
|
messageCount := len(j.buffer)
|
|
|
- logger.Info("开始批量处理WAF日志", zap.Int("日志数量", messageCount))
|
|
|
-
|
|
|
- // 复制一份数据进行处理
|
|
|
- batch := make([]*adminApi.WafLog, len(j.buffer))
|
|
|
+ logger.Info("准备批量处理WAF日志", zap.Int("日志数量", messageCount))
|
|
|
+
|
|
|
+ // 复制一份数据用于处理
|
|
|
+ batch := make([]*adminApi.WafLog, messageCount)
|
|
|
copy(batch, j.buffer)
|
|
|
-
|
|
|
- // 清空缓冲区并更新刷新时间
|
|
|
+
|
|
|
+ // 清空缓冲区 (通过切片重置) 并更新刷新时间
|
|
|
j.buffer = j.buffer[:0]
|
|
|
j.lastFlushTime = time.Now()
|
|
|
-
|
|
|
- // 批量处理消息
|
|
|
+
|
|
|
+ return batch
|
|
|
+}
|
|
|
+
|
|
|
+// processBatch 执行实际的批量写入操作
|
|
|
+func (j *wafLogJob) processBatch(ctx context.Context, logger *zap.Logger, batch []*adminApi.WafLog) {
|
|
|
+ if len(batch) == 0 {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
+ logger.Info("开始批量处理WAF日志", zap.Int("数量", len(batch)))
|
|
|
+
|
|
|
+ // 实际执行批量处理
|
|
|
err := j.wafLogService.BatchAddWafLog(ctx, batch)
|
|
|
-
|
|
|
if err != nil {
|
|
|
- logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", messageCount))
|
|
|
+ logger.Error("批量处理WAF日志失败", zap.Error(err), zap.Int("数量", len(batch)))
|
|
|
+ // 此处可以根据业务需求增加失败重试或告警逻辑
|
|
|
} else {
|
|
|
- logger.Info("成功批量处理WAF日志", zap.Int("数量", messageCount))
|
|
|
+ logger.Info("成功批量处理WAF日志", zap.Int("数量", len(batch)))
|
|
|
}
|
|
|
-
|
|
|
- return err
|
|
|
-}
|
|
|
+}
|