Selaa lähdekoodia

feat(waf): 实现 WafLog 异步日志记录功能

- 新增 WafLogService 的 PublishIpWafLogTask 方法用于发布日志任务
- 实现 WafLogJob 以处理 WafLog 相关的 RabbitMQ 消息
- 在 JobServer 中集成 WafLogJob- 优化了日志记录的异步处理流程,提高了系统性能
fusu 6 päivää sitten
vanhempi
sitoutus
6306b3717f

+ 1 - 1
cmd/server/wire/wire_gen.go

@@ -102,7 +102,7 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	zzybgpService := waf2.NewZzybgpService(serviceService, gatewayipRepository, hostService, aoDunService)
 	globalLimitService := waf2.NewGlobalLimitService(serviceService, globalLimitRepository, duedateService, crawlerService, viperViper, requiredService, parserService, hostService, hostRepository, cdnService, cdnRepository, tcpforwardingRepository, udpForWardingRepository, webForwardingRepository, allowAndDenyIpService, allowAndDenyIpRepository, tcpforwardingService, udpForWardingService, webForwardingService, gatewayipRepository, gatewayipService, buildAudunService, zzybgpService)
 	wafLogRepository := admin.NewWafLogRepository(repositoryRepository)
-	wafLogService := admin2.NewWafLogService(serviceService, wafLogRepository, globalLimitRepository)
+	wafLogService := admin2.NewWafLogService(serviceService, wafLogRepository, globalLimitRepository, rabbitMQ)
 	globalLimitHandler := waf3.NewGlobalLimitHandler(handlerHandler, globalLimitService, wafLogService)
 	adminRepository := admin.NewAdminRepository(repositoryRepository)
 	adminService := admin2.NewAdminService(serviceService, adminRepository)

+ 2 - 1
cmd/task/wire/wire.go

