async_client.py 1006 B

12345678910111213141516171819202122232425262728293031323334353637383940
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 9:01
  3. # @Author : XuJiakai
  4. # @File : async_client
  5. # @Software: PyCharm
  6. from aiokafka import AIOKafkaConsumer
  7. from aiokafka import AIOKafkaProducer
  8. from motor.motor_asyncio import AsyncIOMotorClient
  9. from data_clean.env.environment_switch import environment_switch
  10. _env = environment_switch()
  11. def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
  12. url = _env.get_val('mongo.' + mongo_name + '.url')
  13. client = AsyncIOMotorClient(url)
  14. return client[db]
  15. pass
  16. def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
  17. hosts = _env.get_val('kafka.' + name + '.hosts')
  18. consumer = AIOKafkaConsumer(
  19. topic,
  20. bootstrap_servers=hosts,
  21. group_id=group_id)
  22. return consumer
  23. pass
  24. def get_aio_kafka_producer(name='base'):
  25. hosts = _env.get_val('kafka.' + name + '.hosts')
  26. producer = AIOKafkaProducer(bootstrap_servers=hosts)
  27. return producer
  28. pass
  29. if __name__ == '__main__':
  30. pass