Bläddra i källkod

feat: 添加数据聚合对比

许家凯 2 år sedan
förälder
incheckning
ea487ce6f2
3 ändrade filer med 187 tillägg och 5 borttagningar
  1. 13 4
      handle/pull_sample_data.py
  2. 3 1
      main.py
  3. 171 0
      spider/cpa_agg.py

+ 13 - 4
handle/pull_sample_data.py

@@ -3,7 +3,7 @@
 # @Author : XuJiakai
 # @File : pull_sample_data
 # @Software: PyCharm
-import json, os
+import json, os, sys
 from project_const import TOPIC_NAME
 from sdk.WinhcElasticSearchSDK import get_new_es
 from utils.base_utils import json_path
@@ -13,6 +13,9 @@ from utils.pca_code_utils import get_name
 from sdk.WinhcAllClient import get_all_client
 from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
 from sdk.WinhcAllClient import get_odps_sdk
+from log import get_log
+
+log = get_log('pull_sample_data')
 
 _module_path = os.path.dirname(__file__)
 
@@ -34,6 +37,7 @@ def _send_rabbit(li: list):
 
 def pull_by_es(size: int = 20):
     assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
+    log.info(f"pull {size} record for es")
 
     dsl = {
         "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
@@ -102,6 +106,8 @@ odps_sdk = get_odps_sdk()
 
 
 def pull_by_max(size=100000):
+    log.info(f"pull {size} record for max")
+
     f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
     sql = f.read().strip()
 
@@ -118,7 +124,7 @@ def pull_by_max(size=100000):
         instance.wait_for_success()
 
     with odps_sdk.execute_sql(
-            'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 1000').open_reader(
+            'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + '').open_reader(
         tunnel=True) as reader:
         for record in reader:
             c = get_value(
@@ -149,6 +155,9 @@ def pull_by_max(size=100000):
 
 
 if __name__ == '__main__':
-    # pull_by_es(size=100)
-    pull_by_max(size=10000)
+    log.info(f"input args: {sys.argv}")
+    if len(sys.argv) >= 2:
+        pull_by_max(size=int(sys.argv[1]))
+    else:
+        pull_by_max(size=1000)
     pass

+ 3 - 1
main.py

@@ -5,8 +5,10 @@
 
 import sys
 from spider.winhc_job import main as winhc_jab_main
+from log import get_log
 
+log = get_log('main')
 if __name__ == '__main__':
-    print(f"input args: {sys.argv}")
+    log.info(f"input args: {sys.argv}")
 
     winhc_jab_main()

+ 171 - 0
spider/cpa_agg.py

@@ -0,0 +1,171 @@
+# -*- coding: utf-8 -*-
+# @Time : 2022/12/1 14:46
+# @Author : XuJiakai
+# @File : cpa_agg
+# @Software: PyCharm
+import json
+import time
+
+from log import get_log
+from sdk.WinhcAllClient import get_all_client
+from utils.datetime_utils import get_ds, get_now, datetime_format_transform
+from utils import map_2_json_str, json_path
+from utils.base_utils import tuple_max
+from utils.mysql_utils import insert_many
+from utils.xxl_queue import xxl_queue
+import re
+import sys
+import argparse
+from project_const import TOPIC_NAME, MONGODB_NAME
+
+date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
+all_client = get_all_client()
+
+col = all_client.get_mongo_collection(MONGODB_NAME)
+del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del')
+log = get_log('cpa_agg')
+
+holo_client = all_client.get_holo_client(db='winhc_biz')
+HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
+
+
+def get_max_data(data: list, key: str):
+    max_data = None
+    for i in data:
+        tmp_v = json_path(i, key)
+        if tmp_v is None:
+            continue
+        product_name = i['competitor_product_name']
+        if max_data is None:
+            max_data = (tmp_v, product_name)
+        else:
+            max_data = tuple_max(max_data, (tmp_v, product_name))
+
+    if max_data is None:
+        return None, None
+    return max_data
+
+
+def get_all_data_by_item(data: list, key):
+    result_data = {}
+    for i in data:
+        result_data[i['competitor_product_name']] = json_path(i, key)
+    return result_data
+
+
+def data_transform(data: list):
+    log.info('input data: {}'.format(data))
+    deleted_key = [i['_id'] for i in data][0]
+    deleted_key = deleted_key[:deleted_key.rfind('_')]
+
+    base_info = data[0]['base_info']
+    ds = get_ds()
+    key_set = set()
+    winhc_data = None
+    for i in data:
+        key_set = key_set | set(i['summary'].keys())
+        key_set = key_set | set(i['latest_date'].keys())
+        if i['competitor_product_name'] == 'winhc':
+            winhc_data = i
+            pass
+        pass
+
+    if winhc_data is None:
+        return
+
+    li = []
+
+    holo_keys = None
+
+    for i in key_set:
+        tmp_json = base_info.copy()
+
+        summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i)
+        latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i)
+        if (latest_date_max is None or latest_date_max == '') and (summary_max is None or summary_max == 0):
+            # print('这个维度为空...', i, )
+            continue
+            pass
+        if latest_date_max is not None and date_part.match(latest_date_max):
+            latest_date_max = datetime_format_transform(latest_date_max, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S")
+            pass
+
+        winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i)
+        if winhc_dim_date is not None and winhc_dim_date == '':
+            winhc_dim_date = None
+
+        if winhc_dim_date is not None and date_part.match(winhc_dim_date):
+            winhc_dim_date = datetime_format_transform(winhc_dim_date, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S")
+            pass
+        other_data = {
+            "id": tmp_json['company_id'] + "_" + ds + "_" + i,
+            "dim_name": i,
+            "dim_max_num": summary_max,
+            "dim_max_num_business_name": summary_max_p_name,
+            "winhc_dim_num": json_path(winhc_data, '$.summary.' + i),
+            "dim_max_date": latest_date_max,
+            "dim_max_date_business_name": latest_date_max_p_name,
+            "winhc_dim_date": winhc_dim_date,
+            "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
+                                      'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
+            "update_time": get_now(),
+            "create_time": get_now(),
+            "ds": ds,
+        }
+        tmp_json.update(other_data)
+        li.append(tmp_json)
+        if holo_keys is None:
+            holo_keys = list(tmp_json.keys())
+        pass
+
+    log.info('output data: {}'.format(li))
+    insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
+    del_num = 0
+    try:
+        del_col.insert_many(data, ordered=False)
+        del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count
+    except:
+        pass
+    log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num))
+
+    return li
+
+
+def main(max_round: int = 2, interval_of_sed: int = 300):
+    round_num = 0
+    while True:
+        round_num += 1
+        ds = get_ds()
+        # ds = '20221205'
+        log.info('{},第{}遍轮循...'.format(ds, round_num))
+        q = xxl_queue(pop_threshold=2)
+        for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
+            _id = i['_id']
+            key = _id[:_id.rfind('_')]
+            result = q.append(key=key, obj=i)
+            if result:
+                data_transform(result)
+            pass
+        if round_num >= max_round:
+            break
+
+        try:
+            time.sleep(interval_of_sed)
+            pass
+        except:
+            pass
+        pass
+
+    pass
+
+
+if __name__ == '__main__':
+    log.info(f"input args: {sys.argv}")
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次')
+    parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)')
+    args = parser.parse_args()
+
+    main(max_round=args.max_round, interval_of_sed=args.interval_of_sed)
+    pass