Browse Source

refactor(rabbitmq): 优化消息队列配置和发送机制- 更新本地配置文件,修改 IP 白名单和域名白名单任务的队列名称
- 重构 RabbitMQ 发送消息方法,确保线程安全和连接可用性

fusu 1 month ago
parent
commit
f95d47baef
2 changed files with 27 additions and 13 deletions
  1. 2 2
      config/local.yml
  2. 25 11
      pkg/rabbitmq/rabbitmq.go

+ 2 - 2
config/local.yml

@@ -109,7 +109,7 @@ rabbitmq:
     # IP白名单更新任务
     ip_white:
       exchange: "tasks_direct_exchange" # 使用一个统一的direct交换机
-      queue: "ip_white_queue"
+      queue: "ip_white_queue_test"
       routing_key: "task.ip_white.update"
       consumer_count: 2
       prefetch_count: 1
@@ -126,7 +126,7 @@ rabbitmq:
     domain_whitelist:
       exchange: "whitelist_topic_exchange" # Topic 类型的交换机
       exchange_type: "topic"              # 显式指定交换机类型
-      queue: "domain_whitelist_queue"
+      queue: "domain_whitelist_queue_test"
       routing_key: "whitelist.domain.*"   # 消费者监听的绑定键,能接收所有 domain 相关的任务
       consumer_count: 3
       prefetch_count: 1

+ 25 - 11
pkg/rabbitmq/rabbitmq.go

@@ -244,23 +244,37 @@ func (r *RabbitMQ) Publish(exchange, routingKey string, body []byte) error {
 }
 
 // PublishWithCh sends a message to the specified exchange with the given routing key using a custom amqp.Publishing struct.
+// It creates a new channel for each publication to ensure thread safety, as amqp.Channel is not safe for concurrent use.
 func (r *RabbitMQ) PublishWithCh(exchange, routingKey string, msg amqp.Publishing) error {
-	if r.isClosed() {
-		return errors.New("rabbitmq connection is closed")
+	r.mu.RLock()
+	// Check if the connection is alive and well.
+	if r.closed || r.conn == nil || r.conn.IsClosed() {
+		r.mu.RUnlock()
+		return fmt.Errorf("rabbitmq: connection is not available")
+	}
+	// We must get the connection under the lock, but we can release the lock before creating the channel
+	// because the connection object itself is safe for concurrent use.
+	conn := r.conn
+	r.mu.RUnlock()
+
+	// Create a new channel for this specific publication. This is the key to thread safety.
+	ch, err := conn.Channel()
+	if err != nil {
+		return fmt.Errorf("rabbitmq: failed to open a channel: %w", err)
 	}
+	defer ch.Close() // Ensure the channel is closed after the operation.
 
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 
-	return r.withChannel(func(ch *amqp.Channel) error {
-		return ch.PublishWithContext(ctx,
-			exchange,
-			routingKey,
-			false, // mandatory
-			false, // immediate
-			msg,
-		)
-	})
+	// Publish the message using the temporary channel.
+	return ch.PublishWithContext(ctx,
+		exchange,
+		routingKey,
+		false, // mandatory
+		false, // immediate
+		msg,
+	)
 }
 
 // Consume 获取消息消费通道. 注意: Qos的设置需要调用方在获取channel后自行处理,或者为Consume方法增加prefetchCount参数