job.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. package job
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/go-nunu/nunu-layout-advanced/internal/repository"
  6. "github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
  7. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
  9. "github.com/go-nunu/nunu-layout-advanced/pkg/sid"
  10. "github.com/google/uuid"
  11. "github.com/rabbitmq/amqp091-go"
  12. "go.uber.org/zap"
  13. )
  14. // TaskHandler 定义了处理单个消息的函数签名
  15. // 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。
  16. type TaskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error
  17. type Job struct {
  18. logger *log.Logger
  19. sid *sid.Sid
  20. jwt *jwt.JWT
  21. tm repository.Transaction
  22. Rabbitmq *rabbitmq.RabbitMQ
  23. }
  24. func NewJob(
  25. tm repository.Transaction,
  26. logger *log.Logger,
  27. sid *sid.Sid,
  28. mq *rabbitmq.RabbitMQ,
  29. ) *Job {
  30. return &Job{
  31. logger: logger,
  32. sid: sid,
  33. tm: tm,
  34. Rabbitmq: mq,
  35. }
  36. }
  37. // Consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑
  38. func (j *Job) Consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
  39. taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
  40. if !ok {
  41. j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
  42. return
  43. }
  44. j.logger.Info("正在启动消费者...",
  45. zap.String("task", taskName),
  46. zap.String("queue", taskCfg.Queue),
  47. zap.String("consumer", consumerName),
  48. )
  49. msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
  50. if err != nil {
  51. j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err))
  52. return
  53. }
  54. for {
  55. select {
  56. case <-ctx.Done():
  57. j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
  58. return
  59. case d, ok := <-msgs:
  60. if !ok {
  61. j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
  62. return
  63. }
  64. // 尝试从消息头获取 trace_id,如果不存在则生成一个新的
  65. traceID, ok := d.Headers["trace_id"].(string)
  66. if !ok || traceID == "" {
  67. traceID = uuid.New().String()
  68. }
  69. // 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录
  70. scopedLogger := j.logger.With(zap.String("trace_id", traceID))
  71. // 创建一个带有 trace_id 的 context,用于传递给下游服务
  72. ctxWithTrace := context.WithValue(ctx, "trace_id", traceID)
  73. // 调用具体的业务处理器
  74. processingErr := handler(ctxWithTrace, scopedLogger, d)
  75. // 根据处理结果统一进行 Ack/Nack
  76. if processingErr != nil {
  77. // 业务失败,拒绝消息并不重新入队
  78. if err := d.Nack(false, false); err != nil {
  79. scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName))
  80. }
  81. } else {
  82. // 业务处理成功,手动发送确认
  83. if err := d.Ack(false); err != nil {
  84. scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName))
  85. }
  86. }
  87. }
  88. }
  89. }