cpa_agg.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:46
  3. # @Author : XuJiakai
  4. # @File : cpa_agg
  5. # @Software: PyCharm
  6. import json
  7. import time, queue
  8. from threading import Thread
  9. from utils.datetime_utils import datetime_format
  10. from log import get_log
  11. from sdk.WinhcAllClient import get_all_client
  12. from utils.datetime_utils import get_ds, get_now, datetime_format_transform
  13. from utils import map_2_json_str, json_path
  14. from utils.base_utils import tuple_max
  15. from utils.mysql_utils import insert_many
  16. from utils.xxl_queue import xxl_queue
  17. import re
  18. import sys
  19. import argparse
  20. from project_const import TOPIC_NAME, MONGODB_NAME
  21. date_part = re.compile('\\d{4}年\\d{2}月\\d{2}日')
  22. all_client = get_all_client()
  23. col = all_client.get_mongo_collection(MONGODB_NAME)
  24. del_col = all_client.get_mongo_collection(MONGODB_NAME + '_del')
  25. log = get_log('cpa_agg')
  26. holo_client = all_client.get_holo_client(db='winhc_biz')
  27. HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
  28. def get_max_data(data: list, key: str, exclude_product_name: list = ['winhc']):
  29. max_data = None
  30. for i in data:
  31. tmp_v = json_path(i, key)
  32. if tmp_v is None:
  33. continue
  34. product_name = i['competitor_product_name']
  35. if product_name in exclude_product_name:
  36. continue
  37. pass
  38. if max_data is None:
  39. max_data = (tmp_v, product_name)
  40. else:
  41. max_data = tuple_max(max_data, (tmp_v, product_name))
  42. if max_data is None:
  43. return None, None
  44. return max_data
  45. def get_all_data_by_item(data: list, key):
  46. result_data = {}
  47. for i in data:
  48. result_data[i['competitor_product_name']] = json_path(i, key)
  49. return result_data
  50. def data_transform(data: list):
  51. log.info('input data: {}'.format(data))
  52. deleted_key = [i['_id'] for i in data][0]
  53. deleted_key = deleted_key[:deleted_key.rfind('_')]
  54. base_info = data[0]['base_info']
  55. ds = get_ds()
  56. key_set = set()
  57. winhc_data = None
  58. for i in data:
  59. key_set = key_set | set(i['summary'].keys())
  60. key_set = key_set | set(i['latest_date'].keys())
  61. if i['competitor_product_name'] == 'winhc':
  62. winhc_data = i
  63. pass
  64. pass
  65. if winhc_data is None:
  66. return
  67. li = []
  68. winhc_spider_date = winhc_data['spider_date']
  69. holo_keys = None
  70. for i in key_set:
  71. tmp_json = base_info.copy()
  72. summary_max, summary_max_p_name = get_max_data(data, "$.summary." + i)
  73. latest_date_max, latest_date_max_p_name = get_max_data(data, "$.latest_date." + i)
  74. winhc_dim_num = json_path(winhc_data, '$.summary.' + i)
  75. latest_date_max = datetime_format(latest_date_max)
  76. winhc_dim_date = json_path(winhc_data, '$.latest_date.' + i)
  77. if winhc_dim_date is not None and winhc_dim_date == '':
  78. winhc_dim_date = None
  79. winhc_dim_date = datetime_format(winhc_dim_date)
  80. if (latest_date_max is None or latest_date_max == '') and (
  81. summary_max is None or summary_max == 0) and winhc_dim_date is None and (
  82. winhc_dim_num is None or winhc_dim_num == 0):
  83. # print('这个维度为空...', i, )
  84. continue
  85. pass
  86. if winhc_spider_date is None:
  87. winhc_spider_date = get_now()
  88. other_data = {
  89. "id": tmp_json['company_id'] + "_" + ds + "_" + i,
  90. "dim_name": i,
  91. "dim_max_num": summary_max,
  92. "dim_max_num_business_name": summary_max_p_name,
  93. "winhc_dim_num": winhc_dim_num,
  94. "dim_max_date": latest_date_max,
  95. "dim_max_date_business_name": latest_date_max_p_name,
  96. "winhc_dim_date": winhc_dim_date,
  97. "other_info": json.dumps({"summary": get_all_data_by_item(data, '$.summary.' + i),
  98. 'latest_date': get_all_data_by_item(data, '$.latest_date.' + i)}),
  99. "update_time": winhc_spider_date,
  100. "create_time": winhc_spider_date,
  101. "ds": ds,
  102. }
  103. tmp_json.update(other_data)
  104. li.append(tmp_json)
  105. if holo_keys is None:
  106. holo_keys = list(tmp_json.keys())
  107. pass
  108. log.info('output data: {}'.format(li))
  109. insert_many(li, holo_keys, HOLO_TABLE_NAME, holo_client)
  110. del_num = 0
  111. try:
  112. del_col.insert_many(data, ordered=False)
  113. del_num = col.delete_many({"_id": {"$regex": "^" + deleted_key}}).deleted_count
  114. except:
  115. pass
  116. log.info("deleted mongo _id: {} , deleted count: {}".format(deleted_key, del_num))
  117. return li
  118. q = queue.Queue(5000)
  119. class Work(Thread):
  120. def run(self):
  121. while True:
  122. data_transform(q.get())
  123. today_ds = get_ds()
  124. scan_ds = today_ds[:-2]
  125. def overwrite_handle(key, obj_list):
  126. if obj_list is None or len(obj_list) == 0:
  127. return
  128. _id = obj_list[0]['_id']
  129. if not key.startswith(today_ds) and len(obj_list) == 1:
  130. deleted_count = col.delete_one({'_id': _id}).deleted_count
  131. log.info(f"delete id: {_id} , {deleted_count}")
  132. else:
  133. # log.info(f"skip :{_id}")
  134. pass
  135. pass
  136. def main(max_round: int = 2, interval_of_sed: int = 300):
  137. thread_num = 10
  138. for i in range(thread_num):
  139. w = Work()
  140. w.setDaemon(True)
  141. w.start()
  142. pass
  143. round_num = 0
  144. while True:
  145. round_num += 1
  146. ds = get_ds()
  147. # ds = '20221205'
  148. log.info('{},第{}遍轮循...'.format(ds, round_num))
  149. xxl_q = xxl_queue(pop_threshold=2, overwrite_handle=overwrite_handle)
  150. # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
  151. for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
  152. # for i in col.find().batch_size(200):
  153. _id = i['_id']
  154. key = _id[:_id.rfind('_')]
  155. result = xxl_q.append(key=key, obj=i)
  156. if result:
  157. q.put(result)
  158. pass
  159. if round_num >= max_round:
  160. # sys.exit(0)
  161. break
  162. try:
  163. log.info('{},第{}遍轮循结束.'.format(ds, round_num))
  164. time.sleep(interval_of_sed)
  165. pass
  166. except:
  167. pass
  168. pass
  169. pass
  170. # tmp_data = {
  171. # "_id": "20221214_0000b94de6aa5fba1f4daa0f2c353815_winhc",
  172. # "base_info": {
  173. # "cate_first": "租赁和商务服务业",
  174. # "cate_second": "商务服务业",
  175. # "cate_third": "旅行社及相关服务",
  176. # "city": "衢州市",
  177. # "company_id": "0000b94de6aa5fba1f4daa0f2c353815",
  178. # "company_name": "龙游县文化旅游发展有限公司",
  179. # "company_org_type": "有限责任公司(非自然人投资或控股的法人独资)",
  180. # "county": "龙游县",
  181. # "credit_code": "91330825573984254D",
  182. # "org_number": "573984254",
  183. # "province": "浙江省",
  184. # "reg_number": "330825000024620"
  185. # },
  186. # "competitor_product_name": "winhc",
  187. # "latest_date": {
  188. # "严重违法": None,
  189. # "公示催告": None,
  190. # "历史变更": "2022-04-15 00:00:00",
  191. # "双随机抽查": None,
  192. # "司法拍卖": None,
  193. # "土地公示": "2022-06-14 00:00:00",
  194. # "基本信息": "2021-12-17 00:00:00",
  195. # "失信信息": None,
  196. # "开庭公告": "2022-10-24 09:00:00",
  197. # "抽查检查": None,
  198. # "招投标": None,
  199. # "欠税公告": None,
  200. # "法院公告": None,
  201. # "环保处罚": None,
  202. # "税收违法": None,
  203. # "立案信息": "2019-08-21 00:00:00",
  204. # "终本案件": None,
  205. # "经营异常": None,
  206. # "行政处罚": None,
  207. # "行政许可": "2022-08-17 00:00:00",
  208. # "被执行人": None,
  209. # "裁判文书": "2020-05-29 00:00:00",
  210. # "诉前调解": None,
  211. # "询价评估": None,
  212. # "购地信息": "2022-06-27 00:00:00",
  213. # "送达公告": None,
  214. # "限制消费": None
  215. # },
  216. # "spider_date": "2022-12-14 10:09:43",
  217. # "summary": {
  218. # "主要成员": 2,
  219. # "企业年报": 9,
  220. # "历史主要成员": 1,
  221. # "历史变更": 32,
  222. # "历史对外投资": 0,
  223. # "历史股东信息": 0,
  224. # "商标": 97,
  225. # "土地公示": 11,
  226. # "对外投资": 21,
  227. # "股东信息": 1,
  228. # "行政许可": 10,
  229. # "裁判文书": 5,
  230. # "购地信息": 12
  231. # }
  232. # }
  233. #
  234. # tmp_data_2 = {
  235. # "_id": "20221215_0000b94de6aa5fba1f4daa0f2c353815_winhc",
  236. # "base_info": {
  237. # "cate_first": "租赁和商务服务业",
  238. # "cate_second": "商务服务业",
  239. # "cate_third": "旅行社及相关服务",
  240. # "city": "衢州市",
  241. # "company_id": "0000b94de6aa5fba1f4daa0f2c353815",
  242. # "company_name": "龙游县文化旅游发展有限公司",
  243. # "company_org_type": "有限责任公司(非自然人投资或控股的法人独资)",
  244. # "county": "龙游县",
  245. # "credit_code": "91330825573984254D",
  246. # "org_number": "573984254",
  247. # "province": "浙江省",
  248. # "reg_number": "330825000024620"
  249. # },
  250. # "competitor_product_name": "qcc",
  251. # "latest_date": {
  252. # "严重违法": None,
  253. # "公示催告": None,
  254. # "双随机抽查": None,
  255. # "司法拍卖": None,
  256. # "失信信息": None,
  257. # "抽查检查": None,
  258. # "招投标": None,
  259. # "欠税公告": None,
  260. # "法院公告": None,
  261. # "环保处罚": None,
  262. # "税收违法": None,
  263. # "终本案件": None,
  264. # "经营异常": None,
  265. # "行政处罚": None,
  266. # "被执行人": None,
  267. # "诉前调解": None,
  268. # "询价评估": None,
  269. # "送达公告": None,
  270. # "限制消费": None
  271. # },
  272. # "spider_date": "2022-12-14 10:09:43",
  273. # "summary": {
  274. # "主要成员": 0,
  275. # "企业年报": 0,
  276. # "历史主要成员": 0,
  277. # "历史变更": 0,
  278. # "历史对外投资": 0,
  279. # "历史股东信息": 0,
  280. # "商标": 0,
  281. # "土地公示": 0,
  282. # "对外投资": 0,
  283. # "股东信息": 0,
  284. # "行政许可": 0,
  285. # "裁判文书": 0,
  286. # "购地信息": None
  287. # }
  288. # }
  289. # def test():
  290. # ds = get_ds()
  291. # for i in col.find({"_id": {"$regex": "^" + ds}}).batch_size(200):
  292. # print(map_2_json_str(i))
  293. # break
  294. # pass
  295. #
  296. # data_transform([tmp_data,tmp_data_2])
  297. #
  298. # pass
  299. if __name__ == '__main__':
  300. # test()
  301. #
  302. log.info(f"input args: {sys.argv}")
  303. parser = argparse.ArgumentParser()
  304. parser.add_argument("-m", "--max-round", type=int, default=2, help='最大迭代轮次')
  305. parser.add_argument("-i", "--interval_of_sed", type=int, default=300, help='每轮间隔时间(秒)')
  306. args = parser.parse_args()
  307. main(max_round=args.max_round, interval_of_sed=args.interval_of_sed)
  308. pass