123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/12/1 14:46
- # @Author : XuJiakai
- # @File : cpa_agg
- # @Software: PyCharm
- import json
- import time, queue
- from threading import Thread
- from utils.datetime_utils import datetime_format
- from log import get_log
- from sdk.WinhcAllClient import get_all_client
- from utils.datetime_utils import get_ds, get_now, datetime_format_transform, get_yesterday_ds
- from utils import map_2_json_str, json_path
- from utils.base_utils import tuple_max, get_str_intersection
- from utils.mysql_utils import insert_many
- from utils.xxl_queue import xxl_queue
- import re
- import sys
- import argparse
- from project_const import TOPIC_NAME, MONGODB_NAME
- date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
- all_client = get_all_client()
- col = all_client.get_mongo_collection(MONGODB_NAME)
- del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del')
- log = get_log('cpa_agg')
- holo_client = all_client.get_holo_client(db='winhc_biz')
- HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
- all_sdk = get_all_client()
- def get_max_data(data: list, key: str, exclude_product_name: list = ['winhc']):
- max_data = None
- for i in data:
- tmp_v = json_path(i, key)
- if tmp_v is None:
- continue
- product_name = i['competitor_product_name']
- if product_name in exclude_product_name:
- continue
- pass
- if max_data is None:
- max_data = (tmp_v, product_name)
- else:
- max_data = tuple_max(max_data, (tmp_v, product_name))
- if max_data is None:
- return None, None
- return max_data
- def get_all_data_by_item(data: list, key):
- result_data = {}
- for i in data:
- result_data[i['competitor_product_name']] = json_path(i, key)
- return result_data
- def data_transform(data: list, rabbit_mq):
- log.info('input data: {}'.format(data))
- deleted_key = [i['_id'] for i in data][0]
- deleted_key = deleted_key[:deleted_key.rfind('_')]
- base_info = data[0]['base_info']
- ds = get_ds()
- key_set = set()
- winhc_data = None
- for i in data:
- key_set = key_set | set(i['summary'].keys())
- key_set = key_set | set(i['latest_date'].keys())
- if i['competitor_product_name'] == 'winhc':
- winhc_data = i
- pass
- pass
- if winhc_data is None:
- return
- li = []
- winhc_spider_date = winhc_data['spider_date']
- holo_keys = None
- for i in key_set:
- tmp_json = base_info.copy()
- summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i)
- latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i)
- winhc_dim_num = json_path(winhc_data, '$.summary.' + i)
- latest_date_max = datetime_format(latest_date_max)
- winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i)
- if winhc_dim_date is not None and winhc_dim_date == '':
- winhc_dim_date = None
- winhc_dim_date = datetime_format(winhc_dim_date)
- if (latest_date_max is None or latest_date_max == '') and (
- summary_max is None or summary_max == 0) and winhc_dim_date is None and (
- winhc_dim_num is None or winhc_dim_num == 0):
- # print('这个维度为空...', i, )
- continue
- pass
- if winhc_spider_date is None:
- winhc_spider_date = get_now()
- other_data = {
- "id": tmp_json['company_id'] + "_" + ds + "_" + i,
- "dim_name": i,
- "dim_max_num": summary_max,
- "dim_max_num_business_name": summary_max_p_name,
- "winhc_dim_num": winhc_dim_num,
- "dim_max_date": latest_date_max,
- "dim_max_date_business_name": latest_date_max_p_name,
- "winhc_dim_date": winhc_dim_date,
- "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
- 'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
- "update_time": winhc_spider_date,
- "create_time": winhc_spider_date,
- "ds": ds,
- }
- tmp_json.update(other_data)
- li.append(tmp_json)
- if holo_keys is None:
- holo_keys = list(tmp_json.keys())
- pass
- log.info('output data: {}'.format(li))
- if li is not None and len(li) > 0:
- # insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
- li = [tuple(i.values()) for i in li]
- for i in li:
- rabbit_mq.send_by_fanout("cpa_insert_holo", json.dumps(i, ensure_ascii=False).encode())
- pass
- del_num = 0
- try:
- del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count
- del_col.insert_many(data, ordered=False)
- except:
- pass
- log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num))
- return li
- q = queue.Queue(5000)
- class Work(Thread):
- def run(self):
- r_sdk = all_sdk.get_rabbit_mq_sdk()
- while True:
- data_transform(q.get(), r_sdk)
- today_ds = get_ds()
- yesterday_ds = get_yesterday_ds()
- # yesterday_ds = '20221231'
- # scan_ds = today_ds[:-2]
- scan_ds = get_str_intersection(today_ds, yesterday_ds)
- # scan_ds = '2022'
- def overwrite_handle(key, obj_list):
- if obj_list is None or len(obj_list) == 0:
- return
- _id = obj_list[0]['_id']
- if not key.startswith(today_ds) and len(obj_list) == 1:
- deleted_count = col.delete_one({'_id': _id}).deleted_count
- log.info(f"delete id: {_id} , {deleted_count}")
- else:
- # log.info(f"skip :{_id}")
- pass
- pass
- import tqdm
- def main(max_round: int = 2, interval_of_sed: int = 300):
- thread_num = 20
- for i in range(thread_num):
- w = Work()
- w.setDaemon(True)
- w.start()
- pass
- round_num = 0
- while True:
- round_num += 1
- print('{},第{}遍轮循...'.format(scan_ds, round_num))
- xxl_q = xxl_queue(pop_threshold=2, overwrite_handle=overwrite_handle)
- # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
- for i in tqdm.tqdm(col.find({"_id": {"$regex": "^" + scan_ds}}).batch_size(200)):
- # for i in col.find().batch_size(200):
- _id = i['_id']
- key = _id[:_id.rfind('_')]
- result = xxl_q.append(key=key, obj=i)
- if result:
- q.put(result)
- pass
- if round_num >= max_round:
- # sys.exit(0)
- break
- try:
- print('{},第{}遍轮循结束.'.format(scan_ds, round_num))
- time.sleep(interval_of_sed)
- pass
- except:
- pass
- pass
- pass
- if __name__ == '__main__':
- # test()
- #
- log.info(f"input args: {sys.argv}")
- parser = argparse.ArgumentParser()
- parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次')
- parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)')
- args = parser.parse_args()
- main(max_round=args.max_round, interval_of_sed=args.interval_of_sed)
- while not q.empty():
- log.info(f"遍历未结束,队列剩余:{q.qsize()}")
- time.sleep(300)
- pass
- log.info(f"遍历完成,队列剩余:{q.qsize()}")
- pass
|