Ver Fonte

fix: 数据聚合多线程不中止问题

许家凯 há 2 anos atrás
pai
commit
28f1d5a185
2 ficheiros alterados com 90 adições e 10 exclusões
  1. 33 4
      spider/cpa_agg.py
  2. 57 6
      utils/xxl_queue.py

+ 33 - 4
spider/cpa_agg.py

@@ -81,6 +81,8 @@ def data_transform(data: list):
 
     li = []
 
+    winhc_spider_date = winhc_data['spider_date']
+
     holo_keys = None
 
     for i in key_set:
@@ -106,6 +108,9 @@ def data_transform(data: list):
             continue
             pass
 
+        if winhc_spider_date is None:
+            winhc_spider_date = get_now()
+
         other_data = {
             "id": tmp_json['company_id'] + "_" + ds + "_" + i,
             "dim_name": i,
@@ -117,8 +122,8 @@ def data_transform(data: list):
             "winhc_dim_date": winhc_dim_date,
             "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
                                       'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
-            "update_time": get_now(),
-            "create_time": get_now(),
+            "update_time": winhc_spider_date,
+            "create_time": winhc_spider_date,
             "ds": ds,
         }
         tmp_json.update(other_data)
@@ -149,11 +154,32 @@ class Work(Thread):
             data_transform(q.get())
 
 
+today_ds = get_ds()
+
+scan_ds = today_ds[:-2]
+
+
+def overwrite_handle(key, obj_list):
+    if obj_list is None or len(obj_list) == 0:
+        return
+    _id = obj_list[0]['_id']
+    if not key.startswith(today_ds) and len(obj_list) == 1:
+        deleted_count = col.delete_one({'_id': _id}).deleted_count
+        log.info(f"delete id: {_id} , {deleted_count}")
+    else:
+        # log.info(f"skip :{_id}")
+        pass
+
+    pass
+
+
 def main(max_round: int = 2, interval_of_sed: int = 300):
     thread_num = 10
 
     for i in range(thread_num):
-        Work().start()
+        w = Work()
+        w.setDaemon(True)
+        w.start()
         pass
 
     round_num = 0
@@ -162,8 +188,10 @@ def main(max_round: int = 2, interval_of_sed: int = 300):
         ds = get_ds()
         # ds = '20221205'
         log.info('{},第{}遍轮循...'.format(ds, round_num))
-        xxl_q = xxl_queue(pop_threshold=2)
+        xxl_q = xxl_queue(pop_threshold=2, overwrite_handle=overwrite_handle)
+        # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
         for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
+            # for i in col.find().batch_size(200):
             _id = i['_id']
             key = _id[:_id.rfind('_')]
             result = xxl_q.append(key=key, obj=i)
@@ -171,6 +199,7 @@ def main(max_round: int = 2, interval_of_sed: int = 300):
                 q.put(result)
             pass
         if round_num >= max_round:
+            # sys.exit(0)
             break
 
         try:

+ 57 - 6
utils/xxl_queue.py

@@ -4,11 +4,56 @@
 # @File : xxl_queue
 # @Software: PyCharm
 
+from collections import OrderedDict
+from log import get_log
+
+log = get_log("xxl_queue")
+o = OrderedDict()
 
-class xxl_queue:
+
+class xxl_queue_plus:
 
     def __init__(self, pop_threshold, buff_size=100):
         """
+               类似于开心消消乐的定长队列,攒够阈值,则弹出数据
+
+               注意:该队列目前不能按照最新插入时间,调整覆盖
+               :param pop_threshold: 弹出数据阈值
+               :param buff_size: 缓冲区大小
+        """
+        assert isinstance(buff_size, int) and buff_size > 0, "请给我一个大于零的整数来指定长度"
+        self.buff_size = buff_size
+        self.pop_threshold = pop_threshold
+        self.data = {}
+
+        pass
+
+    def append(self, key, obj):
+        result_list = None
+        flag = False
+
+        if key in self.data:
+            _obj_list = self.data[key]
+            if len(_obj_list) + 1 >= self.pop_threshold:
+                result_list = _obj_list.copy()
+                result_list.append(obj)
+                del self.data[key]
+                return result_list
+                pass
+
+            pass
+
+
+def default_overwrite_handle(key, obj_list):
+    log.info(f"overwrite record , key:{key} ojb_list:{obj_list}")
+
+    pass
+
+
+class xxl_queue:
+
+    def __init__(self, pop_threshold, buff_size=100, overwrite_handle=None):
+        """
         类似于开心消消乐的定长队列,攒够阈值,则弹出数据
 
         注意:该队列目前不能按照最新插入时间,调整覆盖
@@ -20,6 +65,7 @@ class xxl_queue:
         self.data = [None] * self.len
         self.hand = 0
         self.pop_threshold = pop_threshold
+        self.overwrite_handle = overwrite_handle
 
         pass
 
@@ -60,6 +106,11 @@ class xxl_queue:
         elif self.data[index][0] == key:
             self.data[index][1].append(obj)
         else:
+            if self.overwrite_handle is not None:
+                _key, _obj_list = self.data[index]
+                self.overwrite_handle(_key, _obj_list)
+                pass
+
             self.data[index] = (key, [obj])  # 覆盖当前index
             self.hand = (self.hand + 1) % self.len  # 指针后移
             pass
@@ -72,21 +123,21 @@ class xxl_queue:
 
 
 if __name__ == '__main__':
-    q = xxl_queue(pop_threshold=3, buff_size=5)
+    q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle)
 
     print(q.append('a', '1'))
     print(q.append('a', '2'))
-    print(q.append('a', '3'))
+    # print(q.append('a', '3'))
     print(q.append('b', '1'))
     print(q.append('b', '2'))
-    print(q.append('b', '3'))
+    # print(q.append('b', '3'))
     print(q.append('a', '3'))
     print(q.append('b', '3'))
     print(q.append('c', '3'))
     print(q.append('d', '3'))
     print(q.append('e', '3'))
     print(q.append('f', '3'))
-    print(q.append('a', '1'))
-    print(q.append('a', '2'))
+    # print(q.append('a', '1'))
+    # print(q.append('a', '2'))
 
     pass