# -*- coding: utf-8 -*- # @Time : 2022/2/22 11:44 # @Author : XuJiakai # @File : mysql_utils # @Software: PyCharm import pymysql import psycopg2 from psycopg2 import extras from log import get_log log = get_log('exec_sql') def exec_sql(db_client, sql): with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor: cursor.execute(sql) result = cursor.fetchall() return result pass def exec_sql_by_holo(db_client, sql): with db_client.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor: cursor.execute(sql) result = cursor.fetchall() return result pass def get_mysql_fields(db_client, table_schema, table_name): with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor: cursor.execute( f"select column_name, column_comment from information_schema.columns where table_schema ='{table_schema}' and table_name = '{table_name}'") result = cursor.fetchall() return result def get_mysql_tables(db_client, table_schema): with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor: cursor.execute( f"select table_name , table_comment from information_schema.tables where table_schema ='{table_schema}' ") result = cursor.fetchall() return result def insert(json: map, table_name, db_client): keys = list(json.keys()) return insert_many([json], keys, table_name, db_client) pass def insert_many(data: list, key_list: list, table_name, db_client): keys = ','.join(key_list) vs = ','.join(['%s'] * len(key_list)) data = tuple([tuple(i.values()) for i in data]) sql = f"INSERT INTO {table_name}({keys}) \nvalues({vs}) " # print(sql) with db_client.cursor() as cursor: try: num = cursor.executemany(sql, data) db_client.commit() return num except Exception as e: log.error("insert exec error: {}".format(e)) db_client.rollback() return -1 pass if __name__ == '__main__': from sdk.WinhcAllClient import get_all_client all_client = get_all_client() holo_client = all_client.get_holo_client(db='winhc_biz') HOLO_TABLE_NAME = 'public.ads_waa_dim_info' pass