12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- # -*- coding: utf-8 -*-
- # @Time : 2022/2/22 11:44
- # @Author : XuJiakai
- # @File : mysql_utils
- # @Software: PyCharm
- import pymysql
- import psycopg2
- from psycopg2 import extras
- from log import get_log
- log = get_log('exec_sql')
- def exec_sql(db_client, sql):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(sql)
- result = cursor.fetchall()
- return result
- pass
- def exec_sql_by_holo(db_client, sql):
- with db_client.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
- cursor.execute(sql)
- result = cursor.fetchall()
- return result
- pass
- def get_mysql_fields(db_client, table_schema, table_name):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(
- f"select column_name, column_comment from information_schema.columns where table_schema ='{table_schema}' and table_name = '{table_name}'")
- result = cursor.fetchall()
- return result
- def get_mysql_tables(db_client, table_schema):
- with db_client.cursor(cursor=pymysql.cursors.DictCursor) as cursor:
- cursor.execute(
- f"select table_name , table_comment from information_schema.tables where table_schema ='{table_schema}' ")
- result = cursor.fetchall()
- return result
- def insert(json: map, table_name, db_client):
- keys = list(json.keys())
- return insert_many([json], keys, table_name, db_client)
- pass
- def insert_many(data: list, key_list: list, table_name, db_client):
- keys = ','.join(key_list)
- vs = ','.join(['%s'] * len(key_list))
- data = tuple([tuple(i.values()) for i in data])
- sql = f"INSERT INTO {table_name}({keys}) \nvalues({vs}) "
- # print(sql)
- with db_client.cursor() as cursor:
- try:
- num = cursor.executemany(sql, data)
- db_client.commit()
- return num
- except Exception as e:
- log.error("insert exec error: {}".format(e))
- db_client.rollback()
- return -1
- pass
- if __name__ == '__main__':
- from sdk.WinhcAllClient import get_all_client
- all_client = get_all_client()
- holo_client = all_client.get_holo_client(db='winhc_biz')
- HOLO_TABLE_NAME = 'public.ads_waa_dim_info'
- pass
|