Browse Source

feat(service): 实现 IP 白名单功能并优化域名白名单逻辑

- 新增 IP 白名单处理逻辑,支持 IP 地址的添加和删除
- 优化域名白名单处理,支持更灵活的路由键
- 重构消息发布逻辑,提高可复用性和可维护性
- 更新配置文件,改为使用 Topic 交换机和通配符路由键
-修复部分代码错误,提高系统稳定性
fusu 1 month ago
parent
commit
741aed015c

+ 1 - 1
cmd/server/wire/wire_gen.go

@@ -62,7 +62,7 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	globalLimitRepository := repository.NewGlobalLimitRepository(repositoryRepository)
 	tcpforwardingRepository := repository.NewTcpforwardingRepository(repositoryRepository)
 	udpForWardingRepository := repository.NewUdpForWardingRepository(repositoryRepository)
-	wafFormatterService := service.NewWafFormatterService(serviceService, globalLimitRepository, hostRepository, requiredService, parserService, tcpforwardingRepository, udpForWardingRepository, webForwardingRepository, hostService)
+	wafFormatterService := service.NewWafFormatterService(serviceService, globalLimitRepository, hostRepository, requiredService, parserService, tcpforwardingRepository, udpForWardingRepository, webForwardingRepository, rabbitMQ, hostService)
 	aoDunService := service.NewAoDunService(serviceService, viperViper)
 	gateWayGroupIpRepository := repository.NewGateWayGroupIpRepository(repositoryRepository)
 	gatewayGroupRepository := repository.NewGatewayGroupRepository(repositoryRepository)

+ 10 - 9
config/local.yml

@@ -108,19 +108,20 @@ rabbitmq:
   tasks:
     # IP白名单更新任务
     ip_white:
-      exchange: "tasks_direct_exchange_test" # 使用一个统一的direct交换机
-      queue: "ip_white_queue_test"
-      routing_key: "task.ip_white.update"
+      exchange: "tasks_direct_exchange_test" # 改为使用 Topic 交换机,与域名任务保持一致
+      exchange_type: "topic"              # 显式指定交换机类型
+      queue: "ip_white_queue"
+      routing_key: "task.ip_white.*"      # 使用通配符路由键,匹配 task.ip_white.add, task.ip_white.del 等
       consumer_count: 2
       prefetch_count: 1
 
     # 用户注册后续任务(例如:发送欢迎邮件)
-    user_register:
-      exchange: "tasks_direct_exchange_test"
-      queue: "user_register_queue_test"
-      routing_key: "task.user.register.notify"
-      consumer_count: 3
-      prefetch_count: 1
+#    user_register:
+#      exchange: "tasks_direct_exchange_test"
+#      queue: "user_register_queue_test"
+#      routing_key: "task.user.register.notify"
+#      consumer_count: 3
+#      prefetch_count: 1
 
     # 域名白名单任务
     domain_whitelist:

+ 3 - 2
config/prod.yml

@@ -110,9 +110,10 @@ rabbitmq:
   tasks:
     # IP白名单更新任务
     ip_white:
-      exchange: "tasks_direct_exchange" # 使用一个统一的direct交换机
+      exchange: "tasks_direct_exchange_test" # 改为使用 Topic 交换机,与域名任务保持一致
+      exchange_type: "topic"              # 显式指定交换机类型
       queue: "ip_white_queue"
-      routing_key: "task.ip_white.update"
+      routing_key: "task.ip_white.*"      # 使用通配符路由键,匹配 task.ip_white.add, task.ip_white.del 等
       consumer_count: 2
       prefetch_count: 1
 

+ 89 - 97
internal/job/whitelist.go

@@ -4,6 +4,7 @@ import (
 	"context"
 	"encoding/json"
 	"fmt"
+	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
 	"go.uber.org/zap"
 )
