winhc_job.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2022/12/1 14:11
  3. # @Author : XuJiakai
  4. # @File : winhc_job
  5. # @Software: PyCharm
  6. import json
  7. import queue
  8. from threading import Thread
  9. from log import get_log
  10. from handle.search_winhc_latest_date import search_latest_date
  11. from handle.search_winhc_summary import search_summary
  12. from sdk.WinhcAllClient import get_all_client
  13. from utils import map_2_json_str
  14. from utils.datetime_utils import get_ds, get_now
  15. from project_const import TOPIC_NAME, MONGODB_NAME
  16. all_client = get_all_client()
  17. col = all_client.get_mongo_collection(MONGODB_NAME)
  18. log = get_log('winhc_job')
  19. def run(data: map):
  20. company_id = data['company_id']
  21. try:
  22. summary = search_summary(company_id)
  23. latest_date = search_latest_date(company_id)
  24. pass
  25. except:
  26. log.error('error company_id :{}'.format(company_id))
  27. return
  28. pass
  29. _id = get_ds() + '_' + company_id + '_winhc'
  30. output_data = {
  31. "_id": _id,
  32. "base_info": data,
  33. "competitor_product_name": "winhc",
  34. "summary": summary,
  35. "latest_date": latest_date,
  36. "spider_date": get_now()
  37. }
  38. # print(map_2_json_str(output_data))
  39. try:
  40. col.insert_one(output_data)
  41. except Exception as e:
  42. pass
  43. pass
  44. r_sdk = all_client.get_rabbit_mq_sdk()
  45. RABBITMQ_TOPIC = TOPIC_NAME
  46. q = queue.Queue(5000)
  47. class Work(Thread):
  48. def run(self):
  49. while True:
  50. run(q.get())
  51. def main():
  52. thread_num = 10
  53. for i in range(thread_num):
  54. Work().start()
  55. pass
  56. def callback(ch, method, properties, body):
  57. data = json.loads(body.decode())
  58. print(data)
  59. q.put(data)
  60. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  61. pass
  62. r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback)
  63. pass
  64. def test(company_id):
  65. data = {
  66. "company_id": company_id,
  67. "company_name": "",
  68. "company_registered_date": "",
  69. "company_org_type": "",
  70. "province_code": "",
  71. "city_code": "",
  72. "county_code": "",
  73. "org_number": "",
  74. "reg_number": "",
  75. "credit_code": "",
  76. }
  77. run(data)
  78. pass
  79. if __name__ == '__main__':
  80. main()
  81. # test('88f04cbfab150fe2bccdeec3aea32750')
  82. pass