12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/8/28 17:32
- # @Author : XuJiakai
- # @File : asyncio_es_scan
- # @Software: PyCharm
- from loguru import logger as log
- from data_clean.utils.async_client import get_aio_elasticsearch
- from data_clean.utils.base_utils import parse_env_and_name
- class AsyncioElasticsearchFastScan:
- def __init__(self, index, query_dsl, env='new', doc_type: str = '_doc'):
- env, index = parse_env_and_name(index, env)
- self.es = get_aio_elasticsearch(env)
- self.env = env
- self.index = index
- self.query_dsl = query_dsl
- self.doc_type = doc_type
- pass
- async def scan(self, scroll='5m', timeout='5m', size=500):
- result_data = await self.es.search(index=self.index
- , doc_type=self.doc_type
- , body=self.query_dsl
- , scroll=scroll
- , timeout=timeout
- , size=size,
- )
- hits = result_data.get("hits").get("hits")
- total = result_data["hits"]["total"]
- log.info(
- "total record : {}", total
- )
- log.info("Processing a batch of data: {} - {}", 0, size)
- yield hits
- scroll_id = result_data["_scroll_id"]
- for i in range(int(total / size)):
- res = await self.es.scroll(scroll_id=scroll_id, scroll=scroll)
- log.info("Processing a batch of data: {} - {}", (i + 1) * size, (i + 2) * size)
- yield res["hits"]["hits"]
- log.info("scan successful ! ")
- await self.es.close()
- pass
|