whitelist.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  7. "go.uber.org/zap"
  8. )
  9. // WhitelistJob 定义了处理白名单相关任务的接口
  10. type WhitelistJob interface {
  11. // DomainConsumer 启动消费者,处理域名白名单任务
  12. DomainConsumer(ctx context.Context)
  13. }
  14. // NewWhitelistJob 创建一个新的 WhitelistJob
  15. func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob {
  16. return &whitelistJob{
  17. Job: job,
  18. aoDunService: aoDunService,
  19. }
  20. }
  21. type whitelistJob struct {
  22. *Job
  23. aoDunService service.AoDunService
  24. }
  25. // DomainConsumer 是处理域名白名单任务的消费者
  26. func (j *whitelistJob) DomainConsumer(ctx context.Context) {
  27. taskName := "domain_whitelist"
  28. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  29. if !ok {
  30. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  31. return
  32. }
  33. consumerName := "domain_whitelist_consumer"
  34. j.logger.Info("正在启动域名白名单消费者...",
  35. zap.String("task", taskName),
  36. zap.String("queue", taskCfg.Queue),
  37. zap.String("consumer", consumerName),
  38. )
  39. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  40. if err != nil {
  41. j.logger.Error("启动域名白名单消费者失败", zap.Error(err))
  42. return
  43. }
  44. // Define the message payload structure, now including an action field
  45. type domainTaskPayload struct {
  46. Domain string `json:"domain"`
  47. Action string `json:"action"` // "add" or "del"
  48. }
  49. for {
  50. select {
  51. case <-ctx.Done():
  52. j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName))
  53. return
  54. case d, ok := <-msgs:
  55. if !ok {
  56. j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName))
  57. return
  58. }
  59. // 解析消息
  60. var payload domainTaskPayload
  61. if err := json.Unmarshal(d.Body, &payload); err != nil {
  62. j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  63. // 消息格式错误,直接拒绝且不重新入队
  64. _ = d.Nack(false, false)
  65. continue
  66. }
  67. j.logger.Info("收到域名白名单任务",
  68. zap.String("domain", payload.Domain),
  69. zap.String("routing_key", d.RoutingKey),
  70. )
  71. // Call business logic based on the action
  72. switch payload.Action {
  73. case "add":
  74. if err := j.aoDunService.AddDomainWhiteList(ctx, []string{payload.Domain}); err != nil {
  75. j.logger.Error("Failed to handle 'add' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain))
  76. _ = d.Reject(false) // Business failure, reject message
  77. continue
  78. }
  79. j.logger.Info("Successfully processed 'add' domain whitelist task", zap.String("domain", payload.Domain))
  80. case "del":
  81. if err := j.aoDunService.DeleteDomainWhiteList(ctx, []string{payload.Domain}); err != nil {
  82. j.logger.Error("Failed to handle 'delete' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain))
  83. _ = d.Reject(false) // Business failure, reject message
  84. continue
  85. }
  86. j.logger.Info("Successfully processed 'delete' domain whitelist task", zap.String("domain", payload.Domain))
  87. default:
  88. j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  89. _ = d.Reject(false) // Reject message with unknown action
  90. continue
  91. }
  92. // 业务处理完成后,手动发送确认
  93. if err := d.Ack(false); err != nil {
  94. j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
  95. }
  96. }
  97. }
  98. }