# -*- coding: utf-8 -*- # @Time : 2022/12/5 9:28 # @Author : XuJiakai # @File : pull_sample_data # @Software: PyCharm import json, os, sys from project_const import TOPIC_NAME from sdk.WinhcElasticSearchSDK import get_new_es from utils.base_utils import json_path from utils import map_2_json_str from utils.category_utils import get_value from utils.pca_code_utils import get_name from sdk.WinhcAllClient import get_all_client from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds from sdk.WinhcAllClient import get_odps_sdk from log import get_log log = get_log('pull_sample_data') _module_path = os.path.dirname(__file__) RABBITMQ_TOPIC = TOPIC_NAME all_sdk = get_all_client() r_sdk = all_sdk.get_rabbit_mq_sdk() es_sdk = get_new_es() def _send_rabbit(li: list): for i in li: r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode()) pass pass def pull_by_es(size: int = 20): assert isinstance(size, int) and 0 < size <= 10000, "数值错误" log.info(f"pull {size} record for es") dsl = { "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number", "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"], "size": size, "query": { "bool": { "must": [ { "term": { "company_type": { "value": "1" } } }, { "term": { "deleted": { "value": "0" } } }, { "terms": { "company_org_type_new_std": [ "有限责任公司", "独资企业" ] } } ] } } , "sort": [ { "company_rank_sec": { "order": "desc" } } ] } res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl) li = [] for i in res: c = get_value( c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'), c3=json_path(i, '$.category_third_code')) a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'), county_code=json_path(i, '$.county_code')) e = { "company_id": i['_id'], "company_name": json_path(i, "$.cname.show"), "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"), "province": a[0], "city": a[1], "county": a[2], "org_number": json_path(i, '$.org_number'), "credit_code": json_path(i, '$.credit_code'), "reg_number": json_path(i, '$.reg_number'), "cate_first": c[0], "cate_second": c[1], "cate_third": c[2], } li.append(e) pass _send_rabbit(li) pass odps_sdk = get_odps_sdk() def pull_by_max(size=100000): log.info(f"pull {size} record for max") f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8') sql = f.read().strip() company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng') from utils.datetime_utils import get_ds latest_ds = get_ds() sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds) # print(sql) all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng') if latest_ds not in all_ds: log.info("exec sql: {}".format(sql)) instance = odps_sdk.run_sql(sql) instance.wait_for_success() log.info("开始推送数据...") with odps_sdk.execute_sql( 'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' ').open_reader( tunnel=True) as reader: for record in reader: c = get_value( c1=record['cate_first_code'], c2=record['cate_second_code'], c3=record['cate_third_code']) a = get_name(province_code=record['province_code'], city_code=record['city_code'], county_code=record['county_code']) ele = { "company_id": record['company_id'], "company_name": record['company_name'], "company_org_type": record['company_org_type'], "province": a[0], "city": a[1], "county": a[2], "org_number": record['org_number'], "credit_code": record['credit_code'], "reg_number": record['reg_number'], "cate_first": c[0], "cate_second": c[1], "cate_third": c[2], } # print(map_2_json_str(ele)) r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode()) log.info('数据推送完成.') pass def pull(size): if size > 10000: pull_by_max(size) else: pull_by_es(500) pass pass if __name__ == '__main__': log.info(f"input args: {sys.argv}") if len(sys.argv) >= 2: pull(size=int(sys.argv[1])) else: pull(size=1000) pass