# -*- coding: utf-8 -*- # @Time : 2023/8/22 10:37 # @Author : XuJiakai # @File : data_schema_utils # @Software: PyCharm import asyncio import json from data_clean.api.http_api import get def _cast_by_type(val, val_type: list): if val == '': return None val_type = [i for i in val_type if i != 'null'] val_type = val_type[0] if val_type == 'array' or val_type == 'object': return json.loads(val) if val_type == 'number': return int(val) return val pass class BusinessDataSchema: def __init__(self): self.org_data_schema = {} self._lock = asyncio.Lock() pass async def get_data_schema(self, tn: str): if tn in self.org_data_schema: return self.org_data_schema[tn] pass async with self._lock: if tn in self.org_data_schema: return self.org_data_schema[tn] pass res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema', result_json=False) res = json.loads(res) self.org_data_schema[tn] = res return res pass async def hbase_record_to_json(self, tn, record_json): json_schema = await self.get_data_schema(tn) json_schema = json_schema['properties'] result_json = {} for key in record_json: key_lower = key.lower() if key_lower in json_schema: result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type']) pass return result_json pass