# -*- coding: utf-8 -*- # @Time : 2023/7/21 9:01 # @Author : XuJiakai # @File : async_client # @Software: PyCharm from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from motor.motor_asyncio import AsyncIOMotorClient from data_clean.env.environment_switch import environment_switch _env = environment_switch() def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'): url = _env.get_val('mongo.' + mongo_name + '.url') client = AsyncIOMotorClient(url) return client[db] pass def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'): hosts = _env.get_val('kafka.' + name + '.hosts') consumer = AIOKafkaConsumer( topic, bootstrap_servers=hosts, group_id=group_id) return consumer pass def get_aio_kafka_producer(name='base'): hosts = _env.get_val('kafka.' + name + '.hosts') producer = AIOKafkaProducer(bootstrap_servers=hosts) return producer pass if __name__ == '__main__': pass