Explorar o código

feat: holo程序独立写入

许家凯 %!s(int64=2) %!d(string=hai) anos
pai
achega
d565085827
Modificáronse 5 ficheiros con 162 adicións e 13 borrados
  1. 3 2
      log.py
  2. 25 10
      spider/cpa_agg.py
  3. 95 0
      spider/insert_holo.py
  4. 34 1
      utils/base_utils.py
  5. 5 0
      utils/datetime_utils.py

+ 3 - 2
log.py

@@ -10,12 +10,13 @@ import time
 import platform
 
 AUTO_WRITE_LOG_FILE = False if platform.system() == "Linux" else True
-
+AUTO_WRITE_LOG_FILE = False
 
 def get_log(log_name="my_log", write_file=AUTO_WRITE_LOG_FILE):
     logging.basicConfig()
     logger = logging.getLogger(log_name)
-    logger.setLevel(logging.INFO)
+    logger.setLevel(logging.WARNING)
+    # logger.setLevel(logging.INFO)
     logger.propagate = False
     # 日志输出格式
     formatter = logging.Formatter('[%(asctime)s - ' + log_name + ':%(lineno)d] - %(levelname)s: %(message)s')

+ 25 - 10
spider/cpa_agg.py

@@ -9,9 +9,9 @@ from threading import Thread
 from utils.datetime_utils import datetime_format
 from log import get_log
 from sdk.WinhcAllClient import get_all_client
-from utils.datetime_utils import get_ds, get_now, datetime_format_transform
+from utils.datetime_utils import get_ds, get_now, datetime_format_transform, get_yesterday_ds
 from utils import map_2_json_str, json_path
-from utils.base_utils import tuple_max
+from utils.base_utils import tuple_max, get_str_intersection
 from utils.mysql_utils import insert_many
 from utils.xxl_queue import xxl_queue
 import re
@@ -28,6 +28,7 @@ log = get_log('cpa_agg')
 
 holo_client = all_client.get_holo_client(db='winhc_biz')
 HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
+all_sdk = get_all_client()
 
 
 def get_max_data(data: list, key: str, exclude_product_name: list = ['winhc']):
@@ -59,7 +60,7 @@ def get_all_data_by_item(data: list, key):
     return result_data
 
 
-def data_transform(data: list):
+def data_transform(data: list, rabbit_mq):
     log.info('input data: {}'.format(data))
     deleted_key = [i['_id'] for i in data][0]
     deleted_key = deleted_key[:deleted_key.rfind('_')]
@@ -135,7 +136,11 @@ def data_transform(data: list):
     log.info('output data: {}'.format(li))
 
     if li is not None and len(li) > 0:
-        insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
+        # insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
+        li = [tuple(i.values()) for i in li]
+        for i in li:
+            rabbit_mq.send_by_fanout("cpa_insert_holo", json.dumps(i, ensure_ascii=False).encode())
+            pass
 
     del_num = 0
     try:
@@ -153,13 +158,19 @@ q = queue.Queue(5000)
 
 class Work(Thread):
     def run(self):
+        r_sdk = all_sdk.get_rabbit_mq_sdk()
         while True:
-            data_transform(q.get())
+            data_transform(q.get(), r_sdk)
 
 
 today_ds = get_ds()
+yesterday_ds = get_yesterday_ds()
+
+# scan_ds = today_ds[:-2]
+scan_ds = get_str_intersection(today_ds, yesterday_ds)
+
 
-scan_ds = today_ds[:-2]
+# scan_ds = '2022'
 
 
 def overwrite_handle(key, obj_list):
@@ -176,8 +187,11 @@ def overwrite_handle(key, obj_list):
     pass
 
 
+import tqdm
+
+
 def main(max_round: int = 2, interval_of_sed: int = 300):
-    thread_num = 10
+    thread_num = 20
 
     for i in range(thread_num):
         w = Work()
@@ -189,10 +203,11 @@ def main(max_round: int = 2, interval_of_sed: int = 300):
     while True:
         round_num += 1
 
-        log.info('{},第{}遍轮循...'.format(scan_ds, round_num))
+        print('{},第{}遍轮循...'.format(scan_ds, round_num))
         xxl_q = xxl_queue(pop_threshold=2, overwrite_handle=overwrite_handle)
         # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
-        for i in col.find({"_id": {"$regex": "^" + scan_ds}}).batch_size(200):
+
+        for i in tqdm.tqdm(col.find({"_id": {"$regex": "^" + scan_ds}}).batch_size(200)):
             # for i in col.find().batch_size(200):
             _id = i['_id']
             key = _id[:_id.rfind('_')]
