Procházet zdrojové kódy

feat: 完善程序

许家凯 před 1 rokem
rodič
revize
44393254fe

+ 20 - 17
data_clean/dim_handle_registry.py

@@ -5,6 +5,16 @@
 # @Software: PyCharm
 from data_clean.exception.exception_handle import exception_handle
 
+__class_dict = {}
+
+
+def get_dim_handle(tn: str):
+    if tn.endswith(".py"):
+        tn = tn[:-3]
+    if tn not in __class_dict:
+        __class_dict[tn] = DimHandleRegistry(tn)
+    return __class_dict[tn]
+
 
 class DimHandleRegistry:
     def __init__(self, name=None):
@@ -27,30 +37,18 @@ class DimHandleRegistry:
         if name is None:
             name = obj.__name__
 
-        assert (obj.__name__ not in self._obj_list.keys()), "{} already exists in {}".format(obj.__name__, self._name)
+        assert (name not in self._obj_list.keys()), "{} already exists in {}".format(name, self._name)
         self._obj_list[name] = obj
 
-    def registry(self, obj=None):
+    def registry_prefix_func(self, obj=None):
         """
-        # 外部注册函数。注册方法分为两种。
-        # 1.通过装饰器调用
-        # 2.通过函数的方式进行调用
+        外部注册函数。注册方法分为两种。
+        1.通过装饰器调用
+        2.通过函数的方式进行调用
 
         :param obj: 函数或者类的本身
         :return:
         """
-        # 1.通过装饰器调用
-        if obj == None:
-            def _no_obj_registry(func__or__class, *args, **kwargs):
-                self.__registry(func__or__class)
-                # 此时被装饰的函数会被修改为该函数的返回值。
-                return func__or__class
-
-            return _no_obj_registry
-        # 2.通过函数的方式进行调用
-        self.__registry(obj)
-
-    def registry_prefix_func(self, obj=None):
         if obj == None:
             def _no_obj_registry(func__or__class, *args, **kwargs):
                 self.__registry(func__or__class, name="prefix_func")
@@ -111,3 +109,8 @@ class DimHandleRegistry:
             pass
 
         return row_data
+
+
+def get_class_dict():
+    return __class_dict
+    pass

+ 9 - 7
data_clean/exception/exception_handle.py

@@ -3,6 +3,8 @@
 # @Author : XuJiakai
 # @File : exception_handle
 # @Software: PyCharm
+from functools import update_wrapper
+
 from data_clean.api.mongo_api import insert_one
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
@@ -19,8 +21,9 @@ async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data
     :param data:
     :return:
     """
-    col = "a_data_clean_ruler_valid_exception"
+    col = "a_data_clean_ruler_valid_error"
     doc = {
+        "ruler_code": ex.ruler_code,
         "tn": tn,
         "exception": str(ex),
         "data": data
@@ -37,7 +40,7 @@ async def fetch_exception_sink(ex: FetchException, tn: str, data: list):
     :param data:
     :return:
     """
