12345678910111213141516171819202122232425262728293031323334353637383940 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/21 9:01
- # @Author : XuJiakai
- # @File : async_client
- # @Software: PyCharm
- from aiokafka import AIOKafkaConsumer
- from aiokafka import AIOKafkaProducer
- from motor.motor_asyncio import AsyncIOMotorClient
- from data_clean.env.environment_switch import environment_switch
- _env = environment_switch()
- def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
- url = _env.get_val('mongo.' + mongo_name + '.url')
- client = AsyncIOMotorClient(url)
- return client[db]
- pass
- def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
- hosts = _env.get_val('kafka.' + name + '.hosts')
- consumer = AIOKafkaConsumer(
- topic,
- bootstrap_servers=hosts,
- group_id=group_id)
- return consumer
- pass
- def get_aio_kafka_producer(name='base'):
- hosts = _env.get_val('kafka.' + name + '.hosts')
- producer = AIOKafkaProducer(bootstrap_servers=hosts)
- return producer
- pass
- if __name__ == '__main__':
- pass
|