package job import ( "context" "encoding/json" adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin" "github.com/go-nunu/nunu-layout-advanced/internal/service/admin" "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" ) // 使用公共的 TaskHandler 定义 // WafLogJob 定义了处理白名单相关任务的接口 type WafLogJob interface { // DomainConsumer 启动消费者,处理域名白名单任务 AddWafLogConsumer(ctx context.Context) } // NewWafLogJob 创建一个新的 WhitelistJob func NewWafLogJob(job *Job, wafLogService admin.WafLogService, ) WafLogJob { return &wafLogJob{ Job: job, wafLogService: wafLogService, } } type wafLogJob struct { *Job wafLogService admin.WafLogService } // DomainConsumer 启动域名白名单消费者 func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) { j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage) } // consume 调用公共的 Consume 方法 func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) { j.Job.Consume(ctx, taskName, consumerName, handler) } // handleDomainMessage 是域名白名单任务的具体处理器 func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error { var payload adminApi.WafLog if err := json.Unmarshal(d.Body, &payload); err != nil { logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body)) return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误 } logger.Info("收到添加日志任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.String("routing_key", d.RoutingKey), ) var processingErr error processingErr = j.wafLogService.AddWafLog(ctx, payload) if processingErr != nil { logger.Error("处理域名白名单任务失败", zap.Error(processingErr),zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid),zap.Any("req", payload)) } else { logger.Info("已成功处理域名白名单任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", payload)) } return processingErr }