RabbitMQ.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 15:03
  3. # @Author : XuJiakai
  4. # @File : RabbitMQ
  5. # @Software: PyCharm
  6. from retry import retry
  7. import pika
  8. import json
  9. import os
  10. from pika.exchange_type import ExchangeType
  11. from log import get_log
  12. log = get_log('RabbitMQ')
  13. def _default_callback(ch, method, properties, body):
  14. print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理
  15. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  16. pass
  17. class RabbitMQ(object):
  18. def __init__(self, username, password, host, port, virtual_host):
  19. self.username = username
  20. self.password = password
  21. self.host = host
  22. self.port = port
  23. self.virtual_host = virtual_host
  24. self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器
  25. def get_connect(self):
  26. credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证
  27. self.connect = pika.BlockingConnection(
  28. pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
  29. credentials=credentials))
  30. self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务
  31. self.channel.basic_qos(prefetch_count=500)
  32. @retry(tries=5, delay=1)
  33. def send_by_fanout(self, exchange, body):
  34. """
  35. 多消费、重复订阅方式
  36. :param exchange: 发送的topic
  37. :return:
  38. """
  39. try:
  40. self.channel.exchange_declare(
  41. exchange=exchange,
  42. exchange_type=ExchangeType.fanout.value,
  43. )
  44. self.channel.basic_publish(
  45. exchange=exchange,
  46. routing_key='',
  47. body=body, # 发送的数据
  48. )
  49. pass
  50. except Exception as e:
  51. self.get_connect()
  52. raise e
  53. pass
  54. pass
  55. @retry(tries=5, delay=1)
  56. def consumer_by_fanout(self, exchange, queue='', callback=_default_callback):
  57. """
  58. 消费、扇出模式
  59. :param queue:
  60. :param callback:
  61. :param exchange:
  62. :return:
  63. """
  64. auto_delete = queue is None or queue == ''
  65. try:
  66. self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
  67. result = self.channel.queue_declare(queue=queue, auto_delete=auto_delete)
  68. queue_name = result.method.queue
  69. self.channel.queue_bind(queue_name, exchange, '')
  70. log.info(f'消费任务启动,exchange: {exchange}, queue: {queue_name}, auto_delete: {auto_delete} ')
  71. self.channel.basic_consume(
  72. queue=queue_name, # 队列名
  73. on_message_callback=callback, # 指定回调函数
  74. auto_ack=False, # 关闭自动ack采用手动应答
  75. )
  76. self.channel.start_consuming()
  77. pass
  78. except Exception as e:
  79. self.get_connect()
  80. raise e
  81. pass
  82. pass
  83. import threading
  84. if __name__ == '__main__':
  85. mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/')
  86. # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,))
  87. # p.start()
  88. mq.consumer_by_fanout(exchange='xjk_test')
  89. # mq.send(exchange='xjk_test', body='xjk_test message')
  90. pass