# -*- coding: utf-8 -*- # @Time : 2020/12/16 15:25 # @Author : XuJiakai # @File : odps_schema_util # @Software: PyCharm from odps import ODPS odps = ODPS('LTAI4G4yiyJV4ggnLyGMduqV', 'nokDg5HlVIBh80nL2dOXsKa2La4XL5', 'winhc_biz', endpoint='http://service.odps.aliyun.com/api') def execute_sql(sql): with odps.execute_sql(sql).open_reader() as reader: # print(type(reader.raw)) # print(reader.raw) return reader.to_pandas() return None def exists_tab(tab, project): return odps.exist_table(tab, project=project) def list_tab(project, prefix=None): return odps.list_tables(project=project, prefix=prefix) pass def get_tabs(project, prefix=None, owner=None): ts = odps.list_tables(project, prefix=prefix, owner=owner) return [i.name for i in ts] def get_cols(tn, project='winhc_eci_dev'): t = odps.get_table(tn, project) cols = t.schema.columns return [i.name for i in cols] def get_cols_remove_partition_cols(tn, project='winhc_eci_dev'): t = odps.get_table(tn, project) cols = t.schema.columns partition_cols = [i.name for i in t.schema.partitions] return [i.name for i in cols if i.name not in partition_cols] def query(sql): with odps.execute_sql(sql).open_reader() as reader: li = [] for record in reader: # print(record['value'][0:10]) li.append(record['value'] + "\n") return li pass def get_cols_type(tn, project='winhc_ng'): t = odps.get_table(tn, project) cols = t.schema.columns m = [] for i in cols: m.append((i.name, str(i.type))) return m def get_cols_type_desc(tn, project='winhc_ng'): t = odps.get_table(tn, project) cols = t.schema.columns m = [] for i in cols: m.append((i.name, str(i.type), i.comment)) return m def get_partition_cols(tn, project='winhc_ng'): table = odps.get_table(tn, project) return [i.name for i in table.schema.partitions] def get_last_partition_ds(tab, project, default=None): li = get_partition_ds(tab, project) if len(li) == 0: return default else: return li[-1] pass def get_partition_ds(tab, project): li = get_partition(tab, project, 'ds=') l = [] for i in li: if '/' in i: l.append([j for j in i.split('/') if 'ds=' in j][0].split('=')[1]) else: l.append(i.split('=')[1]) return l pass def get_partition(tab, project, expression=None): with odps.execute_sql(f"show partitions {project}.{tab}").open_reader() as reader: li = reader.raw.split('\n') li = [i for i in li if li is not None and i.strip() != ''] if expression: li = [i for i in li if expression in i] return li def show_partitions(tn, project='winhc_ng'): table = odps.get_table(tn, project) return [i.name for i in table.partitions] pass if __name__ == '__main__': res = list_tab("winhc_ng", prefix='ads_') str = "" for i in res: str += '"{a}":"{b}",'.format(a=i.name[4:], b=i.comment) from utils import set_text set_text(str) print(str) # li = query( # "SELECT * from winhc_ng.tmp_dynamic_test1 where ds = '20220719' and INSTR(value, 'company_staff') > 0 and INSTR(value, 'company_holder') > 0;") # f = open("D://out_dynamic_test1.txt", mode='w', encoding='utf-8') # f.writelines(li) # print(li) pass