|
- package job
- import (
- "context"
- "encoding/json"
- "fmt"
- "github.com/go-nunu/nunu-layout-advanced/internal/service"
- "go.uber.org/zap"
- )
- // WhitelistJob 定义了处理白名单相关任务的接口
- type WhitelistJob interface {
- // DomainConsumer 启动消费者,处理域名白名单任务
- DomainConsumer(ctx context.Context)
- }
- // NewWhitelistJob 创建一个新的 WhitelistJob
- func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob {
- return &whitelistJob{
- Job: job,
- aoDunService: aoDunService,
- }
- }
- type whitelistJob struct {
- *Job
- aoDunService service.AoDunService
- }
- // DomainConsumer 是处理域名白名单任务的消费者
- func (j *whitelistJob) DomainConsumer(ctx context.Context) {
- taskName := "domain_whitelist"
- taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
- if !ok {
- j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
- return
- }
- consumerName := "domain_whitelist_consumer"
- j.logger.Info("正在启动域名白名单消费者...",
- zap.String("task", taskName),
- zap.String("queue", taskCfg.Queue),
- zap.String("consumer", consumerName),
- )
- msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
- if err != nil {
- j.logger.Error("启动域名白名单消费者失败", zap.Error(err))
- return
- }
- // Define the message payload structure, now including an action field
- type domainTaskPayload struct {
- Domain string `json:"domain"`
- Ip string `json:"ip"`
- Action string `json:"action"` // "add" or "del"
- }
- for {
- select {
- case <-ctx.Done():
- j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName))
- return
- case d, ok := <-msgs:
- if !ok {
- j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName))
- return
- }
- // 解析消息
- var payload domainTaskPayload
- if err := json.Unmarshal(d.Body, &payload); err != nil {
- j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
- // 消息格式错误,直接拒绝且不重新入队
- _ = d.Nack(false, false)
- continue
- }
- j.logger.Info("收到域名白名单任务",
- zap.String("domain", payload.Domain),
- zap.String("routing_key", d.RoutingKey),
- )
- // Call business logic based on the action
- var processingErr error
- switch payload.Action {
- case "add", "del":
- processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
- default:
- processingErr = fmt.Errorf("unknown action: %s", payload.Action)
- j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
- }
- if processingErr == nil {
- j.logger.Info("Successfully processed domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
- }
- // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
- if processingErr != nil {
- j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
- // 业务失败,拒绝消息并不重新入队
- if err := d.Nack(false, false); err != nil {
- j.logger.Error("消息 Nack 失败", zap.Error(err))
- }
- } else {
- // 业务处理成功,手动发送确认
- if err := d.Ack(false); err != nil {
- j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
- }
- }
- }
- }
- }
- //func (j *whitelistJob) IpConsumer(ctx context.Context) {
- // taskName := "IP_whitelist"
- // taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
- // if !ok {
- // j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
- // return
- // }
- //
- // consumerName := "IP_whitelist_consumer"
- // j.logger.Info("正在启动域名白名单消费者...",
- // zap.String("task", taskName),
- // zap.String("queue", taskCfg.Queue),
- // zap.String("consumer", consumerName),
- // )
- //
- // msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
- // if err != nil {
- // j.logger.Error("启动IP白名单消费者失败", zap.Error(err))
- // return
- // }
- //
- // // Define the message payload structure, now including an action field
- // type ipTaskPayload struct {
- // IP string `json:"IP"`
- // Action string `json:"action"` // "add" or "del"
- // }
- //
- // for {
- // select {
- // case <-ctx.Done():
- // j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName))
- // return
- // case d, ok := <-msgs:
- // if !ok {
- // j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName))
- // return
- // }
- //
- // // 解析消息
- // var payload ipTaskPayload
- // if err := json.Unmarshal(d.Body, &payload); err != nil {
- // j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
- // // 消息格式错误,直接拒绝且不重新入队
- // _ = d.Nack(false, false)
- // continue
- // }
- //
- // j.logger.Info("收到IP白名单任务",
- // zap.String("domain", payload.IP),
- // zap.String("routing_key", d.RoutingKey),
- // )
- //
- // // Call business logic based on the action
- // var processingErr error
- //
- // // Call business logic based on the action
- // switch payload.Action {
- // case "add":
- // processingErr = j.aoDunService.AddWhiteStaticList(ctx, []string{payload.IP})
- // if processingErr == nil {
- // j.logger.Info("Successfully processed 'add' IP whitelist task", zap.String("IP", payload.IP))
- // }
- // case "del":
- // processingErr = j.aoDunService.DomainWhiteList(ctx, []string{payload.IP})
- // if processingErr == nil {
- // j.logger.Info("Successfully processed 'delete' IP whitelist task", zap.String("IP", payload.IP))
- // }
- // case "get":
- // ids,processingErr = j.aoDunService.d(ctx)
- // if processingErr == nil {
- // j.logger.Info("Successfully processed 'get' IP whitelist task", zap.String("IP", payload.IP))
- // }
- //
- // default:
- // j.logger.Warn("Received unknown action in IP whitelist task", zap.String("action", payload.Action), zap.String("IP", payload.IP))
- // processingErr = fmt.Errorf("unknown action: %s", payload.Action)
- // }
- //
- // // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
- // if processingErr != nil {
- // j.logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.String("IP", payload.IP))
- // // 业务失败,拒绝消息并不重新入队
- // if err := d.Nack(false, false); err != nil {
- // j.logger.Error("消息 Nack 失败", zap.Error(err))
- // }
- // } else {
- // // 业务处理成功,手动发送确认
- // if err := d.Ack(false); err != nil {
- // j.logger.Error("IP白名单任务消息确认失败", zap.Error(err))
- // }
- // }
- //
- // }
- // }
- //}
|