pull_sample_data.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/5 9:28
  3. # @Author : XuJiakai
  4. # @File : pull_sample_data
  5. # @Software: PyCharm
  6. import json, os, sys
  7. from project_const import TOPIC_NAME
  8. from sdk.WinhcElasticSearchSDK import get_new_es
  9. from utils.base_utils import json_path
  10. from utils import map_2_json_str
  11. from utils.category_utils import get_value
  12. from utils.pca_code_utils import get_name
  13. from sdk.WinhcAllClient import get_all_client
  14. from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
  15. from sdk.WinhcAllClient import get_odps_sdk
  16. from log import get_log
  17. log = get_log('pull_sample_data')
  18. _module_path = os.path.dirname(__file__)
  19. RABBITMQ_TOPIC = TOPIC_NAME
  20. all_sdk = get_all_client()
  21. r_sdk = all_sdk.get_rabbit_mq_sdk()
  22. es_sdk = get_new_es()
  23. def _send_rabbit(li: list):
  24. for i in li:
  25. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode())
  26. pass
  27. pass
  28. def pull_by_es(size: int = 20):
  29. assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
  30. log.info(f"pull {size} record for es")
  31. dsl = {
  32. "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
  33. "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
  34. "size": size,
  35. "query": {
  36. "bool": {
  37. "must": [
  38. {
  39. "term": {
  40. "deleted": {
  41. "value": "0"
  42. }
  43. }
  44. }, {
  45. "terms": {
  46. "company_org_type_new_std": [
  47. "有限责任公司",
  48. "独资企业"
  49. ]
  50. }
  51. }
  52. ]
  53. }
  54. }
  55. , "sort": [
  56. {
  57. "company_rank_sec": {
  58. "order": "desc"
  59. }
  60. }
  61. ]
  62. }
  63. res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl)
  64. li = []
  65. for i in res:
  66. c = get_value(
  67. c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'),
  68. c3=json_path(i, '$.category_third_code'))
  69. a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'),
  70. county_code=json_path(i, '$.county_code'))
  71. e = {
  72. "company_id": i['_id'],
  73. "company_name": json_path(i, "$.cname.show"),
  74. "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"),
  75. "province": a[0],
  76. "city": a[1],
  77. "county": a[2],
  78. "org_number": json_path(i, '$.org_number'),
  79. "credit_code": json_path(i, '$.credit_code'),
  80. "reg_number": json_path(i, '$.reg_number'),
  81. "cate_first": c[0],
  82. "cate_second": c[1],
  83. "cate_third": c[2],
  84. }
  85. li.append(e)
  86. pass
  87. _send_rabbit(li)
  88. pass
  89. odps_sdk = get_odps_sdk()
  90. def pull_by_max(size=100000):
  91. log.info(f"pull {size} record for max")
  92. f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
  93. sql = f.read().strip()
  94. company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng')
  95. from utils.datetime_utils import get_ds
  96. latest_ds = get_ds()
  97. sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds)
  98. # print(sql)
  99. all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
  100. if latest_ds not in all_ds:
  101. log.info("exec sql: {}".format(sql))
  102. instance = odps_sdk.run_sql(sql)
  103. instance.wait_for_success()
  104. log.info("开始推送数据...")
  105. with odps_sdk.execute_sql(
  106. 'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + '').open_reader(
  107. tunnel=True) as reader:
  108. for record in reader:
  109. c = get_value(
  110. c1=record['cate_first_code'], c2=record['cate_second_code'],
  111. c3=record['cate_third_code'])
  112. a = get_name(province_code=record['province_code'], city_code=record['city_code'],
  113. county_code=record['county_code'])
  114. ele = {
  115. "company_id": record['company_id'],
  116. "company_name": record['company_name'],
  117. "company_org_type": record['company_org_type'],
  118. "province": a[0],
  119. "city": a[1],
  120. "county": a[2],
  121. "org_number": record['org_number'],
  122. "credit_code": record['credit_code'],
  123. "reg_number": record['reg_number'],
  124. "cate_first": c[0],
  125. "cate_second": c[1],
  126. "cate_third": c[2],
  127. }
  128. # print(map_2_json_str(ele))
  129. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
  130. log.info('数据推送完成.')
  131. pass
  132. if __name__ == '__main__':
  133. log.info(f"input args: {sys.argv}")
  134. if len(sys.argv) >= 2:
  135. pull_by_max(size=int(sys.argv[1]))
  136. else:
  137. pull_by_max(size=1000)
  138. pass