# -*- coding: utf-8 -*- # @Time : 2023/8/28 17:32 # @Author : XuJiakai # @File : asyncio_es_scan # @Software: PyCharm from loguru import logger as log from data_clean.utils.async_client import get_aio_elasticsearch from data_clean.utils.base_utils import parse_env_and_name class AsyncioElasticsearchFastScan: def __init__(self, index, query_dsl, env='new', doc_type: str = '_doc'): env, index = parse_env_and_name(index, env) self.es = get_aio_elasticsearch(env) self.env = env self.index = index self.query_dsl = query_dsl self.doc_type = doc_type pass async def scan(self, scroll='5m', timeout='5m', size=500): result_data = await self.es.search(index=self.index , doc_type=self.doc_type , body=self.query_dsl , scroll=scroll , timeout=timeout , size=size, ) hits = result_data.get("hits").get("hits") total = result_data["hits"]["total"] log.info( "total record : {}", total ) log.info("Processing a batch of data: {} - {}", 0, size) yield hits scroll_id = result_data["_scroll_id"] for i in range(int(total / size)): res = await self.es.scroll(scroll_id=scroll_id, scroll=scroll) log.info("Processing a batch of data: {} - {}", (i + 1) * size, (i + 2) * size) yield res["hits"]["hits"] log.info("scan successful ! ") await self.es.close() pass