Selaa lähdekoodia

fix: 解决mq重连丢失数据问题

许家凯 2 vuotta sitten
vanhempi
commit
a81b629c00
2 muutettua tiedostoa jossa 7 lisäystä ja 5 poistoa
  1. 6 4
      sdk/RabbitMQ.py
  2. 1 1
      spider/winhc_job.py

+ 6 - 4
sdk/RabbitMQ.py

@@ -62,20 +62,23 @@ class RabbitMQ(object):
         pass
 
     @retry(tries=5, delay=1)
-    def consumer_by_fanout(self, exchange, callback=_default_callback):
+    def consumer_by_fanout(self, exchange, queue='', callback=_default_callback):
         """
         消费、扇出模式
+        :param queue:
         :param callback:
         :param exchange:
         :return:
         """
+        auto_delete = queue is None or queue == ''
+
         try:
             self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
-            result = self.channel.queue_declare(queue='', auto_delete=True)
+            result = self.channel.queue_declare(queue=queue, auto_delete=auto_delete)
             queue_name = result.method.queue
             self.channel.queue_bind(queue_name, exchange, '')
 
-            log.info('消费任务启动,queue: ' + queue_name)
+            log.info(f'消费任务启动,exchange: {exchange}, queue: {queue_name}, auto_delete: {auto_delete} ')
 
             self.channel.basic_consume(
                 queue=queue_name,  # 队列名
@@ -89,7 +92,6 @@ class RabbitMQ(object):
             raise e
             pass
 
-
     pass
 
 

+ 1 - 1
spider/winhc_job.py

@@ -79,7 +79,7 @@ def main():
         ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动应答ack,确保消息真正消费后才应答
         pass
 
-    r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback)
+    r_sdk.consumer_by_fanout(RABBITMQ_TOPIC,queue='cpa_winhc_spider', callback=callback)
 
     pass