123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- package rabbitmq
- import (
- "context"
- "errors"
- "fmt"
- "sync"
- "time"
- "github.com/go-nunu/nunu-layout-advanced/pkg/log"
- amqp "github.com/rabbitmq/amqp091-go"
- "go.uber.org/zap"
- )
- var (
- ErrClosed = errors.New("rabbitmq: client is closed")
- )
- type Config struct {
- Host string `yaml:"host"`
- Port int `yaml:"port"`
- Username string `yaml:"username"`
- Password string `yaml:"password"`
- VHost string `yaml:"vhost"`
- ConnectionTimeout time.Duration `yaml:"connection_timeout"`
- Tasks map[string]TaskConfig `yaml:"tasks"` // 支持多个任务配置
- }
- type TaskConfig struct {
- Exchange string `mapstructure:"exchange"`
- ExchangeType string `mapstructure:"exchange_type"`
- Queue string `mapstructure:"queue"`
- RoutingKey string `mapstructure:"routing_key"`
- ConsumerCount int `mapstructure:"consumer_count"`
- PrefetchCount int `mapstructure:"prefetch_count"`
- }
- type RabbitMQ struct {
- config Config
- conn *amqp.Connection
- ch *amqp.Channel
- logger *log.Logger
- mu sync.RWMutex
- closed bool
- }
- // New 创建新的RabbitMQ客户端
- func New(config Config, logger *log.Logger) (*RabbitMQ, error) {
- r := &RabbitMQ{
- config: config,
- logger: logger,
- }
- if err := r.Connect(); err != nil {
- return nil, err
- }
- if err := r.SetupAllTaskQueues(); err != nil {
- _ = r.Close() // Attempt to close the connection if setup fails
- return nil, fmt.Errorf("failed to setup task queues: %w", err)
- }
- go r.reconnectLoop()
- return r, nil
- }
- // Connect 连接到RabbitMQ服务器
- func (r *RabbitMQ) Connect() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.conn != nil && !r.conn.IsClosed() {
- _ = r.ch.Close()
- _ = r.conn.Close()
- }
- vhost := r.config.VHost
- if vhost == "" {
- vhost = "/"
- } else if vhost[0] != '/' {
- vhost = "/" + vhost
- }
- // 构造完整的连接URL
- fullURL := fmt.Sprintf("amqp://%s:%s@%s:%d%s",
- r.config.Username,
- r.config.Password,
- r.config.Host,
- r.config.Port,
- vhost,
- )
- r.logger.Info("正在尝试连接到 RabbitMQ...", zap.String("url", fullURL))
- var err error
- r.conn, err = amqp.Dial(fullURL)
- if err != nil {
- // 记录详细的底层错误
- r.logger.Error("连接RabbitMQ失败", zap.Error(err))
- return fmt.Errorf("连接RabbitMQ失败: %w", err)
- }
- r.ch, err = r.conn.Channel()
- if err != nil {
- _ = r.conn.Close()
- return fmt.Errorf("创建通道失败: %w", err)
- }
- r.closed = false
- r.logger.Info("RabbitMQ连接成功")
- return nil
- }
- // reconnectLoop 监控连接状态并处理重连
- func (r *RabbitMQ) reconnectLoop() {
- for {
- closeChan := make(chan *amqp.Error)
- r.mu.RLock()
- if r.conn == nil {
- r.mu.RUnlock()
- time.Sleep(5 * time.Second)
- continue
- }
- r.conn.NotifyClose(closeChan)
- isClosed := r.closed
- r.mu.RUnlock()
- if isClosed {
- r.logger.Info("RabbitMQ客户端已关闭,停止重连循环。")
- return
- }
- closeErr := <-closeChan
- if closeErr != nil {
- r.logger.Error("RabbitMQ连接断开,将尝试重新连接", zap.Error(closeErr))
- } else {
- r.logger.Info("RabbitMQ连接正常关闭。")
- }
- r.mu.RLock()
- isClosed = r.closed
- r.mu.RUnlock()
- if isClosed {
- r.logger.Info("RabbitMQ客户端已关闭,停止重连。")
- return
- }
- backoff := 1 * time.Second
- maxBackoff := 30 * time.Second
- for {
- if r.isClosed() {
- return
- }
- err := r.Connect()
- if err == nil {
- r.logger.Info("RabbitMQ重新连接成功")
- // 重新设置任务队列
- if err := r.SetupAllTaskQueues(); err != nil {
- r.logger.Error("重新设置所有任务队列失败", zap.Error(err))
- }
- break
- }
- r.logger.Error("RabbitMQ重连失败", zap.Error(err), zap.Duration("backoff", backoff))
- time.Sleep(backoff)
- backoff *= 2
- if backoff > maxBackoff {
- backoff = maxBackoff
- }
- }
- }
- }
- // Close 关闭连接
- func (r *RabbitMQ) Close() error {
- r.mu.Lock()
- defer r.mu.Unlock()
- if r.closed {
- return nil
- }
- r.closed = true
- var errs []error
- if r.ch != nil {
- if err := r.ch.Close(); err != nil {
- errs = append(errs, fmt.Errorf("关闭channel失败: %w", err))
- }
- }
- if r.conn != nil && !r.conn.IsClosed() {
- if err := r.conn.Close(); err != nil {
- errs = append(errs, fmt.Errorf("关闭connection失败: %w", err))
- }
- }
- if len(errs) > 0 {
- return fmt.Errorf("关闭RabbitMQ时发生错误: %v", errs)
- }
- return nil
- }
- func (r *RabbitMQ) isClosed() bool {
- r.mu.RLock()
- defer r.mu.RUnlock()
- return r.closed
- }
- // GetTaskConfig retrieves a specific task's configuration.
- func (r *RabbitMQ) GetTaskConfig(name string) (TaskConfig, bool) {
- taskCfg, ok := r.config.Tasks[name]
- return taskCfg, ok
- }
- func (r *RabbitMQ) withChannel(fn func(*amqp.Channel) error) error {
- if r.isClosed() {
- return ErrClosed
- }
- r.mu.RLock()
- defer r.mu.RUnlock()
- if r.ch == nil || r.conn.IsClosed() {
- return errors.New("rabbitmq: channel or connection is not available")
- }
- return fn(r.ch)
- }
- // Publish sends a message to the specified exchange with the given routing key.
- // This is a convenience wrapper around PublishWithCh.
- func (r *RabbitMQ) Publish(exchange, routingKey string, body []byte) error {
- return r.PublishWithCh(exchange, routingKey, amqp.Publishing{
- ContentType: "text/plain",
- Body: body,
- DeliveryMode: amqp.Persistent, // Default to persistent
- })
- }
- // PublishWithCh sends a message to the specified exchange with the given routing key using a custom amqp.Publishing struct.
- func (r *RabbitMQ) PublishWithCh(exchange, routingKey string, msg amqp.Publishing) error {
- if r.isClosed() {
- return errors.New("rabbitmq connection is closed")
- }
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- return r.withChannel(func(ch *amqp.Channel) error {
- return ch.PublishWithContext(ctx,
- exchange,
- routingKey,
- false, // mandatory
- false, // immediate
- msg,
- )
- })
- }
- // Consume 获取消息消费通道. 注意: Qos的设置需要调用方在获取channel后自行处理,或者为Consume方法增加prefetchCount参数
- func (r *RabbitMQ) Consume(queue, consumer string, prefetchCount int) (<-chan amqp.Delivery, error) {
- var deliveries <-chan amqp.Delivery
- err := r.withChannel(func(ch *amqp.Channel) error {
- if err := ch.Qos(prefetchCount, 0, false); err != nil {
- return fmt.Errorf("设置Qos失败: %w", err)
- }
- var err error
- deliveries, err = ch.Consume(
- queue,
- consumer,
- false, // auto-ack: false, 手动确认
- false, // exclusive
- false, // no-local
- false, // no-wait
- nil, // args
- )
- return err
- })
- return deliveries, err
- }
- // SetupAllTaskQueues 遍历配置中的所有任务,并为每个任务设置队列
- func (r *RabbitMQ) SetupAllTaskQueues() error {
- if len(r.config.Tasks) == 0 {
- r.logger.Info("在配置中未找到任何任务队列定义。")
- return nil
- }
- for name, taskCfg := range r.config.Tasks {
- r.logger.Info("正在设置任务队列", zap.String("task_name", name))
- if err := r.setupQueue(taskCfg); err != nil {
- return fmt.Errorf("为任务 '%s' 设置队列失败: %w", name, err)
- }
- }
- return nil
- }
- // setupQueue 为单个任务配置设置交换机、队列和绑定
- func (r *RabbitMQ) setupQueue(taskCfg TaskConfig) error {
- if taskCfg.Exchange == "" {
- r.logger.Warn("任务队列的交换机名称为空,将使用默认交换机。这在多任务场景下可能导致问题。", zap.String("queue", taskCfg.Queue))
- return r.withChannel(func(ch *amqp.Channel) error {
- _, err := ch.QueueDeclare(taskCfg.Queue, true, false, false, false, nil)
- if err != nil {
- return fmt.Errorf("声明队列失败 (默认交换机): %w", err)
- }
- r.logger.Info("成功声明队列并绑定到默认交换机", zap.String("queue", taskCfg.Queue))
- return nil
- })
- }
- return r.withChannel(func(ch *amqp.Channel) error {
- // 声明主交换机
- exchangeType := taskCfg.ExchangeType
- if exchangeType == "" {
- exchangeType = "direct" // 默认为 direct 类型,兼容旧配置
- }
- err := ch.ExchangeDeclare(
- taskCfg.Exchange, // name
- exchangeType, // type
- true, // durable
- false, // autoDelete
- false, // internal
- false, // noWait
- nil, // args
- )
- if err != nil {
- return fmt.Errorf("声明主交换机 '%s' 失败: %w", taskCfg.Exchange, err)
- }
- // 为主队列设置死信交换机参数
- dlxExchange := taskCfg.Exchange + ".dlx"
- args := amqp.Table{
- "x-dead-letter-exchange": dlxExchange,
- }
- // 声明主队列
- _, err = ch.QueueDeclare(taskCfg.Queue, true, false, false, false, args)
- if err != nil {
- return fmt.Errorf("声明主队列 '%s' 失败: %w", taskCfg.Queue, err)
- }
- // 绑定主队列到主交换机
- if err := ch.QueueBind(taskCfg.Queue, taskCfg.RoutingKey, taskCfg.Exchange, false, nil); err != nil {
- return fmt.Errorf("绑定主队列失败: %w", err)
- }
- // --- 设置死信队列 ---
- // 声明死信交换机 (DLX)
- if err := ch.ExchangeDeclare(dlxExchange, "direct", true, false, false, false, nil); err != nil {
- return fmt.Errorf("声明死信交换机 '%s' 失败: %w", dlxExchange, err)
- }
- // 声明死信队列 (DLQ)
- dlq := taskCfg.Queue + ".dlq"
- _, err = ch.QueueDeclare(dlq, true, false, false, false, nil)
- if err != nil {
- return fmt.Errorf("声明死信队列 '%s' 失败: %w", dlq, err)
- }
- // 绑定DLQ到DLX,使用与主队列相同的路由键
- if err := ch.QueueBind(dlq, taskCfg.RoutingKey, dlxExchange, false, nil); err != nil {
- return fmt.Errorf("绑定死信队列失败: %w", err)
- }
- r.logger.Info("成功设置任务队列及其死信队列",
- zap.String("exchange", taskCfg.Exchange),
- zap.String("queue", taskCfg.Queue),
- zap.String("routing_key", taskCfg.RoutingKey),
- )
- return nil
- })
- }
|