12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/21 9:01
- # @Author : XuJiakai
- # @File : async_client
- # @Software: PyCharm
- import asyncio
- import warnings
- import aio_pika
- from aio_pika.abc import AbstractRobustConnection
- from aiokafka import AIOKafkaConsumer
- from aiokafka import AIOKafkaProducer
- from elasticsearch7 import AsyncElasticsearch
- from motor.motor_asyncio import AsyncIOMotorClient
- from data_clean.env.environment_switch import environment_switch
- _env = environment_switch()
- warnings.filterwarnings('ignore')
- def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
- url = _env.get_val('mongo.' + mongo_name + '.url')
- client = AsyncIOMotorClient(url)
- return client[db]
- pass
- def get_aio_kafka_consumer(topic, name='base', group_id='data-clean-group'):
- hosts = _env.get_val('kafka.' + name + '.hosts')
- consumer = AIOKafkaConsumer(
- topic,
- bootstrap_servers=hosts,
- group_id=group_id)
- return consumer
- pass
- def get_aio_kafka_producer(name='base'):
- hosts = _env.get_val('kafka.' + name + '.hosts')
- producer = AIOKafkaProducer(bootstrap_servers=hosts)
- return producer
- pass
- def get_aio_elasticsearch(es_name='new') -> AsyncElasticsearch:
- hosts = _env.get_val('es.' + es_name + '.hosts')
- username = _env.get_val('es.' + es_name + '.username')
- password = _env.get_val('es.' + es_name + '.pwd')
- use_ssl = False
- return AsyncElasticsearch(hosts=hosts, http_auth=(username, password))
- async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection:
- host = _env.get_val('rabbit_mq.' + name + '.host')
- username = _env.get_val('rabbit_mq.' + name + '.username')
- password = _env.get_val('rabbit_mq.' + name + '.password')
- port = int(_env.get_val('rabbit_mq.' + name + '.port'))
- # port = 32675
- virtual_host = '/'
- loop = asyncio.get_event_loop()
- return await aio_pika.connect_robust(
- host=host, password=password, virtualhost=virtual_host
- , port=port
- , login=username
- , loop=loop
- )
- pass
- if __name__ == '__main__':
- pass
|