WinhcElasticSearchSDK.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2021/5/24 17:15
  3. # @Author : XuJiakai
  4. # @File : WinhcElasticSearchSDK
  5. # @Software: PyCharm
  6. from log import get_log
  7. from elasticsearch.exceptions import NotFoundError
  8. from sdk.WinhcAllClient import WinhcAllClient
  9. log = get_log('WinhcElasticSearchSDK')
  10. class WinhcElasticSearchSDK:
  11. def __init__(self, es_name):
  12. self.es = WinhcAllClient().get_es_client(es_name)
  13. def exists(self, index, doc_type, _id):
  14. return self.es.transport.perform_request("HEAD", "/" + index + "/" + doc_type + "/" + _id)
  15. def get(self, index, doc_type, _id):
  16. try:
  17. res = self.es.get(index=index, doc_type=doc_type, id=_id)['_source']
  18. return res
  19. except NotFoundError:
  20. return None
  21. def query_record_num(self, index, doc_type, dsl):
  22. dsl['size'] = 0
  23. return self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['total']
  24. pass
  25. def query(self, index, doc_type, dsl):
  26. res = self.es.search(body=dsl, index=index, doc_type=doc_type)['hits']['hits']
  27. if len(res) < 1:
  28. return []
  29. docs = []
  30. for i in range(len(res)):
  31. _id = res[i]['_id']
  32. doc = res[i]['_source']
  33. doc['_id'] = _id
  34. docs.append(doc)
  35. return docs
  36. def perform_get(self, url):
  37. return self.es.transport.perform_request(
  38. "GET",
  39. url,
  40. )
  41. pass
  42. def perform_post(self, url, body):
  43. return self.es.transport.perform_request(
  44. "POST",
  45. url,
  46. body=body
  47. )
  48. pass
  49. def scan(self, index, doc_type, query_dsl, func, scroll='5m', timeout='5m', size=500):
  50. queryData = self.es.search(
  51. index=index,
  52. doc_type=doc_type,
  53. scroll=scroll,
  54. timeout=timeout,
  55. size=size,
  56. body=query_dsl
  57. )
  58. hits = queryData.get("hits").get("hits")
  59. total = queryData["hits"]["total"]
  60. log.info(
  61. "total record : %s", total
  62. )
  63. log.info("Processing a batch of data: %s - %s", 0, size)
  64. func(hits)
  65. scroll_id = queryData["_scroll_id"]
  66. for i in range(int(total / size)):
  67. res = self.es.scroll(scroll_id=scroll_id, scroll=scroll)
  68. log.info("Processing a batch of data: %s - %s", (i + 1) * size, (i + 2) * size)
  69. func(res["hits"]["hits"])
  70. log.info("scan successful ! ")
  71. def get_es_sdk(env="new"):
  72. return WinhcElasticSearchSDK(es_name=env)
  73. pass
  74. def get_old_es():
  75. return WinhcElasticSearchSDK(es_name='old')
  76. def get_new_es():
  77. return WinhcElasticSearchSDK(es_name='new')
  78. if __name__ == '__main__':
  79. old_es = get_old_es()
  80. res = old_es.get("wenshu_detail3", "wenshu_detail_type", "1faa94669633b078966da8905c6aa9b5")
  81. print(res)
  82. pass