fromEs.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. @author: Deepcold
  5. @file: fromEs.py
  6. @time: 2019/8/3 14:49
  7. """
  8. import datetime
  9. import json
  10. from bin.common.esQuery import es_query
  11. from bin.common.handleDate import set_date
  12. from bin.db.parseDate.parseData import parse_data
  13. from bin.db.parseDate.parseData_YG import parse_data_yg
  14. class GetDataFromEs(object):
  15. def __init__(self, db_config, push_date):
  16. self.db_config = db_config
  17. self.save_items = []
  18. self.push_date = push_date
  19. self.table_name = self.db_config.table_name
  20. self.query_condition = self.db_config.query_condition
  21. def get_data(self, start_date):
  22. # 遍历数据表信息
  23. print("正在读取表" + self.table_name)
  24. table_info = self.db_config[self.table_name]
  25. date = start_date[self.db_config.event_type + self.db_config.event_sub_type + self.table_name]
  26. end_date = datetime.datetime.strftime(self.push_date, "%Y-%m-%d")
  27. fields = [x for x in table_info.ALL_FIELDS.keys()]
  28. es_type = table_info.TYPE
  29. query_condition = json.dumps(self.query_condition)
  30. query_condition = query_condition.replace("start", date).replace("end", end_date)
  31. # 查询审判结果里包含终结执行程序的案件
  32. query = {
  33. "_source": fields,
  34. "query": json.loads(query_condition)}
  35. print(query)
  36. query_data = es_query(query, self.table_name, es_type)
  37. if query_data:
  38. # 查询有结果,将日期更新至最新
  39. set_date(self.db_config.event_type + self.db_config.event_sub_type + self.table_name, datetime.datetime.strftime(self.push_date, "%Y-%m-%d"))
  40. # 遍历查询结果
  41. for each in query_data:
  42. _id = each["_id"] # 案件id
  43. each = each["_source"] # es内容主体
  44. each["_id"] = _id
  45. party = self.db_config.party
  46. if party == "PLAINTIFF":
  47. parse_data_yg(self.db_config, each, self.push_date, self.save_items)
  48. else:
  49. parse_data(self.db_config, each, self.push_date, self.save_items)
  50. return self.save_items