@@ -11,10 +11,10 @@ import (
 	waf2 "github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
 	"github.com/go-nunu/nunu-layout-advanced/internal/server"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
+	admin2 "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/api/flexCdn"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/api/gameShield"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
-	admin2 "github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/task"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/app"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
@@ -67,6 +67,7 @@ var jobSet = wire.NewSet(
 	job.NewJob,
 	job.NewUserJob,
 	job.NewWhitelistJob,
+	job.NewWafLogJob,
 )
 var serverSet = wire.NewSet(
 	server.NewTaskServer,

+ 5 - 2
cmd/task/wire/wire_gen.go

@@ -93,7 +93,10 @@ func NewWire(viperViper *viper.Viper, logger *log.Logger) (*app.App, func(), err
 	jobJob := job.NewJob(transaction, logger, sidSid, rabbitMQ)
 	userJob := job.NewUserJob(jobJob, userRepository)
 	whitelistJob := job.NewWhitelistJob(jobJob, aoDunService, wafFormatterService)
-	jobServer := server.NewJobServer(logger, userJob, whitelistJob)
+	wafLogRepository := admin.NewWafLogRepository(repositoryRepository)
+	wafLogService := admin2.NewWafLogService(serviceService, wafLogRepository, globalLimitRepository, rabbitMQ)
+	wafLogJob := job.NewWafLogJob(jobJob, wafLogService)
+	jobServer := server.NewJobServer(logger, userJob, whitelistJob, wafLogJob)
 	appApp := newApp(taskServer, jobServer)
 	return appApp, func() {
 		cleanup()
@@ -106,7 +109,7 @@ var repositorySet = wire.NewSet(repository.NewDB, repository.NewRedis, repositor
 
 var taskSet = wire.NewSet(task.NewTask, task.NewUserTask, task.NewGameShieldTask, task.NewWafTask)
 
-var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob)
+var jobSet = wire.NewSet(job.NewJob, job.NewUserJob, job.NewWhitelistJob, job.NewWafLogJob)
 
 var serverSet = wire.NewSet(server.NewTaskServer, server.NewJobServer)
 

+ 1 - 1
config/local.yml

@@ -140,7 +140,7 @@ rabbitmq:
       prefetch_count: 1
 
     # 记录日志任务
-    waf_Log:
+    waf_log:
       exchange: "wafLog_topic_exchange_test" # Topic 类型的交换机
       exchange_type: "topic"              # 显式指定交换机类型
       queue: "wafLog_queue_test"

+ 4 - 11
internal/handler/api/waf/globallimit.go

@@ -1,7 +1,6 @@
 package waf
 
 import (
-	"context"
 	"github.com/gin-gonic/gin"
 	v1 "github.com/go-nunu/nunu-layout-advanced/api/v1"
 	adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
@@ -75,22 +74,16 @@ func (h *GlobalLimitHandler) EditGlobalLimit(ctx *gin.Context) {
 
 	v1.HandleSuccess(ctx, nil)
 
-	logData := adminApi.WafLog{
+
+	// 2. 启动一个新的 goroutine 在后台记录日志
+	go  h.wafLogService.PublishIpWafLogTask(ctx,adminApi.WafLog{
 		Uid:        req.Uid,
 		RequestIp:  ctx.ClientIP(), // 复制 ClientIP
 		UserAgent:  ctx.Request.UserAgent(), // 复制 UserAgent
 		Api:        ctx.Request.URL.Path, // 复制 Path
 		HostId:     req.HostId,
 		ExtraData:  req,
-	}
-
-	// 2. 启动一个新的 goroutine 在后台记录日志
-	go func() {
-		err := h.wafLogService.AddWafLog(context.Background(), logData) // 注意:这里可能需要一个后台 context
-		if err != nil {
-
-		}
-	}()
+	})
 }
 
 func (h *GlobalLimitHandler) DeleteGlobalLimit(ctx *gin.Context) {

+ 71 - 0
internal/job/job.go

@@ -1,13 +1,22 @@
 package job
 
 import (
+	"context"
+	"fmt"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/log"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
 	"github.com/go-nunu/nunu-layout-advanced/pkg/sid"
+	"github.com/google/uuid"
+	"github.com/rabbitmq/amqp091-go"
+	"go.uber.org/zap"
 )
 
+// TaskHandler 定义了处理单个消息的函数签名
+// 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。
+type TaskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error
+
 type Job struct {
 	logger   *log.Logger
 	sid      *sid.Sid
@@ -29,3 +38,65 @@ func NewJob(
 		Rabbitmq: mq,
 	}
 }
+
+// Consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑
+func (j *Job) Consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
+	taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
+	if !ok {
+		j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
+		return
+	}
+
+	j.logger.Info("正在启动消费者...",
+		zap.String("task", taskName),
+		zap.String("queue", taskCfg.Queue),
+		zap.String("consumer", consumerName),
+	)
+
+	msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
+	if err != nil {
+		j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err))
+		return
+	}
+
+	for {
+		select {
+		case <-ctx.Done():
+			j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
+			return
+		case d, ok := <-msgs:
+			if !ok {
+				j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
+				return
+			}
+
+			// 尝试从消息头获取 trace_id,如果不存在则生成一个新的
+			traceID, ok := d.Headers["trace_id"].(string)
+			if !ok || traceID == "" {
+				traceID = uuid.New().String()
+			}
+
+			// 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录
+			scopedLogger := j.logger.With(zap.String("trace_id", traceID))
+
+			// 创建一个带有 trace_id 的 context,用于传递给下游服务
+			ctxWithTrace := context.WithValue(ctx, "trace_id", traceID)
+
+			// 调用具体的业务处理器
+			processingErr := handler(ctxWithTrace, scopedLogger, d)
+
+			// 根据处理结果统一进行 Ack/Nack
+			if processingErr != nil {
+				// 业务失败,拒绝消息并不重新入队
+				if err := d.Nack(false, false); err != nil {
+					scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName))
+				}
+			} else {
+				// 业务处理成功,手动发送确认
+				if err := d.Ack(false); err != nil {
+					scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName))
+				}
+			}
+		}
+	}
+}

+ 71 - 0
internal/job/wafLog.go

