odps_schema_utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2020/12/16 15:25
  3. # @Author : XuJiakai
  4. # @File : odps_schema_util
  5. # @Software: PyCharm
  6. from odps import ODPS
  7. odps = ODPS('LTAI4G4yiyJV4ggnLyGMduqV', 'nokDg5HlVIBh80nL2dOXsKa2La4XL5', 'winhc_biz',
  8. endpoint='http://service.odps.aliyun.com/api')
  9. def execute_sql(sql):
  10. with odps.execute_sql(sql).open_reader() as reader:
  11. # print(type(reader.raw))
  12. # print(reader.raw)
  13. return reader.to_pandas()
  14. return None
  15. def exists_tab(tab, project):
  16. return odps.exist_table(tab, project=project)
  17. def list_tab(project, prefix=None):
  18. return odps.list_tables(project=project, prefix=prefix)
  19. pass
  20. def get_tabs(project, prefix=None, owner=None):
  21. ts = odps.list_tables(project, prefix=prefix, owner=owner)
  22. return [i.name for i in ts]
  23. def get_cols(tn, project='winhc_eci_dev'):
  24. t = odps.get_table(tn, project)
  25. cols = t.schema.columns
  26. return [i.name for i in cols]
  27. def get_cols_remove_partition_cols(tn, project='winhc_eci_dev'):
  28. t = odps.get_table(tn, project)
  29. cols = t.schema.columns
  30. partition_cols = [i.name for i in t.schema.partitions]
  31. return [i.name for i in cols if i.name not in partition_cols]
  32. def query(sql):
  33. with odps.execute_sql(sql).open_reader() as reader:
  34. li = []
  35. for record in reader:
  36. # print(record['value'][0:10])
  37. li.append(record['value'] + "\n")
  38. return li
  39. pass
  40. def get_cols_type(tn, project='winhc_ng'):
  41. t = odps.get_table(tn, project)
  42. cols = t.schema.columns
  43. m = []
  44. for i in cols:
  45. m.append((i.name, str(i.type)))
  46. return m
  47. def get_cols_type_desc(tn, project='winhc_ng'):
  48. t = odps.get_table(tn, project)
  49. cols = t.schema.columns
  50. m = []
  51. for i in cols:
  52. m.append((i.name, str(i.type), i.comment))
  53. return m
  54. def get_partition_cols(tn, project='winhc_ng'):
  55. table = odps.get_table(tn, project)
  56. return [i.name for i in table.schema.partitions]
  57. def get_last_partition_ds(tab, project, default=None):
  58. li = get_partition_ds(tab, project)
  59. if len(li) == 0:
  60. return default
  61. else:
  62. return li[-1]
  63. pass
  64. def get_partition_ds(tab, project):
  65. li = get_partition(tab, project, 'ds=')
  66. l = []
  67. for i in li:
  68. if '/' in i:
  69. l.append([j for j in i.split('/') if 'ds=' in j][0].split('=')[1])
  70. else:
  71. l.append(i.split('=')[1])
  72. return l
  73. pass
  74. def get_partition(tab, project, expression=None):
  75. with odps.execute_sql(f"show partitions {project}.{tab}").open_reader() as reader:
  76. li = reader.raw.split('\n')
  77. li = [i for i in li if li is not None and i.strip() != '']
  78. if expression:
  79. li = [i for i in li if expression in i]
  80. return li
  81. def show_partitions(tn, project='winhc_ng'):
  82. table = odps.get_table(tn, project)
  83. return [i.name for i in table.partitions]
  84. pass
  85. if __name__ == '__main__':
  86. res = list_tab("winhc_ng", prefix='ads_')
  87. str = ""
  88. for i in res:
  89. str += '"{a}":"{b}",'.format(a=i.name[4:], b=i.comment)
  90. from utils import set_text
  91. set_text(str)
  92. print(str)
  93. # li = query(
  94. # "SELECT * from winhc_ng.tmp_dynamic_test1 where ds = '20220719' and INSTR(value, 'company_staff') > 0 and INSTR(value, 'company_holder') > 0;")
  95. # f = open("D://out_dynamic_test1.txt", mode='w', encoding='utf-8')
  96. # f.writelines(li)
  97. # print(li)
  98. pass