# -*- coding: utf-8 -*- # @Time : 2023/1/3 15:03 # @Author : XuJiakai # @File : insert_holo # @Software: PyCharm from threading import Thread import json import queue from sdk.WinhcAllClient import get_all_client import time from log import get_log import tqdm log = get_log('cpa_agg') q = queue.Queue(50000) all_client = get_all_client() db_client = all_client.get_holo_client(db='winhc_biz') sql = """INSERT INTO public.ads_waa_dim_info(company_id,company_name,company_org_type,province,city,county,org_number,credit_code,reg_number,cate_first,cate_second,cate_third,id,dim_name,dim_max_num,dim_max_num_business_name,winhc_dim_num,dim_max_date,dim_max_date_business_name,winhc_dim_date,other_info,update_time,create_time,ds) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING """ def run(data): if len(data) == 0: return with db_client.cursor() as cursor: try: # print('start : %s ' % time.time()) num = cursor.executemany(sql, tuple(data)) db_client.commit() # print('end : %s ' % time.time()) return num except Exception as e: log.error("insert exec error: {}".format(e)) db_client.rollback() return -1 pass class Work(Thread): def run(self): t = tqdm.tqdm() bulk_list = [] flag = 0 while True: try: ele = q.get(timeout=5) bulk_list.append(ele) t.update() except: log.info("等待超时...") pass if len(bulk_list) > 40000: log.info(f"溢出触发,缓冲队列剩余:{len(bulk_list)}") run(bulk_list) bulk_list = [] flag = 0 pass else: flag += 1 if flag > 3: log.info(f"手动触发,缓冲队列剩余:{len(bulk_list)}") run(bulk_list) bulk_list = [] flag = 0 pass r_sdk = all_client.get_rabbit_mq_sdk() def main(): thread_num = 10 for i in range(thread_num): log.info("启动一个线程...") Work().start() pass def callback(ch, method, properties, body): data = json.loads(body.decode()) q.put(data) ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答 pass r_sdk.consumer_by_fanout("cpa_insert_holo", queue='cpa_save_holo', callback=callback) pass if __name__ == '__main__': main() pass