package job import ( "context" "fmt" "github.com/go-nunu/nunu-layout-advanced/internal/repository" "go.uber.org/zap" ) type UserJob interface { // RegisterConsumer 启动消费者,处理用户注册后的任务 RegisterConsumer(ctx context.Context) } func NewUserJob( job *Job, userRepo repository.UserRepository, ) UserJob { return &userJob{ userRepo: userRepo, Job: job, } } type userJob struct { userRepo repository.UserRepository *Job } func (j *userJob) RegisterConsumer(ctx context.Context) { taskName := "user_register" taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName) if !ok { j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName)) return } consumerName := "user_register_consumer" j.logger.Info("正在启动消费者...", zap.String("task", taskName), zap.String("queue", taskCfg.Queue), zap.String("consumer", consumerName), ) msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount) if err != nil { j.logger.Error("启动消费者失败", zap.Error(err)) return } for { select { case <-ctx.Done(): j.logger.Info("消费者正在关闭...", zap.String("task", taskName)) return case d, ok := <-msgs: if !ok { j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName)) return } j.logger.Info("收到新消息", zap.String("exchange", d.Exchange), zap.String("routing_key", d.RoutingKey), zap.ByteString("body", d.Body), ) // 在这里处理你的业务逻辑 // ... // 业务处理完成后,手动发送确认 if err := d.Ack(false); err != nil { j.logger.Error("消息确认失败", zap.Error(err)) // 你可以在这里决定是否需要重试或将消息放入死信队列 // d.Nack(false, true) // requeue } } } }