pull_sample_data.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/5 9:28
  3. # @Author : XuJiakai
  4. # @File : pull_sample_data
  5. # @Software: PyCharm
  6. import json, os, sys
  7. from project_const import TOPIC_NAME
  8. from sdk.WinhcElasticSearchSDK import get_new_es
  9. from utils.base_utils import json_path
  10. from utils import map_2_json_str
  11. from utils.category_utils import get_value
  12. from utils.pca_code_utils import get_name
  13. from sdk.WinhcAllClient import get_all_client
  14. from utils.odps_schema_utils import get_last_partition_ds, get_partition_ds
  15. from sdk.WinhcAllClient import get_odps_sdk
  16. from log import get_log
  17. log = get_log('pull_sample_data')
  18. _module_path = os.path.dirname(__file__)
  19. RABBITMQ_TOPIC = TOPIC_NAME
  20. all_sdk = get_all_client()
  21. r_sdk = all_sdk.get_rabbit_mq_sdk()
  22. es_sdk = get_new_es()
  23. def _send_rabbit(li: list):
  24. for i in li:
  25. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode())
  26. pass
  27. pass
  28. def pull_by_es(size: int = 20):
  29. assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
  30. log.info(f"pull {size} record for es")
  31. dsl = {
  32. "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
  33. "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
  34. "size": size,
  35. "query": {
  36. "bool": {
  37. "must": [
  38. {
  39. "term": {
  40. "company_type": {
  41. "value": "1"
  42. }
  43. }
  44. },
  45. {
  46. "term": {
  47. "deleted": {
  48. "value": "0"
  49. }
  50. }
  51. }, {
  52. "terms": {
  53. "company_org_type_new_std": [
  54. "有限责任公司",
  55. "独资企业"
  56. ]
  57. }
  58. }
  59. ]
  60. }
  61. }
  62. , "sort": [
  63. {
  64. "company_rank_sec": {
  65. "order": "desc"
  66. }
  67. }
  68. ]
  69. }
  70. res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl)
  71. li = []
  72. for i in res:
  73. c = get_value(
  74. c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'),
  75. c3=json_path(i, '$.category_third_code'))
  76. a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'),
  77. county_code=json_path(i, '$.county_code'))
  78. e = {
  79. "company_id": i['_id'],
  80. "company_name": json_path(i, "$.cname.show"),
  81. "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"),
  82. "province": a[0],
  83. "city": a[1],
  84. "county": a[2],
  85. "org_number": json_path(i, '$.org_number'),
  86. "credit_code": json_path(i, '$.credit_code'),
  87. "reg_number": json_path(i, '$.reg_number'),
  88. "cate_first": c[0],
  89. "cate_second": c[1],
  90. "cate_third": c[2],
  91. }
  92. li.append(e)
  93. pass
  94. _send_rabbit(li)
  95. pass
  96. odps_sdk = get_odps_sdk()
  97. def pull_by_max(size=100000):
  98. log.info(f"pull {size} record for max")
  99. f = open(os.path.join(_module_path, 'pull_data.sql'), encoding='utf-8')
  100. sql = f.read().strip()
  101. company_rank_latest_ds = get_last_partition_ds(tab='calc_company_rank_out', project='winhc_ng')
  102. from utils.datetime_utils import get_ds
  103. latest_ds = get_ds()
  104. sql = sql.format(limit_num=size, company_rank_latest_ds=company_rank_latest_ds, latest_ds=latest_ds)
  105. # print(sql)
  106. all_ds = get_partition_ds(tab='out_winhc_data_analysis_pull_data', project='winhc_ng')
  107. if latest_ds not in all_ds:
  108. log.info("exec sql: {}".format(sql))
  109. instance = odps_sdk.run_sql(sql)
  110. instance.wait_for_success()
  111. log.info("开始推送数据...")
  112. with odps_sdk.execute_sql(
  113. 'select * from out_winhc_data_analysis_pull_data where ds = ' + latest_ds + ' limit ' + str(
  114. size)).open_reader(
  115. tunnel=True) as reader:
  116. for record in reader:
  117. c = get_value(
  118. c1=record['cate_first_code'], c2=record['cate_second_code'],
  119. c3=record['cate_third_code'])
  120. a = get_name(province_code=record['province_code'], city_code=record['city_code'],
  121. county_code=record['county_code'])
  122. ele = {
  123. "company_id": record['company_id'],
  124. "company_name": record['company_name'],
  125. "company_org_type": record['company_org_type'],
  126. "province": a[0],
  127. "city": a[1],
  128. "county": a[2],
  129. "org_number": record['org_number'],
  130. "credit_code": record['credit_code'],
  131. "reg_number": record['reg_number'],
  132. "cate_first": c[0],
  133. "cate_second": c[1],
  134. "cate_third": c[2],
  135. }
  136. # print(map_2_json_str(ele))
  137. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(ele, ensure_ascii=False).encode())
  138. log.info('数据推送完成.')
  139. pass
  140. def pull(size):
  141. if size > 10000:
  142. pull_by_max(size)
  143. else:
  144. pull_by_es(500)
  145. pass
  146. pass
  147. if __name__ == '__main__':
  148. log.info(f"input args: {sys.argv}")
  149. if len(sys.argv) >= 2:
  150. pull(size=int(sys.argv[1]))
  151. else:
  152. pull(size=1000)
  153. pass