package job import ( "context" "encoding/json" "fmt" v1 "github.com/go-nunu/nunu-layout-advanced/api/v1" "github.com/go-nunu/nunu-layout-advanced/internal/service" "go.uber.org/zap" ) // WhitelistJob 定义了处理白名单相关任务的接口 type WhitelistJob interface { // DomainConsumer 启动消费者,处理域名白名单任务 DomainConsumer(ctx context.Context) // IpConsumer 启动消费者,处理 IP 白名单任务 IpConsumer(ctx context.Context) } // NewWhitelistJob 创建一个新的 WhitelistJob func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob { return &whitelistJob{ Job: job, aoDunService: aoDunService, } } type whitelistJob struct { *Job aoDunService service.AoDunService } // DomainConsumer 是处理域名白名单任务的消费者 func (j *whitelistJob) DomainConsumer(ctx context.Context) { taskName := "domain_whitelist" taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } consumerName := "domain_whitelist_consumer" j.logger.Info("正在启动域名白名单消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动域名白名单消费者失败", zap.Error(err)) return } // Define the message payload structure, now including an action field type domainTaskPayload struct { Domain string `json:"domain"` Ip string `json:"ip"` Action string `json:"action"` // "add" or "del" } for { select { case <-ctx.Done(): j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName)) return } // 解析消息 var payload domainTaskPayload if err := json.Unmarshal(d.Body, &payload); err != nil { j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body)) // 消息格式错误,直接拒绝且不重新入队 _ = d.Nack(false, false) continue } j.logger.Info("收到域名白名单任务", zap.String("domain", payload.Domain), zap.String("routing_key", d.RoutingKey), ) // Call business logic based on the action var processingErr error switch payload.Action { case "add", "del": processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action) default: processingErr = fmt.Errorf("unknown action: %s", payload.Action) j.logger.Warn("在 域名 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.String("domain", payload.Domain)) } if processingErr == nil { j.logger.Info("已成功处理域名白名单任务", zap.String("action", payload.Action), zap.String("domain", payload.Domain)) } // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack if processingErr != nil { j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain)) // 业务失败,拒绝消息并不重新入队 if err := d.Nack(false, false); err != nil { j.logger.Error("消息 Nack 失败", zap.Error(err)) } } else { // 业务处理成功,手动发送确认 if err := d.Ack(false); err != nil { j.logger.Error("域名白名单任务消息确认失败", zap.Error(err)) } } } } } func (j *whitelistJob) IpConsumer(ctx context.Context) { taskName := "ip_white" taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } consumerName := "ip_white_consumer" j.logger.Info("正在启动IP白名单消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动IP白名单消费者失败", zap.Error(err)) return } // Define the message payload structure, now including an action field type ipTaskPayload struct { Ips []v1.IpInfo `json:"ips"` Action string `json:"action"` } for { select { case <-ctx.Done(): j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName)) return } // 解析消息 var payload ipTaskPayload if err := json.Unmarshal(d.Body, &payload); err != nil { j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body)) // 消息格式错误,直接拒绝且不重新入队 _ = d.Nack(false, false) continue } j.logger.Info("收到IP白名单任务", zap.Any("IP", payload.Ips), zap.String("routing_key", d.RoutingKey), ) // Call business logic based on the action var processingErr error switch payload.Action { case "add": processingErr = j.aoDunService.AddWhiteStaticList(ctx, payload.Ips) default: processingErr = fmt.Errorf("unknown action: %s", payload.Action) j.logger.Warn("在 IP 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.Any("IP", payload.Ips)) } if processingErr == nil { j.logger.Info("已成功处理IP白名单任务", zap.String("action", payload.Action), zap.Any("IP", payload.Ips)) } // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack if processingErr != nil { j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.Any("domain", payload.Ips)) // 业务失败,拒绝消息并不重新入队 if err := d.Nack(false, false); err != nil { j.logger.Error("消息 Nack 失败", zap.Error(err)) } } else { // 业务处理成功,手动发送确认 if err := d.Ack(false); err != nil { j.logger.Error("域名白名单任务消息确认失败", zap.Error(err)) } } } } }