async_client.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 9:01
  3. # @Author : XuJiakai
  4. # @File : async_client
  5. # @Software: PyCharm
  6. import asyncio
  7. import aio_pika
  8. from aio_pika.abc import AbstractRobustConnection
  9. from aiokafka import AIOKafkaConsumer
  10. from aiokafka import AIOKafkaProducer
  11. from motor.motor_asyncio import AsyncIOMotorClient
  12. from data_clean.env.environment_switch import environment_switch
  13. _env = environment_switch()
  14. def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
  15. url = _env.get_val('mongo.' + mongo_name + '.url')
  16. client = AsyncIOMotorClient(url)
  17. return client[db]
  18. pass
  19. def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
  20. hosts = _env.get_val('kafka.' + name + '.hosts')
  21. consumer = AIOKafkaConsumer(
  22. topic,
  23. bootstrap_servers=hosts,
  24. group_id=group_id)
  25. return consumer
  26. pass
  27. def get_aio_kafka_producer(name='base'):
  28. hosts = _env.get_val('kafka.' + name + '.hosts')
  29. producer = AIOKafkaProducer(bootstrap_servers=hosts)
  30. return producer
  31. pass
  32. async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection:
  33. host = _env.get_val('rabbit_mq.' + name + '.host')
  34. username = _env.get_val('rabbit_mq.' + name + '.username')
  35. password = _env.get_val('rabbit_mq.' + name + '.password')
  36. port = int(_env.get_val('rabbit_mq.' + name + '.port'))
  37. # port = 32675
  38. virtual_host = '/'
  39. loop = asyncio.get_event_loop()
  40. return await aio_pika.connect_robust(
  41. host=host, password=password, virtualhost=virtual_host
  42. , port=port
  43. , login=username
  44. , loop=loop
  45. )
  46. pass
  47. if __name__ == '__main__':
  48. pass