123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/12/1 14:11
- # @Author : XuJiakai
- # @File : winhc_job
- # @Software: PyCharm
- import json
- import queue
- from threading import Thread
- from handle.search_winhc_latest_date import search_latest_date
- from handle.search_winhc_summary import search_summary
- from sdk.WinhcAllClient import get_all_client
- from utils import map_2_json_str
- from utils.datetime_utils import get_ds, get_now
- from project_const import TOPIC_NAME, MONGODB_NAME
- all_client = get_all_client()
- col = all_client.get_mongo_collection(MONGODB_NAME)
- def run(data: map):
- company_id = data['company_id']
- summary = search_summary(company_id)
- latest_date = search_latest_date(company_id)
- _id = get_ds() + '_' + company_id + '_winhc'
- output_data = {
- "_id": _id,
- "base_info": data,
- "competitor_product_name": "winhc",
- "summary": summary,
- "latest_date": latest_date,
- "spider_date": get_now()
- }
- # print(map_2_json_str(output_data))
- try:
- col.insert_one(output_data)
- except Exception as e:
- pass
- pass
- r_sdk = all_client.get_rabbit_mq_sdk()
- RABBITMQ_TOPIC = TOPIC_NAME
- q = queue.Queue(5000)
- class Work(Thread):
- def run(self):
- while True:
- run(q.get())
- def main():
- thread_num = 10
- for i in range(thread_num):
- Work().start()
- pass
- def callback(ch, method, properties, body):
- data = json.loads(body.decode())
- print(data)
- q.put(data)
- ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
- pass
- r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback)
- pass
- if __name__ == '__main__':
- main()
- # data = {
- # "company_id": "88f04cbfab150fe2bccdeec3aea32750",
- # "company_name": "",
- # "company_registered_date": "",
- # "company_org_type": "",
- # "province_code": "",
- # "city_code": "",
- # "county_code": "",
- # "org_number": "",
- # "reg_number": "",
- # "credit_code": "",
- # }
- #
- # run(data)
- pass
|