# -*- coding: utf-8 -*- # @Time : 2022/12/1 14:11 # @Author : XuJiakai # @File : winhc_job # @Software: PyCharm import json import queue from threading import Thread from log import get_log from handle.search_winhc_latest_date import search_latest_date from handle.search_winhc_summary import search_summary from sdk.WinhcAllClient import get_all_client from utils import map_2_json_str from utils.datetime_utils import get_ds, get_now from project_const import TOPIC_NAME, MONGODB_NAME all_client = get_all_client() col = all_client.get_mongo_collection(MONGODB_NAME) log = get_log('winhc_job') def run(data: map): company_id = data['company_id'] try: summary = search_summary(company_id) latest_date = search_latest_date(company_id) pass except: log.error('error company_id :{}'.format(company_id)) return pass _id = get_ds() + '_' + company_id + '_winhc' output_data = { "_id": _id, "base_info": data, "competitor_product_name": "winhc", "summary": summary, "latest_date": latest_date, "spider_date": get_now() } log.info('insert mongo: {}'.format(output_data)) # print(map_2_json_str(output_data)) try: col.insert_one(output_data) except Exception as e: pass pass r_sdk = all_client.get_rabbit_mq_sdk() RABBITMQ_TOPIC = TOPIC_NAME q = queue.Queue(5000) class Work(Thread): def run(self): while True: run(q.get()) def main(): thread_num = 10 for i in range(thread_num): Work().start() pass def callback(ch, method, properties, body): data = json.loads(body.decode()) log.info('receive data: {}'.format(data)) q.put(data) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答 pass r_sdk.consumer_by_fanout(RABBITMQ_TOPIC, callback=callback) pass def test(company_id): data = { "company_id": company_id, "company_name": "", "company_registered_date": "", "company_org_type": "", "province_code": "", "city_code": "", "county_code": "", "org_number": "", "reg_number": "", "credit_code": "", } run(data) pass if __name__ == '__main__': main() # test('3667556dcfeda7c18c9745f6b8133165') pass