job.go 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. package server
  2. import (
  3. "context"
  4. "sync"
  5. "github.com/go-nunu/nunu-layout-advanced/internal/job"
  6. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  7. "go.uber.org/zap"
  8. )
  9. type JobServer struct {
  10. log *log.Logger
  11. userJob job.UserJob
  12. whitelistJob job.WhitelistJob
  13. wg sync.WaitGroup
  14. }
  15. func NewJobServer(
  16. log *log.Logger,
  17. userJob job.UserJob,
  18. whitelistJob job.WhitelistJob,
  19. ) *JobServer {
  20. return &JobServer{
  21. log: log,
  22. userJob: userJob,
  23. whitelistJob: whitelistJob,
  24. }
  25. }
  26. func (j *JobServer) Start(ctx context.Context) error {
  27. j.log.Info("job server starting...")
  28. // 启动 UserJob 的消费者
  29. //j.wg.Add(1)
  30. //go func() {
  31. // defer j.wg.Done()
  32. // j.userJob.RegisterConsumer(ctx)
  33. //}()
  34. // 启动 WhitelistJob 的消费者
  35. j.wg.Add(1)
  36. go func() {
  37. defer func() {
  38. if r := recover(); r != nil {
  39. j.log.Error("whitelistJob domain consumer panic", zap.Any("error", r))
  40. }
  41. j.wg.Done()
  42. }()
  43. j.whitelistJob.DomainConsumer(ctx)
  44. }()
  45. j.wg.Add(1)
  46. go func() {
  47. defer func() {
  48. if r := recover(); r != nil {
  49. j.log.Error("whitelistJob ip consumer panic", zap.Any("error", r))
  50. }
  51. j.wg.Done()
  52. }()
  53. j.whitelistJob.IpConsumer(ctx)
  54. }()
  55. j.wg.Wait()
  56. return nil
  57. }
  58. func (j *JobServer) Stop(ctx context.Context) error {
  59. return nil
  60. }