@@ -0,0 +1,71 @@
+package job
+
+import (
+	"context"
+	"encoding/json"
+	adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
+	"github.com/go-nunu/nunu-layout-advanced/internal/service/admin"
+	"github.com/rabbitmq/amqp091-go"
+	"go.uber.org/zap"
+)
+
+// 使用公共的 TaskHandler 定义
+
+// WafLogJob 定义了处理白名单相关任务的接口
+type WafLogJob interface {
+	// DomainConsumer 启动消费者,处理域名白名单任务
+	AddWafLogConsumer(ctx context.Context)
+
+}
+
+// NewWafLogJob 创建一个新的 WhitelistJob
+func NewWafLogJob(job *Job,
+	wafLogService admin.WafLogService,
+) WafLogJob {
+	return &wafLogJob{
+		Job:          job,
+		wafLogService: wafLogService,
+	}
+}
+
+type wafLogJob struct {
+	*Job
+	wafLogService admin.WafLogService
+}
+
+// DomainConsumer 启动域名白名单消费者
+func (j *wafLogJob) AddWafLogConsumer(ctx context.Context) {
+	j.consume(ctx, "waf_log", "waf_log_consumer", j.handleDomainMessage)
+}
+
+// consume 调用公共的 Consume 方法
+func (j *wafLogJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
+	j.Job.Consume(ctx, taskName, consumerName, handler)
+}
+
+// handleDomainMessage 是域名白名单任务的具体处理器
+func (j *wafLogJob) handleDomainMessage(ctx context.Context, logger *zap.Logger, d amqp091.Delivery) error {
+
+	var payload adminApi.WafLog
+	if err := json.Unmarshal(d.Body, &payload); err != nil {
+		logger.Error("解析添加日志消息失败", zap.Error(err), zap.ByteString("body", d.Body))
+		return nil // 返回 nil 以避免消息重入队列,因为这是一个格式错误
+	}
+
+	logger.Info("收到添加日志任务",
+		zap.Int("hostId", payload.HostId),
+		zap.Int("uid", payload.Uid),
+		zap.String("routing_key", d.RoutingKey),
+	)
+
+	var processingErr error
+	processingErr = j.wafLogService.AddWafLog(ctx, payload)
+
+	if processingErr != nil {
+		logger.Error("处理域名白名单任务失败", zap.Error(processingErr),zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid),zap.Any("req", payload))
+	} else {
+		logger.Info("已成功处理域名白名单任务", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", payload))
+	}
+
+	return processingErr
+}

+ 4 - 64
internal/job/whitelist.go

@@ -6,7 +6,6 @@ import (
 	"fmt"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service/api/waf"
-	"github.com/google/uuid"
 	"github.com/rabbitmq/amqp091-go"
 	"go.uber.org/zap"
 	"strconv"
@@ -14,9 +13,7 @@ import (
 	"sync"
 )
 
-// taskHandler 定义了处理单个消息的函数签名
-// 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。
-type taskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error
+// 使用公共的 TaskHandler 定义
 
 // WhitelistJob 定义了处理白名单相关任务的接口
 type WhitelistJob interface {
@@ -55,66 +52,9 @@ func (j *whitelistJob) IpConsumer(ctx context.Context) {
 	j.consume(ctx, "ip_white", "ip_white_consumer", j.handleIpMessage)
 }
 
-// consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑
-func (j *whitelistJob) consume(ctx context.Context, taskName, consumerName string, handler taskHandler) {
-	taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
-	if !ok {
-		j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
-		return
-	}
-
-	j.logger.Info("正在启动消费者...",
-		zap.String("task", taskName),
-		zap.String("queue", taskCfg.Queue),
-		zap.String("consumer", consumerName),
-	)
-
-	msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
-	if err != nil {
-		j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err))
-		return
-	}
-
-	for {
-		select {
-		case <-ctx.Done():
-			j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
-			return
-		case d, ok := <-msgs:
-			if !ok {
-				j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
-				return
-			}
-
-			// 尝试从消息头获取 trace_id,如果不存在则生成一个新的
-			traceID, ok := d.Headers["trace_id"].(string)
-			if !ok || traceID == "" {
-				traceID = uuid.New().String()
-			}
-
-			// 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录
-			scopedLogger := j.logger.With(zap.String("trace_id", traceID))
-
-			// 创建一个带有 trace_id 的 context,用于传递给下游服务
-			ctxWithTrace := context.WithValue(ctx, "trace_id", traceID)
-
-			// 调用具体的业务处理器
-			processingErr := handler(ctxWithTrace, scopedLogger, d)
-
-			// 根据处理结果统一进行 Ack/Nack
-			if processingErr != nil {
-				// 业务失败,拒绝消息并不重新入队
-				if err := d.Nack(false, false); err != nil {
-					scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName))
-				}
-			} else {
-				// 业务处理成功,手动发送确认
-				if err := d.Ack(false); err != nil {
-					scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName))
-				}
-			}
-		}
-	}
+// consume 调用公共的 Consume 方法
+func (j *whitelistJob) consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
+	j.Job.Consume(ctx, taskName, consumerName, handler)
 }
 
 // handleDomainMessage 是域名白名单任务的具体处理器

