# -*- coding: utf-8 -*- # @Time : 2021/5/24 17:15 # @Author : XuJiakai # @File : WinhcElasticSearchSDK # @Software: PyCharm from log import get_log from elasticsearch.exceptions import NotFoundError from sdk.WinhcAllClient import WinhcAllClient log = get_log('WinhcElasticSearchSDK') class WinhcElasticSearchSDK: def __init__(self, es_name): self.es = WinhcAllClient().get_es_client(es_name) def exists(self, index, doc_type, _id): return self.es.transport.perform_request("HEAD", "/" + index + "/" + doc_type + "/" + _id) def get(self, index, doc_type, _id): try: res = self.es.get(index=index, doc_type=doc_type, id=_id)['_source'] return res except NotFoundError: return None def query_record_num(self, index, doc_type, dsl): dsl['size'] = 0 return self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['total'] pass def query(self, index, doc_type, dsl): res = self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['hits'] if len(res) < 1: return [] docs = [] for i in range(len(res)): _id = res[i]['_id'] doc = res[i]['_source'] doc['_id'] = _id docs.append(doc) return docs def perform_get(self, url): return self.es.transport.perform_request( "GET", url, ) pass def perform_post(self, url, body): return self.es.transport.perform_request( "POST", url, body=body ) pass def scan(self, index, doc_type, query_dsl, func, scroll='5m', timeout='5m', size=500): queryData = self.es.search( index=index, doc_type=doc_type, scroll=scroll, timeout=timeout, size=size, body=query_dsl ) hits = queryData.get("hits").get("hits") total = queryData["hits"]["total"] log.info( "total record : %s", total ) log.info("Processing a batch of data: %s - %s", 0, size) func(hits) scroll_id = queryData["_scroll_id"] for i in range(int(total / size)): res = self.es.scroll(scroll_id=scroll_id, scroll=scroll) log.info("Processing a batch of data: %s - %s", (i + 1) * size, (i + 2) * size) func(res["hits"]["hits"]) log.info("scan successful ! ") def get_es_sdk(env="new"): return WinhcElasticSearchSDK(es_name=env) pass def get_old_es(): return WinhcElasticSearchSDK(es_name='old') def get_new_es(): return WinhcElasticSearchSDK(es_name='new') if __name__ == '__main__': old_es = get_old_es() res = old_es.get("wenshu_detail3", "wenshu_detail_type", "1faa94669633b078966da8905c6aa9b5") print(res) pass