123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/2/22 11:44
- # @Author : XuJiakai
- # @File : mysql_utils
- # @Software: PyCharm
- import pymysql
- import psycopg2
- from psycopg2 import extras
- def exec_sql(db_client, sql):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(sql)
- result = cursor.fetchall()
- return result
- pass
- def exec_sql_by_holo(db_client, sql):
- with db_client.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
- cursor.execute(sql)
- result = cursor.fetchall()
- return result
- pass
- def get_mysql_fields(db_client, table_schema, table_name):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(
- f"select column_name, column_comment from information_schema.columns where table_schema ='{table_schema}' and table_name = '{table_name}'")
- result = cursor.fetchall()
- return result
- def get_mysql_tables(db_client, table_schema):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(
- f"select table_name , table_comment from information_schema.tables where table_schema ='{table_schema}' ")
- result = cursor.fetchall()
- return result
- def insert(json: map, table_name, db_client):
- keys = list(json.keys())
- return insert_many([json], keys, table_name, db_client)
- pass
- def insert_many(data: list, key_list: list, table_name, db_client):
- keys = ','.join(key_list)
- vs = ','.join(['%s'] * len(key_list))
- data = tuple([tuple(i.values()) for i in data])
- sql = f"INSERT INTO {table_name}({keys}) \nvalues({vs}) "
- # print(sql)
- with db_client.cursor() as cursor:
- try:
- num = cursor.executemany(sql, data)
- db_client.commit()
- return num
- except Exception as e:
- print(e)
- db_client.rollback()
- return -1
- pass
- if __name__ == '__main__':
- from sdk.WinhcAllClient import get_all_client
- all_client = get_all_client()
- holo_client = all_client.get_holo_client(db='winhc_biz')
- HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
- pass
|