Przeglądaj źródła

feat: 添加异常处理

许家凯 1 rok temu
rodzic
commit
be198b4acf

+ 7 - 1
data_clean/dim_handle_registry.py

@@ -3,6 +3,8 @@
 # @Author : XuJiakai
 # @File : dim_handle_registry
 # @Software: PyCharm
+from data_clean.exception.exception_handle import exception_handle
+
 
 class DimHandleRegistry:
     def __init__(self, name=None):
@@ -84,6 +86,7 @@ class DimHandleRegistry:
         self._row_func.append(obj)
         pass
 
+    @exception_handle
     async def execute_dim(self, dim_data: list):
         if "prefix_func" in self._obj_list:
             await self._obj_list["prefix_func"](dim_data)
@@ -99,9 +102,12 @@ class DimHandleRegistry:
 
         return result_list
 
-    async def _exec_row(self, row_data):
+    @exception_handle
+    async def _exec_row(self, row_data: dict):
         for func in self._row_func:
             row_data = await func(row_data)
+            if row_data is None:
+                return row_data
             pass
 
         return row_data

+ 76 - 6
data_clean/exception/exception_handle.py

@@ -3,26 +3,96 @@
 # @Author : XuJiakai
 # @File : exception_handle
 # @Software: PyCharm
+from data_clean.api.mongo_api import insert_one
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
-from data_clean.utils.str_utils import pascal_case_to_snake_case
 from data_clean.utils import get_log
 
 log = get_log("exception_handler")
 
 
+async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list):
+    """
+    该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
+    :param ex:
+    :param tn:
+    :param data:
+    :return:
+    """
+    col = "a_data_clean_ruler_valid_exception"
+    doc = {
+        "tn": tn,
+        "exception": str(ex),
+        "data": data
+    }
+    await insert_one(col, doc)
+    pass
+
+
+async def fetch_exception_sink(ex: FetchException, tn: str, data: list):
+    """
+    拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
+    :param ex:
+    :param tn:
+    :param data:
+    :return:
+    """
+    col_pre = "a_data_clean_fetch_exception"
+
+    doc = {
+        "tn": tn,
+        "data": data,
+        "exception": str(ex),
+    }
+    await insert_one(col_pre, doc)
+    pass
+
+
+async def error_sink(ex: Exception, tn: str, data: list):
+    """
+    未知异常写出,出现该异常需要手动介入查看原因。
+    :param ex:
+    :param tn:
+    :param data:
+    :return:
+    """
+    col_pre = f"a_data_clean_error"
+    col_pre += "_dim" if isinstance(data, list) else "_record"
+    doc = {
+        "tn": tn,
+        "exception": repr(ex),
+        "data": data
+    }
+
+    await insert_one(col_pre, doc)
+    pass
+
+
 def exception_handle(func):
     async def wrapper(self, *args, **kwargs):
-        tn = pascal_case_to_snake_case(self.__class__.__name__)
+        # tn = pascal_case_to_snake_case(self.__class__.__name__)
+        tn = self._name
         result = None
         try:
             result = await func(self, *args, **kwargs)
-        except (FetchException, RulerValidationException) as ex:
+        except RulerValidationException as ex:
             log.warn("%s", ex)
-            pass
-
+            data = args[0]
+            if not isinstance(data, list):
+                data = [data]
+            await ruler_valid_exception_sink(ex, tn, data)
+        except FetchException as ex:
+            log.error("%s", ex)
+            data = args[0]
+            if not isinstance(data, list):
+                data = [data]
+            await fetch_exception_sink(ex, tn, data)
         except Exception as ex:
-            log.error("出现未知异常:%s", ex)
+            log.error("出现未知异常:%s", repr(ex))
+            data = args[0]
+            if not isinstance(data, list):
+                data = [data]
+            await error_sink(ex, tn, data)
 
         return result
         pass

+ 6 - 0
data_clean/exception/ruler_validation_exception.py

@@ -5,7 +5,13 @@
 # @Software: PyCharm
 
 class RulerValidationException(Exception):
+
     def __init__(self, ruler_code, msg=None, ):
+        """
+        业务逻辑规则校验,不符合数据业务逻辑的,将抛出此异常
+        :param ruler_code:
+        :param msg:
+        """
         self.ruler_code = ruler_code
         self.msg = msg
         pass

+ 2 - 4
data_clean/handle/company_court_open_announcement.py

@@ -6,7 +6,6 @@
 import os
 
 from data_clean.dim_handle_registry import DimHandleRegistry
-from data_clean.exception.exception_handle import exception_handle
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.utils.str_utils import json_str_2_list
 
@@ -15,21 +14,20 @@ dim_handle = DimHandleRegistry(os.path.basename(__file__))
 
 
 @dim_handle.registry_prefix_func
-@exception_handle
 async def prefix_func(dim_data: list):
     print("前置程序:", dim_data)
+    raise ValueError("前置程序错误")
+
     pass
 
 
 @dim_handle.registry_postfix_func()
-@exception_handle
 async def post_func(dim_data: list):
     print("后置程序")
     pass
 
 
 @dim_handle.registry_row_func
-@exception_handle
 async def party_intersect(row_data: dict) -> dict:
     plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
     defendant_info = json_str_2_list(row_data['defendant_info'], "name")

+ 1 - 1
data_clean/task_distributor.py

@@ -26,7 +26,7 @@ async def task_distribute(data: dict):
     for key in set(tmp_data.keys()):
         if key in class_dict:
             result_data = await class_dict[key].execute_dim(tmp_data[key])
-            if len(result_data) == 0:
+            if result_data is None or len(result_data) == 0:
                 del tmp_data[key]
             else:
                 tmp_data[key] = result_data