# -*- coding: utf-8 -*- # @Time : 2023/7/21 9:01 # @Author : XuJiakai # @File : async_client # @Software: PyCharm import asyncio import warnings import aio_pika from aio_pika.abc import AbstractRobustConnection from aiokafka import AIOKafkaConsumer from aiokafka import AIOKafkaProducer from elasticsearch7 import AsyncElasticsearch from motor.motor_asyncio import AsyncIOMotorClient from data_clean.env.environment_switch import environment_switch _env = environment_switch() warnings.filterwarnings('ignore') def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'): url = _env.get_val('mongo.' + mongo_name + '.url') client = AsyncIOMotorClient(url) return client[db] pass def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'): hosts = _env.get_val('kafka.' + name + '.hosts') consumer = AIOKafkaConsumer( topic, bootstrap_servers=hosts, group_id=group_id) return consumer pass def get_aio_kafka_producer(name='base'): hosts = _env.get_val('kafka.' + name + '.hosts') producer = AIOKafkaProducer(bootstrap_servers=hosts) return producer pass def get_aio_elasticsearch(es_name='new') -> AsyncElasticsearch: hosts = _env.get_val('es.' + es_name + '.hosts') username = _env.get_val('es.' + es_name + '.username') password = _env.get_val('es.' + es_name + '.pwd') use_ssl = False return AsyncElasticsearch(hosts=hosts, http_auth=(username, password)) async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection: host = _env.get_val('rabbit_mq.' + name + '.host') username = _env.get_val('rabbit_mq.' + name + '.username') password = _env.get_val('rabbit_mq.' + name + '.password') port = int(_env.get_val('rabbit_mq.' + name + '.port')) # port = 32675 virtual_host = '/' loop = asyncio.get_event_loop() return await aio_pika.connect_robust( host=host, password=password, virtualhost=virtual_host , port=port , login=username , loop=loop ) pass if __name__ == '__main__': pass