winhc_job.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:11
  3. # @Author : XuJiakai
  4. # @File : winhc_job
  5. # @Software: PyCharm
  6. import json
  7. import queue
  8. from threading import Thread
  9. from log import get_log
  10. from handle.search_winhc_latest_date import search_latest_date
  11. from handle.search_winhc_summary import search_summary
  12. from sdk.WinhcAllClient import get_all_client
  13. from utils import map_2_json_str
  14. from utils.datetime_utils import get_ds, get_now
  15. from project_const import TOPIC_NAME, MONGODB_NAME
  16. all_client = get_all_client()
  17. col = all_client.get_mongo_collection(MONGODB_NAME)
  18. log = get_log('winhc_job')
  19. def run(data: map):
  20. company_id = data['company_id']
  21. try:
  22. summary = search_summary(company_id)
  23. latest_date = search_latest_date(company_id)
  24. pass
  25. except:
  26. log.error('error company_id :{}'.format(company_id))
  27. return
  28. pass
  29. _id = get_ds() + '_' + company_id + '_winhc'
  30. output_data = {
  31. "_id": _id,
  32. "base_info": data,
  33. "competitor_product_name": "winhc",
  34. "summary": summary,
  35. "latest_date": latest_date,
  36. "spider_date": get_now()
  37. }
  38. log.info('insert mongo: {}'.format(output_data))
  39. # print(map_2_json_str(output_data))
  40. try:
  41. col.insert_one(output_data)
  42. except Exception as e:
  43. pass
  44. pass
  45. r_sdk = all_client.get_rabbit_mq_sdk()
  46. RABBITMQ_TOPIC = TOPIC_NAME
  47. q = queue.Queue(5000)
  48. class Work(Thread):
  49. def run(self):
  50. while True:
  51. run(q.get())
  52. def main():
  53. thread_num = 10
  54. for i in range(thread_num):
  55. Work().start()
  56. pass
  57. def callback(ch, method, properties, body):
  58. data = json.loads(body.decode())
  59. log.info('receive data: {}'.format(data))
  60. q.put(data)
  61. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  62. pass
  63. r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, queue='cpa_winhc_spider', callback=callback)
  64. pass
  65. def test(company_id):
  66. data = {
  67. "company_id": company_id,
  68. "company_name": "",
  69. "company_registered_date": "",
  70. "company_org_type": "",
  71. "province_code": "",
  72. "city_code": "",
  73. "county_code": "",
  74. "org_number": "",
  75. "reg_number": "",
  76. "credit_code": "",
  77. }
  78. run(data)
  79. pass
  80. if __name__ == '__main__':
  81. main()
  82. # test('3667556dcfeda7c18c9745f6b8133165')
  83. pass