RabbitMQ.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 15:03
  3. # @Author : XuJiakai
  4. # @File : RabbitMQ
  5. # @Software: PyCharm
  6. from retry import retry
  7. import pika
  8. import json
  9. import os
  10. from pika.exchange_type import ExchangeType
  11. from log import get_log
  12. log = get_log('RabbitMQ')
  13. def _default_callback(ch, method, properties, body):
  14. print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理
  15. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  16. pass
  17. class RabbitMQ(object):
  18. def __init__(self, username, password, host, port, virtual_host):
  19. self.username = username
  20. self.password = password
  21. self.host = host
  22. self.port = port
  23. self.virtual_host = virtual_host
  24. self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器
  25. def get_connect(self):
  26. credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证
  27. self.connect = pika.BlockingConnection(
  28. pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
  29. credentials=credentials))
  30. self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务
  31. @retry(delay=5, jitter=(1, 3))
  32. def send_by_fanout(self, exchange, body):
  33. """
  34. 多消费、重复订阅方式
  35. :param exchange: 发送的topic
  36. :return:
  37. """
  38. self.channel.exchange_declare(
  39. exchange=exchange,
  40. exchange_type=ExchangeType.fanout.value,
  41. )
  42. # 声明队列, 并且队列持久化
  43. # self.channel.queue_declare(queue=queue, durable=False, auto_delete=True)
  44. # 通过routing_key 绑定 消息交换机和队列
  45. # self.channel.queue_bind(queue, exchange, routing_key)
  46. # 发消息
  47. self.channel.basic_publish(
  48. exchange=exchange,
  49. routing_key='',
  50. body=body, # 发送的数据
  51. )
  52. pass
  53. @retry(delay=5, jitter=(1, 3))
  54. def consumer_by_fanout(self, exchange, callback=_default_callback):
  55. """
  56. 消费、扇出模式
  57. :param callback:
  58. :param exchange:
  59. :return:
  60. """
  61. self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
  62. result = self.channel.queue_declare(queue='', auto_delete=True)
  63. queue_name = result.method.queue
  64. self.channel.queue_bind(queue_name, exchange, '')
  65. log.info('消费任务启动,queue: ' + queue_name)
  66. self.channel.basic_consume(
  67. queue=queue_name, # 队列名
  68. on_message_callback=callback, # 指定回调函数
  69. auto_ack=False, # 关闭自动ack采用手动应答
  70. )
  71. self.channel.start_consuming()
  72. pass
  73. import threading
  74. if __name__ == '__main__':
  75. mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/')
  76. # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,))
  77. # p.start()
  78. mq.consumer_by_fanout(exchange='xjk_test')
  79. # mq.send(exchange='xjk_test', body='xjk_test message')
  80. pass