123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- # -*- coding: utf-8 -*-
- # @Time : 2021/5/24 17:15
- # @Author : XuJiakai
- # @File : WinhcElasticSearchSDK
- # @Software: PyCharm
- from log import get_log
- from elasticsearch.exceptions import NotFoundError
- from sdk.WinhcAllClient import WinhcAllClient
- log = get_log('WinhcElasticSearchSDK')
- class WinhcElasticSearchSDK:
- def __init__(self, es_name):
- self.es = WinhcAllClient().get_es_client(es_name)
- def exists(self, index, doc_type, _id):
- return self.es.transport.perform_request("HEAD", "/" + index + "/" + doc_type + "/" + _id)
- def get(self, index, doc_type, _id):
- try:
- res = self.es.get(index=index, doc_type=doc_type, id=_id)['_source']
- return res
- except NotFoundError:
- return None
- def query_record_num(self, index, doc_type, dsl):
- dsl['size'] = 0
- return self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['total']
- pass
- def query(self, index, doc_type, dsl):
- res = self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['hits']
- if len(res) < 1:
- return []
- docs = []
- for i in range(len(res)):
- _id = res[i]['_id']
- doc = res[i]['_source']
- doc['_id'] = _id
- docs.append(doc)
- return docs
- def perform_get(self, url):
- return self.es.transport.perform_request(
- "GET",
- url,
- )
- pass
- def perform_post(self, url, body):
- return self.es.transport.perform_request(
- "POST",
- url,
- body=body
- )
- pass
- def scan(self, index, doc_type, query_dsl, func, scroll='5m', timeout='5m', size=500):
- queryData = self.es.search(
- index=index,
- doc_type=doc_type,
- scroll=scroll,
- timeout=timeout,
- size=size,
- body=query_dsl
- )
- hits = queryData.get("hits").get("hits")
- total = queryData["hits"]["total"]
- log.info(
- "total record : %s", total
- )
- log.info("Processing a batch of data: %s - %s", 0, size)
- func(hits)
- scroll_id = queryData["_scroll_id"]
- for i in range(int(total / size)):
- res = self.es.scroll(scroll_id=scroll_id, scroll=scroll)
- log.info("Processing a batch of data: %s - %s", (i + 1) * size, (i + 2) * size)
- func(res["hits"]["hits"])
- log.info("scan successful ! ")
- def get_es_sdk(env="new"):
- return WinhcElasticSearchSDK(es_name=env)
- pass
- def get_old_es():
- return WinhcElasticSearchSDK(es_name='old')
- def get_new_es():
- return WinhcElasticSearchSDK(es_name='new')
- if __name__ == '__main__':
- old_es = get_old_es()
- res = old_es.get("wenshu_detail3", "wenshu_detail_type", "1faa94669633b078966da8905c6aa9b5")
- print(res)
- pass
|