@@ -205,7 +220,7 @@ def main(max_round: int = 2, interval_of_sed: int = 300):
             break
 
         try:
-            log.info('{},第{}遍轮循结束.'.format(scan_ds, round_num))
+            print('{},第{}遍轮循结束.'.format(scan_ds, round_num))
             time.sleep(interval_of_sed)
             pass
         except:

+ 95 - 0
spider/insert_holo.py

@@ -0,0 +1,95 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/1/3 15:03
+# @Author : XuJiakai
+# @File : insert_holo
+# @Software: PyCharm
+from threading import Thread
+import json
+import queue
+from sdk.WinhcAllClient import get_all_client
+import time
+from log import get_log
+import tqdm
+
+log = get_log('cpa_agg')
+q = queue.Queue(50000)
+
+all_client = get_all_client()
+db_client = all_client.get_holo_client(db='winhc_biz')
+
+sql = """INSERT INTO public.ads_waa_dim_info(company_id,company_name,company_org_type,province,city,county,org_number,credit_code,reg_number,cate_first,cate_second,cate_third,id,dim_name,dim_max_num,dim_max_num_business_name,winhc_dim_num,dim_max_date,dim_max_date_business_name,winhc_dim_date,other_info,update_time,create_time,ds) 
+values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING """
+
+
+def run(data):
+    if len(data) == 0:
+        return
+    with db_client.cursor() as cursor:
+        try:
+            # print('start : %s ' % time.time())
+            num = cursor.executemany(sql, tuple(data))
+            db_client.commit()
+            # print('end : %s ' % time.time())
+            return num
+        except Exception as e:
+            log.error("insert exec error: {}".format(e))
+            db_client.rollback()
+            return -1
+    pass
+
+
+class Work(Thread):
+    def run(self):
+        t = tqdm.tqdm()
+        bulk_list = []
+        flag = 0
+        while True:
+            try:
+                ele = q.get(timeout=5)
+                bulk_list.append(ele)
+                t.update()
+            except:
+                log.info("等待超时...")
+                pass
+
+            if len(bulk_list) > 40000:
+                log.info(f"溢出触发,缓冲队列剩余:{len(bulk_list)}")
+                run(bulk_list)
+                bulk_list = []
+                flag = 0
+                pass
+            else:
+                flag += 1
+                if flag > 3:
+                    log.info(f"手动触发,缓冲队列剩余:{len(bulk_list)}")
+                    run(bulk_list)
+                    bulk_list = []
+                    flag = 0
+                    pass
+
+
+r_sdk = all_client.get_rabbit_mq_sdk()
+
+
+def main():
+    thread_num = 10
+
+    for i in range(thread_num):
+        log.info("启动一个线程...")
+        Work().start()
+        pass
+
+    def callback(ch, method, properties, body):
+        data = json.loads(body.decode())
+        q.put(data)
+        ch.basic_ack(delivery_tag=method.delivery_tag)  # 手动应答ack,确保消息真正消费后才应答
+        pass
+
+    r_sdk.consumer_by_fanout("cpa_insert_holo", queue='cpa_save_holo', callback=callback)
+
+    pass
+
+
+if __name__ == '__main__':
+    main()
+    pass

+ 34 - 1
utils/base_utils.py

@@ -59,6 +59,39 @@ def tuple_max(*tu: tuple):
     return result
 
 
+def get_str_intersection(*ele: str):
+    ele = [i for i in ele if i is not None]
+    if len(ele) == 0:
+        return None
+    if len(ele) == 1:
+        return ele[0]
+    res = ''
+    min_len = min([len(i) for i in ele])
+
+    for i in range(min_len):
+        tmp_v = None
+        for j in ele:
+            if tmp_v is None:
+                tmp_v = j[i]
+            else:
+                if not tmp_v == j[i]:
+                    return res
+        res += tmp_v
+        pass
+    return res
+    pass
+
+
 if __name__ == '__main__':
-    print(tuple_max('4', None))
+    # print(tuple_max('4', None))
+    print(get_str_intersection('4a5', '345'))
+    from utils.datetime_utils import get_ds,get_yesterday_ds
+
+    today_ds = get_ds()
+    yesterday_ds = get_yesterday_ds()
+
+    # scan_ds = today_ds[:-2]
+    scan_ds = get_str_intersection(today_ds, yesterday_ds)
+    print(scan_ds)
+
     pass

+ 5 - 0
utils/datetime_utils.py

@@ -12,6 +12,11 @@ def get_ds():
     return time.strftime("%Y%m%d")
 
 
+def get_yesterday_ds():
+    return (datetime.date.today() + datetime.timedelta(days=-1)).strftime("%Y%m%d")
+    pass
+
+
 def get_now():
     return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')