wafLog.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
  7. "github.com/rabbitmq/amqp091-go"
  8. "go.uber.org/zap"
  9. )
  10. // 使用公共的 TaskHandler 定义
  11. // WafLogJob 定义了处理白名单相关任务的接口
  12. type WafLogJob interface {
  13. // DomainConsumer 启动消费者,处理域名白名单任务
  14. AddWafLogConsumer(ctx context.Context)
  15. }
  16. // NewWafLogJob 创建一个新的 WhitelistJob
  17. func NewWafLogJob(job *Job,
  18. wafLogService admin.WafLogService,
  19. ) WafLogJob {
  20. return &wafLogJob{
  21. Job: job,
  22. wafLogService: wafLogService,
  23. }
  24. }
  25. type wafLogJob struct {
  26. *Job
  27. wafLogService admin.WafLogService
  28. }
  29. // DomainConsumer 启动域名白名单消费者
  30. func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
  31. j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
  32. }
  33. // consume 调用公共的 Consume 方法
  34. func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
  35. j.Job.Consume(ctx, taskName, consumerName, handler)
  36. }
  37. // handleDomainMessage 是域名白名单任务的具体处理器
  38. func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  39. var payload adminApi.WafLog
  40. if err := json.Unmarshal(d.Body, &payload); err != nil {
  41. logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  42. return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
  43. }
  44. logger.Info("收到添加日志任务",
  45. zap.Int("hostId", payload.HostId),
  46. zap.Int("uid", payload.Uid),
  47. zap.String("routing_key", d.RoutingKey),
  48. )
  49. var processingErr error
  50. processingErr = j.wafLogService.AddWafLog(ctx, payload)
  51. if processingErr != nil {
  52. logger.Error("处理域名白名单任务失败", zap.Error(processingErr),zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid),zap.Any("req", payload))
  53. } else {
  54. logger.Info("已成功处理域名白名单任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", payload))
  55. }
  56. return processingErr
  57. }