insert_holo.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/1/3 15:03
  3. # @Author : XuJiakai
  4. # @File : insert_holo
  5. # @Software: PyCharm
  6. from threading import Thread
  7. import json
  8. import queue
  9. from sdk.WinhcAllClient import get_all_client
  10. import time
  11. from log import get_log
  12. import tqdm
  13. log = get_log('cpa_agg')
  14. q = queue.Queue(50000)
  15. all_client = get_all_client()
  16. db_client = all_client.get_holo_client(db='winhc_biz')
  17. sql = """INSERT INTO public.ads_waa_dim_info(company_id,company_name,company_org_type,province,city,county,org_number,credit_code,reg_number,cate_first,cate_second,cate_third,id,dim_name,dim_max_num,dim_max_num_business_name,winhc_dim_num,dim_max_date,dim_max_date_business_name,winhc_dim_date,other_info,update_time,create_time,ds)
  18. values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON CONFLICT DO NOTHING """
  19. def run(data):
  20. if len(data) == 0:
  21. return
  22. with db_client.cursor() as cursor:
  23. try:
  24. # print('start : %s ' % time.time())
  25. num = cursor.executemany(sql, tuple(data))
  26. db_client.commit()
  27. # print('end : %s ' % time.time())
  28. return num
  29. except Exception as e:
  30. log.error("insert exec error: {}".format(e))
  31. db_client.rollback()
  32. return -1
  33. pass
  34. class Work(Thread):
  35. def run(self):
  36. t = tqdm.tqdm()
  37. bulk_list = []
  38. flag = 0
  39. while True:
  40. try:
  41. ele = q.get(timeout=5)
  42. bulk_list.append(ele)
  43. t.update()
  44. except:
  45. log.info("等待超时...")
  46. pass
  47. if len(bulk_list) > 40000:
  48. log.info(f"溢出触发,缓冲队列剩余:{len(bulk_list)}")
  49. run(bulk_list)
  50. bulk_list = []
  51. flag = 0
  52. pass
  53. else:
  54. flag += 1
  55. if flag > 3:
  56. log.info(f"手动触发,缓冲队列剩余:{len(bulk_list)}")
  57. run(bulk_list)
  58. bulk_list = []
  59. flag = 0
  60. pass
  61. r_sdk = all_client.get_rabbit_mq_sdk()
  62. def main():
  63. thread_num = 10
  64. for i in range(thread_num):
  65. log.info("启动一个线程...")
  66. Work().start()
  67. pass
  68. def callback(ch, method, properties, body):
  69. data = json.loads(body.decode())
  70. q.put(data)
  71. ch.basic_ack(delivery_tag=method.delivery_tag) # 手动应答ack,确保消息真正消费后才应答
  72. pass
  73. r_sdk.consumer_by_fanout("cpa_insert_holo", queue='cpa_save_holo', callback=callback)
  74. pass
  75. if __name__ == '__main__':
  76. main()
  77. pass