async_client.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 9:01
  3. # @Author : XuJiakai
  4. # @File : async_client
  5. # @Software: PyCharm
  6. import asyncio
  7. import aio_pika
  8. from aio_pika.abc import AbstractRobustConnection
  9. from aiokafka import AIOKafkaConsumer
  10. from aiokafka import AIOKafkaProducer
  11. from motor.motor_asyncio import AsyncIOMotorClient
  12. from data_clean.env.environment_switch import environment_switch
  13. _env = environment_switch()
  14. def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
  15. url = _env.get_val('mongo.' + mongo_name + '.url')
  16. client = AsyncIOMotorClient(url)
  17. return client[db]
  18. pass
  19. def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
  20. hosts = _env.get_val('kafka.' + name + '.hosts')
  21. consumer = AIOKafkaConsumer(
  22. topic,
  23. bootstrap_servers=hosts,
  24. group_id=group_id)
  25. return consumer
  26. pass
  27. def get_aio_kafka_producer(name='base'):
  28. hosts = _env.get_val('kafka.' + name + '.hosts')
  29. producer = AIOKafkaProducer(bootstrap_servers=hosts)
  30. return producer
  31. pass
  32. async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection:
  33. host = _env.get_val('rabbit_mq.' + name + '.host')
  34. username = _env.get_val('rabbit_mq.' + name + '.username')
  35. password = _env.get_val('rabbit_mq.' + name + '.password')
  36. port = 32675
  37. virtual_host = '/'
  38. loop = asyncio.get_event_loop()
  39. return await aio_pika.connect_robust(
  40. host=host, password=password, virtualhost=virtual_host
  41. , port=port
  42. , login=username
  43. , loop=loop
  44. )
  45. pass
  46. if __name__ == '__main__':
  47. pass