-    col_pre = "a_data_clean_fetch_exception"
+    col_pre = "a_data_clean_fetch_error"
 
     doc = {
         "tn": tn,
@@ -57,7 +60,6 @@ async def error_sink(ex: Exception, tn: str, data: list):
     :return:
     """
     col_pre = f"a_data_clean_error"
-    col_pre += "_dim" if isinstance(data, list) else "_record"
     doc = {
         "tn": tn,
         "exception": repr(ex),
@@ -76,19 +78,19 @@ def exception_handle(func):
         try:
             result = await func(self, *args, **kwargs)
         except RulerValidationException as ex:
-            log.warn("%s", ex)
+            log.warn("维度:%s ,detail: %s", tn, ex)
             data = args[0]
             if not isinstance(data, list):
                 data = [data]
             await ruler_valid_exception_sink(ex, tn, data)
         except FetchException as ex:
-            log.error("%s", ex)
+            log.error("维度:%s ,detail: %s", tn, ex)
             data = args[0]
             if not isinstance(data, list):
                 data = [data]
             await fetch_exception_sink(ex, tn, data)
         except Exception as ex:
-            log.error("出现未知异常:%s", repr(ex))
+            log.error("维度:%s ,出现未知异常:%s", tn, repr(ex))
             data = args[0]
             if not isinstance(data, list):
                 data = [data]
@@ -97,7 +99,7 @@ def exception_handle(func):
         return result
         pass
 
-    return wrapper
+    return update_wrapper(wrapper, func)
 
 
 if __name__ == '__main__':

+ 34 - 3
data_clean/handle/company_court_open_announcement.py

@@ -5,18 +5,20 @@
 # @Software: PyCharm
 import os
 
-from data_clean.dim_handle_registry import DimHandleRegistry
+from data_clean.dim_handle_registry import get_dim_handle
 from data_clean.exception.ruler_validation_exception import RulerValidationException
+from data_clean.utils.date_utils import *
+from data_clean.utils.party_name_verify_utils import person_name_verify
 from data_clean.utils.str_utils import json_str_2_list
 
 # 必须命名为dim_handle
-dim_handle = DimHandleRegistry(os.path.basename(__file__))
+dim_handle = get_dim_handle(os.path.basename(__file__))
 
 
 @dim_handle.registry_prefix_func
 async def prefix_func(dim_data: list):
     print("前置程序:", dim_data)
-    raise ValueError("前置程序错误")
+    # raise ValueError("前置程序错误")
 
     pass
 
@@ -29,6 +31,7 @@ async def post_func(dim_data: list):
 
 @dim_handle.registry_row_func
 async def party_intersect(row_data: dict) -> dict:
+    # 判断当事人有交叉
     plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
     defendant_info = json_str_2_list(row_data['defendant_info'], "name")
 
@@ -40,3 +43,31 @@ async def party_intersect(row_data: dict) -> dict:
         raise RulerValidationException("ccoa_001", "当事人有交叉:%s" % inter)
 
     pass
+
+
+@dim_handle.registry_row_func
+async def open_ann_date(row_data: dict) -> dict:
+    # 过滤开庭时间早于建国时间问题
+    if 'start_date' in row_data:
+        this_date = str_2_date_time(row_data['start_date'])
+        if this_date < establish_state_time:
+            raise RulerValidationException("ccoa_002", "开庭时间早于建国时间:%s" % row_data['start_date'])
+
+    return row_data
+    pass
+
+
+@dim_handle.registry_row_func
+async def party_unknown(row_data: dict) -> dict:
+    # 过滤当事人名字异常,Z某某、xxx
+    plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
+    defendant_info = json_str_2_list(row_data['defendant_info'], "name")
+    li = plaintiff_info + defendant_info
+    for i in li:
+        if not person_name_verify(i):
+            raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % i)
+    return row_data
+
+
+if __name__ == '__main__':
+    pass

+ 5 - 2
data_clean/task_distributor.py

@@ -4,15 +4,18 @@
 # @File : task_distributor
 # @Software: PyCharm
 import os
+from data_clean.dim_handle_registry import get_class_dict
 
 scan_path = os.path.join(os.path.dirname(__file__), 'handle')
 
 file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")]
 
-class_dict = {}
+# class_dict = {}
 for tn in file_name_list:
     tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn))
-    class_dict[tn] = tmp.dim_handle
+    # class_dict[tn] = tmp.dim_handle
+
+class_dict = get_class_dict()
 
 
 async def task_distribute(data: dict):

+ 18 - 0
data_clean/utils/date_utils.py

@@ -0,0 +1,18 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/7/25 15:35
+# @Author : XuJiakai
+# @File : data_utils
+# @Software: PyCharm
+
+from datetime import datetime
+
+
+def str_2_date_time(date_str, format="%Y-%m-%d %H:%M:%S"):
+    return datetime.strptime(date_str, format)
+    pass
+
+
+establish_state_time = datetime(year=1949, month=10, day=1)
+
+if __name__ == '__main__':
+    pass

+ 24 - 0
data_clean/utils/party_name_verify_utils.py

@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/7/26 11:19
+# @Author : XuJiakai
+# @File : party_name_verify_utils
+# @Software: PyCharm
+import re
+
+__anonymity_name = re.compile("^[a-zA-Z][某xX]+$")
+
+
+def person_name_verify(name: str):
+    """
+    检测人名是否符合规范
+    :param name:
+    :return: True 人名符合规范,False 不符合
+    """
+    if not name:
+        return False
+    return __anonymity_name.match(name) is None
+
+
+if __name__ == '__main__':
+    print(person_name_verify("Z某某"))
+    pass

+ 4 - 1
tests/TestMain.py

@@ -16,7 +16,9 @@ async def get_test_data():
     res = await get_json(url)
     res = json.loads(json.dumps(res['data']).lower())
 
-    res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
+    # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
+    res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
+    # res["start_date"] = '1948-10-01 00:00:00'
     data = {
         "data": {
             tn: [
@@ -40,6 +42,7 @@ async def test_send_kafka():
 
 async def test_for_url():
     data = await get_test_data()
+    asyncio.get_running_loop()
 
     print("receive : ", data)
     from data_clean.task_distributor import task_distribute