1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/1/3 15:03
- # @Author : XuJiakai
- # @File : insert_holo
- # @Software: PyCharm
- from threading import Thread
- import json
- import queue
- from sdk.WinhcAllClient import get_all_client
- import time
- from log import get_log
- import tqdm
- log = get_log('cpa_agg')
- q = queue.Queue(50000)
- all_client = get_all_client()
- db_client = all_client.get_holo_client(db='winhc_biz')
- sql = """INSERT INTO public.ads_waa_dim_info(company_id,company_name,company_org_type,province,city,county,org_number,credit_code,reg_number,cate_first,cate_second,cate_third,id,dim_name,dim_max_num,dim_max_num_business_name,winhc_dim_num,dim_max_date,dim_max_date_business_name,winhc_dim_date,other_info,update_time,create_time,ds)
- values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING """
- def run(data):
- if len(data) == 0:
- return
- with db_client.cursor() as cursor:
- try:
- # print('start : %s ' % time.time())
- num = cursor.executemany(sql, tuple(data))
- db_client.commit()
- # print('end : %s ' % time.time())
- return num
- except Exception as e:
- log.error("insert exec error: {}".format(e))
- db_client.rollback()
- return -1
- pass
- class Work(Thread):
- def run(self):
- t = tqdm.tqdm()
- bulk_list = []
- flag = 0
- while True:
- try:
- ele = q.get(timeout=5)
- bulk_list.append(ele)
- t.update()
- except:
- log.info("等待超时...")
- pass
- if len(bulk_list) > 40000:
- log.info(f"溢出触发,缓冲队列剩余:{len(bulk_list)}")
- run(bulk_list)
- bulk_list = []
- flag = 0
- pass
- else:
- flag += 1
- if flag > 3:
- log.info(f"手动触发,缓冲队列剩余:{len(bulk_list)}")
- run(bulk_list)
- bulk_list = []
- flag = 0
- pass
- r_sdk = all_client.get_rabbit_mq_sdk()
- def main():
- thread_num = 10
- for i in range(thread_num):
- log.info("启动一个线程...")
- Work().start()
- pass
- def callback(ch, method, properties, body):
- data = json.loads(body.decode())
- q.put(data)
- ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
- pass
- r_sdk.consumer_by_fanout("cpa_insert_holo", queue='cpa_save_holo', callback=callback)
- pass
- if __name__ == '__main__':
- main()
- pass
|