asyncio_es_scan.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/8/28 17:32
  3. # @Author : XuJiakai
  4. # @File : asyncio_es_scan
  5. # @Software: PyCharm
  6. from loguru import logger as log
  7. from data_clean.utils.async_client import get_aio_elasticsearch
  8. from data_clean.utils.base_utils import parse_env_and_name
  9. class AsyncioElasticsearchFastScan:
  10. def __init__(self, index, query_dsl, env='new', doc_type: str = '_doc'):
  11. env, index = parse_env_and_name(index, env)
  12. self.es = get_aio_elasticsearch(env)
  13. self.env = env
  14. self.index = index
  15. self.query_dsl = query_dsl
  16. self.doc_type = doc_type
  17. pass
  18. async def scan(self, scroll='5m', timeout='5m', size=500):
  19. result_data = await self.es.search(index=self.index
  20. , doc_type=self.doc_type
  21. , body=self.query_dsl
  22. , scroll=scroll
  23. , timeout=timeout
  24. , size=size,
  25. )
  26. hits = result_data.get("hits").get("hits")
  27. total = result_data["hits"]["total"]
  28. log.info(
  29. "total record : {}", total
  30. )
  31. log.info("Processing a batch of data: {} - {}", 0, size)
  32. yield hits
  33. scroll_id = result_data["_scroll_id"]
  34. for i in range(int(total / size)):
  35. res = await self.es.scroll(scroll_id=scroll_id, scroll=scroll)
  36. log.info("Processing a batch of data: {} - {}", (i + 1) * size, (i + 2) * size)
  37. yield res["hits"]["hits"]
  38. log.info("scan successful ! ")
  39. await self.es.close()
  40. pass