@@ -12,6 +13,9 @@ import (
 type WhitelistJob interface {
 	// DomainConsumer 启动消费者,处理域名白名单任务
 	DomainConsumer(ctx context.Context)
+
+	// IpConsumer 启动消费者,处理 IP 白名单任务
+	IpConsumer(ctx context.Context)
 }
 
 // NewWhitelistJob 创建一个新的 WhitelistJob
@@ -88,11 +92,11 @@ func (j *whitelistJob) DomainConsumer(ctx context.Context) {
 				processingErr = j.aoDunService.DomainWhiteList(ctx, payload.Domain, payload.Ip, payload.Action)
 			default:
 				processingErr = fmt.Errorf("unknown action: %s", payload.Action)
-				j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
+				j.logger.Warn("在 域名 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
 			}
 
 			if processingErr == nil {
-				j.logger.Info("Successfully processed domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
+				j.logger.Info("已成功处理域名白名单任务", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
 			}
 
 			// 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
@@ -114,98 +118,86 @@ func (j *whitelistJob) DomainConsumer(ctx context.Context) {
 }
 
 
-//func (j *whitelistJob) IpConsumer(ctx context.Context) {
-//	taskName := "IP_whitelist"
-//	taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
-//	if !ok {
-//		j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
-//		return
-//	}
-//
-//	consumerName := "IP_whitelist_consumer"
-//	j.logger.Info("正在启动域名白名单消费者...",
-//		zap.String("task", taskName),
-//		zap.String("queue", taskCfg.Queue),
-//		zap.String("consumer", consumerName),
-//	)
-//
-//	msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
-//	if err != nil {
-//		j.logger.Error("启动IP白名单消费者失败", zap.Error(err))
-//		return
-//	}
-//
-//	// Define the message payload structure, now including an action field
-//	type ipTaskPayload struct {
-//		IP string `json:"IP"`
-//		Action string `json:"action"` // "add" or "del"
-//	}
-//
-//	for {
-//		select {
-//		case <-ctx.Done():
-//			j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName))
-//			return
-//		case d, ok := <-msgs:
-//			if !ok {
-//				j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName))
-//				return
-//			}
-//
-//			// 解析消息
-//			var payload ipTaskPayload
-//			if err := json.Unmarshal(d.Body, &payload); err != nil {
-//				j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
-//				// 消息格式错误,直接拒绝且不重新入队
-//				_ = d.Nack(false, false)
-//				continue
-//			}
-//
-//			j.logger.Info("收到IP白名单任务",
-//				zap.String("domain", payload.IP),
-//				zap.String("routing_key", d.RoutingKey),
-//			)
-//
-//			// Call business logic based on the action
-//			var processingErr error
-//
-//			// Call business logic based on the action
-//			switch payload.Action {
-//			case "add":
-//				processingErr = j.aoDunService.AddWhiteStaticList(ctx, []string{payload.IP})
-//				if processingErr == nil {
-//					j.logger.Info("Successfully processed 'add' IP whitelist task", zap.String("IP", payload.IP))
-//				}
-//			case "del":
-//				processingErr = j.aoDunService.DomainWhiteList(ctx, []string{payload.IP})
-//				if processingErr == nil {
-//					j.logger.Info("Successfully processed 'delete' IP whitelist task", zap.String("IP", payload.IP))
-//				}
-//			case "get":
-//				ids,processingErr = j.aoDunService.d(ctx)
-//				if processingErr == nil {
-//					j.logger.Info("Successfully processed 'get' IP whitelist task", zap.String("IP", payload.IP))
-//				}
-//
-//			default:
-//				j.logger.Warn("Received unknown action in IP whitelist task", zap.String("action", payload.Action), zap.String("IP", payload.IP))
-//				processingErr = fmt.Errorf("unknown action: %s", payload.Action)
-//			}
-//
-//			// 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
-//			if processingErr != nil {
-//				j.logger.Error("处理IP白名单任务失败", zap.Error(processingErr), zap.String("IP", payload.IP))
-//				// 业务失败,拒绝消息并不重新入队
-//				if err := d.Nack(false, false); err != nil {
-//					j.logger.Error("消息 Nack 失败", zap.Error(err))
-//				}
-//			} else {
-//				// 业务处理成功,手动发送确认
-//				if err := d.Ack(false); err != nil {
-//					j.logger.Error("IP白名单任务消息确认失败", zap.Error(err))
-//				}
-//			}
-//
-//		}
-//	}
-//}
+func (j *whitelistJob) IpConsumer(ctx context.Context) {
+	taskName := "ip_white"
+	taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
+	if !ok {
+		j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
+		return
+	}
+
+	consumerName := "ip_white_consumer"
+	j.logger.Info("正在启动IP白名单消费者...",
+		zap.String("task", taskName),
+		zap.String("queue", taskCfg.Queue),
+		zap.String("consumer", consumerName),
+	)
+
+	msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
+	if err != nil {
+		j.logger.Error("启动IP白名单消费者失败", zap.Error(err))
+		return
+	}
+
+	// Define the message payload structure, now including an action field
+	type ipTaskPayload struct {
+		Ips     []v1.IpInfo `json:"ips"`
+		Action string `json:"action"`
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			j.logger.Info("IP白名单消费者正在关闭...", zap.String("task", taskName))
+			return
+		case d, ok := <-msgs:
+			if !ok {
+				j.logger.Warn("消息通道已关闭,IP白名单消费者退出。", zap.String("task", taskName))
+				return
+			}
+
+			// 解析消息
+			var payload ipTaskPayload
+			if err := json.Unmarshal(d.Body, &payload); err != nil {
+				j.logger.Error("解析IP白名单消息失败", zap.Error(err), zap.ByteString("body", d.Body))
+				// 消息格式错误,直接拒绝且不重新入队
+				_ = d.Nack(false, false)
+				continue
+			}
+
+			j.logger.Info("收到IP白名单任务",
+				zap.Any("IP", payload.Ips),
+				zap.String("routing_key", d.RoutingKey),
+			)
+
+			// Call business logic based on the action
+			var processingErr error
+			switch payload.Action {
+			case "add":
+				processingErr = j.aoDunService.AddWhiteStaticList(ctx, payload.Ips)
+			default:
+				processingErr = fmt.Errorf("unknown action: %s", payload.Action)
+				j.logger.Warn("在 IP 白名单任务中收到未知操作", zap.String("action", payload.Action), zap.Any("IP", payload.Ips))
+			}
+
+			if processingErr == nil {
+				j.logger.Info("已成功处理IP白名单任务", zap.String("action", payload.Action), zap.Any("IP", payload.Ips))
+			}
+
+			// 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
+			if processingErr != nil {
+				j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.Any("domain", payload.Ips))
+				// 业务失败,拒绝消息并不重新入队
+				if err := d.Nack(false, false); err != nil {
+					j.logger.Error("消息 Nack 失败", zap.Error(err))
+				}
+			} else {
+				// 业务处理成功,手动发送确认
+				if err := d.Ack(false); err != nil {
+					j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
+				}
+			}
+
+		}
+	}
+}

+ 11 - 1
internal/server/job.go

@@ -43,12 +43,22 @@ func (j *JobServer) Start(ctx context.Context) error {
 	go func() {
 		defer func() {
 			if r := recover(); r != nil {
-				j.log.Error("whitelistJob consumer panic", zap.Any("error", r))
+				j.log.Error("whitelistJob domain consumer panic", zap.Any("error", r))
 			}
 			j.wg.Done()
 		}()
 		j.whitelistJob.DomainConsumer(ctx)
 	}()
+	j.wg.Add(1)
+	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				j.log.Error("whitelistJob ip consumer panic", zap.Any("error", r))
+			}
+			j.wg.Done()
+		}()
+		j.whitelistJob.IpConsumer(ctx)
+	}()
 
 	j.wg.Wait()
 

