cpa_agg.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:46
  3. # @Author : XuJiakai
  4. # @File : cpa_agg
  5. # @Software: PyCharm
  6. import json
  7. import time, queue
  8. from threading import Thread
  9. from utils.datetime_utils import datetime_format
  10. from log import get_log
  11. from sdk.WinhcAllClient import get_all_client
  12. from utils.datetime_utils import get_ds, get_now, datetime_format_transform, get_yesterday_ds
  13. from utils import map_2_json_str, json_path
  14. from utils.base_utils import tuple_max, get_str_intersection
  15. from utils.mysql_utils import insert_many
  16. from utils.xxl_queue import xxl_queue
  17. import re
  18. import sys
  19. import argparse
  20. from project_const import TOPIC_NAME, MONGODB_NAME
  21. date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
  22. all_client = get_all_client()
  23. col = all_client.get_mongo_collection(MONGODB_NAME)
  24. del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del')
  25. log = get_log('cpa_agg')
  26. holo_client = all_client.get_holo_client(db='winhc_biz')
  27. HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
  28. all_sdk = get_all_client()
  29. def get_max_data(data: list, key: str, exclude_product_name: list = ['winhc']):
  30. max_data = None
  31. for i in data:
  32. tmp_v = json_path(i, key)
  33. if tmp_v is None:
  34. continue
  35. product_name = i['competitor_product_name']
  36. if product_name in exclude_product_name:
  37. continue
  38. pass
  39. if max_data is None:
  40. max_data = (tmp_v, product_name)
  41. else:
  42. max_data = tuple_max(max_data, (tmp_v, product_name))
  43. if max_data is None:
  44. return None, None
  45. return max_data
  46. def get_all_data_by_item(data: list, key):
  47. result_data = {}
  48. for i in data:
  49. result_data[i['competitor_product_name']] = json_path(i, key)
  50. return result_data
  51. def data_transform(data: list, rabbit_mq):
  52. log.info('input data: {}'.format(data))
  53. deleted_key = [i['_id'] for i in data][0]
  54. deleted_key = deleted_key[:deleted_key.rfind('_')]
  55. base_info = data[0]['base_info']
  56. ds = get_ds()
  57. key_set = set()
  58. winhc_data = None
  59. for i in data:
  60. key_set = key_set | set(i['summary'].keys())
  61. key_set = key_set | set(i['latest_date'].keys())
  62. if i['competitor_product_name'] == 'winhc':
  63. winhc_data = i
  64. pass
  65. pass
  66. if winhc_data is None:
  67. return
  68. li = []
  69. winhc_spider_date = winhc_data['spider_date']
  70. holo_keys = None
  71. for i in key_set:
  72. tmp_json = base_info.copy()
  73. summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i)
  74. latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i)
  75. winhc_dim_num = json_path(winhc_data, '$.summary.' + i)
  76. latest_date_max = datetime_format(latest_date_max)
  77. winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i)
  78. if winhc_dim_date is not None and winhc_dim_date == '':
  79. winhc_dim_date = None
  80. winhc_dim_date = datetime_format(winhc_dim_date)
  81. if (latest_date_max is None or latest_date_max == '') and (
  82. summary_max is None or summary_max == 0) and winhc_dim_date is None and (
  83. winhc_dim_num is None or winhc_dim_num == 0):
  84. # print('这个维度为空...', i, )
  85. continue
  86. pass
  87. if winhc_spider_date is None:
  88. winhc_spider_date = get_now()
  89. other_data = {
  90. "id": tmp_json['company_id'] + "_" + ds + "_" + i,
  91. "dim_name": i,
  92. "dim_max_num": summary_max,
  93. "dim_max_num_business_name": summary_max_p_name,
  94. "winhc_dim_num": winhc_dim_num,
  95. "dim_max_date": latest_date_max,
  96. "dim_max_date_business_name": latest_date_max_p_name,
  97. "winhc_dim_date": winhc_dim_date,
  98. "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
  99. 'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
  100. "update_time": winhc_spider_date,
  101. "create_time": winhc_spider_date,
  102. "ds": ds,
  103. }
  104. tmp_json.update(other_data)
  105. li.append(tmp_json)
  106. if holo_keys is None:
  107. holo_keys = list(tmp_json.keys())
  108. pass
  109. log.info('output data: {}'.format(li))
  110. if li is not None and len(li) > 0:
  111. # insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
  112. li = [tuple(i.values()) for i in li]
  113. for i in li:
  114. rabbit_mq.send_by_fanout("cpa_insert_holo", json.dumps(i, ensure_ascii=False).encode())
  115. pass
  116. del_num = 0
  117. try:
  118. del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count
  119. del_col.insert_many(data, ordered=False)
  120. except:
  121. pass
  122. log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num))
  123. return li
  124. q = queue.Queue(5000)
  125. class Work(Thread):
  126. def run(self):
  127. r_sdk = all_sdk.get_rabbit_mq_sdk()
  128. while True:
  129. data_transform(q.get(), r_sdk)
  130. today_ds = get_ds()
  131. yesterday_ds = get_yesterday_ds()
  132. # yesterday_ds = '20221231'
  133. # scan_ds = today_ds[:-2]
  134. scan_ds = get_str_intersection(today_ds, yesterday_ds)
  135. # scan_ds = '2022'
  136. def overwrite_handle(key, obj_list):
  137. if obj_list is None or len(obj_list) == 0:
  138. return
  139. _id = obj_list[0]['_id']
  140. if not key.startswith(today_ds) and len(obj_list) == 1:
  141. deleted_count = col.delete_one({'_id': _id}).deleted_count
  142. log.info(f"delete id: {_id} , {deleted_count}")
  143. else:
  144. # log.info(f"skip :{_id}")
  145. pass
  146. pass
  147. import tqdm
  148. def main(max_round: int = 2, interval_of_sed: int = 300):
  149. thread_num = 20
  150. for i in range(thread_num):
  151. w = Work()
  152. w.setDaemon(True)
  153. w.start()
  154. pass
  155. round_num = 0
  156. while True:
  157. round_num += 1
  158. print('{},第{}遍轮循...'.format(scan_ds, round_num))
  159. xxl_q = xxl_queue(pop_threshold=2, overwrite_handle=overwrite_handle)
  160. # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
  161. for i in tqdm.tqdm(col.find({"_id": {"$regex": "^" + scan_ds}}).batch_size(200)):
  162. # for i in col.find().batch_size(200):
  163. _id = i['_id']
  164. key = _id[:_id.rfind('_')]
  165. result = xxl_q.append(key=key, obj=i)
  166. if result:
  167. q.put(result)
  168. pass
  169. if round_num >= max_round:
  170. # sys.exit(0)
  171. break
  172. try:
  173. print('{},第{}遍轮循结束.'.format(scan_ds, round_num))
  174. time.sleep(interval_of_sed)
  175. pass
  176. except:
  177. pass
  178. pass
  179. pass
  180. if __name__ == '__main__':
  181. # test()
  182. #
  183. log.info(f"input args: {sys.argv}")
  184. parser = argparse.ArgumentParser()
  185. parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次')
  186. parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)')
  187. args = parser.parse_args()
  188. main(max_round=args.max_round, interval_of_sed=args.interval_of_sed)
  189. while not q.empty():
  190. log.info(f"遍历未结束,队列剩余:{q.qsize()}")
  191. time.sleep(300)
  192. pass
  193. log.info(f"遍历完成,队列剩余:{q.qsize()}")
  194. pass