package job import ( "context" "encoding/json" "fmt" "github.com/go-nunu/nunu-layout-advanced/internal/service" "go.uber.org/zap" ) // WhitelistJob 定义了处理白名单相关任务的接口 type WhitelistJob interface { // DomainConsumer 启动消费者,处理域名白名单任务 DomainConsumer(ctx context.Context) } // NewWhitelistJob 创建一个新的 WhitelistJob func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob { return &whitelistJob{ Job: job, aoDunService: aoDunService, } } type whitelistJob struct { *Job aoDunService service.AoDunService } // DomainConsumer 是处理域名白名单任务的消费者 func (j *whitelistJob) DomainConsumer(ctx context.Context) { taskName := "domain_whitelist" taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } consumerName := "domain_whitelist_consumer" j.logger.Info("正在启动域名白名单消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动域名白名单消费者失败", zap.Error(err)) return } // Define the message payload structure, now including an action field type domainTaskPayload struct { Domain string `json:"domain"` Action string `json:"action"` // "add" or "del" } for { select { case <-ctx.Done(): j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName)) return } // 解析消息 var payload domainTaskPayload if err := json.Unmarshal(d.Body, &payload); err != nil { j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body)) // 消息格式错误,直接拒绝且不重新入队 _ = d.Nack(false, false) continue } j.logger.Info("收到域名白名单任务", zap.String("domain", payload.Domain), zap.String("routing_key", d.RoutingKey), ) // Call business logic based on the action switch payload.Action { case "add": if err := j.aoDunService.AddDomainWhiteList(ctx, []string{payload.Domain}); err != nil { j.logger.Error("Failed to handle 'add' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain)) _ = d.Reject(false) // Business failure, reject message continue } j.logger.Info("Successfully processed 'add' domain whitelist task", zap.String("domain", payload.Domain)) case "del": if err := j.aoDunService.DeleteDomainWhiteList(ctx, []string{payload.Domain}); err != nil { j.logger.Error("Failed to handle 'delete' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain)) _ = d.Reject(false) // Business failure, reject message continue } j.logger.Info("Successfully processed 'delete' domain whitelist task", zap.String("domain", payload.Domain)) default: j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain)) _ = d.Reject(false) // Reject message with unknown action continue } // 业务处理完成后,手动发送确认 if err := d.Ack(false); err != nil { j.logger.Error("域名白名单任务消息确认失败", zap.Error(err)) } } } }