waflog.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. package waf
  2. import (
  3. "context"
  4. "encoding/json"
  5. "fmt"
  6. adminApi "github.com/go-nunu/nunu-layout-advanced/api/v1/admin"
  7. "github.com/go-nunu/nunu-layout-advanced/internal/service"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  9. amqp "github.com/rabbitmq/amqp091-go"
  10. "go.uber.org/zap"
  11. )
  12. type WaflogService interface {
  13. PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog)
  14. }
  15. func NewWaflogService(
  16. service *service.Service,
  17. mq *rabbitmq.RabbitMQ,
  18. ) WaflogService {
  19. return &waflogService{
  20. Service: service,
  21. mq : mq,
  22. }
  23. }
  24. type waflogService struct {
  25. *service.Service
  26. mq *rabbitmq.RabbitMQ
  27. }
  28. func (s *waflogService) PublishIpWafLogTask(ctx context.Context, req adminApi.WafLog) {
  29. payload := &req
  30. msgBody, err := json.Marshal(payload)
  31. if err != nil {
  32. s.Logger.Error("序列化 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  33. return
  34. }
  35. taskCfg, ok := s.mq.GetTaskConfig("waf_log")
  36. if !ok {
  37. s.Logger.Error("无法获取“waf_Log”任务配置")
  38. return
  39. }
  40. routingKey := fmt.Sprintf("wafLog.%s", "add")
  41. publishingMsg := amqp.Publishing{
  42. ContentType: "application/json",
  43. Body: msgBody,
  44. DeliveryMode: amqp.Persistent,
  45. }
  46. err = s.mq.PublishWithCh(taskCfg.Exchange, routingKey, publishingMsg)
  47. if err != nil {
  48. s.Logger.Error("发布 WafLog 任务消息失败", zap.Error(err), zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  49. } else {
  50. s.Logger.Info("已成功发布 WafLog 任务消息", zap.Int("hostId", payload.HostId), zap.Int("uid", payload.Uid), zap.Any("req", req))
  51. }
  52. }