whitelist.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package job
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
  8. "github.com/google/uuid"
  9. "github.com/rabbitmq/amqp091-go"
  10. "go.uber.org/zap"
  11. "strconv"
  12. "strings"
  13. "sync"
  14. )
  15. // taskHandler 定义了处理单个消息的函数签名
  16. // 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。
  17. type taskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error
  18. // WhitelistJob 定义了处理白名单相关任务的接口
  19. type WhitelistJob interface {
  20. // DomainConsumer 启动消费者,处理域名白名单任务
  21. DomainConsumer(ctx context.Context)
  22. // IpConsumer 启动消费者,处理 IP 白名单任务
  23. IpConsumer(ctx context.Context)
  24. }
  25. // NewWhitelistJob 创建一个新的 WhitelistJob
  26. func NewWhitelistJob(job *Job,
  27. aoDunService service.AoDunService,
  28. wafForMatter waf.WafFormatterService,
  29. ) WhitelistJob {
  30. return &whitelistJob{
  31. Job: job,
  32. aoDunService: aoDunService,
  33. wafForMatter: wafForMatter,
  34. }
  35. }
  36. type whitelistJob struct {
  37. *Job
  38. aoDunService service.AoDunService
  39. wafForMatter waf.WafFormatterService
  40. }
  41. // DomainConsumer 启动域名白名单消费者
  42. func (j *whitelistJob) DomainConsumer(ctx context.Context) {
  43. j.consume(ctx, "domain_whitelist", "domain_whitelist_consumer", j.handleDomainMessage)
  44. }
  45. // IpConsumer 启动IP白名单消费者
  46. func (j *whitelistJob) IpConsumer(ctx context.Context) {
  47. j.consume(ctx, "ip_white", "ip_white_consumer", j.handleIpMessage)
  48. }
  49. // consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑
  50. func (j *whitelistJob) consume(ctx context.Context, taskName, consumerName string, handler taskHandler) {
  51. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  52. if !ok {
  53. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  54. return
  55. }
  56. j.logger.Info("正在启动消费者...",
  57. zap.String("task", taskName),
  58. zap.String("queue", taskCfg.Queue),
  59. zap.String("consumer", consumerName),
  60. )
  61. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  62. if err != nil {
  63. j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err))
  64. return
  65. }
  66. for {
  67. select {
  68. case <-ctx.Done():
  69. j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
  70. return
  71. case d, ok := <-msgs:
  72. if !ok {
  73. j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
  74. return
  75. }
  76. // 尝试从消息头获取 trace_id,如果不存在则生成一个新的
  77. traceID, ok := d.Headers["trace_id"].(string)
  78. if !ok || traceID == "" {
  79. traceID = uuid.New().String()
  80. }
  81. // 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录
  82. scopedLogger := j.logger.With(zap.String("trace_id", traceID))
  83. // 创建一个带有 trace_id 的 context,用于传递给下游服务
  84. ctxWithTrace := context.WithValue(ctx, "trace_id", traceID)
  85. // 调用具体的业务处理器
  86. processingErr := handler(ctxWithTrace, scopedLogger, d)
  87. // 根据处理结果统一进行 Ack/Nack
  88. if processingErr != nil {
  89. // 业务失败,拒绝消息并不重新入队
  90. if err := d.Nack(false, false); err != nil {
  91. scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName))
  92. }
  93. } else {
  94. // 业务处理成功,手动发送确认
  95. if err := d.Ack(false); err != nil {
  96. scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName))
  97. }
  98. }
  99. }
  100. }
  101. }
  102. // handleDomainMessage 是域名白名单任务的具体处理器
  103. func (j *whitelistJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  104. type domainTaskPayload struct {
  105. Domain string `json:"domain"`
  106. Ip string `json:"ip"`
  107. Action string `json:"action"` // "add" or "del"
  108. }
  109. var payload domainTaskPayload
  110. if err := json.Unmarshal(d.Body, &payload); err != nil {
  111. logger.Error("解析域名白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
  112. return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
  113. }
  114. logger.Info("收到域名白名单任务",
  115. zap.String("action", payload.Action),
  116. zap.String("domain", payload.Domain),
  117. zap.String("ip", payload.Ip),
  118. zap.String("routing_key", d.RoutingKey),
  119. )
  120. var processingErr error
  121. switch payload.Action {
  122. case "add", "del":
  123. processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
  124. default:
  125. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  126. logger.Warn("在域名白名单任务中收到未知操作", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  127. }
  128. if processingErr != nil {
  129. logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
  130. } else {
  131. logger.Info("已成功处理域名白名单任务", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
  132. }
  133. return processingErr
  134. }
  135. // handleIpMessage 是 IP 白名单任务的具体处理器
  136. func (j *whitelistJob) handleIpMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
  137. type ipTaskPayload struct {
  138. Ips []string `json:"ips"`
  139. Action string `json:"action"`
  140. Color string `json:"color"`
  141. ReturnSourceIp string `json:"return_source_ip"`
  142. }
  143. var payload ipTaskPayload
  144. if err := json.Unmarshal(d.Body, &payload); err != nil {
  145. logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body), zap.String("routing_key", d.RoutingKey))
  146. return nil // 消息格式错误,不应重试
  147. }
  148. logger.Info("收到IP白名单任务",
  149. zap.String("action", payload.Action),
  150. zap.Any("ips", payload.Ips),
  151. zap.String("color", payload.Color),
  152. zap.String("routing_key", d.RoutingKey),
  153. )
  154. var processingErr error
  155. switch payload.Action {
  156. case "add":
  157. ips, err := j.wafForMatter.AppendWafIp(ctx, payload.Ips, payload.ReturnSourceIp)
  158. if err != nil {
  159. // 如果附加IP失败,记录错误并终止
  160. processingErr = fmt.Errorf("为WAF准备IP列表失败: %w", err)
  161. } else {
  162. var wg sync.WaitGroup
  163. errChan := make(chan error, 2)
  164. wg.Add(2)
  165. go func() {
  166. defer wg.Done()
  167. if err := j.aoDunService.AddWhiteStaticList(ctx, false, ips, payload.Color); err != nil {
  168. errChan <- err
  169. }
  170. }()
  171. go func() {
  172. defer wg.Done()
  173. if err := j.aoDunService.AddWhiteStaticList(ctx, true, ips,payload.Color); err != nil {
  174. errChan <- err
  175. }
  176. }()
  177. wg.Wait()
  178. close(errChan)
  179. var errs []string
  180. for err := range errChan {
  181. errs = append(errs, err.Error())
  182. }
  183. if len(errs) > 0 {
  184. processingErr = fmt.Errorf("添加IP到白名单时发生错误: %s", strings.Join(errs, "; "))
  185. }
  186. }
  187. case "del":
  188. var wg sync.WaitGroup
  189. errChan := make(chan error, len(payload.Ips)*2)
  190. deleteFromWall := func(isSmall bool, ip string) {
  191. defer wg.Done()
  192. id, err := j.aoDunService.GetWhiteStaticList(ctx, isSmall, ip, payload.ReturnSourceIp,payload.Color)
  193. if err != nil {
  194. errChan <- fmt.Errorf("获取IP '%s' (isSmall: %t) ID失败: %w , color: %s", ip, isSmall, err, payload.Color)
  195. return
  196. }
  197. if err := j.aoDunService.DelWhiteStaticList(ctx, isSmall, strconv.Itoa(id), payload.Color); err != nil {
  198. errChan <- fmt.Errorf("删除IP '%s' (isSmall: %t, id: %d) 失败: %w , color: %s", ip, isSmall, id, err , payload.Color)
  199. }
  200. }
  201. for _, ip := range payload.Ips {
  202. wg.Add(2)
  203. go deleteFromWall(false, ip)
  204. go deleteFromWall(true, ip)
  205. }
  206. wg.Wait()
  207. close(errChan)
  208. var errs []string
  209. for err := range errChan {
  210. logger.Error("删除IP白名单过程中发生错误", zap.Error(err), zap.String("color", payload.Color))
  211. errs = append(errs, err.Error())
  212. }
  213. if len(errs) > 0 {
  214. processingErr = fmt.Errorf("删除IP任务中发生错误: %s", strings.Join(errs, "; ") + ", color: " + payload.Color)
  215. }
  216. default:
  217. processingErr = fmt.Errorf("unknown action: %s", payload.Action)
  218. logger.Warn("在IP白名单任务中收到未知操作", zap.String("action", payload.Action), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  219. }
  220. if processingErr != nil {
  221. logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  222. } else {
  223. logger.Info("已成功处理IP白名单任务", zap.String("action", payload.Action), zap.Any("ips", payload.Ips), zap.String("color", payload.Color))
  224. }
  225. return processingErr
  226. }