RabbitMQ.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/2 15:03
  3. # @Author : XuJiakai
  4. # @File : RabbitMQ
  5. # @Software: PyCharm
  6. import pika
  7. import json
  8. import os
  9. from pika.exchange_type import ExchangeType
  10. from log import get_log
  11. log = get_log('RabbitMQ')
  12. def _default_callback(ch, method, properties, body):
  13. print('callback body ', body.decode()) # 在这里对消息内容进行我们想做的处理
  14. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  15. pass
  16. class RabbitMQ(object):
  17. def __init__(self, username, password, host, port, virtual_host):
  18. self.username = username
  19. self.password = password
  20. self.host = host
  21. self.port = port
  22. self.virtual_host = virtual_host
  23. self.get_connect() # 初始化RabbitMQ实例对象时完成连接rabbitmq服务器
  24. def get_connect(self):
  25. credentials = pika.PlainCredentials(username=self.username, password=self.password) # 登录凭证
  26. self.connect = pika.BlockingConnection(
  27. pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host,
  28. credentials=credentials))
  29. self.channel = self.connect.channel() # 客户端连接rabbitmq服务端后开辟管道,每个channel代表一个会话任务
  30. def send_by_fanout(self, exchange, body):
  31. """
  32. 多消费、重复订阅方式
  33. :param exchange: 发送的topic
  34. :return:
  35. """
  36. self.channel.exchange_declare(
  37. exchange=exchange,
  38. exchange_type=ExchangeType.fanout.value,
  39. )
  40. # 声明队列, 并且队列持久化
  41. # self.channel.queue_declare(queue=queue, durable=False, auto_delete=True)
  42. # 通过routing_key 绑定 消息交换机和队列
  43. # self.channel.queue_bind(queue, exchange, routing_key)
  44. # 发消息
  45. self.channel.basic_publish(
  46. exchange=exchange,
  47. routing_key='',
  48. body=body, # 发送的数据
  49. )
  50. pass
  51. def consumer_by_fanout(self, exchange, callback=_default_callback):
  52. """
  53. 消费、扇出模式
  54. :param callback:
  55. :param exchange:
  56. :return:
  57. """
  58. self.channel.exchange_declare(exchange=exchange, exchange_type=ExchangeType.fanout.value)
  59. result = self.channel.queue_declare(queue='', auto_delete=True)
  60. queue_name = result.method.queue
  61. self.channel.queue_bind(queue_name, exchange, '')
  62. log.info('消费任务启动,queue: ' + queue_name)
  63. self.channel.basic_consume(
  64. queue=queue_name, # 队列名
  65. on_message_callback=callback, # 指定回调函数
  66. auto_ack=False, # 关闭自动ack采用手动应答
  67. )
  68. self.channel.start_consuming()
  69. pass
  70. import threading
  71. if __name__ == '__main__':
  72. mq = RabbitMQ(username='whc', password='whc', host='106.15.78.184', port=5672, virtual_host='/')
  73. # p = threading.Thread(target=mq.consumer, args=('xjk_test', '', callback,))
  74. # p.start()
  75. mq.consumer_by_fanout(exchange='xjk_test')
  76. # mq.send(exchange='xjk_test', body='xjk_test message')
  77. pass