+ 11 - 2
internal/service/aodun.go

@@ -11,6 +11,7 @@ import (
 	"io"
 	"net/http"
 	"net/url"
+	"strings"
 	"time"
 )
 
@@ -30,6 +31,8 @@ func NewAoDunService(
 		clientID:          conf.GetString("aodun.clientID"),
 		username:          conf.GetString("aodun.username"),
 		password:          conf.GetString("aodun.password"),
+		IPusername:        conf.GetString("aodunIp.username"),
+		IPpassword:        conf.GetString("aodunIp.password"),
 		domainUserName:    conf.GetString("domainWhite.username"),
 		domainPassword:    conf.GetString("domainWhite.password"),
 
@@ -42,6 +45,8 @@ type aoDunService struct {
 	clientID string
 	username string
 	password string
+	IPusername string
+	IPpassword string
 	domainUserName string
 	domainPassword string
 }
@@ -142,8 +147,8 @@ func (s *aoDunService) GetToken(ctx context.Context)  (string,string,error) {
 	formData := map[string]interface{}{
 		"ClientID":  s.clientID,
 		"GrantType": "password",
-		"Username":  s.username,
-		"Password":  s.password,
+		"Username":  s.IPusername,
+		"Password":  s.IPpassword,
 	}
 
 	resBody, err := s.sendFormData(ctx,"/oauth/token","","",formData)
@@ -194,6 +199,10 @@ func (s *aoDunService) AddWhiteStaticList(ctx context.Context,req []v1.IpInfo) e
 		return  fmt.Errorf("反序列化响应 JSON 失败 ( 内容: %s): %w", string(resBody), err)
 	}
 	if res.Code != 0 {
+		if strings.Contains(res.Msg,"操作部分成功,重复IP如下") {
+			s.logger.Info(res.Msg)
+			return nil
+		}
 		return  fmt.Errorf("API 错误: code %d, msg '%s'",
 			res.Code, res.Msg)
 	}

+ 140 - 0
internal/service/wafformatter.go

@@ -2,11 +2,16 @@ package service
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
+	amqp "github.com/rabbitmq/amqp091-go"
 	"github.com/spf13/cast"
+	"go.uber.org/zap"
 	"golang.org/x/net/publicsuffix"
+	"net"
 	"slices"
 	"strconv"
 )
