# -*- coding: utf-8 -*- # @Time : 2022/12/2 15:03 # @Author : XuJiakai # @File : RabbitMQ # @Software: PyCharm from retry import retry import pika import json import os from pika.exchange_type import ExchangeType from log import get_log log = get_log('RabbitMQ') def _default_callback(ch, method, properties, body): print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理 ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答 pass class RabbitMQ(object): def __init__(self, username, password, host, port, virtual_host): self.username = username self.password = password self.host = host self.port = port self.virtual_host = virtual_host self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器 def get_connect(self): credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证 self.connect = pika.BlockingConnection( pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=credentials)) self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务 self.channel.basic_qos(prefetch_count=500) @retry(tries=5, delay=1) def send_by_fanout(self, exchange, body): """ 多消费、重复订阅方式 :param exchange: 发送的topic :return: """ try: self.channel.exchange_declare( exchange=exchange, exchange_type=ExchangeType.fanout.value, ) self.channel.basic_publish( exchange=exchange, routing_key='', body=body, # 发送的数据 ) pass except Exception as e: self.get_connect() raise e pass pass @retry(tries=5, delay=1) def consumer_by_fanout(self, exchange, queue='', callback=_default_callback): """ 消费、扇出模式 :param queue: :param callback: :param exchange: :return: """ auto_delete = queue is None or queue == '' try: self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value) result = self.channel.queue_declare(queue=queue, auto_delete=auto_delete) queue_name = result.method.queue self.channel.queue_bind(queue_name, exchange, '') log.info(f'消费任务启动,exchange: {exchange}, queue: {queue_name}, auto_delete: {auto_delete} ') self.channel.basic_consume( queue=queue_name, # 队列名 on_message_callback=callback, # 指定回调函数 auto_ack=False, # 关闭自动ack采用手动应答 ) self.channel.start_consuming() pass except Exception as e: self.get_connect() raise e pass pass import threading if __name__ == '__main__': mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/') # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,)) # p.start() mq.consumer_by_fanout(exchange='xjk_test') # mq.send(exchange='xjk_test', body='xjk_test message') pass