+ 14 - 0
internal/server/job.go

@@ -14,17 +14,20 @@ type JobServer struct {
 	userJob      job.UserJob
 	whitelistJob job.WhitelistJob
 	wg           sync.WaitGroup
+	wafLogJob    job.WafLogJob
 }
 
 func NewJobServer(
 	log *log.Logger,
 	userJob job.UserJob,
 	whitelistJob job.WhitelistJob,
+	wafLogJob job.WafLogJob,
 ) *JobServer {
 	return &JobServer{
 		log:          log,
 		userJob:      userJob,
 		whitelistJob: whitelistJob,
+		wafLogJob:    wafLogJob,
 	}
 }
 
@@ -60,6 +63,17 @@ func (j *JobServer) Start(ctx context.Context) error {
 		j.whitelistJob.IpConsumer(ctx)
 	}()
 
+	j.wg.Add(1)
+	go func() {
+		defer func() {
+			if r := recover(); r != nil {
+				j.log.Error("wafLogJob domain consumer panic", zap.Any("error", r))
+			}
+			j.wg.Done()
+		}()
+		j.wafLogJob.AddWafLogConsumer(ctx)
+	}()
+
 	j.wg.Wait()
 
 	return nil

+ 46 - 1
internal/service/admin/waflog.go

@@ -3,11 +3,15 @@ package admin
 import (
 	"context"
 	"encoding/json"
+	"fmt"
+	adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/model"
 	adminRep "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
-	adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
 	"github.com/go-nunu/nunu-layout-advanced/internal/repository/api/waf"
 	"github.com/go-nunu/nunu-layout-advanced/internal/service"
+	"github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
+	amqp "github.com/rabbitmq/amqp091-go"
+	"go.uber.org/zap"
 	"strings"
 )
 
@@ -15,16 +19,19 @@ type WafLogService interface {
 	GetWafLog(ctx context.Context, id int64) (*model.WafLog, error)
 	GetWafLogList(ctx context.Context) ([]model.WafLog, error)
 	AddWafLog(ctx context.Context, req adminApi.WafLog) error
+	PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
 }
 func NewWafLogService(
     service *service.Service,
     wafLogRepository adminRep.WafLogRepository,
 	globalLimitRepository waf.GlobalLimitRepository,
+	mq *rabbitmq.RabbitMQ,
 ) WafLogService {
 	return &wafLogService{
 		Service:        service,
 		wafLogRepository: wafLogRepository,
 		globalLimitRepository: globalLimitRepository,
+		mq : mq,
 	}
 }
 
@@ -32,6 +39,7 @@ type wafLogService struct {
 	*service.Service
 	wafLogRepository adminRep.WafLogRepository
 	globalLimitRepository waf.GlobalLimitRepository
+	mq *rabbitmq.RabbitMQ
 }
 
 var ApiDescriptionMap = map[string]string{
@@ -141,3 +149,40 @@ func (s *wafLogService) AddWafLog(ctx context.Context, req adminApi.WafLog) erro
 
 	})
 }
+
+func (s *wafLogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
+
+	payload := &req
+
+	// Serialize the message
+	msgBody, err := json.Marshal(payload)
+	if err != nil {
+		s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
+		return
+	}
+
+	// Get task configuration
+	taskCfg, ok := s.mq.GetTaskConfig("waf_log")
+	if !ok {
+		s.Logger.Error("无法获取“waf_Log”任务配置")
+		return
+	}
+
+	// Construct the routing key dynamically based on the action
+	routingKey := fmt.Sprintf("wafLog.%s", "add")
+
+	// Construct the amqp.Publishing message
+	publishingMsg := amqp.Publishing{
+		ContentType:  "application/json",
+		Body:         msgBody,
+		DeliveryMode: amqp.Persistent, // Persistent message
+	}
+
+	// Publish the message
+	err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
+	if err != nil {
+		s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
+	} else {
+		s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
+	}
+}