@@ -17,6 +22,10 @@ type WafFormatterService interface {
 	validateWafPortCount(ctx context.Context, hostId int) error
 	validateWafDomainCount(ctx context.Context, req v1.GlobalRequire) error
 	ConvertToWildcardDomain(ctx context.Context,domain string) (string, error)
+	AppendWafIp(ctx context.Context, req []string) ([]v1.IpInfo, error)
+	AppendWafIpByRemovePort(ctx context.Context, req []string) ([]v1.IpInfo, error)
+	PublishIpWhitelistTask(ips []v1.IpInfo, action string)
+	PublishDomainWhitelistTask(domain, ip, action string)
 }
 func NewWafFormatterService(
     service *Service,
@@ -27,6 +36,7 @@ func NewWafFormatterService(
 	tcpforwardingRep repository.TcpforwardingRepository,
 	udpForWardingRep repository.UdpForWardingRepository,
 	webForwardingRep repository.WebForwardingRepository,
+	mq *rabbitmq.RabbitMQ,
 	host HostService,
 ) WafFormatterService {
 	return &wafFormatterService{
@@ -39,6 +49,7 @@ func NewWafFormatterService(
 		udpForWardingRep: udpForWardingRep,
 		webForwardingRep: webForwardingRep,
 		host : host,
+		mq:    mq,
 	}
 }
 
@@ -52,8 +63,10 @@ type wafFormatterService struct {
 	udpForWardingRep repository.UdpForWardingRepository
 	webForwardingRep repository.WebForwardingRepository
 	host HostService
+	mq *rabbitmq.RabbitMQ
 }
 
+
 func (s *wafFormatterService) require(ctx context.Context,req v1.GlobalRequire,category string) (v1.GlobalRequire, error) {
 	RuleIds, err := s.globalRep.GetGlobalLimitByHostId(ctx, int64(req.HostId))
 	if err != nil {
@@ -162,4 +175,131 @@ func (s *wafFormatterService) ConvertToWildcardDomain(ctx context.Context, domai
 	// 4. 如果原始域名和可注册域名相同(例如,输入就是 "google.com"),
 	//    则说明没有子域名可替换,直接返回原始域名。
 	return domain, nil
+}
+
+func (s *wafFormatterService) AppendWafIp(ctx context.Context, req []string) ([]v1.IpInfo, error) {
+	var ips []v1.IpInfo
+	for _, v := range req {
+		ips = append(ips, v1.IpInfo{
+			FType:      "0",
+			FStartIp:   v,
+			FEndIp:     v,
+			FRemark:    "宁波高防IP过白",
+			FServerIp:  "",
+		})
+	}
+	return ips, nil
+}
+
+func (s *wafFormatterService) AppendWafIpByRemovePort(ctx context.Context, req []string) ([]v1.IpInfo, error) {
+	var ips []v1.IpInfo
+	for _, v := range req {
+		ip, _, err := net.SplitHostPort(v)
+		if err != nil {
+			return nil, err
+		}
+		ips = append(ips, v1.IpInfo{
+			FType:      "0",
+			FStartIp:   ip,
+			FEndIp:     ip,
+			FRemark:    "宁波高防IP过白",
+			FServerIp:  "",
+		})
+	}
+	return ips, nil
+
+}
+
+
+// publishDomainWhitelistTask is a helper function to publish domain whitelist tasks to RabbitMQ.
+// It can handle different actions like "add" or "del".
+func (s *wafFormatterService) PublishDomainWhitelistTask(domain, ip, action string) {
+	// Define message payload, including the action
+	type domainTaskPayload struct {
+		Domain string `json:"domain"`
+		Ip     string `json:"ip"`
+		Action string `json:"action"`
+	}
+	payload := domainTaskPayload{
+		Domain: domain,
+		Ip:     ip,
+		Action: action,
+	}
+
+	// Serialize the message
+	msgBody, err := json.Marshal(payload)
+	if err != nil {
+		s.logger.Error("Failed to serialize domain whitelist task message", zap.Error(err), zap.String("domain", domain), zap.String("ip", ip), zap.String("action", action))
+		return
+	}
+
+	// Get task configuration
+	taskCfg, ok := s.mq.GetTaskConfig("domain_whitelist")
+	if !ok {
+		s.logger.Error("Failed to get 'domain_whitelist' task configuration")
+		return
+	}
+
+	// Construct the routing key dynamically based on the action
+	routingKey := fmt.Sprintf("whitelist.domain.%s", action)
+
+	// Construct the amqp.Publishing message
+	publishingMsg := amqp.Publishing{
+		ContentType:  "application/json",
+		Body:         msgBody,
+		DeliveryMode: amqp.Persistent, // Persistent message
+	}
+
+	// Publish the message
+	err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
+	if err != nil {
+		s.logger.Error("发布 域名 白名单任务到 MQ 失败", zap.Error(err), zap.String("domain", domain), zap.String("action", action))
+	} else {
+		s.logger.Info("成功将 域名 白名单任务发布到 MQ", zap.String("domain", domain), zap.String("action", action))
+	}
+}
+
+
+func (s *wafFormatterService) PublishIpWhitelistTask(ips []v1.IpInfo, action string) {
+	// Define message payload, including the action
+	type ipTaskPayload struct {
+		Ips     []v1.IpInfo `json:"ips"`
+		Action string `json:"action"`
+	}
+	payload := ipTaskPayload{
+		Ips:     ips,
+		Action: action,
+	}
+
+	// Serialize the message
+	msgBody, err := json.Marshal(payload)
+	if err != nil {
+		s.logger.Error("序列化 IP 白名单任务消息失败", zap.Error(err), zap.Any("IPs", ips), zap.String("action", action))
+		return
+	}
+
+	// Get task configuration
+	taskCfg, ok := s.mq.GetTaskConfig("ip_white")
+	if !ok {
+		s.logger.Error("无法获取“ip_white”任务配置")
+		return
+	}
+
+	// Construct the routing key dynamically based on the action
+	routingKey := fmt.Sprintf("task.ip_white.%s", action)
+
+	// Construct the amqp.Publishing message
+	publishingMsg := amqp.Publishing{
+		ContentType:  "application/json",
+		Body:         msgBody,
+		DeliveryMode: amqp.Persistent, // Persistent message
+	}
+
+	// Publish the message
+	err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
+	if err != nil {
+		s.logger.Error("发布 IP 白名单任务到 MQ 失败", zap.Error(err), zap.String("action", action))
+	} else {
+		s.logger.Info("成功将 IP 白名单任务发布到 MQ", zap.String("action", action))
+	}
 }

+ 31 - 63
internal/service/webforwarding.go

@@ -8,9 +8,8 @@ import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
-	amqp "github.com/rabbitmq/amqp091-go"
-	"go.uber.org/zap"
 	"golang.org/x/sync/errgroup"
+	"net"
 	"sort"
 	"strconv"
 	"strings"
@@ -336,7 +335,31 @@ func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.Web
 			return err
 		}
 
-		go s.publishDomainWhitelistTask(doMain,ip, "add")
+		go s.wafformatter.PublishDomainWhitelistTask(doMain,ip, "add")
+	}
+	// IP过白
+	var ips []v1.IpInfo
+	if req.WebForwardingData.BackendList != nil {
+		for _, v := range req.WebForwardingData.BackendList {
+			ip, _, err := net.SplitHostPort(v.Addr)
+			if err != nil {
+				return err
+			}
+			ips = append(ips, v1.IpInfo{
+				FType:      "0",
+				FStartIp:   ip,
+				FEndIp:     ip,
+				FRemark:    "宁波高防IP过白",
+				FServerIp:  "",
+			})
+		}
+		allowIps, err := s.wafformatter.AppendWafIp(ctx, req.WebForwardingData.AllowIpList)
+		if err != nil {
+			return err
+		}
+		ips = append(ips, allowIps...)
+		go s.wafformatter.PublishIpWhitelistTask(ips, "add")
+
 	}
 
 
@@ -347,17 +370,8 @@ func (s *webForwardingService) AddWebForwarding(ctx context.Context, req *v1.Web
 		return err
 	}
 	webRuleModel := s.buildWebRuleModel(&req.WebForwardingData, require, id)
-	//var ips []v1.IpInfo
-	//for _, v := range req.WebForwardingData.AllowIpList {
-	//	ips = append(ips, v1.IpInfo{
-	//		FType:      "allow",
-	//		FStartIp:   v,
-	//		FEndIp:     v,
-	//		FRemark:    "宁波高防IP过白",
-	//		FServerIp:  "",
-	//	})
-	//}
-	//err = s.aoDun.AddDomainWhiteList(ctx, req.WebForwardingData.Domain)
+
+
 	if _, err = s.webForwardingRepository.AddWebForwardingIps(ctx, *webRuleModel); err != nil {
 		return err
 	}
@@ -393,8 +407,8 @@ func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.We
 		if err != nil {
 			return err
 		}
-		go s.publishDomainWhitelistTask(webData.Domain, Ip, "del")
-		go s.publishDomainWhitelistTask(doMain, Ip, "add")
+		go s.wafformatter.PublishDomainWhitelistTask(webData.Domain, Ip, "del")
+		go s.wafformatter.PublishDomainWhitelistTask(doMain, Ip, "add")
 	}
 
 
@@ -429,7 +443,7 @@ func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, Ids []in
 			if err != nil {
 				return err
 			}
-			go s.publishDomainWhitelistTask(doMain,ip, "del")
+			go s.wafformatter.PublishDomainWhitelistTask(doMain,ip, "del")
 		}
 		_, err = s.crawler.DeleteRule(ctx, wafWebId, "admin/delete/waf_web?page=1&__pageSize=10&__sort=waf_web_id&__sort_type=desc")
 		if err != nil {
@@ -582,50 +596,4 @@ func (s *webForwardingService) GetWebForwardingWafWebAllIps(ctx context.Context,
 	return finalResults, nil
 }
 
-// publishDomainWhitelistTask is a helper function to publish domain whitelist tasks to RabbitMQ.
-// It can handle different actions like "add" or "del".
-func (s *webForwardingService) publishDomainWhitelistTask(domain, ip, action string) {
-	// Define message payload, including the action
-	type domainTaskPayload struct {
-		Domain string `json:"domain"`
-		Ip     string `json:"ip"`
-		Action string `json:"action"`
-	}
-	payload := domainTaskPayload{
-		Domain: domain,
-		Ip:     ip,
-		Action: action,
-	}
-
-	// Serialize the message
-	msgBody, err := json.Marshal(payload)
-	if err != nil {
-		s.logger.Error("Failed to serialize domain whitelist task message", zap.Error(err), zap.String("domain", domain), zap.String("ip", ip), zap.String("action", action))
-		return
-	}
-
-	// Get task configuration
-	taskCfg, ok := s.mq.GetTaskConfig("domain_whitelist")
-	if !ok {
-		s.logger.Error("Failed to get 'domain_whitelist' task configuration")
-		return
-	}
-
-	// Construct the routing key dynamically based on the action
-	routingKey := fmt.Sprintf("whitelist.domain.%s", action)
-
-	// Construct the amqp.Publishing message
-	publishingMsg := amqp.Publishing{
-		ContentType:  "application/json",
-		Body:         msgBody,
-		DeliveryMode: amqp.Persistent, // Persistent message
-	}
 
-	// Publish the message
-	err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
-	if err != nil {
-		s.logger.Error("Failed to publish domain whitelist task to MQ", zap.Error(err), zap.String("domain", domain), zap.String("action", action))
-	} else {
-		s.logger.Info("Successfully published domain whitelist task to MQ", zap.String("domain", domain), zap.String("action", action))
-	}
-}