pull_sample_data.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/5 9:28
  3. # @Author : XuJiakai
  4. # @File : pull_sample_data
  5. # @Software: PyCharm
  6. import json, os
  7. from project_const import TOPIC_NAME
  8. from sdk.WinhcElasticSearchSDK import get_new_es
  9. from utils.base_utils import json_path
  10. from utils import map_2_json_str
  11. from utils.category_utils import get_value
  12. from utils.pca_code_utils import get_name
  13. from sdk.WinhcAllClient import get_all_client
  14. from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
  15. from sdk.WinhcAllClient import get_odps_sdk
  16. _module_path = os.path.dirname(__file__)
  17. RABBITMQ_TOPIC = TOPIC_NAME
  18. all_sdk = get_all_client()
  19. r_sdk = all_sdk.get_rabbit_mq_sdk()
  20. es_sdk = get_new_es()
  21. def _send_rabbit(li: list):
  22. for i in li:
  23. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode())
  24. pass
  25. pass
  26. def pull_by_es(size: int = 20):
  27. assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
  28. dsl = {
  29. "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
  30. "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
  31. "size": size,
  32. "query": {
  33. "bool": {
  34. "must": [
  35. {
  36. "term": {
  37. "deleted": {
  38. "value": "0"
  39. }
  40. }
  41. }, {
  42. "terms": {
  43. "company_org_type_new_std": [
  44. "有限责任公司",
  45. "独资企业"
  46. ]
  47. }
  48. }
  49. ]
  50. }
  51. }
  52. , "sort": [
  53. {
  54. "company_rank_sec": {
  55. "order": "desc"
  56. }
  57. }
  58. ]
  59. }
  60. res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl)
  61. li = []
  62. for i in res:
  63. c = get_value(
  64. c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'),
  65. c3=json_path(i, '$.category_third_code'))
  66. a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'),
  67. county_code=json_path(i, '$.county_code'))
  68. e = {
  69. "company_id": i['_id'],
  70. "company_name": json_path(i, "$.cname.show"),
  71. "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"),
  72. "province": a[0],
  73. "city": a[1],
  74. "county": a[2],
  75. "org_number": json_path(i, '$.org_number'),
  76. "credit_code": json_path(i, '$.credit_code'),
  77. "reg_number": json_path(i, '$.reg_number'),
  78. "cate_first": c[0],
  79. "cate_second": c[1],
  80. "cate_third": c[2],
  81. }
  82. li.append(e)
  83. pass
  84. _send_rabbit(li)
  85. pass
  86. odps_sdk = get_odps_sdk()
  87. def pull_by_max(size=100000):
  88. f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
  89. sql = f.read().strip()
  90. company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng')
  91. from utils.datetime_utils import get_ds
  92. latest_ds = get_ds()
  93. sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds)
  94. # print(sql)
  95. all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
  96. if latest_ds not in all_ds:
  97. instance = odps_sdk.run_sql(sql)
  98. instance.wait_for_success()
  99. with odps_sdk.execute_sql(
  100. 'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit 1000').open_reader(
  101. tunnel=True) as reader:
  102. for record in reader:
  103. c = get_value(
  104. c1=record['cate_first_code'], c2=record['cate_second_code'],
  105. c3=record['cate_third_code'])
  106. a = get_name(province_code=record['province_code'], city_code=record['city_code'],
  107. county_code=record['county_code'])
  108. ele = {
  109. "company_id": record['company_id'],
  110. "company_name": record['company_name'],
  111. "company_org_type": record['company_org_type'],
  112. "province": a[0],
  113. "city": a[1],
  114. "county": a[2],
  115. "org_number": record['org_number'],
  116. "credit_code": record['credit_code'],
  117. "reg_number": record['reg_number'],
  118. "cate_first": c[0],
  119. "cate_second": c[1],
  120. "cate_third": c[2],
  121. }
  122. # print(map_2_json_str(ele))
  123. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
  124. pass
  125. if __name__ == '__main__':
  126. # pull_by_es(size=100)
  127. pull_by_max(size=10000)
  128. pass