# -*- coding: utf-8 -*- # @Time : 2023/7/24 11:03 # @Author : XuJiakai # @File : dim_handle_registry # @Software: PyCharm from data_clean.exception.exception_handle import exception_handle __class_dict = {} def get_dim_handle(tn: str): if tn.endswith(".py"): tn = tn[:-3] if tn not in __class_dict: __class_dict[tn] = DimHandleRegistry(tn) return __class_dict[tn] def _get_deleted(row_data: dict): del_val = 0 if 'deleted' in row_data: del_val = row_data['deleted'] if 'DELETED' in row_data: del_val = row_data["DELETED"] if type(del_val) is not int: del_val = int(del_val) return del_val pass class DimHandleRegistry: def __init__(self, name=None): if name is None: self._name = "DefaultRegistry" if name.endswith(".py"): name = name[:-3] self._name = name # 创建注册表,以字典的形式。 self._obj_list = {} self._row_func = [] def __registry(self, obj, name=None): """ 内部注册函数 :param obj:函数或者类的地址。 :return: """ # 判断是否目标函数或者类已经注册,如果已经注册过则标错,如果没有则进行注册。 if name is None: name = obj.__name__ assert (name not in self._obj_list.keys()), "{} already exists in {}".format(name, self._name) self._obj_list[name] = obj def registry_prefix_func(self, obj=None): """ 外部注册函数。注册方法分为两种。 1.通过装饰器调用 2.通过函数的方式进行调用 :param obj: 函数或者类的本身 :return: """ if obj == None: def _no_obj_registry(func__or__class, *args, **kwargs): self.__registry(func__or__class, name="prefix_func") # 此时被装饰的函数会被修改为该函数的返回值。 return func__or__class return _no_obj_registry # 2.通过函数的方式进行调用 self.__registry(obj, name="prefix_func") pass def registry_postfix_func(self, obj=None): if obj == None: def _no_obj_registry(func__or__class, *args, **kwargs): self.__registry(func__or__class, name="postfix_func") # 此时被装饰的函数会被修改为该函数的返回值。 return func__or__class return _no_obj_registry # 2.通过函数的方式进行调用 self.__registry(obj, name="postfix_func") pass def registry_row_func(self, obj=None): if obj == None: def _no_obj_registry(func__or__class, *args, **kwargs): self._row_func.append(func__or__class) # 此时被装饰的函数会被修改为该函数的返回值。 return func__or__class return _no_obj_registry # 2.通过函数的方式进行调用 self._row_func.append(obj) pass @exception_handle async def execute_dim(self, dim_data: list, session_id=None, original_data=None) -> list: if "prefix_func" in self._obj_list: await self._obj_list["prefix_func"](dim_data) result_list = [] for row in dim_data: row_data = await self._exec_row(row, session_id, original_data) if row_data is not None: result_list.append(row_data) if "postfix_func" in self._obj_list and len(result_list) > 0: await self._obj_list["postfix_func"](result_list) return result_list @exception_handle async def _exec_row(self, row_data: dict, session_id=None, original_data=None) -> dict: del_val = _get_deleted(row_data) if del_val == 9: return row_data for func in self._row_func: row_data = await func(row_data) if row_data is None: return row_data pass return row_data def get_class_dict(): return __class_dict pass