Ver Fonte

refactor(task): 重构任务服务并优化域名白名单逻辑

- 重命名 DeleteDomainWhiteList 方法为 DelDomainWhiteList
- 更新 RabbitMQ 配置,增加多个任务队列
- 优化域名白名单任务处理逻辑,支持异步操作
- 调整任务服务启动方式,使用非阻塞模式
- 重构 wire 文件,整合任务服务和作业服务
fusu há 1 mês atrás
pai
commit
d99b91e957

+ 5 - 11
cmd/server/wire/wire.go

@@ -5,7 +5,6 @@ package wire
 
 import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/handler"
-	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/middleware"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
@@ -94,11 +93,7 @@ var handlerSet = wire.NewSet(
 	handler.NewGateWayGroupIpHandler,
 )
 
-var jobSet = wire.NewSet(
-	job.NewJob,
-	job.NewUserJob,
-	job.NewWhitelistJob,
-)
+
 
 // 限流器依赖集
 var limiterSet = wire.NewSet(
@@ -108,17 +103,16 @@ var limiterSet = wire.NewSet(
 
 var serverSet = wire.NewSet(
 	server.NewHTTPServer,
-	server.NewJobServer,
+	
 )
 
+// build App
 // build App
 func newApp(
 	httpServer *http.Server,
-	jobServer *server.JobServer,
-	// task *server.Task,
 ) *app.App {
 	return app.NewApp(
-		app.WithServer(httpServer, jobServer),
+		app.WithServer(httpServer),
 		app.WithName("demo-server"),
 	)
 }
@@ -128,7 +122,7 @@ func NewWire(*viper.Viper, *log.Logger) (*app.App, func(), error) {
 		repositorySet,
 		serviceSet,
 		handlerSet,
-		jobSet,
+
 		serverSet,
 		limiterSet,
 		sid.NewSid,

+ 4 - 12
cmd/server/wire/wire_gen.go

@@ -8,7 +8,6 @@ package wire
 
 import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/handler"
-	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/middleware"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
@@ -85,11 +84,7 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	globalLimitService := service.NewGlobalLimitService(serviceService, globalLimitRepository, duedateService, crawlerService, viperViper, requiredService, parserService, hostService, tcpLimitService, udpLimitService, webLimitService, gatewayGroupService, hostRepository, gatewayGroupRepository)
 	globalLimitHandler := handler.NewGlobalLimitHandler(handlerHandler, globalLimitService)
 	httpServer := server.NewHTTPServer(logger, viperViper, jwtJWT, limiterLimiter, handlerFunc, userHandler, gameShieldHandler, gameShieldBackendHandler, webForwardingHandler, webLimitHandler, tcpforwardingHandler, udpForWardingHandler, tcpLimitHandler, udpLimitHandler, globalLimitHandler)
-	jobJob := job.NewJob(transaction, logger, sidSid, rabbitMQ)
-	userJob := job.NewUserJob(jobJob, userRepository)
-	whitelistJob := job.NewWhitelistJob(jobJob, aoDunService)
-	jobServer := server.NewJobServer(logger, userJob, whitelistJob)
-	appApp := newApp(httpServer, jobServer)
+	appApp := newApp(httpServer)
 	return appApp, func() {
 		cleanup()
 	}, nil
@@ -103,18 +98,15 @@ var serviceSet = wire.NewSet(service.NewService, service.NewAoDunService, servic
 
 var handlerSet = wire.NewSet(handler.NewHandler, handler.NewUserHandler, handler.NewGameShieldHandler, handler.NewGameShieldPublicIpHandler, handler.NewWebForwardingHandler, handler.NewTcpforwardingHandler, handler.NewUdpForWardingHandler, handler.NewGameShieldUserIpHandler, handler.NewWebLimitHandler, handler.NewTcpLimitHandler, handler.NewUdpLimitHandler, handler.NewGameShieldBackendHandler, handler.NewGameShieldSdkIpHandler, handler.NewHostHandler, handler.NewGlobalLimitHandler, handler.NewGatewayGroupHandler, handler.NewGateWayGroupIpHandler)
 
-var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob)
-
 // 限流器依赖集
 var limiterSet = wire.NewSet(limiter.NewLimiter, middleware.NewRateLimitMiddleware)
 
-var serverSet = wire.NewSet(server.NewHTTPServer, server.NewJobServer)
+var serverSet = wire.NewSet(server.NewHTTPServer)
 
+// build App
 // build App
 func newApp(
 	httpServer *http.Server,
-	jobServer *server.JobServer,
-
 ) *app.App {
-	return app.NewApp(app.WithServer(httpServer, jobServer), app.WithName("demo-server"))
+	return app.NewApp(app.WithServer(httpServer), app.WithName("demo-server"))
 }

+ 13 - 1
cmd/task/wire/wire.go

@@ -6,6 +6,7 @@ package wire
 import (
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
+	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
 	"github.com/go-nunu/nunu-layout-advanced/internal/task"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/app"
@@ -21,6 +22,7 @@ var repositorySet = wire.NewSet(
 	//repository.NewRedis,
 	repository.NewMongoClient,
 	repository.NewMongoDB,
+	repository.NewRabbitMQ,
 	repository.NewRepository,
 	repository.NewTransaction,
 	repository.NewUserRepository,
@@ -37,12 +39,20 @@ var taskSet = wire.NewSet(
 	task.NewUserTask,
 	task.NewGameShieldTask,
 )
+
+var jobSet = wire.NewSet(
+	job.NewJob,
+	job.NewUserJob,
+	job.NewWhitelistJob,
+)
 var serverSet = wire.NewSet(
 	server.NewTaskServer,
+	server.NewJobServer,
 )
 
 var serviceSet = wire.NewSet(
 	service.NewService,
+	service.NewAoDunService,
 	service.NewGameShieldService,
 	service.NewCrawlerService,
 	service.NewGameShieldPublicIpService,
@@ -59,9 +69,10 @@ var serviceSet = wire.NewSet(
 // build App
 func newApp(
 	task *server.TaskServer,
+	jobServer *server.JobServer,
 ) *app.App {
 	return app.NewApp(
-		app.WithServer(task),
+		app.WithServer(task, jobServer),
 		app.WithName("demo-task"),
 	)
 }
@@ -70,6 +81,7 @@ func NewWire(*viper.Viper, *log.Logger) (*app.App, func(), error) {
 	panic(wire.Build(
 		repositorySet,
 		taskSet,
+		jobSet,
 		serverSet,
 		serviceSet,
 		newApp,

+ 17 - 6
cmd/task/wire/wire_gen.go

@@ -7,6 +7,7 @@
 package wire
 
 import (
+	"github.com/go-nunu/nunu-layout-advanced/internal/job"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
@@ -25,7 +26,8 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	db := repository.NewDB(viperViper, logger)
 	client := repository.NewMongoClient(viperViper)
 	database := repository.NewMongoDB(client, viperViper)
-	repositoryRepository := repository.NewRepository(logger, db, client, database)
+	rabbitMQ, cleanup := repository.NewRabbitMQ(viperViper, logger)
+	repositoryRepository := repository.NewRepository(logger, db, client, database, rabbitMQ)
 	transaction := repository.NewTransaction(repositoryRepository)
 	sidSid := sid.NewSid()
 	taskTask := task.NewTask(transaction, logger, sidSid)
@@ -51,23 +53,32 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	gameShieldBackendService := service.NewGameShieldBackendService(serviceService, gameShieldBackendRepository, gameShieldRepository, crawlerService, gameShieldPublicIpService, duedateService, formatterService, parserService, requiredService, viperViper, gameShieldService, hostService)
 	gameShieldTask := task.NewGameShieldTask(taskTask, gameShieldRepository, gameShieldBackendService)
 	taskServer := server.NewTaskServer(logger, userTask, gameShieldTask)
-	appApp := newApp(taskServer)
+	jobJob := job.NewJob(transaction, logger, sidSid, rabbitMQ)
+	userJob := job.NewUserJob(jobJob, userRepository)
+	aoDunService := service.NewAoDunService(serviceService, viperViper)
+	whitelistJob := job.NewWhitelistJob(jobJob, aoDunService)
+	jobServer := server.NewJobServer(logger, userJob, whitelistJob)
+	appApp := newApp(taskServer, jobServer)
 	return appApp, func() {
+		cleanup()
 	}, nil
 }
 
 // wire.go:
 
-var repositorySet = wire.NewSet(repository.NewDB, repository.NewMongoClient, repository.NewMongoDB, repository.NewRepository, repository.NewTransaction, repository.NewUserRepository, repository.NewGameShieldRepository, repository.NewGameShieldBackendRepository, repository.NewGameShieldPublicIpRepository, repository.NewHostRepository, repository.NewGameShieldUserIpRepository, repository.NewGameShieldSdkIpRepository)
+var repositorySet = wire.NewSet(repository.NewDB, repository.NewMongoClient, repository.NewMongoDB, repository.NewRabbitMQ, repository.NewRepository, repository.NewTransaction, repository.NewUserRepository, repository.NewGameShieldRepository, repository.NewGameShieldBackendRepository, repository.NewGameShieldPublicIpRepository, repository.NewHostRepository, repository.NewGameShieldUserIpRepository, repository.NewGameShieldSdkIpRepository)
 
 var taskSet = wire.NewSet(task.NewTask, task.NewUserTask, task.NewGameShieldTask)
 
-var serverSet = wire.NewSet(server.NewTaskServer)
+var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob)
 
-var serviceSet = wire.NewSet(service.NewService, service.NewGameShieldService, service.NewCrawlerService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewHostService, service.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewGameShieldUserIpService)
+var serverSet = wire.NewSet(server.NewTaskServer, server.NewJobServer)
+
+var serviceSet = wire.NewSet(service.NewService, service.NewAoDunService, service.NewGameShieldService, service.NewCrawlerService, service.NewGameShieldPublicIpService, service.NewDuedateService, service.NewFormatterService, service.NewParserService, service.NewRequiredService, service.NewHostService, service.NewGameShieldBackendService, service.NewGameShieldSdkIpService, service.NewGameShieldUserIpService)
 
 // build App
 func newApp(task2 *server.TaskServer,
+	jobServer *server.JobServer,
 ) *app.App {
-	return app.NewApp(app.WithServer(task2), app.WithName("demo-task"))
+	return app.NewApp(app.WithServer(task2, jobServer), app.WithName("demo-task"))
 }

+ 30 - 11
config/prod.yml

@@ -93,17 +93,36 @@ aodun:
   clientId: "bd9d36fc-17e1-11ef-8a72-549f35180370"
   Url: "https://115.238.184.13:16008"
 
-# RabbitMQ Configuration
+
 rabbitmq:
-  host: 127.0.0.1
+  host: "110.42.96.15"
   port: 5672
-  username: "guest"
-  password: "guest"
+  username: "fusu"
+  password: "fusu12332ATQREW"
   vhost: "/"
-  connection_timeout: 5s
-  task:
-    exchange: "task.exchange"
-    queue: "task.queue"
-    routing_key: "task.routing.key"
-    consumer_count: 5
-    prefetch_count: 1
+  connection_timeout: 15s
+  tasks:
+    # IP白名单更新任务
+    ip_white:
+      exchange: "tasks_direct_exchange" # 使用一个统一的direct交换机
+      queue: "ip_white_queue"
+      routing_key: "task.ip_white.update"
+      consumer_count: 2
+      prefetch_count: 1
+
+    # 用户注册后续任务(例如:发送欢迎邮件)
+#    user_register:
+#      exchange: "tasks_direct_exchange"
+#      queue: "user_register_queue"
+#      routing_key: "task.user.register.notify"
+#      consumer_count: 3
+#      prefetch_count: 1
+
+    # 域名白名单任务
+    domain_whitelist:
+      exchange: "whitelist_topic_exchange" # Topic 类型的交换机
+      exchange_type: "topic"              # 显式指定交换机类型
+      queue: "domain_whitelist_queue"
+      routing_key: "whitelist.domain.*"   # 消费者监听的绑定键,能接收所有 domain 相关的任务
+      consumer_count: 3
+      prefetch_count: 1

+ 23 - 15
internal/job/whitelist.go

@@ -80,32 +80,40 @@ func (j *whitelistJob) DomainConsumer(ctx context.Context) {
 				zap.String("routing_key", d.RoutingKey),
 			)
 
+			// Call business logic based on the action
+			var processingErr error
+
 			// Call business logic based on the action
 			switch payload.Action {
 			case "add":
-				if err := j.aoDunService.AddDomainWhiteList(ctx, []string{payload.Domain}); err != nil {
-					j.logger.Error("Failed to handle 'add' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain))
-					_ = d.Reject(false) // Business failure, reject message
-					continue
+				processingErr = j.aoDunService.AddDomainWhiteList(ctx, []string{payload.Domain})
+				if processingErr == nil {
+					j.logger.Info("Successfully processed 'add' domain whitelist task", zap.String("domain", payload.Domain))
 				}
-				j.logger.Info("Successfully processed 'add' domain whitelist task", zap.String("domain", payload.Domain))
 			case "del":
-				if err := j.aoDunService.DeleteDomainWhiteList(ctx, []string{payload.Domain}); err != nil {
-					j.logger.Error("Failed to handle 'delete' domain whitelist task", zap.Error(err), zap.String("domain", payload.Domain))
-					_ = d.Reject(false) // Business failure, reject message
-					continue
+				processingErr = j.aoDunService.DelDomainWhiteList(ctx, []string{payload.Domain})
+				if processingErr == nil {
+					j.logger.Info("Successfully processed 'delete' domain whitelist task", zap.String("domain", payload.Domain))
 				}
-				j.logger.Info("Successfully processed 'delete' domain whitelist task", zap.String("domain", payload.Domain))
 			default:
 				j.logger.Warn("Received unknown action in domain whitelist task", zap.String("action", payload.Action), zap.String("domain", payload.Domain))
-				_ = d.Reject(false) // Reject message with unknown action
-				continue
+				processingErr = fmt.Errorf("unknown action: %s", payload.Action)
 			}
 
-			// 业务处理完成后,手动发送确认
-			if err := d.Ack(false); err != nil {
-				j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
+			// 在循环的最后,根据 processingErr 的状态统一处理 Ack/Nack
+			if processingErr != nil {
+				j.logger.Error("处理域名白名单任务失败", zap.Error(processingErr), zap.String("domain", payload.Domain))
+				// 业务失败,拒绝消息并不重新入队
+				if err := d.Nack(false, false); err != nil {
+					j.logger.Error("消息 Nack 失败", zap.Error(err))
+				}
+			} else {
+				// 业务处理成功,手动发送确认
+				if err := d.Ack(false); err != nil {
+					j.logger.Error("域名白名单任务消息确认失败", zap.Error(err))
+				}
 			}
+
 		}
 	}
 }

+ 3 - 1
internal/server/task.go

@@ -69,7 +69,9 @@ func (t *TaskServer) Start(ctx context.Context) error {
 	if err != nil {
 		t.log.Error("Register SyncAllExpireTimeFromHost task error", zap.Error(err))
 	}
-	t.scheduler.StartBlocking()
+	// 使用非阻塞方式启动调度器,添加一条日志表明任务服务已启动
+	t.scheduler.StartAsync()
+	t.log.Info("task server starting...")
 	return nil
 }
 func (t *TaskServer) Stop(ctx context.Context) error {

+ 1 - 1
internal/service/aodun.go

@@ -16,7 +16,7 @@ import (
 
 type AoDunService interface {
 	AddDomainWhiteList(ctx context.Context, req []string) error
-	DeleteDomainWhiteList(ctx context.Context, req []string) error
+	DelDomainWhiteList(ctx context.Context, req []string) error
 }
 func NewAoDunService(
     service *Service,

+ 19 - 1
internal/service/webforwarding.go

@@ -344,11 +344,22 @@ func (s *webForwardingService) EditWebForwarding(ctx context.Context, req *v1.We
 	if err != nil {
 		return err
 	}
-
 	_, err = s.wafformatter.sendFormData(ctx, "admin/info/waf_web/edit?&__goadmin_edit_pk="+strconv.Itoa(req.WebForwardingData.WafWebId), "admin/edit/waf_web", formData)
 	if err != nil {
 		return err
 	}
+
+	webData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(req.WebForwardingData.Id))
+	if err != nil {
+		return err
+	}
+	// 异步任务:将域名添加到白名单
+	if webData.Domain != req.WebForwardingData.Domain {
+		go s.publishDomainWhitelistTask(webData.Domain, "del")
+		go s.publishDomainWhitelistTask(req.WebForwardingData.Domain, "add")
+	}
+
+
 	webModel := s.buildWebForwardingModel(&req.WebForwardingData, req.WebForwardingData.WafWebId, require)
 	webModel.Id = req.WebForwardingData.Id
 	if err = s.webForwardingRepository.EditWebForwarding(ctx, webModel); err != nil {
@@ -367,6 +378,13 @@ func (s *webForwardingService) DeleteWebForwarding(ctx context.Context, Ids []in
 		if err != nil {
 			return err
 		}
+		webData, err := s.webForwardingRepository.GetWebForwarding(ctx, int64(Id))
+		if err != nil {
+			return err
+		}
+		if webData.Domain != "" {
+			go s.publishDomainWhitelistTask(webData.Domain, "del")
+		}
 		_, err = s.crawler.DeleteRule(ctx, wafWebId, "admin/delete/waf_web?page=1&__pageSize=10&__sort=waf_web_id&__sort_type=desc")
 		if err != nil {
 			return err

+ 0 - 6
pkg/rabbitmq/rabbitmq.go

@@ -294,7 +294,6 @@ func (r *RabbitMQ) SetupAllTaskQueues() error {
 	}
 
 	for name, taskCfg := range r.config.Tasks {
-		r.logger.Info("正在设置任务队列", zap.String("task_name", name))
 		if err := r.setupQueue(taskCfg); err != nil {
 			return fmt.Errorf("为任务 '%s' 设置队列失败: %w", name, err)
 		}
@@ -370,11 +369,6 @@ func (r *RabbitMQ) setupQueue(taskCfg TaskConfig) error {
 			return fmt.Errorf("绑定死信队列失败: %w", err)
 		}
 
-		r.logger.Info("成功设置任务队列及其死信队列",
-			zap.String("exchange", taskCfg.Exchange),
-			zap.String("queue", taskCfg.Queue),
-			zap.String("routing_key", taskCfg.RoutingKey),
-		)
 		return nil
 	})
 }