# -*- coding: utf-8 -*- # @Time : 2022/12/1 14:11 # @Author : XuJiakai # @File : winhc_job # @Software: PyCharm import json import queue from threading import Thread from handle.search_winhc_latest_date import search_latest_date from handle.search_winhc_summary import search_summary from sdk.WinhcAllClient import get_all_client from utils import map_2_json_str from utils.datetime_utils import get_ds, get_now from project_const import TOPIC_NAME, MONGODB_NAME all_client = get_all_client() col = all_client.get_mongo_collection(MONGODB_NAME) def run(data: map): company_id = data['company_id'] summary = search_summary(company_id) latest_date = search_latest_date(company_id) _id = get_ds() + '_' + company_id + '_winhc' output_data = { "_id": _id, "base_info": data, "competitor_product_name": "winhc", "summary": summary, "latest_date": latest_date, "spider_date": get_now() } # print(map_2_json_str(output_data)) try: col.insert_one(output_data) except Exception as e: pass pass r_sdk = all_client.get_rabbit_mq_sdk() RABBITMQ_TOPIC = TOPIC_NAME q = queue.Queue(5000) class Work(Thread): def run(self): while True: run(q.get()) def main(): thread_num = 10 for i in range(thread_num): Work().start() pass def callback(ch, method, properties, body): data = json.loads(body.decode()) print(data) q.put(data) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答 pass r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback) pass if __name__ == '__main__': main() # data = { # "company_id": "88f04cbfab150fe2bccdeec3aea32750", # "company_name": "", # "company_registered_date": "", # "company_org_type": "", # "province_code": "", # "city_code": "", # "county_code": "", # "org_number": "", # "reg_number": "", # "credit_code": "", # } # # run(data) pass