Przeglądaj źródła

fix: rabbitmq reconnect

许家凯 2 lat temu
rodzic
commit
ed5686d27f
1 zmienionych plików z 36 dodań i 27 usunięć
  1. 36 27
      sdk/RabbitMQ.py

+ 36 - 27
sdk/RabbitMQ.py

@@ -43,20 +43,22 @@ class RabbitMQ(object):
         :param exchange: 发送的topic
         :return:
         """
-        self.channel.exchange_declare(
-            exchange=exchange,
-            exchange_type=ExchangeType.fanout.value,
-        )
-        # 声明队列, 并且队列持久化
-        # self.channel.queue_declare(queue=queue, durable=False, auto_delete=True)
-        # 通过routing_key 绑定 消息交换机和队列
-        # self.channel.queue_bind(queue, exchange, routing_key)
-        # 发消息
-        self.channel.basic_publish(
-            exchange=exchange,
-            routing_key='',
-            body=body,  # 发送的数据
-        )
+        try:
+            self.channel.exchange_declare(
+                exchange=exchange,
+                exchange_type=ExchangeType.fanout.value,
+            )
+            self.channel.basic_publish(
+                exchange=exchange,
+                routing_key='',
+                body=body,  # 发送的数据
+            )
+            pass
+        except Exception as e:
+            self.get_connect()
+            raise e
+            pass
+
         pass
 
     @retry(delay=5, jitter=(1, 3))
@@ -67,19 +69,26 @@ class RabbitMQ(object):
         :param exchange:
         :return:
         """
-        self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
-        result = self.channel.queue_declare(queue='', auto_delete=True)
-        queue_name = result.method.queue
-        self.channel.queue_bind(queue_name, exchange, '')
-
-        log.info('消费任务启动,queue: ' + queue_name)
-
-        self.channel.basic_consume(
-            queue=queue_name,  # 队列名
-            on_message_callback=callback,  # 指定回调函数
-            auto_ack=False,  # 关闭自动ack采用手动应答
-        )
-        self.channel.start_consuming()
+        try:
+            self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
+            result = self.channel.queue_declare(queue='', auto_delete=True)
+            queue_name = result.method.queue
+            self.channel.queue_bind(queue_name, exchange, '')
+
+            log.info('消费任务启动,queue: ' + queue_name)
+
+            self.channel.basic_consume(
+                queue=queue_name,  # 队列名
+                on_message_callback=callback,  # 指定回调函数
+                auto_ack=False,  # 关闭自动ack采用手动应答
+            )
+            self.channel.start_consuming()
+            pass
+        except Exception as e:
+            self.get_connect()
+            raise e
+            pass
+
 
     pass