123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/12/5 9:28
- # @Author : XuJiakai
- # @File : pull_sample_data
- # @Software: PyCharm
- import json, os
- from project_const import TOPIC_NAME
- from sdk.WinhcElasticSearchSDK import get_new_es
- from utils.base_utils import json_path
- from utils import map_2_json_str
- from utils.category_utils import get_value
- from utils.pca_code_utils import get_name
- from sdk.WinhcAllClient import get_all_client
- from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
- from sdk.WinhcAllClient import get_odps_sdk
- _module_path = os.path.dirname(__file__)
- RABBITMQ_TOPIC = TOPIC_NAME
- all_sdk = get_all_client()
- r_sdk = all_sdk.get_rabbit_mq_sdk()
- es_sdk = get_new_es()
- def _send_rabbit(li: list):
- for i in li:
- r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode())
- pass
- pass
- def pull_by_es(size: int = 20):
- assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
- dsl = {
- "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
- "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
- "size": size,
- "query": {
- "bool": {
- "must": [
- {
- "term": {
- "deleted": {
- "value": "0"
- }
- }
- }, {
- "terms": {
- "company_org_type_new_std": [
- "有限责任公司",
- "独资企业"
- ]
- }
- }
- ]
- }
- }
- , "sort": [
- {
- "company_rank_sec": {
- "order": "desc"
- }
- }
- ]
- }
- res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl)
- li = []
- for i in res:
- c = get_value(
- c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'),
- c3=json_path(i, '$.category_third_code'))
- a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'),
- county_code=json_path(i, '$.county_code'))
- e = {
- "company_id": i['_id'],
- "company_name": json_path(i, "$.cname.show"),
- "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"),
- "province": a[0],
- "city": a[1],
- "county": a[2],
- "org_number": json_path(i, '$.org_number'),
- "credit_code": json_path(i, '$.credit_code'),
- "reg_number": json_path(i, '$.reg_number'),
- "cate_first": c[0],
- "cate_second": c[1],
- "cate_third": c[2],
- }
- li.append(e)
- pass
- _send_rabbit(li)
- pass
- odps_sdk = get_odps_sdk()
- def pull_by_max(size=100000):
- f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
- sql = f.read().strip()
- company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng')
- from utils.datetime_utils import get_ds
- latest_ds = get_ds()
- sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds)
- # print(sql)
- all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
- if latest_ds not in all_ds:
- instance = odps_sdk.run_sql(sql)
- instance.wait_for_success()
- with odps_sdk.execute_sql(
- 'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 1000').open_reader(
- tunnel=True) as reader:
- for record in reader:
- c = get_value(
- c1=record['cate_first_code'], c2=record['cate_second_code'],
- c3=record['cate_third_code'])
- a = get_name(province_code=record['province_code'], city_code=record['city_code'],
- county_code=record['county_code'])
- ele = {
- "company_id": record['company_id'],
- "company_name": record['company_name'],
- "company_org_type": record['company_org_type'],
- "province": a[0],
- "city": a[1],
- "county": a[2],
- "org_number": record['org_number'],
- "credit_code": record['credit_code'],
- "reg_number": record['reg_number'],
- "cate_first": c[0],
- "cate_second": c[1],
- "cate_third": c[2],
- }
- # print(map_2_json_str(ele))
- r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
- pass
- if __name__ == '__main__':
- # pull_by_es(size=100)
- pull_by_max(size=10000)
- pass
|