RabbitMQ.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 15:03
  3. # @Author : XuJiakai
  4. # @File : RabbitMQ
  5. # @Software: PyCharm
  6. from retry import retry
  7. import pika
  8. import json
  9. import os
  10. from pika.exchange_type import ExchangeType
  11. from log import get_log
  12. log = get_log('RabbitMQ')
  13. def _default_callback(ch, method, properties, body):
  14. print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理
  15. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  16. pass
  17. class RabbitMQ(object):
  18. def __init__(self, username, password, host, port, virtual_host):
  19. self.username = username
  20. self.password = password
  21. self.host = host
  22. self.port = port
  23. self.virtual_host = virtual_host
  24. self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器
  25. def get_connect(self):
  26. credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证
  27. self.connect = pika.BlockingConnection(
  28. pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
  29. credentials=credentials))
  30. self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务
  31. @retry(tries=5, delay=1)
  32. def send_by_fanout(self, exchange, body):
  33. """
  34. 多消费、重复订阅方式
  35. :param exchange: 发送的topic
  36. :return:
  37. """
  38. try:
  39. self.channel.exchange_declare(
  40. exchange=exchange,
  41. exchange_type=ExchangeType.fanout.value,
  42. )
  43. self.channel.basic_publish(
  44. exchange=exchange,
  45. routing_key='',
  46. body=body, # 发送的数据
  47. )
  48. pass
  49. except Exception as e:
  50. self.get_connect()
  51. raise e
  52. pass
  53. pass
  54. @retry(tries=5, delay=1)
  55. def consumer_by_fanout(self, exchange, callback=_default_callback):
  56. """
  57. 消费、扇出模式
  58. :param callback:
  59. :param exchange:
  60. :return:
  61. """
  62. try:
  63. self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
  64. result = self.channel.queue_declare(queue='', auto_delete=True)
  65. queue_name = result.method.queue
  66. self.channel.queue_bind(queue_name, exchange, '')
  67. log.info('消费任务启动,queue: ' + queue_name)
  68. self.channel.basic_consume(
  69. queue=queue_name, # 队列名
  70. on_message_callback=callback, # 指定回调函数
  71. auto_ack=False, # 关闭自动ack采用手动应答
  72. )
  73. self.channel.start_consuming()
  74. pass
  75. except Exception as e:
  76. self.get_connect()
  77. raise e
  78. pass
  79. pass
  80. import threading
  81. if __name__ == '__main__':
  82. mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/')
  83. # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,))
  84. # p.start()
  85. mq.consumer_by_fanout(exchange='xjk_test')
  86. # mq.send(exchange='xjk_test', body='xjk_test message')
  87. pass