123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- package job
- import (
- "context"
- "fmt"
- "github.com/go-nunu/nunu-layout-advanced/internal/repository"
- "github.com/go-nunu/nunu-layout-advanced/pkg/jwt"
- "github.com/go-nunu/nunu-layout-advanced/pkg/log"
- "github.com/go-nunu/nunu-layout-advanced/pkg/rabbitmq"
- "github.com/go-nunu/nunu-layout-advanced/pkg/sid"
- "github.com/google/uuid"
- "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- )
- // TaskHandler 定义了处理单个消息的函数签名
- // 它负责业务逻辑的执行,并返回一个 error 来告知调用者处理是否成功。
- type TaskHandler func(ctx context.Context, logger *zap.Logger, delivery amqp091.Delivery) error
- type Job struct {
- logger *log.Logger
- sid *sid.Sid
- jwt *jwt.JWT
- tm repository.Transaction
- Rabbitmq *rabbitmq.RabbitMQ
- }
- func NewJob(
- tm repository.Transaction,
- logger *log.Logger,
- sid *sid.Sid,
- mq *rabbitmq.RabbitMQ,
- ) *Job {
- return &Job{
- logger: logger,
- sid: sid,
- tm: tm,
- Rabbitmq: mq,
- }
- }
- // Consume 是一个通用的 RabbitMQ 消费者方法,封装了重复的逻辑
- func (j *Job) Consume(ctx context.Context, taskName, consumerName string, handler TaskHandler) {
- taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
- if !ok {
- j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
- return
- }
- j.logger.Info("正在启动消费者...",
- zap.String("task", taskName),
- zap.String("queue", taskCfg.Queue),
- zap.String("consumer", consumerName),
- )
- msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
- if err != nil {
- j.logger.Error("启动消费者失败", zap.String("task", taskName), zap.Error(err))
- return
- }
- for {
- select {
- case <-ctx.Done():
- j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
- return
- case d, ok := <-msgs:
- if !ok {
- j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
- return
- }
- // 尝试从消息头获取 trace_id,如果不存在则生成一个新的
- traceID, ok := d.Headers["trace_id"].(string)
- if !ok || traceID == "" {
- traceID = uuid.New().String()
- }
- // 创建一个带有 trace_id 的 logger,用于本次任务的所有日志记录
- scopedLogger := j.logger.With(zap.String("trace_id", traceID))
- // 创建一个带有 trace_id 的 context,用于传递给下游服务
- ctxWithTrace := context.WithValue(ctx, "trace_id", traceID)
- // 调用具体的业务处理器
- processingErr := handler(ctxWithTrace, scopedLogger, d)
- // 根据处理结果统一进行 Ack/Nack
- if processingErr != nil {
- // 业务失败,拒绝消息并不重新入队
- if err := d.Nack(false, false); err != nil {
- scopedLogger.Error("消息 Nack 失败", zap.Error(err), zap.String("task", taskName))
- }
- } else {
- // 业务处理成功,手动发送确认
- if err := d.Ack(false); err != nil {
- scopedLogger.Error("消息 Ack 失败", zap.Error(err), zap.String("task", taskName))
- }
- }
- }
- }
- }
|