Browse Source

feat: 替换队列,以提高性能

许家凯 2 years ago
parent
commit
552f160e6b
2 changed files with 43 additions and 25 deletions
  1. 4 0
      spider/cpa_agg.py
  2. 39 25
      utils/xxl_queue.py

+ 4 - 0
spider/cpa_agg.py

@@ -133,6 +133,10 @@ def data_transform(data: list):
         pass
 
     log.info('output data: {}'.format(li))
+
+    if li is None or len(li) == 0:
+        return li
+
     insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
     del_num = 0
     try:

+ 39 - 25
utils/xxl_queue.py

@@ -66,28 +66,35 @@ class xxl_queue:
         self.hand = 0
         self.pop_threshold = pop_threshold
         self.overwrite_handle = overwrite_handle
+        self.index_data = {}
 
         pass
 
     def append(self, key, obj):
         result_list = None
         flag = False
-        for i in range(len(self.data)):
-            if self.data[i] is None:
-                continue
-            _key = self.data[i][0]
-            _obj_list = self.data[i][1]
-            if _key == key:
-                if len(_obj_list) + 1 >= self.pop_threshold:
-                    r = _obj_list.copy()
-                    r.append(obj)
-                    result_list = r
-                    self.data[i] = None
-                    pass
-                else:
-                    self.data[i][1].append(obj)
+
+        if key in self.index_data:
+            tmp_index = self.index_data[key]
+            if self.data[tmp_index] is None:
+                ##todo
+                pass
+            else:
+                _key = self.data[tmp_index][0]
+                _obj_list = self.data[tmp_index][1]
+                if _key == key:
+                    if len(_obj_list) + 1 >= self.pop_threshold:
+                        r = _obj_list.copy()
+                        r.append(obj)
+                        result_list = r
+                        del self.index_data[key]
+                        self.data[tmp_index] = None
+                        pass
+                    else:
+                        self.data[tmp_index][1].append(obj)
+                        pass
+                    flag = True
                     pass
-                flag = True
                 pass
             pass
 
@@ -101,6 +108,7 @@ class xxl_queue:
     def _put(self, index, key, obj):
         if self.data[index] is None:
             self.data[index] = (key, [obj])
+            self.index_data[key] = index
             self.hand = (self.hand + 1) % self.len  # 指针后移
             pass
         elif self.data[index][0] == key:
@@ -108,9 +116,11 @@ class xxl_queue:
         else:
             if self.overwrite_handle is not None:
                 _key, _obj_list = self.data[index]
+                del self.index_data[_key]
                 self.overwrite_handle(_key, _obj_list)
                 pass
 
+            self.index_data[key] = index
             self.data[index] = (key, [obj])  # 覆盖当前index
             self.hand = (self.hand + 1) % self.len  # 指针后移
             pass
@@ -121,22 +131,26 @@ class xxl_queue:
         ret.extend(self.data[:self.hand])
         return ret
 
+    def print_info(self):
+        print(f"--info-- \n  index_data: {self.index_data} \n  data: {self.data} \n")
+        pass
+
 
 if __name__ == '__main__':
     q = xxl_queue(pop_threshold=2, buff_size=3, overwrite_handle=default_overwrite_handle)
 
-    print(q.append('a', '1'))
-    print(q.append('a', '2'))
+    log.info('%s' % q.append('a', '1'))
+    log.info('%s' % q.append('a', '2'))
     # print(q.append('a', '3'))
-    print(q.append('b', '1'))
-    print(q.append('b', '2'))
+    log.info('%s' % q.append('b', '1'))
+    log.info('%s' % q.append('b', '2'))
     # print(q.append('b', '3'))
-    print(q.append('a', '3'))
-    print(q.append('b', '3'))
-    print(q.append('c', '3'))
-    print(q.append('d', '3'))
-    print(q.append('e', '3'))
-    print(q.append('f', '3'))
+    log.info('%s' % q.append('a', '3'))
+    log.info('%s' % q.append('b', '3'))
+    log.info('%s' % q.append('c', '3'))
+    log.info('%s' % q.append('d', '3'))
+    log.info('%s' % q.append('e', '3'))
+    log.info('%s' % q.append('f', '3'))
     # print(q.append('a', '1'))
     # print(q.append('a', '2'))