ソースを参照

feat: 添加相关规则

- 添加两年后开庭的过滤
- 添加案号和开庭时间的逻辑判断
- 添加当事人成立日期与开庭时间的关系
- 添加案号和开庭时间同时为空的拦截
- 添加es依赖
- 调整本地测试的日志输出和统计指标输出
许家凯 1 年間 前
コミット
363bf8d7d3

+ 32 - 0
data_clean/api/es_api.py

@@ -0,0 +1,32 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/9 10:41
+# @Author : XuJiakai
+# @File : es_api
+# @Software: PyCharm
+
+from data_clean.exception.fetch_exception import FetchException
+from data_clean.utils.async_client import get_aio_elasticsearch
+
+_es = get_aio_elasticsearch()
+
+
+async def search(index: str, body: dict, doc_type: str = '_doc') -> dict:
+    try:
+        return await _es.search(index=index, doc_type=doc_type, body=body)
+        pass
+    except Exception as ex:
+        raise FetchException(ex)
+    pass
+
+
+async def get(index: str, id: str, doc_type: str = '_doc') -> dict:
+    try:
+        return await _es.get(index=index, doc_type=doc_type, id=id)
+        pass
+    except Exception as ex:
+        raise FetchException(ex)
+    pass
+
+
+if __name__ == '__main__':
+    pass

+ 46 - 0
data_clean/api/hbase_api.py

@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/14 17:54
+# @Author : XuJiakai
+# @File : hbase_api
+# @Software: PyCharm
+from data_clean.api.http_api import get as http_get
+from data_clean.api.http_api import post
+
+from data_clean.env.environment_switch import project_env
+from data_clean.exception.fetch_exception import FetchException
+
+_hosts = project_env.get_val('winhc_open_api.eci_data.host')
+
+
+async def get(table_name: str, rowkey: str):
+    result = await http_get(_hosts + '/hbase/get/' + table_name + '/' + rowkey)
+    if not result['success']:
+        raise FetchException(200, result, '内容异常')
+    return result['data']
+    pass
+
+
+async def bulk_get(table_name, rowkey: list, query_key: list = None):
+    if query_key is None:
+        query_key = []
+    rowkey = list(set(rowkey))
+    result = await post(_hosts + '/hbase/bulk-get?tableName=' + table_name, data={
+        "query_key": query_key,
+        "rowkey": rowkey
+    })
+    if not result['success']:
+        raise FetchException(200, result, '内容异常')
+    r = []
+    result = result['data']
+    for k in result:
+        result[k]['ROWKEY'] = k
+        r.append(result[k])
+    return r
+    pass
+
+
+if __name__ == '__main__':
+    import asyncio
+
+    asyncio.run(bulk_get("ng_rt_company", ["a33f3cd172f8f9bd61d5f3cd84a4ffd9", "bc702f0f5202342a9c1c75fbf9be9aff"]))
+    pass

+ 11 - 7
data_clean/api/http_api.py

@@ -3,7 +3,6 @@
 # @Author : XuJiakai
 # @File : http_api
 # @Software: PyCharm
-import json
 
 import aiohttp
 
@@ -13,16 +12,21 @@ from data_clean.exception.fetch_exception import FetchException
 async def get(url: str):
     async with aiohttp.ClientSession() as session:
         async with session.get(url) as response:
-            text = await response.text()
+            result = await response.json()
             if response.status != 200:
-                raise FetchException(response.status, text)
-            return text
+                raise FetchException(response.status, result)
+            return result
     pass
 
 
-async def get_json(url: str):
-    text = await get(url)
-    return json.loads(text)
+async def post(url: str, data: dict):
+    async with aiohttp.ClientSession() as session:
+        async with session.post(url, json=data) as response:
+            result = await response.json()
+            if response.status != 200:
+                raise FetchException(response.status, result)
+            return result
+    pass
 
 
 if __name__ == '__main__':

+ 5 - 5
data_clean/env/env-dev.yaml

@@ -1,8 +1,8 @@
 es:
-#  old:
-#    hosts: http://es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com:9200
-#    username: elastic
-#    pwd: elastic_168
+  #  old:
+  #    hosts: http://es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com:9200
+  #    username: elastic
+  #    pwd: elastic_168
   new:
     hosts: http://es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com:9200
     username: elastic
