123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/12/2 15:03
- # @Author : XuJiakai
- # @File : RabbitMQ
- # @Software: PyCharm
- from retry import retry
- import pika
- import json
- import os
- from pika.exchange_type import ExchangeType
- from log import get_log
- log = get_log('RabbitMQ')
- def _default_callback(ch, method, properties, body):
- print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理
- ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
- pass
- class RabbitMQ(object):
- def __init__(self, username, password, host, port, virtual_host):
- self.username = username
- self.password = password
- self.host = host
- self.port = port
- self.virtual_host = virtual_host
- self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器
- def get_connect(self):
- credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证
- self.connect = pika.BlockingConnection(
- pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
- credentials=credentials))
- self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务
- self.channel.basic_qos(prefetch_count=500)
- @retry(tries=5, delay=1)
- def send_by_fanout(self, exchange, body):
- """
- 多消费、重复订阅方式
- :param exchange: 发送的topic
- :return:
- """
- try:
- self.channel.exchange_declare(
- exchange=exchange,
- exchange_type=ExchangeType.fanout.value,
- )
- self.channel.basic_publish(
- exchange=exchange,
- routing_key='',
- body=body, # 发送的数据
- )
- pass
- except Exception as e:
- self.get_connect()
- raise e
- pass
- pass
- @retry(tries=5, delay=1)
- def consumer_by_fanout(self, exchange, queue='', callback=_default_callback):
- """
- 消费、扇出模式
- :param queue:
- :param callback:
- :param exchange:
- :return:
- """
- auto_delete = queue is None or queue == ''
- try:
- self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
- result = self.channel.queue_declare(queue=queue, auto_delete=auto_delete)
- queue_name = result.method.queue
- self.channel.queue_bind(queue_name, exchange, '')
- log.info(f'消费任务启动,exchange: {exchange}, queue: {queue_name}, auto_delete: {auto_delete} ')
- self.channel.basic_consume(
- queue=queue_name, # 队列名
- on_message_callback=callback, # 指定回调函数
- auto_ack=False, # 关闭自动ack采用手动应答
- )
- self.channel.start_consuming()
- pass
- except Exception as e:
- self.get_connect()
- raise e
- pass
- pass
- import threading
- if __name__ == '__main__':
- mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/')
- # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,))
- # p.start()
- mq.consumer_by_fanout(exchange='xjk_test')
- # mq.send(exchange='xjk_test', body='xjk_test message')
- pass
|