package job import ( "context" "fmt" "github.com/go-nunu/nunu-layout-advanced/internal/repository" "github.com/go-nunu/nunu-layout-advanced/pkg/jwt" "github.com/go-nunu/nunu-layout-advanced/pkg/log" "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq" "github.com/go-nunu/nunu-layout-advanced/pkg/sid" "github.com/google/uuid" "github.com/rabbitmq/amqp091-go" "go.uber.org/zap" ) // TaskHandler 定义了处理单个消息的函数签名 // 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。 type TaskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error type Job struct { logger *log.Logger sid *sid.Sid jwt *jwt.JWT tm repository.Transaction Rabbitmq *rabbitmq.RabbitMQ } func NewJob( tm repository.Transaction, logger *log.Logger, sid *sid.Sid, mq *rabbitmq.RabbitMQ, ) *Job { return &Job{ logger: logger, sid: sid, tm: tm, Rabbitmq: mq, } } // Consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑 func (j *Job) Consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) { taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } j.logger.Info("正在启动消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err)) return } for { select { case <-ctx.Done(): j.logger.Info("消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName)) return } // 尝试从消息头获取 trace_id,如果不存在则生成一个新的 traceID, ok := d.Headers["trace_id"].(string) if !ok || traceID == "" { traceID = uuid.New().String() } // 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录 scopedLogger := j.logger.With(zap.String("trace_id", traceID)) // 创建一个带有 trace_id 的 context,用于传递给下游服务 ctxWithTrace := context.WithValue(ctx, "trace_id", traceID) // 调用具体的业务处理器 processingErr := handler(ctxWithTrace, scopedLogger, d) // 根据处理结果统一进行 Ack/Nack if processingErr != nil { // 业务失败,拒绝消息并不重新入队 if err := d.Nack(false, false); err != nil { scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName)) } } else { // 业务处理成功,手动发送确认 if err := d.Ack(false); err != nil { scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName)) } } } } }