Ver Fonte

feat: 添加开庭公告当事人修正

- 开庭公告当事人修正
- 开庭时间置空时记录日志
- 添加一些工具方法、依赖
许家凯 há 1 ano atrás
pai
commit
3b7d937faa

+ 9 - 3
data_clean/api/es_api.py

@@ -6,11 +6,15 @@
 
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.utils.async_client import get_aio_elasticsearch
+from data_clean.utils.base_utils import parse_env_and_name
 
-_es = get_aio_elasticsearch()
+_new_es = get_aio_elasticsearch()
+_old_es = get_aio_elasticsearch(es_name='old')
 
 
-async def search(index: str, body: dict, doc_type: str = '_doc') -> dict:
+async def search(index: str, body: dict, doc_type: str = '_doc', env='new') -> dict:
+    env, index = parse_env_and_name(index, env)
+    _es = _new_es if env == 'new' else _old_es
     try:
         return await _es.search(index=index, doc_type=doc_type, body=body)
         pass
@@ -19,7 +23,9 @@ async def search(index: str, body: dict, doc_type: str = '_doc') -> dict:
     pass
 
 
-async def get(index: str, id: str, doc_type: str = '_doc') -> dict:
+async def get(index: str, id: str, doc_type: str = '_doc', env='new') -> dict:
+    env, index = parse_env_and_name(index, env)
+    _es = _new_es if env == 'new' else _old_es
     try:
         return await _es.get(index=index, doc_type=doc_type, id=id)
         pass

+ 2 - 1
data_clean/api/hbase_api.py

@@ -42,5 +42,6 @@ async def bulk_get(table_name, rowkey: list, query_key: list = None):
 if __name__ == '__main__':
     import asyncio
 
-    asyncio.run(bulk_get("ng_rt_company", ["a33f3cd172f8f9bd61d5f3cd84a4ffd9", "bc702f0f5202342a9c1c75fbf9be9aff"]))
+    asyncio.run(get("ng_rt_company","bc702f0f5202342a9c1c75fbf9be9aff"))
+    # asyncio.run(bulk_get("ng_rt_company", ["a33f3cd172f8f9bd61d5f3cd84a4ffd9", "bc702f0f5202342a9c1c75fbf9be9aff"]))
     pass

+ 6 - 0
data_clean/api/mongo_api.py

@@ -22,6 +22,12 @@ async def insert_many(collection, data: list):
     pass
 
 
+async def delete_one(collection, object_id):
+    result = await db[collection].delete_one({"_id": object_id})
+    return result.deleted_count
+    pass
+
+
 async def test():
     # res = await db["a_xjk_test_aio"].insert_one({"a": "b"})
     # print(res.inserted_id)

+ 11 - 0
data_clean/env/const.py

@@ -0,0 +1,11 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/21 15:21
+# @Author : XuJiakai
+# @File : const
+# @Software: PyCharm
+from data_clean.utils.base_utils import is_windows
+
+mongo_table_prefix = 'a_data_clean_' if not is_windows() else 'a_test_data_clean_'
+
+if __name__ == '__main__':
+    pass

+ 1 - 3
data_clean/exception/exception_handle.py

@@ -7,17 +7,15 @@ import traceback
 from functools import update_wrapper
 
 from data_clean.api.mongo_api import insert_one
+from data_clean.env.const import mongo_table_prefix
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.statistic.data_clean_statistic import ruler_error, error
 from data_clean.utils import get_log
 from data_clean.utils.date_utils import get_now_datetime
-from data_clean.utils.base_utils import is_windows
 
 log = get_log("exception_handler")
 
