# -*- coding: utf-8 -*- # @Time : 2022/12/1 14:46 # @Author : XuJiakai # @File : cpa_agg # @Software: PyCharm import json import time from log import get_log from sdk.WinhcAllClient import get_all_client from utils.datetime_utils import get_ds, get_now, datetime_format_transform from utils import map_2_json_str, json_path from utils.base_utils import tuple_max from utils.mysql_utils import insert_many from utils.xxl_queue import xxl_queue import re import sys import argparse from project_const import TOPIC_NAME, MONGODB_NAME date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日') all_client = get_all_client() col = all_client.get_mongo_collection(MONGODB_NAME) del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del') log = get_log('cpa_agg') holo_client = all_client.get_holo_client(db='winhc_biz') HOLO_TABLE_NAME = 'public.ads_waa_dim_info' def get_max_data(data: list, key: str, exclude_product_name: list = ['winhc']): max_data = None for i in data: tmp_v = json_path(i, key) if tmp_v is None: continue product_name = i['competitor_product_name'] if product_name in exclude_product_name: continue pass if max_data is None: max_data = (tmp_v, product_name) else: max_data = tuple_max(max_data, (tmp_v, product_name)) if max_data is None: return None, None return max_data def get_all_data_by_item(data: list, key): result_data = {} for i in data: result_data[i['competitor_product_name']] = json_path(i, key) return result_data def data_transform(data: list): log.info('input data: {}'.format(data)) deleted_key = [i['_id'] for i in data][0] deleted_key = deleted_key[:deleted_key.rfind('_')] base_info = data[0]['base_info'] ds = get_ds() key_set = set() winhc_data = None for i in data: key_set = key_set | set(i['summary'].keys()) key_set = key_set | set(i['latest_date'].keys()) if i['competitor_product_name'] == 'winhc': winhc_data = i pass pass if winhc_data is None: return li = [] holo_keys = None for i in key_set: tmp_json = base_info.copy() summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i) latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i) winhc_dim_num = json_path(winhc_data, '$.summary.' + i) if latest_date_max is not None and date_part.match(latest_date_max): latest_date_max = datetime_format_transform(latest_date_max, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S") pass winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i) if winhc_dim_date is not None and winhc_dim_date == '': winhc_dim_date = None if winhc_dim_date is not None and date_part.match(winhc_dim_date): winhc_dim_date = datetime_format_transform(winhc_dim_date, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S") pass if (latest_date_max is None or latest_date_max == '') and ( summary_max is None or summary_max == 0) and winhc_dim_date is None and ( winhc_dim_num is None or winhc_dim_num == 0): # print('这个维度为空...', i, ) continue pass other_data = { "id": tmp_json['company_id'] + "_" + ds + "_" + i, "dim_name": i, "dim_max_num": summary_max, "dim_max_num_business_name": summary_max_p_name, "winhc_dim_num": winhc_dim_num, "dim_max_date": latest_date_max, "dim_max_date_business_name": latest_date_max_p_name, "winhc_dim_date": winhc_dim_date, "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i), 'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}), "update_time": get_now(), "create_time": get_now(), "ds": ds, } tmp_json.update(other_data) li.append(tmp_json) if holo_keys is None: holo_keys = list(tmp_json.keys()) pass log.info('output data: {}'.format(li)) insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client) del_num = 0 try: del_col.insert_many(data, ordered=False) del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count except: pass log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num)) return li def main(max_round: int = 2, interval_of_sed: int = 300): round_num = 0 while True: round_num += 1 ds = get_ds() # ds = '20221205' log.info('{},第{}遍轮循...'.format(ds, round_num)) q = xxl_queue(pop_threshold=2) for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200): _id = i['_id'] key = _id[:_id.rfind('_')] result = q.append(key=key, obj=i) if result: data_transform(result) pass if round_num >= max_round: break try: log.info('{},第{}遍轮循结束.'.format(ds, round_num)) time.sleep(interval_of_sed) pass except: pass pass pass # tmp_data = { # "_id": "20221214_0000b94de6aa5fba1f4daa0f2c353815_winhc", # "base_info": { # "cate_first": "租赁和商务服务业", # "cate_second": "商务服务业", # "cate_third": "旅行社及相关服务", # "city": "衢州市", # "company_id": "0000b94de6aa5fba1f4daa0f2c353815", # "company_name": "龙游县文化旅游发展有限公司", # "company_org_type": "有限责任公司(非自然人投资或控股的法人独资)", # "county": "龙游县", # "credit_code": "91330825573984254D", # "org_number": "573984254", # "province": "浙江省", # "reg_number": "330825000024620" # }, # "competitor_product_name": "winhc", # "latest_date": { # "严重违法": None, # "公示催告": None, # "历史变更": "2022-04-15 00:00:00", # "双随机抽查": None, # "司法拍卖": None, # "土地公示": "2022-06-14 00:00:00", # "基本信息": "2021-12-17 00:00:00", # "失信信息": None, # "开庭公告": "2022-10-24 09:00:00", # "抽查检查": None, # "招投标": None, # "欠税公告": None, # "法院公告": None, # "环保处罚": None, # "税收违法": None, # "立案信息": "2019-08-21 00:00:00", # "终本案件": None, # "经营异常": None, # "行政处罚": None, # "行政许可": "2022-08-17 00:00:00", # "被执行人": None, # "裁判文书": "2020-05-29 00:00:00", # "诉前调解": None, # "询价评估": None, # "购地信息": "2022-06-27 00:00:00", # "送达公告": None, # "限制消费": None # }, # "spider_date": "2022-12-14 10:09:43", # "summary": { # "主要成员": 2, # "企业年报": 9, # "历史主要成员": 1, # "历史变更": 32, # "历史对外投资": 0, # "历史股东信息": 0, # "商标": 97, # "土地公示": 11, # "对外投资": 21, # "股东信息": 1, # "行政许可": 10, # "裁判文书": 5, # "购地信息": 12 # } # } # # tmp_data_2 = { # "_id": "20221215_0000b94de6aa5fba1f4daa0f2c353815_winhc", # "base_info": { # "cate_first": "租赁和商务服务业", # "cate_second": "商务服务业", # "cate_third": "旅行社及相关服务", # "city": "衢州市", # "company_id": "0000b94de6aa5fba1f4daa0f2c353815", # "company_name": "龙游县文化旅游发展有限公司", # "company_org_type": "有限责任公司(非自然人投资或控股的法人独资)", # "county": "龙游县", # "credit_code": "91330825573984254D", # "org_number": "573984254", # "province": "浙江省", # "reg_number": "330825000024620" # }, # "competitor_product_name": "qcc", # "latest_date": { # "严重违法": None, # "公示催告": None, # "双随机抽查": None, # "司法拍卖": None, # "失信信息": None, # "抽查检查": None, # "招投标": None, # "欠税公告": None, # "法院公告": None, # "环保处罚": None, # "税收违法": None, # "终本案件": None, # "经营异常": None, # "行政处罚": None, # "被执行人": None, # "诉前调解": None, # "询价评估": None, # "送达公告": None, # "限制消费": None # }, # "spider_date": "2022-12-14 10:09:43", # "summary": { # "主要成员": 0, # "企业年报": 0, # "历史主要成员": 0, # "历史变更": 0, # "历史对外投资": 0, # "历史股东信息": 0, # "商标": 0, # "土地公示": 0, # "对外投资": 0, # "股东信息": 0, # "行政许可": 0, # "裁判文书": 0, # "购地信息": None # } # } # def test(): # ds = get_ds() # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200): # print(map_2_json_str(i)) # break # pass # # data_transform([tmp_data,tmp_data_2]) # # pass if __name__ == '__main__': # test() # log.info(f"input args: {sys.argv}") parser = argparse.ArgumentParser() parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次') parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)') args = parser.parse_args() main(max_round=args.max_round, interval_of_sed=args.interval_of_sed) pass