rabbitmq.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. package rabbitmq
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "time"
  8. "github.com/go-nunu/nunu-layout-advanced/pkg/log"
  9. amqp "github.com/rabbitmq/amqp091-go"
  10. "go.uber.org/zap"
  11. )
  12. var (
  13. ErrClosed = errors.New("rabbitmq: client is closed")
  14. )
  15. type Config struct {
  16. Host string `yaml:"host"`
  17. Port int `yaml:"port"`
  18. Username string `yaml:"username"`
  19. Password string `yaml:"password"`
  20. VHost string `yaml:"vhost"`
  21. ConnectionTimeout time.Duration `yaml:"connection_timeout"`
  22. Tasks map[string]TaskConfig `yaml:"tasks"` // 支持多个任务配置
  23. }
  24. type TaskConfig struct {
  25. Exchange string `mapstructure:"exchange"`
  26. ExchangeType string `mapstructure:"exchange_type"`
  27. Queue string `mapstructure:"queue"`
  28. RoutingKey string `mapstructure:"routing_key"`
  29. ConsumerCount int `mapstructure:"consumer_count"`
  30. PrefetchCount int `mapstructure:"prefetch_count"`
  31. }
  32. type RabbitMQ struct {
  33. config Config
  34. conn *amqp.Connection
  35. ch *amqp.Channel
  36. logger *log.Logger
  37. mu sync.RWMutex
  38. closed bool
  39. }
  40. // New 创建新的RabbitMQ客户端
  41. func New(config Config, logger *log.Logger) (*RabbitMQ, error) {
  42. r := &RabbitMQ{
  43. config: config,
  44. logger: logger,
  45. }
  46. if err := r.Connect(); err != nil {
  47. return nil, err
  48. }
  49. if err := r.SetupAllTaskQueues(); err != nil {
  50. _ = r.Close() // Attempt to close the connection if setup fails
  51. return nil, fmt.Errorf("failed to setup task queues: %w", err)
  52. }
  53. go r.reconnectLoop()
  54. return r, nil
  55. }
  56. // Connect 连接到RabbitMQ服务器
  57. func (r *RabbitMQ) Connect() error {
  58. r.mu.Lock()
  59. defer r.mu.Unlock()
  60. if r.conn != nil && !r.conn.IsClosed() {
  61. _ = r.ch.Close()
  62. _ = r.conn.Close()
  63. }
  64. vhost := r.config.VHost
  65. if vhost == "" {
  66. vhost = "/"
  67. } else if vhost[0] != '/' {
  68. vhost = "/" + vhost
  69. }
  70. // 构造完整的连接URL
  71. fullURL := fmt.Sprintf("amqp://%s:%s@%s:%d%s",
  72. r.config.Username,
  73. r.config.Password,
  74. r.config.Host,
  75. r.config.Port,
  76. vhost,
  77. )
  78. r.logger.Info("正在尝试连接到 RabbitMQ...", zap.String("url", fullURL))
  79. var err error
  80. r.conn, err = amqp.Dial(fullURL)
  81. if err != nil {
  82. // 记录详细的底层错误
  83. r.logger.Error("连接RabbitMQ失败", zap.Error(err))
  84. return fmt.Errorf("连接RabbitMQ失败: %w", err)
  85. }
  86. r.ch, err = r.conn.Channel()
  87. if err != nil {
  88. _ = r.conn.Close()
  89. return fmt.Errorf("创建通道失败: %w", err)
  90. }
  91. r.closed = false
  92. r.logger.Info("RabbitMQ连接成功")
  93. return nil
  94. }
  95. // reconnectLoop 监控连接状态并处理重连
  96. func (r *RabbitMQ) reconnectLoop() {
  97. for {
  98. closeChan := make(chan *amqp.Error)
  99. r.mu.RLock()
  100. if r.conn == nil {
  101. r.mu.RUnlock()
  102. time.Sleep(5 * time.Second)
  103. continue
  104. }
  105. r.conn.NotifyClose(closeChan)
  106. isClosed := r.closed
  107. r.mu.RUnlock()
  108. if isClosed {
  109. r.logger.Info("RabbitMQ客户端已关闭,停止重连循环。")
  110. return
  111. }
  112. closeErr := <-closeChan
  113. if closeErr != nil {
  114. r.logger.Error("RabbitMQ连接断开,将尝试重新连接", zap.Error(closeErr))
  115. } else {
  116. r.logger.Info("RabbitMQ连接正常关闭。")
  117. }
  118. r.mu.RLock()
  119. isClosed = r.closed
  120. r.mu.RUnlock()
  121. if isClosed {
  122. r.logger.Info("RabbitMQ客户端已关闭,停止重连。")
  123. return
  124. }
  125. backoff := 1 * time.Second
  126. maxBackoff := 30 * time.Second
  127. for {
  128. if r.isClosed() {
  129. return
  130. }
  131. err := r.Connect()
  132. if err == nil {
  133. r.logger.Info("RabbitMQ重新连接成功")
  134. // 重新设置任务队列
  135. if err := r.SetupAllTaskQueues(); err != nil {
  136. r.logger.Error("重新设置所有任务队列失败", zap.Error(err))
  137. }
  138. break
  139. }
  140. r.logger.Error("RabbitMQ重连失败", zap.Error(err), zap.Duration("backoff", backoff))
  141. time.Sleep(backoff)
  142. backoff *= 2
  143. if backoff > maxBackoff {
  144. backoff = maxBackoff
  145. }
  146. }
  147. }
  148. }
  149. // Close 关闭连接
  150. func (r *RabbitMQ) Close() error {
  151. r.mu.Lock()
  152. defer r.mu.Unlock()
  153. if r.closed {
  154. return nil
  155. }
  156. r.closed = true
  157. var errs []error
  158. if r.ch != nil {
  159. if err := r.ch.Close(); err != nil {
  160. errs = append(errs, fmt.Errorf("关闭channel失败: %w", err))
  161. }
  162. }
  163. if r.conn != nil && !r.conn.IsClosed() {
  164. if err := r.conn.Close(); err != nil {
  165. errs = append(errs, fmt.Errorf("关闭connection失败: %w", err))
  166. }
  167. }
  168. if len(errs) > 0 {
  169. return fmt.Errorf("关闭RabbitMQ时发生错误: %v", errs)
  170. }
  171. return nil
  172. }
  173. func (r *RabbitMQ) isClosed() bool {
  174. r.mu.RLock()
  175. defer r.mu.RUnlock()
  176. return r.closed
  177. }
  178. // GetTaskConfig retrieves a specific task's configuration.
  179. func (r *RabbitMQ) GetTaskConfig(name string) (TaskConfig, bool) {
  180. taskCfg, ok := r.config.Tasks[name]
  181. return taskCfg, ok
  182. }
  183. func (r *RabbitMQ) withChannel(fn func(*amqp.Channel) error) error {
  184. if r.isClosed() {
  185. return ErrClosed
  186. }
  187. r.mu.RLock()
  188. defer r.mu.RUnlock()
  189. if r.ch == nil || r.conn.IsClosed() {
  190. return errors.New("rabbitmq: channel or connection is not available")
  191. }
  192. return fn(r.ch)
  193. }
  194. // Publish sends a message to the specified exchange with the given routing key.
  195. // This is a convenience wrapper around PublishWithCh.
  196. func (r *RabbitMQ) Publish(exchange, routingKey string, body []byte) error {
  197. return r.PublishWithCh(exchange, routingKey, amqp.Publishing{
  198. ContentType: "text/plain",
  199. Body: body,
  200. DeliveryMode: amqp.Persistent, // Default to persistent
  201. })
  202. }
  203. // PublishWithCh sends a message to the specified exchange with the given routing key using a custom amqp.Publishing struct.
  204. // It creates a new channel for each publication to ensure thread safety, as amqp.Channel is not safe for concurrent use.
  205. func (r *RabbitMQ) PublishWithCh(exchange, routingKey string, msg amqp.Publishing) error {
  206. r.mu.RLock()
  207. // Check if the connection is alive and well.
  208. if r.closed || r.conn == nil || r.conn.IsClosed() {
  209. r.mu.RUnlock()
  210. return fmt.Errorf("rabbitmq: connection is not available")
  211. }
  212. // We must get the connection under the lock, but we can release the lock before creating the channel
  213. // because the connection object itself is safe for concurrent use.
  214. conn := r.conn
  215. r.mu.RUnlock()
  216. // Create a new channel for this specific publication. This is the key to thread safety.
  217. ch, err := conn.Channel()
  218. if err != nil {
  219. return fmt.Errorf("rabbitmq: failed to open a channel: %w", err)
  220. }
  221. defer ch.Close() // Ensure the channel is closed after the operation.
  222. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  223. defer cancel()
  224. // Publish the message using the temporary channel.
  225. return ch.PublishWithContext(ctx,
  226. exchange,
  227. routingKey,
  228. false, // mandatory
  229. false, // immediate
  230. msg,
  231. )
  232. }
  233. // Consume 获取消息消费通道. 注意: Qos的设置需要调用方在获取channel后自行处理,或者为Consume方法增加prefetchCount参数
  234. func (r *RabbitMQ) Consume(queue, consumer string, prefetchCount int) (<-chan amqp.Delivery, error) {
  235. var deliveries <-chan amqp.Delivery
  236. err := r.withChannel(func(ch *amqp.Channel) error {
  237. if err := ch.Qos(prefetchCount, 0, false); err != nil {
  238. return fmt.Errorf("设置Qos失败: %w", err)
  239. }
  240. var err error
  241. deliveries, err = ch.Consume(
  242. queue,
  243. consumer,
  244. false, // auto-ack: false, 手动确认
  245. false, // exclusive
  246. false, // no-local
  247. false, // no-wait
  248. nil, // args
  249. )
  250. return err
  251. })
  252. return deliveries, err
  253. }
  254. // SetupAllTaskQueues 遍历配置中的所有任务,并为每个任务设置队列
  255. func (r *RabbitMQ) SetupAllTaskQueues() error {
  256. if len(r.config.Tasks) == 0 {
  257. r.logger.Info("在配置中未找到任何任务队列定义。")
  258. return nil
  259. }
  260. for name, taskCfg := range r.config.Tasks {
  261. if err := r.setupQueue(taskCfg); err != nil {
  262. return fmt.Errorf("为任务 '%s' 设置队列失败: %w", name, err)
  263. }
  264. }
  265. return nil
  266. }
  267. // setupQueue 为单个任务配置设置交换机、队列和绑定
  268. func (r *RabbitMQ) setupQueue(taskCfg TaskConfig) error {
  269. if taskCfg.Exchange == "" {
  270. r.logger.Warn("任务队列的交换机名称为空,将使用默认交换机。这在多任务场景下可能导致问题。", zap.String("queue", taskCfg.Queue))
  271. return r.withChannel(func(ch *amqp.Channel) error {
  272. _, err := ch.QueueDeclare(taskCfg.Queue, true, false, false, false, nil)
  273. if err != nil {
  274. return fmt.Errorf("声明队列失败 (默认交换机): %w", err)
  275. }
  276. r.logger.Info("成功声明队列并绑定到默认交换机", zap.String("queue", taskCfg.Queue))
  277. return nil
  278. })
  279. }
  280. return r.withChannel(func(ch *amqp.Channel) error {
  281. // 声明主交换机
  282. exchangeType := taskCfg.ExchangeType
  283. if exchangeType == "" {
  284. exchangeType = "direct" // 默认为 direct 类型,兼容旧配置
  285. }
  286. err := ch.ExchangeDeclare(
  287. taskCfg.Exchange, // name
  288. exchangeType, // type
  289. true, // durable
  290. false, // autoDelete
  291. false, // internal
  292. false, // noWait
  293. nil, // args
  294. )
  295. if err != nil {
  296. return fmt.Errorf("声明主交换机 '%s' 失败: %w", taskCfg.Exchange, err)
  297. }
  298. // 为主队列设置死信交换机参数
  299. dlxExchange := taskCfg.Exchange + ".dlx"
  300. args := amqp.Table{
  301. "x-dead-letter-exchange": dlxExchange,
  302. }
  303. // 声明主队列
  304. _, err = ch.QueueDeclare(taskCfg.Queue, true, false, false, false, args)
  305. if err != nil {
  306. return fmt.Errorf("声明主队列 '%s' 失败: %w", taskCfg.Queue, err)
  307. }
  308. // 绑定主队列到主交换机
  309. if err := ch.QueueBind(taskCfg.Queue, taskCfg.RoutingKey, taskCfg.Exchange, false, nil); err != nil {
  310. return fmt.Errorf("绑定主队列失败: %w", err)
  311. }
  312. // --- 设置死信队列 ---
  313. // 声明死信交换机 (DLX)
  314. if err := ch.ExchangeDeclare(dlxExchange, "direct", true, false, false, false, nil); err != nil {
  315. return fmt.Errorf("声明死信交换机 '%s' 失败: %w", dlxExchange, err)
  316. }
  317. // 声明死信队列 (DLQ)
  318. dlq := taskCfg.Queue + ".dlq"
  319. _, err = ch.QueueDeclare(dlq, true, false, false, false, nil)
  320. if err != nil {
  321. return fmt.Errorf("声明死信队列 '%s' 失败: %w", dlq, err)
  322. }
  323. // 绑定DLQ到DLX,使用与主队列相同的路由键
  324. if err := ch.QueueBind(dlq, taskCfg.RoutingKey, dlxExchange, false, nil); err != nil {
  325. return fmt.Errorf("绑定死信队列失败: %w", err)
  326. }
  327. return nil
  328. })
  329. }