es_api.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/8/9 10:41
  3. # @Author : XuJiakai
  4. # @File : es_api
  5. # @Software: PyCharm
  6. from data_clean.exception.fetch_exception import FetchException
  7. from data_clean.utils.async_client import get_aio_elasticsearch
  8. from data_clean.utils.base_utils import parse_env_and_name
  9. _new_es = get_aio_elasticsearch()
  10. _old_es = get_aio_elasticsearch(es_name='old')
  11. async def search(index: str, body: dict, doc_type: str = '_doc', env='new') -> dict:
  12. env, index = parse_env_and_name(index, env)
  13. _es = _new_es if env == 'new' else _old_es
  14. try:
  15. return await _es.search(index=index, doc_type=doc_type, body=body)
  16. pass
  17. except Exception as ex:
  18. raise FetchException(ex)
  19. pass
  20. async def get(index: str, id: str, doc_type: str = '_doc', env='new') -> dict:
  21. env, index = parse_env_and_name(index, env)
  22. _es = _new_es if env == 'new' else _old_es
  23. try:
  24. return await _es.get(index=index, doc_type=doc_type, id=id)
  25. pass
  26. except Exception as ex:
  27. raise FetchException(ex)
  28. pass
  29. if __name__ == '__main__':
  30. pass