# -*- coding: utf-8 -*- # @Time : 2023/8/9 10:41 # @Author : XuJiakai # @File : es_api # @Software: PyCharm from data_clean.exception.fetch_exception import FetchException from data_clean.utils.async_client import get_aio_elasticsearch from data_clean.utils.base_utils import parse_env_and_name _new_es = get_aio_elasticsearch() _old_es = get_aio_elasticsearch(es_name='old') async def search(index: str, body: dict, doc_type: str = '_doc', env='new') -> dict: env, index = parse_env_and_name(index, env) _es = _new_es if env == 'new' else _old_es try: return await _es.search(index=index, doc_type=doc_type, body=body) pass except Exception as ex: raise FetchException(ex) pass async def get(index: str, id: str, doc_type: str = '_doc', env='new') -> dict: env, index = parse_env_and_name(index, env) _es = _new_es if env == 'new' else _old_es try: return await _es.get(index=index, doc_type=doc_type, id=id) pass except Exception as ex: raise FetchException(ex) pass if __name__ == '__main__': pass