pull_sample_data.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/5 9:28
  3. # @Author : XuJiakai
  4. # @File : pull_sample_data
  5. # @Software: PyCharm
  6. import json
  7. RABBITMQ_TOPIC = "xjk_test"
  8. from sdk.WinhcElasticSearchSDK import get_new_es
  9. from utils.base_utils import json_path
  10. from utils import map_2_json_str
  11. from utils.category_utils import get_value
  12. from utils.pca_code_utils import get_name
  13. from sdk.WinhcAllClient import get_all_client
  14. all_sdk = get_all_client()
  15. r_sdk = all_sdk.get_rabbit_mq_sdk()
  16. es_sdk = get_new_es()
  17. def _send_rabbit(li: list):
  18. for i in li:
  19. r_sdk.send_by_fanout(RABBITMQ_TOPIC, json.dumps(i, ensure_ascii=False).encode())
  20. pass
  21. pass
  22. def pull_by_es(size: int = 20):
  23. assert isinstance(size, int) and 0 < size <= 10000, "数值错误"
  24. dsl = {
  25. "_source": ["cname.show", "company_org_type_new_std", "province_code", "city_code", "county_code", "org_number",
  26. "credit_code", "reg_number", "category_first_code", "category_second_code", "category_third_code"],
  27. "size": 20,
  28. "query": {
  29. "bool": {
  30. "must": [
  31. {
  32. "term": {
  33. "deleted": {
  34. "value": "0"
  35. }
  36. }
  37. }, {
  38. "terms": {
  39. "company_org_type_new_std": [
  40. "有限责任公司",
  41. "独资企业"
  42. ]
  43. }
  44. }
  45. ]
  46. }
  47. }
  48. , "sort": [
  49. {
  50. "company_score_weight": {
  51. "order": "desc"
  52. }
  53. }
  54. ]
  55. }
  56. res = es_sdk.query(index='winhc_index_rt_company', doc_type='company', dsl=dsl)
  57. li = []
  58. for i in res:
  59. c = get_value(
  60. c1=json_path(i, '$.category_first_code'), c2=json_path(i, '$.category_second_code'),
  61. c3=json_path(i, '$.category_third_code'))
  62. a = get_name(province_code=json_path(i, '$.province_code'), city_code=json_path(i, '$.city_code'),
  63. county_code=json_path(i, '$.county_code'))
  64. e = {
  65. "company_id": i['_id'],
  66. "company_name": json_path(i, "$.cname.show"),
  67. "company_org_type": json_path(i, "$.company_org_type_new_std.[0]"),
  68. "province": a[0],
  69. "city": a[1],
  70. "county": a[2],
  71. "org_number": json_path(i, '$.org_number'),
  72. "credit_code": json_path(i, '$.credit_code'),
  73. "reg_number": json_path(i, '$.reg_number'),
  74. "cate_first": c[0],
  75. "cate_second": c[1],
  76. "cate_third": c[2],
  77. }
  78. li.append(e)
  79. pass
  80. _send_rabbit(li)
  81. pass
  82. if __name__ == '__main__':
  83. pull_by_es()
  84. pass