whitelist.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  8. "go.uber.org/zap"
  9. )
  10. // WhitelistJob 定义了处理白名单相关任务的接口
  11. type WhitelistJob interface {
  12. // DomainConsumer 启动消费者,处理域名白名单任务
  13. DomainConsumer(ctx context.Context)
  14. // IpConsumer 启动消费者,处理 IP 白名单任务
  15. IpConsumer(ctx context.Context)
  16. }
  17. // NewWhitelistJob 创建一个新的 WhitelistJob
  18. func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob {
  19. return &whitelistJob{
  20. Job: job,
  21. aoDunService: aoDunService,
  22. }
  23. }
  24. type whitelistJob struct {
  25. *Job
  26. aoDunService service.AoDunService
  27. }
  28. // DomainConsumer 是处理域名白名单任务的消费者
  29. func (j *whitelistJob) DomainConsumer(ctx context.Context) {
  30. taskName := "domain_whitelist"
  31. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  32. if !ok {
  33. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  34. return
  35. }
  36. consumerName := "domain_whitelist_consumer"
  37. j.logger.Info("正在启动域名白名单消费者...",
  38. zap.String("task", taskName),
  39. zap.String("queue", taskCfg.Queue),
  40. zap.String("consumer", consumerName),
  41. )
  42. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  43. if err != nil {
  44. j.logger.Error("启动域名白名单消费者失败", zap.Error(err))
  45. return
  46. }
  47. // Define the message payload structure, now including an action field
  48. type domainTaskPayload struct {
  49. Domain string `json:"domain"`
  50. Ip string `json:"ip"`
  51. Action string `json:"action"` // "add" or "del"
  52. }
  53. for {
  54. select {
  55. case <-ctx.Done():
  56. j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName))
  57. return
  58. case d, ok := <-msgs:
  59. if !ok {
  60. j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName))
  61. return
  62. }
  63. // 解析消息
  64. var payload domainTaskPayload
  65. if err := json.Unmarshal(d.Body, &payload); err != nil {
  66. j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  67. // 消息格式错误,直接拒绝且不重新入队
  68. _ = d.Nack(false, false)
  69. continue
  70. }
  71. j.logger.Info("收到域名白名单任务",
  72. zap.String("domain", payload.Domain),
  73. zap.String("routing_key", d.RoutingKey),
  74. )
  75. // Call business logic based on the action
  76. var processingErr error
  77. switch payload.Action {
  78. case "add", "del":
  79. processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
  80. default:
  81. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  82. j.logger.Warn("在 域名 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  83. }
  84. if processingErr == nil {
  85. j.logger.Info("已成功处理域名白名单任务", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  86. }
  87. // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
  88. if processingErr != nil {
  89. j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
  90. // 业务失败,拒绝消息并不重新入队
  91. if err := d.Nack(false, false); err != nil {
  92. j.logger.Error("消息 Nack 失败", zap.Error(err))
  93. }
  94. } else {
  95. // 业务处理成功,手动发送确认
  96. if err := d.Ack(false); err != nil {
  97. j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
  98. }
  99. }
  100. }
  101. }
  102. }
  103. func (j *whitelistJob) IpConsumer(ctx context.Context) {
  104. taskName := "ip_white"
  105. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  106. if !ok {
  107. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  108. return
  109. }
  110. consumerName := "ip_white_consumer"
  111. j.logger.Info("正在启动IP白名单消费者...",
  112. zap.String("task", taskName),
  113. zap.String("queue", taskCfg.Queue),
  114. zap.String("consumer", consumerName),
  115. )
  116. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  117. if err != nil {
  118. j.logger.Error("启动IP白名单消费者失败", zap.Error(err))
  119. return
  120. }
  121. // Define the message payload structure, now including an action field
  122. type ipTaskPayload struct {
  123. Ips []v1.IpInfo `json:"ips"`
  124. Action string `json:"action"`
  125. }
  126. for {
  127. select {
  128. case <-ctx.Done():
  129. j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName))
  130. return
  131. case d, ok := <-msgs:
  132. if !ok {
  133. j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName))
  134. return
  135. }
  136. // 解析消息
  137. var payload ipTaskPayload
  138. if err := json.Unmarshal(d.Body, &payload); err != nil {
  139. j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  140. // 消息格式错误,直接拒绝且不重新入队
  141. _ = d.Nack(false, false)
  142. continue
  143. }
  144. j.logger.Info("收到IP白名单任务",
  145. zap.Any("IP", payload.Ips),
  146. zap.String("routing_key", d.RoutingKey),
  147. )
  148. // Call business logic based on the action
  149. var processingErr error
  150. switch payload.Action {
  151. case "add":
  152. processingErr = j.aoDunService.AddWhiteStaticList(ctx, payload.Ips)
  153. default:
  154. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  155. j.logger.Warn("在 IP 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.Any("IP", payload.Ips))
  156. }
  157. if processingErr == nil {
  158. j.logger.Info("已成功处理IP白名单任务", zap.String("action", payload.Action), zap.Any("IP", payload.Ips))
  159. }
  160. // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
  161. if processingErr != nil {
  162. j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.Any("domain", payload.Ips))
  163. // 业务失败,拒绝消息并不重新入队
  164. if err := d.Nack(false, false); err != nil {
  165. j.logger.Error("消息 Nack 失败", zap.Error(err))
  166. }
  167. } else {
  168. // 业务处理成功,手动发送确认
  169. if err := d.Ack(false); err != nil {
  170. j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
  171. }
  172. }
  173. }
  174. }
  175. }