whitelist.go 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
  8. "github.com/rabbitmq/amqp091-go"
  9. "go.uber.org/zap"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. )
  14. // 使用公共的 TaskHandler 定义
  15. // WhitelistJob 定义了处理白名单相关任务的接口
  16. type WhitelistJob interface {
  17. // DomainConsumer 启动消费者,处理域名白名单任务
  18. DomainConsumer(ctx context.Context)
  19. // IpConsumer 启动消费者,处理 IP 白名单任务
  20. IpConsumer(ctx context.Context)
  21. }
  22. // NewWhitelistJob 创建一个新的 WhitelistJob
  23. func NewWhitelistJob(job *Job,
  24. aoDunService service.AoDunService,
  25. wafForMatter waf.WafFormatterService,
  26. ) WhitelistJob {
  27. return &whitelistJob{
  28. Job: job,
  29. aoDunService: aoDunService,
  30. wafForMatter: wafForMatter,
  31. }
  32. }
  33. type whitelistJob struct {
  34. *Job
  35. aoDunService service.AoDunService
  36. wafForMatter waf.WafFormatterService
  37. }
  38. // DomainConsumer 启动域名白名单消费者
  39. func (j *whitelistJob) DomainConsumer(ctx context.Context) {
  40. j.consume(ctx, "domain_whitelist", "domain_whitelist_consumer", j.handleDomainMessage)
  41. }
  42. // IpConsumer 启动IP白名单消费者
  43. func (j *whitelistJob) IpConsumer(ctx context.Context) {
  44. j.consume(ctx, "ip_white", "ip_white_consumer", j.handleIpMessage)
  45. }
  46. // consume 调用公共的 Consume 方法
  47. func (j *whitelistJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
  48. j.Job.Consume(ctx, taskName, consumerName, handler)
  49. }
  50. // handleDomainMessage 是域名白名单任务的具体处理器
  51. func (j *whitelistJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  52. type domainTaskPayload struct {
  53. Domain string `json:"domain"`
  54. Ip string `json:"ip"`
  55. Action string `json:"action"` // "add" or "del"
  56. }
  57. var payload domainTaskPayload
  58. if err := json.Unmarshal(d.Body, &payload); err != nil {
  59. logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  60. return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
  61. }
  62. logger.Info("收到域名白名单任务",
  63. zap.String("action", payload.Action),
  64. zap.String("domain", payload.Domain),
  65. zap.String("ip", payload.Ip),
  66. zap.String("routing_key", d.RoutingKey),
  67. )
  68. var processingErr error
  69. switch payload.Action {
  70. case "add", "del":
  71. processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
  72. default:
  73. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  74. logger.Warn("在域名白名单任务中收到未知操作", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  75. }
  76. if processingErr != nil {
  77. logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
  78. } else {
  79. logger.Info("已成功处理域名白名单任务", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  80. }
  81. return processingErr
  82. }
  83. // handleIpMessage 是 IP 白名单任务的具体处理器
  84. func (j *whitelistJob) handleIpMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  85. type ipTaskPayload struct {
  86. Ips []string `json:"ips"`
  87. Action string `json:"action"`
  88. Color string `json:"color"`
  89. ReturnSourceIp string `json:"return_source_ip"`
  90. }
  91. var payload ipTaskPayload
  92. if err := json.Unmarshal(d.Body, &payload); err != nil {
  93. logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body), zap.String("routing_key", d.RoutingKey))
  94. return nil // 消息格式错误,不应重试
  95. }
  96. logger.Info("收到IP白名单任务",
  97. zap.String("action", payload.Action),
  98. zap.Any("ips", payload.Ips),
  99. zap.String("color", payload.Color),
  100. zap.String("routing_key", d.RoutingKey),
  101. )
  102. var processingErr error
  103. switch payload.Action {
  104. case "add":
  105. ips, err := j.wafForMatter.AppendWafIp(ctx, payload.Ips, payload.ReturnSourceIp)
  106. if err != nil {
  107. // 如果附加IP失败,记录错误并终止
  108. processingErr = fmt.Errorf("为WAF准备IP列表失败: %w", err)
  109. } else {
  110. var wg sync.WaitGroup
  111. errChan := make(chan error, 2)
  112. wg.Add(2)
  113. go func() {
  114. defer wg.Done()
  115. if err := j.aoDunService.AddWhiteStaticList(ctx, false, ips, payload.Color); err != nil {
  116. errChan <- err
  117. }
  118. }()
  119. go func() {
  120. defer wg.Done()
  121. if err := j.aoDunService.AddWhiteStaticList(ctx, true, ips,payload.Color); err != nil {
  122. errChan <- err
  123. }
  124. }()
  125. wg.Wait()
  126. close(errChan)
  127. var errs []string
  128. for err := range errChan {
  129. errs = append(errs, err.Error())
  130. }
  131. if len(errs) > 0 {
  132. processingErr = fmt.Errorf("添加IP到白名单时发生错误: %s", strings.Join(errs, "; "))
  133. }
  134. }
  135. case "del":
  136. var wg sync.WaitGroup
  137. errChan := make(chan error, len(payload.Ips)*2)
  138. deleteFromWall := func(isSmall bool, ip string) {
  139. defer wg.Done()
  140. id, err := j.aoDunService.GetWhiteStaticList(ctx, isSmall, ip, payload.ReturnSourceIp,payload.Color)
  141. if err != nil {
  142. errChan <- fmt.Errorf("获取IP '%s' (isSmall: %t) ID失败: %w , color: %s", ip, isSmall, err, payload.Color)
  143. return
  144. }
  145. if err := j.aoDunService.DelWhiteStaticList(ctx, isSmall, strconv.Itoa(id), payload.Color); err != nil {
  146. errChan <- fmt.Errorf("删除IP '%s' (isSmall: %t, id: %d) 失败: %w , color: %s", ip, isSmall, id, err , payload.Color)
  147. }
  148. }
  149. for _, ip := range payload.Ips {
  150. wg.Add(2)
  151. go deleteFromWall(false, ip)
  152. go deleteFromWall(true, ip)
  153. }
  154. wg.Wait()
  155. close(errChan)
  156. var errs []string
  157. for err := range errChan {
  158. logger.Error("删除IP白名单过程中发生错误", zap.Error(err), zap.String("color", payload.Color))
  159. errs = append(errs, err.Error())
  160. }
  161. if len(errs) > 0 {
  162. processingErr = fmt.Errorf("删除IP任务中发生错误: %s", strings.Join(errs, "; ") + ", color: " + payload.Color)
  163. }
  164. default:
  165. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  166. logger.Warn("在IP白名单任务中收到未知操作", zap.String("action", payload.Action), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  167. }
  168. if processingErr != nil {
  169. logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  170. } else {
  171. logger.Info("已成功处理IP白名单任务", zap.String("action", payload.Action), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  172. }
  173. return processingErr
  174. }