1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/8/22 10:37
- # @Author : XuJiakai
- # @File : data_schema_utils
- # @Software: PyCharm
- import asyncio
- import json
- from data_clean.api.http_api import get
- def _cast_by_type(val, val_type: list):
- if val == '':
- return None
- val_type = [i for i in val_type if i != 'null']
- val_type = val_type[0]
- if val_type == 'array' or val_type == 'object':
- return json.loads(val)
- if val_type == 'number':
- return int(val)
- return val
- pass
- class BusinessDataSchema:
- def __init__(self):
- self.org_data_schema = {}
- self._lock = asyncio.Lock()
- pass
- async def get_data_schema(self, tn: str):
- if tn in self.org_data_schema:
- return self.org_data_schema[tn]
- pass
- async with self._lock:
- if tn in self.org_data_schema:
- return self.org_data_schema[tn]
- pass
- res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema',
- result_json=False)
- res = json.loads(res)
- self.org_data_schema[tn] = res
- return res
- pass
- async def hbase_record_to_json(self, tn, record_json):
- json_schema = await self.get_data_schema(tn)
- json_schema = json_schema['properties']
- result_json = {}
- for key in record_json:
- key_lower = key.lower()
- if key_lower in json_schema:
- result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type'])
- pass
- return result_json
- pass
|