fromMongo.py 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. """
  4. @author: Deepcold
  5. @file: fromMysql.py
  6. @time: 2019/8/3 14:49
  7. """
  8. import datetime
  9. from pymongo import MongoClient
  10. from bin.common.handleDate import set_date
  11. from bin.db.parseDate.parseData import parse_data
  12. class GetDataFromMongo(object):
  13. def __init__(self, db_config, push_date):
  14. self.db_config = db_config
  15. self.db_info = self.db_config.db_info
  16. # 连接mongodb
  17. self.client = MongoClient(self.db_info.HOST, replicaSet=self.db_info.REPLICASET)
  18. self.client[self.db_info.DBNAME].authenticate(self.db_info.USER, self.db_info.PASSWD)
  19. self.save_items = []
  20. self.push_date = push_date
  21. self.table_name = self.db_config.table_name
  22. def get_data(self, start_date):
  23. # 遍历数据表信息
  24. print("正在读取表" + self.table_name)
  25. table_info = self.db_config[self.table_name]
  26. query_words = table_info.QUERY_WORDS # 查询关键字
  27. date = start_date[self.db_config.event_type + self.db_config.event_sub_type + self.table_name]
  28. end_date = datetime.datetime.strftime(self.push_date, "%Y-%m-%d") # 要查询的结束日期
  29. db = self.client[self.db_info.DBNAME][self.table_name]
  30. limit_key = {}
  31. for temp in table_info.ALL_FIELDS.keys():
  32. limit_key[temp] = 1
  33. query = {query_words["start_date"]: {"$gt": date, "$lt": end_date}}
  34. query_data = db.find(query, limit_key, no_cursor_timeout=True)
  35. if query_data:
  36. # # 查询有结果,将日期更新至最新
  37. set_date(self.db_config.event_type + self.db_config.event_sub_type + self.table_name,
  38. datetime.datetime.strftime(self.push_date, "%Y-%m-%d"))
  39. # 解析数据
  40. for each in query_data:
  41. _id = str(each["_id"])
  42. each["_id"] = str(each["_id"])
  43. parse_data(self.db_config, each, self.push_date, self.save_items)
  44. return self.save_items