data_schema_utils.py 1.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/8/22 10:37
  3. # @Author : XuJiakai
  4. # @File : data_schema_utils
  5. # @Software: PyCharm
  6. import asyncio
  7. import json
  8. from data_clean.api.http_api import get
  9. def _cast_by_type(val, val_type: list):
  10. if val == '':
  11. return None
  12. val_type = [i for i in val_type if i != 'null']
  13. val_type = val_type[0]
  14. if val_type == 'array' or val_type == 'object':
  15. return json.loads(val)
  16. if val_type == 'number':
  17. return int(val)
  18. return val
  19. pass
  20. class BusinessDataSchema:
  21. def __init__(self):
  22. self.org_data_schema = {}
  23. self._lock = asyncio.Lock()
  24. pass
  25. async def get_data_schema(self, tn: str):
  26. if tn in self.org_data_schema:
  27. return self.org_data_schema[tn]
  28. pass
  29. async with self._lock:
  30. if tn in self.org_data_schema:
  31. return self.org_data_schema[tn]
  32. pass
  33. res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema',
  34. result_json=False)
  35. res = json.loads(res)
  36. self.org_data_schema[tn] = res
  37. return res
  38. pass
  39. async def hbase_record_to_json(self, tn, record_json):
  40. json_schema = await self.get_data_schema(tn)
  41. json_schema = json_schema['properties']
  42. result_json = {}
  43. for key in record_json:
  44. key_lower = key.lower()
  45. if key_lower in json_schema:
  46. result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type'])
  47. pass
  48. return result_json
  49. pass