package job import ( "context" "encoding/json" "fmt" "github.com/go-nunu/nunu-layout-advanced/internal/service" "go.uber.org/zap" ) // WhitelistJob 定义了处理白名单相关任务的接口 type WhitelistJob interface { // DomainConsumer 启动消费者,处理域名白名单任务 DomainConsumer(ctx context.Context) } // NewWhitelistJob 创建一个新的 WhitelistJob func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob { return &whitelistJob{ Job: job, aoDunService: aoDunService, } } type whitelistJob struct { *Job aoDunService service.AoDunService } // DomainConsumer 是处理域名白名单任务的消费者 func (j *whitelistJob) DomainConsumer(ctx context.Context) { taskName := "domain_whitelist" taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } consumerName := "domain_whitelist_consumer" j.logger.Info("正在启动域名白名单消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动域名白名单消费者失败", zap.Error(err)) return } // Define the message payload structure, now including an action field type domainTaskPayload struct { Domain string `json:"domain"` Ip string `json:"ip"` Action string `json:"action"` // "add" or "del" } for { select { case <-ctx.Done(): j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName)) return } // 解析消息 var payload domainTaskPayload if err := json.Unmarshal(d.Body, &payload); err != nil { j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body)) // 消息格式错误,直接拒绝且不重新入队 _ = d.Nack(false, false) continue } j.logger.Info("收到域名白名单任务", zap.String("domain", payload.Domain), zap.String("routing_key", d.RoutingKey), ) // Call business logic based on the action var processingErr error switch payload.Action { case "add", "del": processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action) default: processingErr = fmt.Errorf("unknown action: %s", payload.Action) j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain)) } if processingErr == nil { j.logger.Info("Successfully processed domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain)) } // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack if processingErr != nil { j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain)) // 业务失败,拒绝消息并不重新入队 if err := d.Nack(false, false); err != nil { j.logger.Error("消息 Nack 失败", zap.Error(err)) } } else { // 业务处理成功,手动发送确认 if err := d.Ack(false); err != nil { j.logger.Error("域名白名单任务消息确认失败", zap.Error(err)) } } } } } //func (j *whitelistJob) IpConsumer(ctx context.Context) { // taskName := "IP_whitelist" // taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) // if !ok { // j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) // return // } // // consumerName := "IP_whitelist_consumer" // j.logger.Info("正在启动域名白名单消费者...", // zap.String("task", taskName), // zap.String("queue", taskCfg.Queue), // zap.String("consumer", consumerName), // ) // // msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) // if err != nil { // j.logger.Error("启动IP白名单消费者失败", zap.Error(err)) // return // } // // // Define the message payload structure, now including an action field // type ipTaskPayload struct { // IP string `json:"IP"` // Action string `json:"action"` // "add" or "del" // } // // for { // select { // case <-ctx.Done(): // j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName)) // return // case d, ok := <-msgs: // if !ok { // j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName)) // return // } // // // 解析消息 // var payload ipTaskPayload // if err := json.Unmarshal(d.Body, &payload); err != nil { // j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body)) // // 消息格式错误,直接拒绝且不重新入队 // _ = d.Nack(false, false) // continue // } // // j.logger.Info("收到IP白名单任务", // zap.String("domain", payload.IP), // zap.String("routing_key", d.RoutingKey), // ) // // // Call business logic based on the action // var processingErr error // // // Call business logic based on the action // switch payload.Action { // case "add": // processingErr = j.aoDunService.AddWhiteStaticList(ctx, []string{payload.IP}) // if processingErr == nil { // j.logger.Info("Successfully processed 'add' IP whitelist task", zap.String("IP", payload.IP)) // } // case "del": // processingErr = j.aoDunService.DomainWhiteList(ctx, []string{payload.IP}) // if processingErr == nil { // j.logger.Info("Successfully processed 'delete' IP whitelist task", zap.String("IP", payload.IP)) // } // case "get": // ids,processingErr = j.aoDunService.d(ctx) // if processingErr == nil { // j.logger.Info("Successfully processed 'get' IP whitelist task", zap.String("IP", payload.IP)) // } // // default: // j.logger.Warn("Received unknown action in IP whitelist task", zap.String("action", payload.Action), zap.String("IP", payload.IP)) // processingErr = fmt.Errorf("unknown action: %s", payload.Action) // } // // // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack // if processingErr != nil { // j.logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.String("IP", payload.IP)) // // 业务失败,拒绝消息并不重新入队 // if err := d.Nack(false, false); err != nil { // j.logger.Error("消息 Nack 失败", zap.Error(err)) // } // } else { // // 业务处理成功,手动发送确认 // if err := d.Ack(false); err != nil { // j.logger.Error("IP白名单任务消息确认失败", zap.Error(err)) // } // } // // } // } //}