@@ -25,7 +25,7 @@ kafka:
 
 winhc_open_api:
   eci_data:
-    host: 47.101.221.131
+    host: http://106.15.78.184:30196
 
 
 mysql:

+ 2 - 0
data_clean/env/environment_switch.py

@@ -67,6 +67,8 @@ class environment_switch:
         return None
 
 
+project_env = environment_switch()
+
 if __name__ == '__main__':
     env = environment_switch()
     print(env.is_intranet)

+ 6 - 3
data_clean/exception/exception_handle.py

@@ -12,9 +12,12 @@ from data_clean.exception.ruler_validation_exception import RulerValidationExcep
 from data_clean.statistic.data_clean_statistic import ruler_error, error
 from data_clean.utils import get_log
 from data_clean.utils.date_utils import get_now_datetime
+from data_clean.utils.base_utils import is_windows
 
 log = get_log("exception_handler")
 
+mongo_table_prefix = 'a_data_clean_' if not is_windows() else 'a_test_data_clean_'
+
 
 async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
     """
@@ -25,7 +28,7 @@ async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data
     :param session_id:
     :return:
     """
-    col = "a_data_clean_ruler_valid_error"
+    col = mongo_table_prefix + "ruler_valid_error"
     doc = {
         "ruler_code": ex.ruler_code,
         "tn": tn,
@@ -47,7 +50,7 @@ async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
     :param session_id:
     :return:
     """
-    col_pre = "a_data_clean_fetch_error"
+    col_pre = mongo_table_prefix + "fetch_error"
 
     doc = {
         "tn": tn,
@@ -69,7 +72,7 @@ async def error_sink(ex: Exception, tn: str, data, session_id):
     :param session_id:
     :return:
     """
-    col_pre = f"a_data_clean_error"
+    col_pre = mongo_table_prefix + "error"
     doc = {
         "tn": tn,
         "session_id": session_id,

+ 3 - 2
data_clean/exception/fetch_exception.py

@@ -6,13 +6,14 @@
 
 
 class FetchException(Exception):
-    def __init__(self, response_status=None, response_content=None, ):
+    def __init__(self, response_status=None, response_content=None, msg=None):
         self.__response_status = response_status
         self.__response_content = response_content
+        self.__msg = msg
         pass
 
     def __str__(self):
-        return f"拉取数据异常,response status: {self.__response_status} , content : {self.__response_content}"
+        return f"拉取数据异常,response status: {self.__response_status} , msg:{self.__msg} , content : {self.__response_content}"
 
 
 if __name__ == '__main__':

+ 58 - 7
data_clean/handle/company_court_open_announcement.py

@@ -5,10 +5,12 @@
 # @Software: PyCharm
 import os
 
+from data_clean.api.hbase_api import bulk_get
 from data_clean.dim_handle_registry import get_dim_handle
-from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
-from data_clean.utils.date_utils import *
+from data_clean.utils.base_utils import *
+from data_clean.utils.case_utils import case_no_year_datetime
+from data_clean.utils.date_utils import str_2_date_time, get_update_time, establish_state_time
 from data_clean.utils.party_name_verify_utils import person_name_verify
 from data_clean.utils.str_utils import json_str_2_list
 
@@ -51,18 +53,56 @@ async def party_intersect(row_data: dict) -> dict:
     pass
 
 
+async def _get_max_establish_date(company_ids: list):
+    company_ids = [i for i in company_ids if i]
+    res = await bulk_get('ng_rt_company', company_ids)
+    res = [str_2_date_time(i['ESTIBLISH_TIME']) for i in res if 'ESTIBLISH_TIME' in i and i['ESTIBLISH_TIME']]
+    return max(res)
+    pass
+
+
+# 开庭时间相关过滤
 @dim_handle.registry_row_func
 async def open_ann_date(row_data: dict) -> dict:
-    # 过滤开庭时间早于建国时间问题
-    if 'start_date' in row_data:
-        if not row_data['start_date']:
-            # raise RulerValidationException("ccoa_002", "开庭时间为空")
-            return row_data
+    import datetime
+    now = datetime.datetime.now()
+    delta = datetime.timedelta(days=730)  # 两年后
+    max_date = now + delta
+    start_date = get_or_none(row_data, 'start_date')
+    case_no = get_or_none(row_data, 'case_no')
+
+    if case_no is None and start_date is None:
+        raise RulerValidationException("ccoa_007", "案号和开庭时间均为空")
 
+    if start_date is None:
+        return row_data
+
+    try:
         this_date = str_2_date_time(row_data['start_date'])
         if this_date < establish_state_time:
             raise RulerValidationException("ccoa_002", "开庭时间早于建国时间:%s" % row_data['start_date'])
 
+        if this_date > max_date:
+            raise RulerValidationException("ccoa_006", "开庭时间在两年后:%s" % row_data['start_date'])
+
+        part_keyno = json_str_2_list(row_data['plaintiff_info'], 'litigant_id') + json_str_2_list(
+            row_data['defendant_info'], 'litigant_id')
+        part_keyno = [i for i in part_keyno if i and len(i) == 32]
+        max_establish_date = await _get_max_establish_date(part_keyno)
+        if max_establish_date and this_date < max_establish_date:
+            raise RulerValidationException("ccoa_004", "开庭时有公司未成立,最晚一个公司成立日期:%s,开庭时间:%s" % (
+                max_establish_date, row_data['start_date']))
+
+        case_no_year_dt = case_no_year_datetime(case_no)
+        if case_no_year_dt and this_date < case_no_year_dt:
+            raise RulerValidationException("ccoa_005", "案号大于开庭时间年份,案号:%s,开庭时间:%s" % (
+                case_no, row_data['start_date']))
+    except RulerValidationException as ex:
+        if case_no is None:
+            raise ex
+        row_data['start_date'] = None
+        pass
+
     return row_data
     pass
 
@@ -79,5 +119,16 @@ async def party_unknown(row_data: dict) -> dict:
     return row_data
 
 
+async def test():
+    res = await _get_max_establish_date(
+        ['bc702f0f5202342a9c1c75fbf9be9aff', 'b79d862faef595f33b166562bb3c18b6', '24cb269450f9262051dfcaa3dc389844'])
+    print(res)
+
+    pass
+
+
 if __name__ == '__main__':
+    import asyncio
+
+    asyncio.run(test())
     pass

+ 3 - 1
data_clean/statistic/data_clean_statistic.py

@@ -9,8 +9,10 @@ from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.statistic.sls_log import get_logger
 from data_clean.statistic.statistic_filter import filter_data
+from data_clean.utils import is_windows
+from data_clean.utils.log import get_log as get_local_log
 
-_log = get_logger()
+_log = get_logger() if not is_windows() else get_local_log("local_log")
 
 '''
 log template:

+ 2 - 0
data_clean/utils/__init__.py

@@ -5,9 +5,11 @@
 # @Software: PyCharm
 
 from data_clean.utils.log import get_log
+from data_clean.utils.base_utils import is_windows
 
 __all__ = [
     'get_log',
+    'is_windows',
 ]
 if __name__ == '__main__':
     pass

+ 11 - 0
data_clean/utils/async_client.py

@@ -4,16 +4,19 @@
 # @File : async_client
 # @Software: PyCharm
 import asyncio
+import warnings
 
 import aio_pika
 from aio_pika.abc import AbstractRobustConnection
 from aiokafka import AIOKafkaConsumer
 from aiokafka import AIOKafkaProducer
+from elasticsearch7 import AsyncElasticsearch
 from motor.motor_asyncio import AsyncIOMotorClient
 
 from data_clean.env.environment_switch import environment_switch
 
 _env = environment_switch()
+warnings.filterwarnings('ignore')
 
 
 def get_aio_mongo_db(db='itslaw', mongo_name='itslaw'):
@@ -40,6 +43,14 @@ def get_aio_kafka_producer(name='base'):
     pass
 
 
+def get_aio_elasticsearch(es_name='new') -> AsyncElasticsearch:
+    hosts = _env.get_val('es.' + es_name + '.hosts')
+    username = _env.get_val('es.' + es_name + '.username')
+    password = _env.get_val('es.' + es_name + '.pwd')
+    use_ssl = False
+    return AsyncElasticsearch(hosts=hosts, http_auth=(username, password))
+
+
 async def get_rabbitmq_connection(name: str = "base") -> AbstractRobustConnection:
     host = _env.get_val('rabbit_mq.' + name + '.host')
     username = _env.get_val('rabbit_mq.' + name + '.username')

+ 25 - 0
data_clean/utils/base_utils.py

@@ -0,0 +1,25 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/14 14:49
+# @Author : XuJiakai
+# @File : base_utils
+# @Software: PyCharm
+import platform
+
+
+def is_windows():
+    if platform.system() == "Windows":
+        return True
+    return False
+
+
+def get_or_none(json: dict, key: str):
+    if not json:
+        return None
+    if key not in json:
+        return None
+    return json[key]
+    pass
+
+
+if __name__ == '__main__':
+    pass

+ 28 - 0
data_clean/utils/case_utils.py

@@ -0,0 +1,28 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/15 9:43
+# @Author : XuJiakai
+# @File : case_utils
+# @Software: PyCharm
+import re
+import datetime
+
+_case_no_year_pattern = re.compile("^[((](\\d{4}?)[))].+$")
+
+
+def case_no_year_datetime(case_no, add_year: int = 0):
+    if not case_no:
+        return None
+
+    match = _case_no_year_pattern.match(case_no)
+    if match:
+        res = match.group(1)
+        return datetime.datetime(year=int(res) + add_year, month=1, day=1)
+    else:
+        return None
+    pass
+
+
+if __name__ == '__main__':
+    year_datetime = case_no_year_datetime("(2102)豫1681 民初535 号", 1)
+    print(year_datetime)
+    pass

+ 8 - 0
data_clean/utils/party_name_verify_utils.py

@@ -8,6 +8,14 @@ import re
 __anonymity_name = re.compile("^[a-zA-Z][某xX]+$")
 
 
+def person_name_list_verify(names: list):
+    for i in names:
+        if not person_name_verify(i):
+            return False
+    return True
+    pass
+
+
 def person_name_verify(name: str):
     """
     检测人名是否符合规范

+ 28 - 1
poetry.lock

@@ -552,6 +552,33 @@ url = "https://mirror.baidu.com/pypi/simple"
 reference = "douban"
 
 [[package]]
+name = "elasticsearch7"
+version = "7.17.9"
+description = "Python client for Elasticsearch"
+optional = false
+python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <4"
+files = [
+    {file = "elasticsearch7-7.17.9-py2.py3-none-any.whl", hash = "sha256:24cfa00438dd1c0328f4c61e064bcfd4bbf5ff7684e2ec49cc46efbb7598b055"},
+    {file = "elasticsearch7-7.17.9.tar.gz", hash = "sha256:4868965d7d6af948c6f31510523f610e9b81299acd2fd35325e46090d786584d"},
+]
+
+[package.dependencies]
+aiohttp = {version = ">=3,<4", optional = true, markers = "extra == \"async\""}
+certifi = "*"
+urllib3 = ">=1.21.1,<2"
+
+[package.extras]
+async = ["aiohttp (>=3,<4)"]
+develop = ["black", "coverage", "jinja2", "mock", "pytest", "pytest-cov", "pyyaml", "requests (>=2.0.0,<3.0.0)", "sphinx (<1.7)", "sphinx-rtd-theme"]
+docs = ["sphinx (<1.7)", "sphinx-rtd-theme"]
+requests = ["requests (>=2.4.0,<3.0.0)"]
+
+[package.source]
+type = "legacy"
+url = "https://mirror.baidu.com/pypi/simple"
+reference = "douban"
+
+[[package]]
 name = "environs"
 version = "9.5.0"
 description = "simplified environment variable parsing"
@@ -1499,4 +1526,4 @@ reference = "douban"
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8"
-content-hash = "539f337cb1a462bfdb3b03b44a3923b8bc8899ebec60148f9957470975912aab"
+content-hash = "5befda9f548e8d2d007ee05aea07246747bff5a7c331d9c8f8fa4374123de40f"

+ 1 - 0
pyproject.toml

@@ -15,6 +15,7 @@ pyyaml = "^6.0.1"
 environs = "^9.5.0"
 aio-pika = "^9.1.5"
 aliyun-log-python-sdk = "^0.8.7"
+elasticsearch7 = {extras = ["async"], version = "^7.17.9"}
 
 
 [build-system]