123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- # -*- coding: utf-8 -*-
- # @Time : 2020/12/16 15:25
- # @Author : XuJiakai
- # @File : odps_schema_util
- # @Software: PyCharm
- from odps import ODPS
- odps = ODPS('LTAI4G4yiyJV4ggnLyGMduqV', 'nokDg5HlVIBh80nL2dOXsKa2La4XL5', 'winhc_biz',
- endpoint='http://service.odps.aliyun.com/api')
- def execute_sql(sql):
- with odps.execute_sql(sql).open_reader() as reader:
- # print(type(reader.raw))
- # print(reader.raw)
- return reader.to_pandas()
- return None
- def exists_tab(tab, project):
- return odps.exist_table(tab, project=project)
- def list_tab(project, prefix=None):
- return odps.list_tables(project=project, prefix=prefix)
- pass
- def get_tabs(project, prefix=None, owner=None):
- ts = odps.list_tables(project, prefix=prefix, owner=owner)
- return [i.name for i in ts]
- def get_cols(tn, project='winhc_eci_dev'):
- t = odps.get_table(tn, project)
- cols = t.schema.columns
- return [i.name for i in cols]
- def get_cols_remove_partition_cols(tn, project='winhc_eci_dev'):
- t = odps.get_table(tn, project)
- cols = t.schema.columns
- partition_cols = [i.name for i in t.schema.partitions]
- return [i.name for i in cols if i.name not in partition_cols]
- def query(sql):
- with odps.execute_sql(sql).open_reader() as reader:
- li = []
- for record in reader:
- # print(record['value'][0:10])
- li.append(record['value'] + "\n")
- return li
- pass
- def get_cols_type(tn, project='winhc_ng'):
- t = odps.get_table(tn, project)
- cols = t.schema.columns
- m = []
- for i in cols:
- m.append((i.name, str(i.type)))
- return m
- def get_cols_type_desc(tn, project='winhc_ng'):
- t = odps.get_table(tn, project)
- cols = t.schema.columns
- m = []
- for i in cols:
- m.append((i.name, str(i.type), i.comment))
- return m
- def get_partition_cols(tn, project='winhc_ng'):
- table = odps.get_table(tn, project)
- return [i.name for i in table.schema.partitions]
- def get_last_partition_ds(tab, project, default=None):
- li = get_partition_ds(tab, project)
- if len(li) == 0:
- return default
- else:
- return li[-1]
- pass
- def get_partition_ds(tab, project):
- li = get_partition(tab, project, 'ds=')
- l = []
- for i in li:
- if '/' in i:
- l.append([j for j in i.split('/') if 'ds=' in j][0].split('=')[1])
- else:
- l.append(i.split('=')[1])
- return l
- pass
- def get_partition(tab, project, expression=None):
- with odps.execute_sql(f"show partitions {project}.{tab}").open_reader() as reader:
- li = reader.raw.split('\n')
- li = [i for i in li if li is not None and i.strip() != '']
- if expression:
- li = [i for i in li if expression in i]
- return li
- def show_partitions(tn, project='winhc_ng'):
- table = odps.get_table(tn, project)
- return [i.name for i in table.partitions]
- pass
- if __name__ == '__main__':
- res = list_tab("winhc_ng", prefix='ads_')
- str = ""
- for i in res:
- str += '"{a}":"{b}",'.format(a=i.name[4:], b=i.comment)
- from utils import set_text
- set_text(str)
- print(str)
- # li = query(
- # "SELECT * from winhc_ng.tmp_dynamic_test1 where ds = '20220719' and INSTR(value, 'company_staff') > 0 and INSTR(value, 'company_holder') > 0;")
- # f = open("D://out_dynamic_test1.txt", mode='w', encoding='utf-8')
- # f.writelines(li)
- # print(li)
- pass
|