dim_handle_registry.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/24 11:03
  3. # @Author : XuJiakai
  4. # @File : dim_handle_registry
  5. # @Software: PyCharm
  6. from data_clean.exception.exception_handle import exception_handle
  7. __class_dict = {}
  8. def get_dim_handle(tn: str):
  9. if tn.endswith(".py"):
  10. tn = tn[:-3]
  11. if tn not in __class_dict:
  12. __class_dict[tn] = DimHandleRegistry(tn)
  13. return __class_dict[tn]
  14. class DimHandleRegistry:
  15. def __init__(self, name=None):
  16. if name is None:
  17. self._name = "DefaultRegistry"
  18. if name.endswith(".py"):
  19. name = name[:-3]
  20. self._name = name
  21. # 创建注册表,以字典的形式。
  22. self._obj_list = {}
  23. self._row_func = []
  24. def __registry(self, obj, name=None):
  25. """
  26. 内部注册函数
  27. :param obj:函数或者类的地址。
  28. :return:
  29. """
  30. # 判断是否目标函数或者类已经注册,如果已经注册过则标错,如果没有则进行注册。
  31. if name is None:
  32. name = obj.__name__
  33. assert (name not in self._obj_list.keys()), "{} already exists in {}".format(name, self._name)
  34. self._obj_list[name] = obj
  35. def registry_prefix_func(self, obj=None):
  36. """
  37. 外部注册函数。注册方法分为两种。
  38. 1.通过装饰器调用
  39. 2.通过函数的方式进行调用
  40. :param obj: 函数或者类的本身
  41. :return:
  42. """
  43. if obj == None:
  44. def _no_obj_registry(func__or__class, *args, **kwargs):
  45. self.__registry(func__or__class, name="prefix_func")
  46. # 此时被装饰的函数会被修改为该函数的返回值。
  47. return func__or__class
  48. return _no_obj_registry
  49. # 2.通过函数的方式进行调用
  50. self.__registry(obj, name="prefix_func")
  51. pass
  52. def registry_postfix_func(self, obj=None):
  53. if obj == None:
  54. def _no_obj_registry(func__or__class, *args, **kwargs):
  55. self.__registry(func__or__class, name="postfix_func")
  56. # 此时被装饰的函数会被修改为该函数的返回值。
  57. return func__or__class
  58. return _no_obj_registry
  59. # 2.通过函数的方式进行调用
  60. self.__registry(obj, name="postfix_func")
  61. pass
  62. def registry_row_func(self, obj=None):
  63. if obj == None:
  64. def _no_obj_registry(func__or__class, *args, **kwargs):
  65. self._row_func.append(func__or__class)
  66. # 此时被装饰的函数会被修改为该函数的返回值。
  67. return func__or__class
  68. return _no_obj_registry
  69. # 2.通过函数的方式进行调用
  70. self._row_func.append(obj)
  71. pass
  72. @exception_handle
  73. async def execute_dim(self, dim_data: list, session_id=None, original_data=None) -> list:
  74. if "prefix_func" in self._obj_list:
  75. await self._obj_list["prefix_func"](dim_data)
  76. result_list = []
  77. for row in dim_data:
  78. row_data = await self._exec_row(row, session_id, original_data)
  79. if row_data is not None:
  80. result_list.append(row_data)
  81. if "postfix_func" in self._obj_list and len(result_list) > 0:
  82. await self._obj_list["postfix_func"](result_list)
  83. return result_list
  84. @exception_handle
  85. async def _exec_row(self, row_data: dict, session_id=None, original_data=None) -> dict:
  86. for func in self._row_func:
  87. row_data = await func(row_data)
  88. if row_data is None:
  89. return row_data
  90. pass
  91. return row_data
  92. def get_class_dict():
  93. return __class_dict
  94. pass