async_client.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 9:01
  3. # @Author : XuJiakai
  4. # @File : async_client
  5. # @Software: PyCharm
  6. import asyncio
  7. import warnings
  8. import aio_pika
  9. from aio_pika.abc import AbstractRobustConnection
  10. from aiokafka import AIOKafkaConsumer
  11. from aiokafka import AIOKafkaProducer
  12. from elasticsearch7 import AsyncElasticsearch
  13. from motor.motor_asyncio import AsyncIOMotorClient
  14. from data_clean.env.environment_switch import environment_switch
  15. _env = environment_switch()
  16. warnings.filterwarnings('ignore')
  17. def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
  18. url = _env.get_val('mongo.' + mongo_name + '.url')
  19. client = AsyncIOMotorClient(url)
  20. return client[db]
  21. pass
  22. def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
  23. hosts = _env.get_val('kafka.' + name + '.hosts')
  24. consumer = AIOKafkaConsumer(
  25. topic,
  26. bootstrap_servers=hosts,
  27. group_id=group_id)
  28. return consumer
  29. pass
  30. def get_aio_kafka_producer(name='base'):
  31. hosts = _env.get_val('kafka.' + name + '.hosts')
  32. producer = AIOKafkaProducer(bootstrap_servers=hosts)
  33. return producer
  34. pass
  35. def get_aio_elasticsearch(es_name='new') -> AsyncElasticsearch:
  36. hosts = _env.get_val('es.' + es_name + '.hosts')
  37. username = _env.get_val('es.' + es_name + '.username')
  38. password = _env.get_val('es.' + es_name + '.pwd')
  39. use_ssl = False
  40. return AsyncElasticsearch(hosts=hosts, http_auth=(username, password))
  41. async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection:
  42. host = _env.get_val('rabbit_mq.' + name + '.host')
  43. username = _env.get_val('rabbit_mq.' + name + '.username')
  44. password = _env.get_val('rabbit_mq.' + name + '.password')
  45. port = int(_env.get_val('rabbit_mq.' + name + '.port'))
  46. # port = 32675
  47. virtual_host = '/'
  48. loop = asyncio.get_event_loop()
  49. return await aio_pika.connect_robust(
  50. host=host, password=password, virtualhost=virtual_host
  51. , port=port
  52. , login=username
  53. , loop=loop
  54. )
  55. pass
  56. if __name__ == '__main__':
  57. pass