winhc_job.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:11
  3. # @Author : XuJiakai
  4. # @File : winhc_job
  5. # @Software: PyCharm
  6. import json
  7. import queue
  8. from threading import Thread
  9. from handle.search_winhc_latest_date import search_latest_date
  10. from handle.search_winhc_summary import search_summary
  11. from sdk.WinhcAllClient import get_all_client
  12. from utils import map_2_json_str
  13. from utils.datetime_utils import get_ds, get_now
  14. from project_const import TOPIC_NAME, MONGODB_NAME
  15. all_client = get_all_client()
  16. col = all_client.get_mongo_collection(MONGODB_NAME)
  17. def run(data: map):
  18. company_id = data['company_id']
  19. summary = search_summary(company_id)
  20. latest_date = search_latest_date(company_id)
  21. _id = get_ds() + '_' + company_id + '_winhc'
  22. output_data = {
  23. "_id": _id,
  24. "base_info": data,
  25. "competitor_product_name": "winhc",
  26. "summary": summary,
  27. "latest_date": latest_date,
  28. "spider_date": get_now()
  29. }
  30. # print(map_2_json_str(output_data))
  31. try:
  32. col.insert_one(output_data)
  33. except Exception as e:
  34. pass
  35. pass
  36. r_sdk = all_client.get_rabbit_mq_sdk()
  37. RABBITMQ_TOPIC = TOPIC_NAME
  38. q = queue.Queue(5000)
  39. class Work(Thread):
  40. def run(self):
  41. while True:
  42. run(q.get())
  43. def main():
  44. thread_num = 10
  45. for i in range(thread_num):
  46. Work().start()
  47. pass
  48. def callback(ch, method, properties, body):
  49. data = json.loads(body.decode())
  50. print(data)
  51. q.put(data)
  52. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  53. pass
  54. r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback)
  55. pass
  56. if __name__ == '__main__':
  57. main()
  58. # data = {
  59. # "company_id": "88f04cbfab150fe2bccdeec3aea32750",
  60. # "company_name": "",
  61. # "company_registered_date": "",
  62. # "company_org_type": "",
  63. # "province_code": "",
  64. # "city_code": "",
  65. # "county_code": "",
  66. # "org_number": "",
  67. # "reg_number": "",
  68. # "credit_code": "",
  69. # }
  70. #
  71. # run(data)
  72. pass