Quellcode durchsuchen

feat: 添加sls前后数据一同输出,以观察数据变化

许家凯 vor 1 Jahr
Ursprung
Commit
d17bfebdfa

+ 3 - 3
data_clean/dim_handle_registry.py

@@ -85,13 +85,13 @@ class DimHandleRegistry:
         pass
 
     @exception_handle
-    async def execute_dim(self, dim_data: list, session_id=None) -> list:
+    async def execute_dim(self, dim_data: list, session_id=None, original_data=None) -> list:
         if "prefix_func" in self._obj_list:
             await self._obj_list["prefix_func"](dim_data)
 
         result_list = []
         for row in dim_data:
-            row_data = await self._exec_row(row, session_id)
+            row_data = await self._exec_row(row, session_id, original_data)
             if row_data is not None:
                 result_list.append(row_data)
 
@@ -101,7 +101,7 @@ class DimHandleRegistry:
         return result_list
 
     @exception_handle
-    async def _exec_row(self, row_data: dict, session_id=None) -> dict:
+    async def _exec_row(self, row_data: dict, session_id=None, original_data=None) -> dict:
         for func in self._row_func:
             row_data = await func(row_data)
             if row_data is None:

+ 7 - 4
data_clean/exception/exception_handle.py

@@ -80,7 +80,7 @@ async def error_sink(ex: Exception, tn: str, data: list, session_id):
     pass
 
 
-def _handle(data, tn, session_id, ex: Exception):
+def _handle(data, tn, session_id, ex: Exception, original_data):
     content_type = 0
     if not isinstance(data, list):
         data = [data]
@@ -100,6 +100,7 @@ def _handle(data, tn, session_id, ex: Exception):
             ex=ex,
             content_type=content_type,
             tn=tn
+            ,original_data=original_data
         )
     else:
         error(data=data,
@@ -107,6 +108,7 @@ def _handle(data, tn, session_id, ex: Exception):
               ex=ex,
               content_type=content_type,
               tn=tn
+              , original_data=original_data
               )
         pass
     pass
@@ -117,23 +119,24 @@ def exception_handle(func):
         # tn = pascal_case_to_snake_case(self.__class__.__name__)
         tn = self._name
         session_id = args[1]
+        original_data = args[2]
         result = None
         try:
             result = await func(self, *args, **kwargs)
         except RulerValidationException as ex:
             log.warn("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
             data = args[0]
-            _handle(data, tn, session_id, ex)
+            _handle(data, tn, session_id, ex, original_data)
             await ruler_valid_exception_sink(ex, tn, data, session_id)
         except FetchException as ex:
             log.error("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
             data = args[0]
-            _handle(data, tn, session_id, ex)
+            _handle(data, tn, session_id, ex, original_data)
             await fetch_exception_sink(ex, tn, data, session_id)
         except Exception as ex:
             log.error("session: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
             data = args[0]
-            _handle(data, tn, session_id, ex)
+            _handle(data, tn, session_id, ex, original_data)
             await error_sink(ex, tn, data, session_id)
 
         return result

+ 3 - 3
data_clean/handle/company_court_open_announcement.py

@@ -16,17 +16,17 @@ from data_clean.utils.str_utils import json_str_2_list
 dim_handle = get_dim_handle(os.path.basename(__file__))
 
 
-@dim_handle.registry_prefix_func
+# @dim_handle.registry_prefix_func
 async def prefix_func(dim_data: list):
     print("前置程序:", dim_data)
-    raise ValueError("前置程序错误")
+    # raise ValueError("前置程序错误")
 
     pass
 
 
 @dim_handle.registry_postfix_func()
 async def post_func(dim_data: list):
-    print("后置程序:", dim_data)
+    # print("后置程序:", dim_data)
 
     for r in dim_data:
         r['update_time'] = get_update_time()

+ 13 - 6
data_clean/statistic/data_clean_statistic.py

@@ -3,13 +3,12 @@
 # @Author : XuJiakai
 # @File : data_clean_statistic
 # @Software: PyCharm
-import sys
 import traceback
-from functools import update_wrapper
 
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.statistic.sls_log import get_logger
+from data_clean.statistic.statistic_filter import filter_data
 
 _log = get_logger()
 
@@ -31,12 +30,14 @@ error_info:
 '''
 
 
-def error(data, session_id, ex: Exception, content_type: int, tn: str):
+def error(data, session_id, ex: Exception, content_type: int, tn: str, original_data):
     error_type = 1 if isinstance(ex, FetchException) else 9
-
+    filter_data(data)
+    filter_data(original_data)
     _log.error(msg={
         "session_id": session_id,
         "content": data,
+        "original_content": original_data,
         "content_type": content_type,
         "tn": tn,
         "successful": 0,
@@ -51,10 +52,13 @@ def error(data, session_id, ex: Exception, content_type: int, tn: str):
     pass
 
 
-def ruler_error(data, session_id, ex: RulerValidationException, content_type: int, tn: str):
+def ruler_error(data, session_id, ex: RulerValidationException, content_type: int, tn: str, original_data):
+    filter_data(data)
+    filter_data(original_data)
     _log.warn(msg={
         "session_id": session_id,
         "content": data,
+        "original_content": original_data,
         "content_type": content_type,
         "tn": tn,
         "successful": 0,
@@ -69,10 +73,13 @@ def ruler_error(data, session_id, ex: RulerValidationException, content_type: in
     pass
 
 
-def success(data, session_id):
+def success(data, session_id, original_data):
+    filter_data(data)
+    filter_data(original_data)
     _log.info(msg={
         "session_id": session_id,
         "content": data,
+        "original_content": original_data,
         "successful": 1,
         "error": 0,
     }, stacklevel=1)

+ 34 - 0
data_clean/statistic/statistic_filter.py

@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/1 11:40
+# @Author : XuJiakai
+# @File : statistic_filter
+# @Software: PyCharm
+
+_filter_conf = {
+    "company_court_open_announcement": {
+        "exclude": []
+    }
+}
+
+
+# _filter_conf_keys = set(_filter_conf.keys())
+
+
+def filter_data(data: dict):
+    if not data:
+        return
+    if 'data' in data:
+        tmp_data = data['data']
+        for key in tmp_data:
+            if key in _filter_conf:
+                exclude_keys = _filter_conf[key]["exclude"]
+                for item in tmp_data[key]:
+                    for exclude_key in exclude_keys:
+                        del item[exclude_key]
+                pass
+            pass
+    pass
+
+
+if __name__ == '__main__':
+    pass

+ 5 - 3
data_clean/task_distributor.py

@@ -5,7 +5,7 @@
 # @Software: PyCharm
 import os
 import uuid
-
+import copy
 from data_clean.dim_handle_registry import get_class_dict
 from data_clean.statistic.data_clean_statistic import success
 
@@ -30,9 +30,11 @@ async def task_distribute(data: dict):
     session_id = str(uuid.uuid4())
     tmp_data = data['data']
 
+    original_data = copy.deepcopy(data)
+
     for key in set(tmp_data.keys()):
         if key in class_dict:
-            result_data = await class_dict[key].execute_dim(tmp_data[key], session_id)
+            result_data = await class_dict[key].execute_dim(tmp_data[key], session_id, original_data)
             if result_data is None or len(result_data) == 0:
                 del tmp_data[key]
             else:
@@ -46,7 +48,7 @@ async def task_distribute(data: dict):
     if len(tmp_data) == 0:
         return None
     else:
-        success(data, session_id)
+        success(data, session_id, original_data)
         pass
 
     return data