dim_handle_registry.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/24 11:03
  3. # @Author : XuJiakai
  4. # @File : dim_handle_registry
  5. # @Software: PyCharm
  6. from data_clean.exception.exception_handle import exception_handle
  7. __class_dict = {}
  8. def get_dim_handle(tn: str):
  9. if tn.endswith(".py"):
  10. tn = tn[:-3]
  11. if tn not in __class_dict:
  12. __class_dict[tn] = DimHandleRegistry(tn)
  13. return __class_dict[tn]
  14. def _get_deleted(row_data: dict):
  15. del_val = 0
  16. if 'deleted' in row_data:
  17. del_val = row_data['deleted']
  18. if 'DELETED' in row_data:
  19. del_val = row_data["DELETED"]
  20. if type(del_val) is not int:
  21. del_val = int(del_val)
  22. return del_val
  23. pass
  24. class DimHandleRegistry:
  25. def __init__(self, name=None):
  26. if name is None:
  27. self._name = "DefaultRegistry"
  28. if name.endswith(".py"):
  29. name = name[:-3]
  30. self._name = name
  31. # 创建注册表,以字典的形式。
  32. self._obj_list = {}
  33. self._row_func = []
  34. def __registry(self, obj, name=None):
  35. """
  36. 内部注册函数
  37. :param obj:函数或者类的地址。
  38. :return:
  39. """
  40. # 判断是否目标函数或者类已经注册,如果已经注册过则标错,如果没有则进行注册。
  41. if name is None:
  42. name = obj.__name__
  43. assert (name not in self._obj_list.keys()), "{} already exists in {}".format(name, self._name)
  44. self._obj_list[name] = obj
  45. def registry_prefix_func(self, obj=None):
  46. """
  47. 外部注册函数。注册方法分为两种。
  48. 1.通过装饰器调用
  49. 2.通过函数的方式进行调用
  50. :param obj: 函数或者类的本身
  51. :return:
  52. """
  53. if obj == None:
  54. def _no_obj_registry(func__or__class, *args, **kwargs):
  55. self.__registry(func__or__class, name="prefix_func")
  56. # 此时被装饰的函数会被修改为该函数的返回值。
  57. return func__or__class
  58. return _no_obj_registry
  59. # 2.通过函数的方式进行调用
  60. self.__registry(obj, name="prefix_func")
  61. pass
  62. def registry_postfix_func(self, obj=None):
  63. if obj == None:
  64. def _no_obj_registry(func__or__class, *args, **kwargs):
  65. self.__registry(func__or__class, name="postfix_func")
  66. # 此时被装饰的函数会被修改为该函数的返回值。
  67. return func__or__class
  68. return _no_obj_registry
  69. # 2.通过函数的方式进行调用
  70. self.__registry(obj, name="postfix_func")
  71. pass
  72. def registry_row_func(self, obj=None):
  73. if obj == None:
  74. def _no_obj_registry(func__or__class, *args, **kwargs):
  75. self._row_func.append(func__or__class)
  76. # 此时被装饰的函数会被修改为该函数的返回值。
  77. return func__or__class
  78. return _no_obj_registry
  79. # 2.通过函数的方式进行调用
  80. self._row_func.append(obj)
  81. pass
  82. @exception_handle
  83. async def execute_dim(self, dim_data: list, session_id=None, original_data=None) -> list:
  84. if "prefix_func" in self._obj_list:
  85. await self._obj_list["prefix_func"](dim_data)
  86. result_list = []
  87. for row in dim_data:
  88. row_data = await self._exec_row(row, session_id, original_data)
  89. if row_data is not None:
  90. result_list.append(row_data)
  91. if "postfix_func" in self._obj_list and len(result_list) > 0:
  92. await self._obj_list["postfix_func"](result_list)
  93. return result_list
  94. @exception_handle
  95. async def _exec_row(self, row_data: dict, session_id=None, original_data=None) -> dict:
  96. del_val = _get_deleted(row_data)
  97. if del_val == 9:
  98. return row_data
  99. for func in self._row_func:
  100. row_data = await func(row_data)
  101. if row_data is None:
  102. return row_data
  103. pass
  104. return row_data
  105. def get_class_dict():
  106. return __class_dict
  107. pass