cpa_agg.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:46
  3. # @Author : XuJiakai
  4. # @File : cpa_agg
  5. # @Software: PyCharm
  6. import json
  7. import time
  8. from log import get_log
  9. from sdk.WinhcAllClient import get_all_client
  10. from utils.datetime_utils import get_ds, get_now, datetime_format_transform
  11. from utils import map_2_json_str, json_path
  12. from utils.base_utils import tuple_max
  13. from utils.mysql_utils import insert_many
  14. from utils.xxl_queue import xxl_queue
  15. import re
  16. import sys
  17. import argparse
  18. from project_const import TOPIC_NAME, MONGODB_NAME
  19. date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
  20. all_client = get_all_client()
  21. col = all_client.get_mongo_collection(MONGODB_NAME)
  22. del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del')
  23. log = get_log('cpa_agg')
  24. holo_client = all_client.get_holo_client(db='winhc_biz')
  25. HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
  26. def get_max_data(data: list, key: str):
  27. max_data = None
  28. for i in data:
  29. tmp_v = json_path(i, key)
  30. if tmp_v is None:
  31. continue
  32. product_name = i['competitor_product_name']
  33. if max_data is None:
  34. max_data = (tmp_v, product_name)
  35. else:
  36. max_data = tuple_max(max_data, (tmp_v, product_name))
  37. if max_data is None:
  38. return None, None
  39. return max_data
  40. def get_all_data_by_item(data: list, key):
  41. result_data = {}
  42. for i in data:
  43. result_data[i['competitor_product_name']] = json_path(i, key)
  44. return result_data
  45. def data_transform(data: list):
  46. log.info('input data: {}'.format(data))
  47. deleted_key = [i['_id'] for i in data][0]
  48. deleted_key = deleted_key[:deleted_key.rfind('_')]
  49. base_info = data[0]['base_info']
  50. ds = get_ds()
  51. key_set = set()
  52. winhc_data = None
  53. for i in data:
  54. key_set = key_set | set(i['summary'].keys())
  55. key_set = key_set | set(i['latest_date'].keys())
  56. if i['competitor_product_name'] == 'winhc':
  57. winhc_data = i
  58. pass
  59. pass
  60. if winhc_data is None:
  61. return
  62. li = []
  63. holo_keys = None
  64. for i in key_set:
  65. tmp_json = base_info.copy()
  66. summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i)
  67. latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i)
  68. if (latest_date_max is None or latest_date_max == '') and (summary_max is None or summary_max == 0):
  69. # print('这个维度为空...', i, )
  70. continue
  71. pass
  72. if latest_date_max is not None and date_part.match(latest_date_max):
  73. latest_date_max = datetime_format_transform(latest_date_max, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S")
  74. pass
  75. winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i)
  76. if winhc_dim_date is not None and winhc_dim_date == '':
  77. winhc_dim_date = None
  78. if winhc_dim_date is not None and date_part.match(winhc_dim_date):
  79. winhc_dim_date = datetime_format_transform(winhc_dim_date, '%Y年%m月%d日', "%Y-%m-%d %H:%M:%S")
  80. pass
  81. other_data = {
  82. "id": tmp_json['company_id'] + "_" + ds + "_" + i,
  83. "dim_name": i,
  84. "dim_max_num": summary_max,
  85. "dim_max_num_business_name": summary_max_p_name,
  86. "winhc_dim_num": json_path(winhc_data, '$.summary.' + i),
  87. "dim_max_date": latest_date_max,
  88. "dim_max_date_business_name": latest_date_max_p_name,
  89. "winhc_dim_date": winhc_dim_date,
  90. "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
  91. 'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
  92. "update_time": get_now(),
  93. "create_time": get_now(),
  94. "ds": ds,
  95. }
  96. tmp_json.update(other_data)
  97. li.append(tmp_json)
  98. if holo_keys is None:
  99. holo_keys = list(tmp_json.keys())
  100. pass
  101. log.info('output data: {}'.format(li))
  102. insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
  103. del_num = 0
  104. try:
  105. del_col.insert_many(data, ordered=False)
  106. del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count
  107. except:
  108. pass
  109. log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num))
  110. return li
  111. def main(max_round: int = 2, interval_of_sed: int = 300):
  112. round_num = 0
  113. while True:
  114. round_num += 1
  115. ds = get_ds()
  116. # ds = '20221205'
  117. log.info('{},第{}遍轮循...'.format(ds, round_num))
  118. q = xxl_queue(pop_threshold=2)
  119. for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
  120. _id = i['_id']
  121. key = _id[:_id.rfind('_')]
  122. result = q.append(key=key, obj=i)
  123. if result:
  124. data_transform(result)
  125. pass
  126. if round_num >= max_round:
  127. break
  128. try:
  129. time.sleep(interval_of_sed)
  130. pass
  131. except:
  132. pass
  133. pass
  134. pass
  135. if __name__ == '__main__':
  136. log.info(f"input args: {sys.argv}")
  137. parser = argparse.ArgumentParser()
  138. parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次')
  139. parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)')
  140. args = parser.parse_args()
  141. main(max_round=args.max_round, interval_of_sed=args.interval_of_sed)
  142. pass