# -*- coding: utf-8 -*- # @Time : 2023/7/21 9:01 # @Author : XuJiakai # @File : async_client # @Software: PyCharm import asyncio import aio_pika from aio_pika.abc import AbstractRobustConnection from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from motor.motor_asyncio import AsyncIOMotorClient from data_clean.env.environment_switch import environment_switch _env = environment_switch() def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'): url = _env.get_val('mongo.' + mongo_name + '.url') client = AsyncIOMotorClient(url) return client[db] pass def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'): hosts = _env.get_val('kafka.' + name + '.hosts') consumer = AIOKafkaConsumer( topic, bootstrap_servers=hosts, group_id=group_id) return consumer pass def get_aio_kafka_producer(name='base'): hosts = _env.get_val('kafka.' + name + '.hosts') producer = AIOKafkaProducer(bootstrap_servers=hosts) return producer pass async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection: host = _env.get_val('rabbit_mq.' + name + '.host') username = _env.get_val('rabbit_mq.' + name + '.username') password = _env.get_val('rabbit_mq.' + name + '.password') port = int(_env.get_val('rabbit_mq.' + name + '.port')) # port = 32675 virtual_host = '/' loop = asyncio.get_event_loop() return await aio_pika.connect_robust( host=host, password=password, virtualhost=virtual_host , port=port , login=username , loop=loop ) pass if __name__ == '__main__': pass