Prechádzať zdrojové kódy

feat: 数据聚合添加多线程

许家凯 2 rokov pred
rodič
commit
0a9983dfdf
1 zmenil súbory, kde vykonal 20 pridanie a 4 odobranie
  1. 20 4
      spider/cpa_agg.py

+ 20 - 4
spider/cpa_agg.py

@@ -4,7 +4,8 @@
 # @File : cpa_agg
 # @Software: PyCharm
 import json
-import time
+import time, queue
+from threading import Thread
 from utils.datetime_utils import datetime_format
 from log import get_log
 from sdk.WinhcAllClient import get_all_client
@@ -139,20 +140,35 @@ def data_transform(data: list):
     return li
 
 
+q = queue.Queue(5000)
+
+
+class Work(Thread):
+    def run(self):
+        while True:
+            data_transform(q.get())
+
+
 def main(max_round: int = 2, interval_of_sed: int = 300):
+    thread_num = 10
+
+    for i in range(thread_num):
+        Work().start()
+        pass
+
     round_num = 0
     while True:
         round_num += 1
         ds = get_ds()
         # ds = '20221205'
         log.info('{},第{}遍轮循...'.format(ds, round_num))
-        q = xxl_queue(pop_threshold=2)
+        xxl_q = xxl_queue(pop_threshold=2)
         for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
             _id = i['_id']
             key = _id[:_id.rfind('_')]
-            result = q.append(key=key, obj=i)
+            result = xxl_q.append(key=key, obj=i)
             if result:
-                data_transform(result)
+                q.put(result)
             pass
         if round_num >= max_round:
             break