user.go 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. package job
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-nunu/nunu-layout-advanced/internal/repository/admin"
  6. "go.uber.org/zap"
  7. )
  8. type UserJob interface {
  9. // RegisterConsumer 启动消费者,处理用户注册后的任务
  10. RegisterConsumer(ctx context.Context)
  11. }
  12. func NewUserJob(
  13. job *Job,
  14. userRepo admin.UserRepository,
  15. ) UserJob {
  16. return &userJob{
  17. userRepo: userRepo,
  18. Job: job,
  19. }
  20. }
  21. type userJob struct {
  22. userRepo admin.UserRepository
  23. *Job
  24. }
  25. func (j *userJob) RegisterConsumer(ctx context.Context) {
  26. taskName := "user_register"
  27. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  28. if !ok {
  29. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  30. return
  31. }
  32. consumerName := "user_register_consumer"
  33. j.logger.Info("正在启动消费者...",
  34. zap.String("task", taskName),
  35. zap.String("queue", taskCfg.Queue),
  36. zap.String("consumer", consumerName),
  37. )
  38. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  39. if err != nil {
  40. j.logger.Error("启动消费者失败", zap.Error(err))
  41. return
  42. }
  43. for {
  44. select {
  45. case <-ctx.Done():
  46. j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
  47. return
  48. case d, ok := <-msgs:
  49. if !ok {
  50. j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
  51. return
  52. }
  53. j.logger.Info("收到新消息",
  54. zap.String("exchange", d.Exchange),
  55. zap.String("routing_key", d.RoutingKey),
  56. zap.ByteString("body", d.Body),
  57. )
  58. // 在这里处理你的业务逻辑
  59. // ...
  60. // 业务处理完成后,手动发送确认
  61. if err := d.Ack(false); err != nil {
  62. j.logger.Error("消息确认失败", zap.Error(err))
  63. // 你可以在这里决定是否需要重试或将消息放入死信队列
  64. // d.Nack(false, true) // requeue
  65. }
  66. }
  67. }
  68. }