-mongo_table_prefix = 'a_data_clean_' if not is_windows() else 'a_test_data_clean_'
-
 
 async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
     """

+ 27 - 4
data_clean/handle/company_court_open_announcement.py

@@ -6,13 +6,16 @@
 import os
 
 from data_clean.api.hbase_api import bulk_get
+from data_clean.api.mongo_api import insert_one
 from data_clean.dim_handle_registry import get_dim_handle
+from data_clean.env.const import mongo_table_prefix
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.utils.base_utils import *
 from data_clean.utils.case_utils import case_no_year_datetime
 from data_clean.utils.date_utils import str_2_date_time, get_update_time, establish_state_time
-from data_clean.utils.party_name_verify_utils import person_name_verify
+from data_clean.utils.party_name_verify_utils import person_name_list_verify
 from data_clean.utils.str_utils import json_str_2_list
+from data_clean.utils.case_utils import get_case_party
 
 # 必须命名为dim_handle
 dim_handle = get_dim_handle(os.path.basename(__file__))
@@ -102,6 +105,14 @@ async def open_ann_date(row_data: dict) -> dict:
     except RulerValidationException as ex:
         if case_no is None:
             raise ex
+        await insert_one(mongo_table_prefix + 'info_cooa_start_date_set_none', {
+            "content": {
+                "data": {
+                    "company_court_open_announcement": [row_data]
+                },
+
+            }
+        })
         row_data['start_date'] = None
         pass
 
@@ -115,9 +126,21 @@ async def party_unknown(row_data: dict) -> dict:
     plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
     defendant_info = json_str_2_list(row_data['defendant_info'], "name")
     li = plaintiff_info + defendant_info
-    for i in li:
-        if not person_name_verify(i):
-            raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % i)
+    flag, error_name = person_name_list_verify(li)
+    if not flag:
+        result = await get_case_party(row_data['case_no'], source='open_court')
+        if result:
+            row_data['plaintiff_info'] = to_string(result['plaintiff_info'], is_format=False)
+            row_data['defendant_info'] = to_string(result['defendant_info'], is_format=False)
+            row_data['litigant_info'] = to_string(result['litigant_info'], is_format=False)
+            row_data['plaintiff'] = result['plaintiff']
+            row_data['defendant'] = result['defendant']
+            row_data['litigant'] = result['litigant']
+            pass
+        else:
+            raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % error_name)
+            pass
+
     return row_data
 
 

+ 2 - 1
data_clean/utils/__init__.py

@@ -5,11 +5,12 @@
 # @Software: PyCharm
 
 from data_clean.utils.log import get_log
-from data_clean.utils.base_utils import is_windows
+from data_clean.utils.base_utils import is_windows, to_string
 
 __all__ = [
     'get_log',
     'is_windows',
+    'to_string',
 ]
 if __name__ == '__main__':
     pass

+ 23 - 3
data_clean/utils/asyncio_pool.py

@@ -9,17 +9,35 @@ import signal
 import sys
 from typing import Coroutine
 
+from data_clean.utils import get_log
+
+log = get_log("JobMain")
+
 
 class AsyncPool(object):
     def __init__(self, max_concurrency: int):
         self._semaphore: asyncio.Semaphore = asyncio.Semaphore(max_concurrency)
+        self._running_task_num = 0
 
     async def create_task(self, coro: Coroutine) -> asyncio.Task:
         await self._semaphore.acquire()
+        self._running_task_num += 1
         task: asyncio.Task = asyncio.create_task(coro)
-        task.add_done_callback(lambda t: self._semaphore.release())
+        task.add_done_callback(lambda t: self._release(t))
         return task
 
+    def is_done(self):
+        return self._running_task_num == 0
+
+    async def wait_all_done(self):
+        while self._running_task_num > 0:
+            log.info("wait done...")
+            await asyncio.sleep(1)
+
+    def _release(self, t):
+        self._running_task_num -= 1
+        self._semaphore.release()
+
 
 class GracefulExit(SystemExit):
     code = 1
@@ -111,9 +129,11 @@ async def callback(msg):
 
 
 async def main():
-    pool = AsyncPoolTest(5, callback)
+    pool = AsyncPool(5)
     for i in range(10):
-        await pool.create_task(run1(), str(i))
+        await pool.create_task(run1())
+    await pool.wait_all_done()
+
     pass
 
 

+ 17 - 0
data_clean/utils/base_utils.py

@@ -3,6 +3,7 @@
 # @Author : XuJiakai
 # @File : base_utils
 # @Software: PyCharm
+import json
 import platform
 
 
@@ -21,5 +22,21 @@ def get_or_none(json: dict, key: str):
     pass
 
 
+def parse_env_and_name(val, env):
+    if '.' in val:
+        tmp = val.split('.')
+        env = tmp[0]
+        val = tmp[1]
+    return env, val
+
+
+def to_string(obj, is_format=True):
+    if is_format:
+        return json.dumps(obj, indent=4, ensure_ascii=False, sort_keys=True)
+    else:
+        return json.dumps(obj, ensure_ascii=False, sort_keys=True)
+    pass
+
+
 if __name__ == '__main__':
     pass

+ 142 - 5
data_clean/utils/case_utils.py

@@ -3,9 +3,15 @@
 # @Author : XuJiakai
 # @File : case_utils
 # @Software: PyCharm
-import re
+import asyncio
 import datetime
+import json
+import re
+
 from data_clean.api.es_api import search
+from data_clean.api.hbase_api import get as hbase_get
+from data_clean.utils.json_utils import json_path, del_key
+from data_clean.utils.party_name_verify_utils import person_name_list_verify
 
 _case_no_year_pattern = re.compile("^[((](\\d{4}?)[))].+$")
 
@@ -23,27 +29,158 @@ def case_no_year_datetime(case_no, add_year: int = 0):
     pass
 
 
-async def get_open_court_case_party(case_no: str):
+async def _general_query_case_party(case_no: str, index: str, doc_type: str = '_doc', case_no_key: str = 'case_no',
+                                    hbase_table_name: str = None):
+    if not case_no:
+        return None
+    result = await search(index=index, doc_type=doc_type, body={
+        "query": {
+            "bool": {
+                "must": [
+                    {
+                        "term": {
+                            case_no_key: {
+                                "value": case_no
+                            }
+                        }
+                    }, {
+                        "terms": {
+                            "deleted": [
+                                "0",
+                                "1"
+                            ]
+                        }
+                    }
+                ]
+            }
+        }
+    })
+    result = result['hits']
+    total = result['total']
+    if total != 1:
+        return None
+        pass
+
+    record = result['hits'][0]
+    plaintiff_info = json_path(record, '$._source.plaintiff_info')
+    defendant_info = json_path(record, '$._source.defendant_info')
+    litigant_info = json_path(record, '$._source.litigant_info')
+    if not litigant_info:
+        if hbase_table_name:
+            rowkey = json_path(record, '$._id')
+            re = await hbase_get(hbase_table_name, rowkey)
+            if 'LITIGANT_INFO' in re:
+                li_str = re['LITIGANT_INFO']
+                litigant_info = json.loads(li_str)
+            pass
+        else:
+            litigant_info = []
+            if plaintiff_info:
+                litigant_info = litigant_info + plaintiff_info
+            if defendant_info:
+                litigant_info = litigant_info + defendant_info
+
+    names = [i['name'] for i in litigant_info]
+    flag, error_name = person_name_list_verify(names)
+    if not flag:
+        return None
+        pass
+
+    if plaintiff_info is None or defendant_info is None or litigant_info is None or len(plaintiff_info) == 0 or len(
+            defendant_info) == 0 or len(litigant_info) == 0:
+        return None
+
+    for i in plaintiff_info:
+        del_key(i, 'lawyer_info')
+        del_key(i, 'judge_tendency')
+    for i in defendant_info:
+        del_key(i, 'lawyer_info')
+        del_key(i, 'judge_tendency')
+    for i in litigant_info:
+        del_key(i, 'lawyer_info')
+        del_key(i, 'judge_tendency')
+
+    return {
+        "plaintiff_info": plaintiff_info,
+        "defendant_info": defendant_info,
+        "litigant_info": litigant_info,
+        "plaintiff": ','.join([i['name'] for i in plaintiff_info]),
+        "defendant": ','.join([i['name'] for i in defendant_info]),
+        "litigant": ','.join([i['name'] for i in litigant_info]),
+    }
+    pass
 
+
+async def get_open_court_case_party(case_no: str):
+    return await _general_query_case_party(case_no, "winhc_index_rt_company_court_open_announcement",
+                                           hbase_table_name="NG_RT_COMPANY_COURT_OPEN_ANNOUNCEMENT")
     pass
 
 
 async def get_register_case_party(case_no: str):
+    return await _general_query_case_party(case_no, "winhc_index_rt_company_court_register")
     pass
 
 
 async def get_judgement_document_party(case_no: str):
+    return await _general_query_case_party(case_no, "old.wenshu_detail2", doc_type='wenshu_detail_type',
+                                           case_no_key='case_no.keyword')
     pass
 
 
-async def get_case_party(case_no: str):
+async def get_case_party(case_no: str, source: str = None):
     if not case_no:
         return None
 
+    if source == 'case_register':
+        result = await get_open_court_case_party(case_no)
+        if result:
+            return result
+        result = await get_judgement_document_party(case_no)
+        if result:
+            return result
+    elif source == 'open_court':
+        result = await get_register_case_party(case_no)
+        if result:
+            return result
+        result = await get_judgement_document_party(case_no)
+        if result:
+            return result
+    elif source == 'judgement_doc':
+        result = await get_register_case_party(case_no)
+        if result:
+            return result
+        result = await get_open_court_case_party(case_no)
+        if result:
+            return result
+    else:
+        result = await get_register_case_party(case_no)
+        if result:
+            return result
+        result = await get_open_court_case_party(case_no)
+        if result:
+            return result
+        result = await get_judgement_document_party(case_no)
+        if result:
+            return result
+
+    return None
+    pass
+
+
+async def test():
+    from base_utils import to_string
+    # print(await get_case_party("(2021)粤05民初2014号"))
+    # print(await get_case_party("(2020)冀0181民初1940号"))
+    # print(await get_case_party("(2020)冀0181民初2477号"))
+    # print(to_string(await get_case_party("(2020)冀09民终5209号"), is_format=False))
+    print(to_string(await get_case_party("(2020)苏0481民初1925号", source='open_court')))
+    # print(await get_register_case_party("(2017)津0119民初2784号"))
+    # print(await get_open_court_case_party("(2017)津0119民初2784号"))
+    # print(await get_judgement_document_party("(2017)津0119民初2784号"))
     pass
 
 
 if __name__ == '__main__':
-    year_datetime = case_no_year_datetime("(2102)豫1681 民初535 号", 1)
-    print(year_datetime)
+    asyncio.run(test())
     pass

+ 27 - 0
data_clean/utils/json_utils.py

@@ -0,0 +1,27 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/17 14:06
+# @Author : XuJiakai
+# @File : json_utils
+# @Software: PyCharm
+import jsonpath as jp
+
+
+def json_path(json: dict, path):
+    result = jp.jsonpath(json, path)
+    if not result:
+        return None
+    else:
+        return result[0]
+
+    pass
+
+
+def del_key(json_obj: dict, key):
+    if key in json_obj:
+        del json_obj[key]
+    return json_obj
+    pass
+
+
+if __name__ == '__main__':
+    pass

+ 2 - 2
data_clean/utils/party_name_verify_utils.py

@@ -11,8 +11,8 @@ __anonymity_name = re.compile("^[a-zA-Z][某xX]+$")
 def person_name_list_verify(names: list):
     for i in names:
         if not person_name_verify(i):
-            return False
-    return True
+            return False, i
+    return True, None
     pass
 
 

+ 16 - 1
poetry.lock

@@ -773,6 +773,21 @@ url = "https://mirror.baidu.com/pypi/simple"
 reference = "douban"
 
 [[package]]
+name = "jsonpath"
+version = "0.82"
+description = "An XPath for JSON"
+optional = false
+python-versions = "*"
+files = [
+    {file = "jsonpath-0.82.tar.gz", hash = "sha256:46d3fd2016cd5b842283d547877a02c418a0fe9aa7a6b0ae344115a2c990fef4"},
+]
+
+[package.source]
+type = "legacy"
+url = "https://mirror.baidu.com/pypi/simple"
+reference = "douban"
+
+[[package]]
 name = "kafka-python"
 version = "2.0.2"
 description = "Pure Python client for Apache Kafka"
@@ -1526,4 +1541,4 @@ reference = "douban"
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8"
-content-hash = "5befda9f548e8d2d007ee05aea07246747bff5a7c331d9c8f8fa4374123de40f"
+content-hash = "bb1426ac49b561de757a2d26e11ba9b1542abc16b6dff0bdd5ff083dc66f2ad1"

+ 1 - 0
pyproject.toml

@@ -16,6 +16,7 @@ environs = "^9.5.0"
 aio-pika = "^9.1.5"
 aliyun-log-python-sdk = "^0.8.7"
 elasticsearch7 = {extras = ["async"], version = "^7.17.9"}
+jsonpath = "^0.82"
 
 
 [build-system]