# -*- coding: utf-8 -*- # @Time : 2022/12/5 9:28 # @Author : XuJiakai # @File : pull_sample_data # @Software: PyCharm import json RABBITMQ_TOPIC = "xjk_test" from sdk.WinhcElasticSearchSDK import get_new_es from utils.base_utils import json_path from utils import map_2_json_str from utils.category_utils import get_value from utils.pca_code_utils import get_name from sdk.WinhcAllClient import get_all_client all_sdk = get_all_client() r_sdk = all_sdk.get_rabbit_mq_sdk() es_sdk = get_new_es() def _send_rabbit(li: list): for i in li: r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode()) pass pass def pull_by_es(size: int = 20): assert isinstance(size, int) and 0 < size <= 10000, "数值错误" dsl = { "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number", "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"], "size": 20, "query": { "bool": { "must": [ { "term": { "deleted": { "value": "0" } } }, { "terms": { "company_org_type_new_std": [ "有限责任公司", "独资企业" ] } } ] } } , "sort": [ { "company_score_weight": { "order": "desc" } } ] } res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl) li = [] for i in res: c = get_value( c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'), c3=json_path(i, '$.category_third_code')) a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'), county_code=json_path(i, '$.county_code')) e = { "company_id": i['_id'], "company_name": json_path(i, "$.cname.show"), "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"), "province": a[0], "city": a[1], "county": a[2], "org_number": json_path(i, '$.org_number'), "credit_code": json_path(i, '$.credit_code'), "reg_number": json_path(i, '$.reg_number'), "cate_first": c[0], "cate_second": c[1], "cate_third": c[2], } li.append(e) pass _send_rabbit(li) pass if __name__ == '__main__': pull_by_es() pass