job.go 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package server
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/go-nunu/nunu-layout-advanced/internal/job"
  6. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  7. "go.uber.org/zap"
  8. )
  9. type JobServer struct {
  10. log *log.Logger
  11. userJob job.UserJob
  12. whitelistJob job.WhitelistJob
  13. wg sync.WaitGroup
  14. wafLogJob job.WafLogJob
  15. }
  16. func NewJobServer(
  17. log *log.Logger,
  18. userJob job.UserJob,
  19. whitelistJob job.WhitelistJob,
  20. wafLogJob job.WafLogJob,
  21. ) *JobServer {
  22. return &JobServer{
  23. log: log,
  24. userJob: userJob,
  25. whitelistJob: whitelistJob,
  26. wafLogJob: wafLogJob,
  27. }
  28. }
  29. func (j *JobServer) Start(ctx context.Context) error {
  30. j.log.Info("job server starting...")
  31. // 启动 UserJob 的消费者
  32. //j.wg.Add(1)
  33. //go func() {
  34. // defer j.wg.Done()
  35. // j.userJob.RegisterConsumer(ctx)
  36. //}()
  37. // 启动 WhitelistJob 的消费者
  38. j.wg.Add(1)
  39. go func() {
  40. defer func() {
  41. if r := recover(); r != nil {
  42. j.log.Error("whitelistJob domain consumer panic", zap.Any("error", r))
  43. }
  44. j.wg.Done()
  45. }()
  46. j.whitelistJob.DomainConsumer(ctx)
  47. }()
  48. j.wg.Add(1)
  49. go func() {
  50. defer func() {
  51. if r := recover(); r != nil {
  52. j.log.Error("whitelistJob ip consumer panic", zap.Any("error", r))
  53. }
  54. j.wg.Done()
  55. }()
  56. j.whitelistJob.IpConsumer(ctx)
  57. }()
  58. j.wg.Add(1)
  59. go func() {
  60. defer func() {
  61. if r := recover(); r != nil {
  62. j.log.Error("wafLogJob domain consumer panic", zap.Any("error", r))
  63. }
  64. j.wg.Done()
  65. }()
  66. j.wafLogJob.AddWafLogConsumer(ctx)
  67. }()
  68. j.wg.Wait()
  69. return nil
  70. }
  71. func (j *JobServer) Stop(ctx context.Context) error {
  72. return nil
  73. }