12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- package job
- import (
- "context"
- "fmt"
- "github.com/go-nunu/nunu-layout-advanced/internal/repository"
- "go.uber.org/zap"
- )
- type UserJob interface {
- // RegisterConsumer 启动消费者,处理用户注册后的任务
- RegisterConsumer(ctx context.Context)
- }
- func NewUserJob(
- job *Job,
- userRepo repository.UserRepository,
- ) UserJob {
- return &userJob{
- userRepo: userRepo,
- Job: job,
- }
- }
- type userJob struct {
- userRepo repository.UserRepository
- *Job
- }
- func (j *userJob) RegisterConsumer(ctx context.Context) {
- taskName := "user_register"
- taskCfg, ok := j.Rabbitmq.GetTaskConfig(taskName)
- if !ok {
- j.logger.Error(fmt.Sprintf("未找到任务 '%s' 的配置", taskName))
- return
- }
- consumerName := "user_register_consumer"
- j.logger.Info("正在启动消费者...",
- zap.String("task", taskName),
- zap.String("queue", taskCfg.Queue),
- zap.String("consumer", consumerName),
- )
- msgs, err := j.Rabbitmq.Consume(taskCfg.Queue, consumerName, taskCfg.PrefetchCount)
- if err != nil {
- j.logger.Error("启动消费者失败", zap.Error(err))
- return
- }
- for {
- select {
- case <-ctx.Done():
- j.logger.Info("消费者正在关闭...", zap.String("task", taskName))
- return
- case d, ok := <-msgs:
- if !ok {
- j.logger.Warn("消息通道已关闭,消费者退出。", zap.String("task", taskName))
- return
- }
- j.logger.Info("收到新消息",
- zap.String("exchange", d.Exchange),
- zap.String("routing_key", d.RoutingKey),
- zap.ByteString("body", d.Body),
- )
- // 在这里处理你的业务逻辑
- // ...
- // 业务处理完成后,手动发送确认
- if err := d.Ack(false); err != nil {
- j.logger.Error("消息确认失败", zap.Error(err))
- // 你可以在这里决定是否需要重试或将消息放入死信队列
- // d.Nack(false, true) // requeue
- }
- }
- }
- }
|