whitelist.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  7. "go.uber.org/zap"
  8. )
  9. // WhitelistJob 定义了处理白名单相关任务的接口
  10. type WhitelistJob interface {
  11. // DomainConsumer 启动消费者,处理域名白名单任务
  12. DomainConsumer(ctx context.Context)
  13. }
  14. // NewWhitelistJob 创建一个新的 WhitelistJob
  15. func NewWhitelistJob(job *Job, aoDunService service.AoDunService) WhitelistJob {
  16. return &whitelistJob{
  17. Job: job,
  18. aoDunService: aoDunService,
  19. }
  20. }
  21. type whitelistJob struct {
  22. *Job
  23. aoDunService service.AoDunService
  24. }
  25. // DomainConsumer 是处理域名白名单任务的消费者
  26. func (j *whitelistJob) DomainConsumer(ctx context.Context) {
  27. taskName := "domain_whitelist"
  28. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  29. if !ok {
  30. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  31. return
  32. }
  33. consumerName := "domain_whitelist_consumer"
  34. j.logger.Info("正在启动域名白名单消费者...",
  35. zap.String("task", taskName),
  36. zap.String("queue", taskCfg.Queue),
  37. zap.String("consumer", consumerName),
  38. )
  39. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  40. if err != nil {
  41. j.logger.Error("启动域名白名单消费者失败", zap.Error(err))
  42. return
  43. }
  44. // Define the message payload structure, now including an action field
  45. type domainTaskPayload struct {
  46. Domain string `json:"domain"`
  47. Ip string `json:"ip"`
  48. Action string `json:"action"` // "add" or "del"
  49. }
  50. for {
  51. select {
  52. case <-ctx.Done():
  53. j.logger.Info("域名白名单消费者正在关闭...", zap.String("task", taskName))
  54. return
  55. case d, ok := <-msgs:
  56. if !ok {
  57. j.logger.Warn("消息通道已关闭,域名白名单消费者退出。", zap.String("task", taskName))
  58. return
  59. }
  60. // 解析消息
  61. var payload domainTaskPayload
  62. if err := json.Unmarshal(d.Body, &payload); err != nil {
  63. j.logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  64. // 消息格式错误,直接拒绝且不重新入队
  65. _ = d.Nack(false, false)
  66. continue
  67. }
  68. j.logger.Info("收到域名白名单任务",
  69. zap.String("domain", payload.Domain),
  70. zap.String("routing_key", d.RoutingKey),
  71. )
  72. // Call business logic based on the action
  73. var processingErr error
  74. switch payload.Action {
  75. case "add", "del":
  76. processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
  77. default:
  78. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  79. j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  80. }
  81. if processingErr == nil {
  82. j.logger.Info("Successfully processed domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  83. }
  84. // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
  85. if processingErr != nil {
  86. j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
  87. // 业务失败,拒绝消息并不重新入队
  88. if err := d.Nack(false, false); err != nil {
  89. j.logger.Error("消息 Nack 失败", zap.Error(err))
  90. }
  91. } else {
  92. // 业务处理成功,手动发送确认
  93. if err := d.Ack(false); err != nil {
  94. j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
  95. }
  96. }
  97. }
  98. }
  99. }
  100. //func (j *whitelistJob) IpConsumer(ctx context.Context) {
  101. // taskName := "IP_whitelist"
  102. // taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  103. // if !ok {
  104. // j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  105. // return
  106. // }
  107. //
  108. // consumerName := "IP_whitelist_consumer"
  109. // j.logger.Info("正在启动域名白名单消费者...",
  110. // zap.String("task", taskName),
  111. // zap.String("queue", taskCfg.Queue),
  112. // zap.String("consumer", consumerName),
  113. // )
  114. //
  115. // msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  116. // if err != nil {
  117. // j.logger.Error("启动IP白名单消费者失败", zap.Error(err))
  118. // return
  119. // }
  120. //
  121. // // Define the message payload structure, now including an action field
  122. // type ipTaskPayload struct {
  123. // IP string `json:"IP"`
  124. // Action string `json:"action"` // "add" or "del"
  125. // }
  126. //
  127. // for {
  128. // select {
  129. // case <-ctx.Done():
  130. // j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName))
  131. // return
  132. // case d, ok := <-msgs:
  133. // if !ok {
  134. // j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName))
  135. // return
  136. // }
  137. //
  138. // // 解析消息
  139. // var payload ipTaskPayload
  140. // if err := json.Unmarshal(d.Body, &payload); err != nil {
  141. // j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  142. // // 消息格式错误,直接拒绝且不重新入队
  143. // _ = d.Nack(false, false)
  144. // continue
  145. // }
  146. //
  147. // j.logger.Info("收到IP白名单任务",
  148. // zap.String("domain", payload.IP),
  149. // zap.String("routing_key", d.RoutingKey),
  150. // )
  151. //
  152. // // Call business logic based on the action
  153. // var processingErr error
  154. //
  155. // // Call business logic based on the action
  156. // switch payload.Action {
  157. // case "add":
  158. // processingErr = j.aoDunService.AddWhiteStaticList(ctx, []string{payload.IP})
  159. // if processingErr == nil {
  160. // j.logger.Info("Successfully processed 'add' IP whitelist task", zap.String("IP", payload.IP))
  161. // }
  162. // case "del":
  163. // processingErr = j.aoDunService.DomainWhiteList(ctx, []string{payload.IP})
  164. // if processingErr == nil {
  165. // j.logger.Info("Successfully processed 'delete' IP whitelist task", zap.String("IP", payload.IP))
  166. // }
  167. // case "get":
  168. // ids,processingErr = j.aoDunService.d(ctx)
  169. // if processingErr == nil {
  170. // j.logger.Info("Successfully processed 'get' IP whitelist task", zap.String("IP", payload.IP))
  171. // }
  172. //
  173. // default:
  174. // j.logger.Warn("Received unknown action in IP whitelist task", zap.String("action", payload.Action), zap.String("IP", payload.IP))
  175. // processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  176. // }
  177. //
  178. // // 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
  179. // if processingErr != nil {
  180. // j.logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.String("IP", payload.IP))
  181. // // 业务失败,拒绝消息并不重新入队
  182. // if err := d.Nack(false, false); err != nil {
  183. // j.logger.Error("消息 Nack 失败", zap.Error(err))
  184. // }
  185. // } else {
  186. // // 业务处理成功,手动发送确认
  187. // if err := d.Ack(false); err != nil {
  188. // j.logger.Error("IP白名单任务消息确认失败", zap.Error(err))
  189. // }
  190. // }
  191. //
  192. // }
  193. // }
  194. //}