123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/24 11:03
- # @Author : XuJiakai
- # @File : dim_handle_registry
- # @Software: PyCharm
- from data_clean.exception.exception_handle import exception_handle
- __class_dict = {}
- def get_dim_handle(tn: str):
- if tn.endswith(".py"):
- tn = tn[:-3]
- if tn not in __class_dict:
- __class_dict[tn] = DimHandleRegistry(tn)
- return __class_dict[tn]
- def _get_deleted(row_data: dict):
- del_val = 0
- if 'deleted' in row_data:
- del_val = row_data['deleted']
- if 'DELETED' in row_data:
- del_val = row_data["DELETED"]
- if type(del_val) is not int:
- del_val = int(del_val)
- return del_val
- pass
- class DimHandleRegistry:
- def __init__(self, name=None):
- if name is None:
- self._name = "DefaultRegistry"
- if name.endswith(".py"):
- name = name[:-3]
- self._name = name
- # 创建注册表,以字典的形式。
- self._obj_list = {}
- self._row_func = []
- def __registry(self, obj, name=None):
- """
- 内部注册函数
- :param obj:函数或者类的地址。
- :return:
- """
- # 判断是否目标函数或者类已经注册,如果已经注册过则标错,如果没有则进行注册。
- if name is None:
- name = obj.__name__
- assert (name not in self._obj_list.keys()), "{} already exists in {}".format(name, self._name)
- self._obj_list[name] = obj
- def registry_prefix_func(self, obj=None):
- """
- 外部注册函数。注册方法分为两种。
- 1.通过装饰器调用
- 2.通过函数的方式进行调用
- :param obj: 函数或者类的本身
- :return:
- """
- if obj == None:
- def _no_obj_registry(func__or__class, *args, **kwargs):
- self.__registry(func__or__class, name="prefix_func")
- # 此时被装饰的函数会被修改为该函数的返回值。
- return func__or__class
- return _no_obj_registry
- # 2.通过函数的方式进行调用
- self.__registry(obj, name="prefix_func")
- pass
- def registry_postfix_func(self, obj=None):
- if obj == None:
- def _no_obj_registry(func__or__class, *args, **kwargs):
- self.__registry(func__or__class, name="postfix_func")
- # 此时被装饰的函数会被修改为该函数的返回值。
- return func__or__class
- return _no_obj_registry
- # 2.通过函数的方式进行调用
- self.__registry(obj, name="postfix_func")
- pass
- def registry_row_func(self, obj=None):
- if obj == None:
- def _no_obj_registry(func__or__class, *args, **kwargs):
- self._row_func.append(func__or__class)
- # 此时被装饰的函数会被修改为该函数的返回值。
- return func__or__class
- return _no_obj_registry
- # 2.通过函数的方式进行调用
- self._row_func.append(obj)
- pass
- @exception_handle
- async def execute_dim(self, dim_data: list, session_id=None, original_data=None) -> list:
- if "prefix_func" in self._obj_list:
- await self._obj_list["prefix_func"](dim_data)
- result_list = []
- for row in dim_data:
- row_data = await self._exec_row(row, session_id, original_data)
- if row_data is not None:
- result_list.append(row_data)
- if "postfix_func" in self._obj_list and len(result_list) > 0:
- await self._obj_list["postfix_func"](result_list)
- return result_list
- @exception_handle
- async def _exec_row(self, row_data: dict, session_id=None, original_data=None) -> dict:
- del_val = _get_deleted(row_data)
- if del_val == 9:
- return row_data
- for func in self._row_func:
- row_data = await func(row_data)
- if row_data is None:
- return row_data
- pass
- return row_data
- def get_class_dict():
- return __class_dict
- pass
|