rabbitmq.go 9.4 KB


  1. package rabbitmq
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  9. amqp "github.com/rabbitmq/amqp091-go"
  10. "go.uber.org/zap"
  11. )
  12. var (
  13. ErrClosed = errors.New("rabbitmq: client is closed")
  14. )
  15. type Config struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. Username string `yaml:"username"`
  19. Password string `yaml:"password"`
  20. VHost string `yaml:"vhost"`
  21. ConnectionTimeout time.Duration `yaml:"connection_timeout"`
  22. Tasks map[string]TaskConfig `yaml:"tasks"` // 支持多个任务配置
  23. }
  24. type TaskConfig struct {
  25. Exchange string `mapstructure:"exchange"`
  26. ExchangeType string `mapstructure:"exchange_type"`
  27. Queue string `mapstructure:"queue"`
  28. RoutingKey string `mapstructure:"routing_key"`
  29. ConsumerCount int `mapstructure:"consumer_count"`
  30. PrefetchCount int `mapstructure:"prefetch_count"`
  31. }
  32. type RabbitMQ struct {
  33. config Config
  34. conn *amqp.Connection
  35. ch *amqp.Channel
  36. logger *log.Logger
  37. mu sync.RWMutex
  38. closed bool
  39. }
  40. // New 创建新的RabbitMQ客户端
  41. func New(config Config, logger *log.Logger) (*RabbitMQ, error) {
  42. r := &RabbitMQ{
  43. config: config,
  44. logger: logger,
  45. }
  46. if err := r.Connect(); err != nil {
  47. return nil, err
  48. }
  49. if err := r.SetupAllTaskQueues(); err != nil {
  50. _ = r.Close() // Attempt to close the connection if setup fails
  51. return nil, fmt.Errorf("failed to setup task queues: %w", err)
  52. }
  53. go r.reconnectLoop()
  54. return r, nil
  55. }
  56. // Connect 连接到RabbitMQ服务器
  57. func (r *RabbitMQ) Connect() error {
  58. r.mu.Lock()
  59. defer r.mu.Unlock()
  60. if r.conn != nil && !r.conn.IsClosed() {
  61. _ = r.ch.Close()
  62. _ = r.conn.Close()
  63. }
  64. vhost := r.config.VHost
  65. if vhost == "" {
  66. vhost = "/"
  67. } else if vhost[0] != '/' {
  68. vhost = "/" + vhost
  69. }
  70. // 构造完整的连接URL
  71. fullURL := fmt.Sprintf("amqp://%s:%s@%s:%d%s",
  72. r.config.Username,
  73. r.config.Password,
  74. r.config.Host,
  75. r.config.Port,
  76. vhost,
  77. )
  78. r.logger.Info("正在尝试连接到 RabbitMQ...", zap.String("url", fullURL))
  79. var err error
  80. r.conn, err = amqp.Dial(fullURL)
  81. if err != nil {
  82. // 记录详细的底层错误
  83. r.logger.Error("连接RabbitMQ失败", zap.Error(err))
  84. return fmt.Errorf("连接RabbitMQ失败: %w", err)
  85. }
  86. r.ch, err = r.conn.Channel()
  87. if err != nil {
  88. _ = r.conn.Close()
  89. return fmt.Errorf("创建通道失败: %w", err)
  90. }
  91. r.closed = false
  92. r.logger.Info("RabbitMQ连接成功")
  93. return nil
  94. }
  95. // reconnectLoop 监控连接状态并处理重连
  96. func (r *RabbitMQ) reconnectLoop() {
  97. for {
  98. closeChan := make(chan *amqp.Error)
  99. r.mu.RLock()
  100. if r.conn == nil {
  101. r.mu.RUnlock()
  102. time.Sleep(5 * time.Second)
  103. continue
  104. }
  105. r.conn.NotifyClose(closeChan)
  106. isClosed := r.closed
  107. r.mu.RUnlock()
  108. if isClosed {
  109. r.logger.Info("RabbitMQ客户端已关闭,停止重连循环。")
  110. return
  111. }
  112. closeErr := <-closeChan
  113. if closeErr != nil {
  114. r.logger.Error("RabbitMQ连接断开,将尝试重新连接", zap.Error(closeErr))
  115. } else {
  116. r.logger.Info("RabbitMQ连接正常关闭。")
  117. }
  118. r.mu.RLock()
  119. isClosed = r.closed
  120. r.mu.RUnlock()
  121. if isClosed {
  122. r.logger.Info("RabbitMQ客户端已关闭,停止重连。")
  123. return
  124. }
  125. backoff := 1 * time.Second
  126. maxBackoff := 30 * time.Second
  127. for {
  128. if r.isClosed() {
  129. return
  130. }
  131. err := r.Connect()
  132. if err == nil {
  133. r.logger.Info("RabbitMQ重新连接成功")
  134. // 重新设置任务队列
  135. if err := r.SetupAllTaskQueues(); err != nil {
  136. r.logger.Error("重新设置所有任务队列失败", zap.Error(err))
  137. }
  138. break
  139. }
  140. r.logger.Error("RabbitMQ重连失败", zap.Error(err), zap.Duration("backoff", backoff))
  141. time.Sleep(backoff)
  142. backoff *= 2
  143. if backoff > maxBackoff {
  144. backoff = maxBackoff
  145. }
  146. }
  147. }
  148. }
  149. // Close 关闭连接
  150. func (r *RabbitMQ) Close() error {
  151. r.mu.Lock()
  152. defer r.mu.Unlock()
  153. if r.closed {
  154. return nil
  155. }
  156. r.closed = true
  157. var errs []error
  158. if r.ch != nil {
  159. if err := r.ch.Close(); err != nil {
  160. errs = append(errs, fmt.Errorf("关闭channel失败: %w", err))
  161. }
  162. }
  163. if r.conn != nil && !r.conn.IsClosed() {
  164. if err := r.conn.Close(); err != nil {
  165. errs = append(errs, fmt.Errorf("关闭connection失败: %w", err))
  166. }
  167. }
  168. if len(errs) > 0 {
  169. return fmt.Errorf("关闭RabbitMQ时发生错误: %v", errs)
  170. }
  171. return nil
  172. }
  173. func (r *RabbitMQ) isClosed() bool {
  174. r.mu.RLock()
  175. defer r.mu.RUnlock()
  176. return r.closed
  177. }
  178. // GetTaskConfig retrieves a specific task's configuration.
  179. func (r *RabbitMQ) GetTaskConfig(name string) (TaskConfig, bool) {
  180. taskCfg, ok := r.config.Tasks[name]
  181. return taskCfg, ok
  182. }
  183. func (r *RabbitMQ) withChannel(fn func(*amqp.Channel) error) error {
  184. if r.isClosed() {
  185. return ErrClosed
  186. }
  187. r.mu.RLock()
  188. defer r.mu.RUnlock()
  189. if r.ch == nil || r.conn.IsClosed() {
  190. return errors.New("rabbitmq: channel or connection is not available")
  191. }
  192. return fn(r.ch)
  193. }
  194. // Publish sends a message to the specified exchange with the given routing key.
  195. // This is a convenience wrapper around PublishWithCh.
  196. func (r *RabbitMQ) Publish(exchange, routingKey string, body []byte) error {
  197. return r.PublishWithCh(exchange, routingKey, amqp.Publishing{
  198. ContentType: "text/plain",
  199. Body: body,
  200. DeliveryMode: amqp.Persistent, // Default to persistent
  201. })
  202. }
  203. // PublishWithCh sends a message to the specified exchange with the given routing key using a custom amqp.Publishing struct.
  204. func (r *RabbitMQ) PublishWithCh(exchange, routingKey string, msg amqp.Publishing) error {
  205. if r.isClosed() {
  206. return errors.New("rabbitmq connection is closed")
  207. }
  208. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  209. defer cancel()
  210. return r.withChannel(func(ch *amqp.Channel) error {
  211. return ch.PublishWithContext(ctx,
  212. exchange,
  213. routingKey,
  214. false, // mandatory
  215. false, // immediate
  216. msg,
  217. )
  218. })
  219. }
  220. // Consume 获取消息消费通道. 注意: Qos的设置需要调用方在获取channel后自行处理,或者为Consume方法增加prefetchCount参数
  221. func (r *RabbitMQ) Consume(queue, consumer string, prefetchCount int) (<-chan amqp.Delivery, error) {
  222. var deliveries <-chan amqp.Delivery
  223. err := r.withChannel(func(ch *amqp.Channel) error {
  224. if err := ch.Qos(prefetchCount, 0, false); err != nil {
  225. return fmt.Errorf("设置Qos失败: %w", err)
  226. }
  227. var err error
  228. deliveries, err = ch.Consume(
  229. queue,
  230. consumer,
  231. false, // auto-ack: false, 手动确认
  232. false, // exclusive
  233. false, // no-local
  234. false, // no-wait
  235. nil, // args
  236. )
  237. return err
  238. })
  239. return deliveries, err
  240. }
  241. // SetupAllTaskQueues 遍历配置中的所有任务,并为每个任务设置队列
  242. func (r *RabbitMQ) SetupAllTaskQueues() error {
  243. if len(r.config.Tasks) == 0 {
  244. r.logger.Info("在配置中未找到任何任务队列定义。")
  245. return nil
  246. }
  247. for name, taskCfg := range r.config.Tasks {
  248. if err := r.setupQueue(taskCfg); err != nil {
  249. return fmt.Errorf("为任务 '%s' 设置队列失败: %w", name, err)
  250. }
  251. }
  252. return nil
  253. }
  254. // setupQueue 为单个任务配置设置交换机、队列和绑定
  255. func (r *RabbitMQ) setupQueue(taskCfg TaskConfig) error {
  256. if taskCfg.Exchange == "" {
  257. r.logger.Warn("任务队列的交换机名称为空,将使用默认交换机。这在多任务场景下可能导致问题。", zap.String("queue", taskCfg.Queue))
  258. return r.withChannel(func(ch *amqp.Channel) error {
  259. _, err := ch.QueueDeclare(taskCfg.Queue, true, false, false, false, nil)
  260. if err != nil {
  261. return fmt.Errorf("声明队列失败 (默认交换机): %w", err)
  262. }
  263. r.logger.Info("成功声明队列并绑定到默认交换机", zap.String("queue", taskCfg.Queue))
  264. return nil
  265. })
  266. }
  267. return r.withChannel(func(ch *amqp.Channel) error {
  268. // 声明主交换机
  269. exchangeType := taskCfg.ExchangeType
  270. if exchangeType == "" {
  271. exchangeType = "direct" // 默认为 direct 类型,兼容旧配置
  272. }
  273. err := ch.ExchangeDeclare(
  274. taskCfg.Exchange, // name
  275. exchangeType, // type
  276. true, // durable
  277. false, // autoDelete
  278. false, // internal
  279. false, // noWait
  280. nil, // args
  281. )
  282. if err != nil {
  283. return fmt.Errorf("声明主交换机 '%s' 失败: %w", taskCfg.Exchange, err)
  284. }
  285. // 为主队列设置死信交换机参数
  286. dlxExchange := taskCfg.Exchange + ".dlx"
  287. args := amqp.Table{
  288. "x-dead-letter-exchange": dlxExchange,
  289. }
  290. // 声明主队列
  291. _, err = ch.QueueDeclare(taskCfg.Queue, true, false, false, false, args)
  292. if err != nil {
  293. return fmt.Errorf("声明主队列 '%s' 失败: %w", taskCfg.Queue, err)
  294. }
  295. // 绑定主队列到主交换机
  296. if err := ch.QueueBind(taskCfg.Queue, taskCfg.RoutingKey, taskCfg.Exchange, false, nil); err != nil {
  297. return fmt.Errorf("绑定主队列失败: %w", err)
  298. }
  299. // --- 设置死信队列 ---
  300. // 声明死信交换机 (DLX)
  301. if err := ch.ExchangeDeclare(dlxExchange, "direct", true, false, false, false, nil); err != nil {
  302. return fmt.Errorf("声明死信交换机 '%s' 失败: %w", dlxExchange, err)
  303. }
  304. // 声明死信队列 (DLQ)
  305. dlq := taskCfg.Queue + ".dlq"
  306. _, err = ch.QueueDeclare(dlq, true, false, false, false, nil)
  307. if err != nil {
  308. return fmt.Errorf("声明死信队列 '%s' 失败: %w", dlq, err)
  309. }
  310. // 绑定DLQ到DLX,使用与主队列相同的路由键
  311. if err := ch.QueueBind(dlq, taskCfg.RoutingKey, dlxExchange, false, nil); err != nil {
  312. return fmt.Errorf("绑定死信队列失败: %w", err)
  313. }
  314. return nil
  315. })
  316. }