123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 16:27
- # @Author : XuJiakai
- # @File : company_court_open_announcement
- # @Software: PyCharm
- import os
- from data_clean.api.hbase_api import bulk_get
- from data_clean.api.mongo_api import insert_one
- from data_clean.dim_handle_registry import get_dim_handle
- from data_clean.env.const import mongo_table_prefix
- from data_clean.exception.ruler_validation_exception import RulerValidationException
- from data_clean.utils.base_utils import *
- from data_clean.utils.case_utils import case_no_year_datetime
- from data_clean.utils.date_utils import str_2_date_time, get_update_time, establish_state_time
- from data_clean.utils.party_name_verify_utils import person_name_list_verify
- from data_clean.utils.str_utils import json_str_2_list
- from data_clean.utils.case_utils import get_case_party
- # 必须命名为dim_handle
- dim_handle = get_dim_handle(os.path.basename(__file__))
- # @dim_handle.registry_prefix_func
- async def prefix_func(dim_data: list):
- print("前置程序:", dim_data)
- raise ValueError("前置程序错误")
- pass
- @dim_handle.registry_postfix_func()
- async def post_func(dim_data: list):
- # print("后置程序:", dim_data)
- for r in dim_data:
- r['update_time'] = get_update_time()
- pass
- pass
- @dim_handle.registry_row_func
- async def party_intersect(row_data: dict) -> dict:
- # 判断当事人有交叉
- plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
- defendant_info = json_str_2_list(row_data['defendant_info'], "name")
- inter = list(set(plaintiff_info).intersection(set(defendant_info)))
- if len(inter) == 0:
- return row_data
- else:
- raise RulerValidationException("ccoa_001", "当事人有交叉:%s" % inter)
- pass
- async def _get_max_establish_date(company_ids: list):
- company_ids = [i for i in company_ids if i]
- res = await bulk_get('ng_rt_company', company_ids)
- res = [str_2_date_time(i['ESTIBLISH_TIME']) for i in res if 'ESTIBLISH_TIME' in i and i['ESTIBLISH_TIME']]
- if not res:
- return None
- return max(res)
- pass
- # 开庭时间相关过滤
- @dim_handle.registry_row_func
- async def open_ann_date(row_data: dict) -> dict:
- import datetime
- now = datetime.datetime.now()
- delta = datetime.timedelta(days=730) # 两年后
- max_date = now + delta
- start_date = get_or_none(row_data, 'start_date')
- case_no = get_or_none(row_data, 'case_no')
- if case_no is None and start_date is None:
- raise RulerValidationException("ccoa_007", "案号和开庭时间均为空")
- if start_date is None:
- return row_data
- try:
- this_date = str_2_date_time(row_data['start_date'])
- if this_date < establish_state_time:
- raise RulerValidationException("ccoa_002", "开庭时间早于建国时间:%s" % row_data['start_date'])
- if this_date > max_date:
- raise RulerValidationException("ccoa_006", "开庭时间在两年后:%s" % row_data['start_date'])
- part_keyno = json_str_2_list(row_data['plaintiff_info'], 'litigant_id') + json_str_2_list(
- row_data['defendant_info'], 'litigant_id')
- part_keyno = [i for i in part_keyno if i and len(i) == 32]
- max_establish_date = await _get_max_establish_date(part_keyno)
- if max_establish_date and this_date < max_establish_date:
- raise RulerValidationException("ccoa_004", "开庭时有公司未成立,最晚一个公司成立日期:%s,开庭时间:%s" % (
- max_establish_date, row_data['start_date']))
- case_no_year_dt = case_no_year_datetime(case_no)
- if case_no_year_dt and this_date < case_no_year_dt:
- raise RulerValidationException("ccoa_005", "案号大于开庭时间年份,案号:%s,开庭时间:%s" % (
- case_no, row_data['start_date']))
- except RulerValidationException as ex:
- if case_no is None:
- raise ex
- await insert_one(mongo_table_prefix + 'info_cooa_start_date_set_none', {
- "content": {
- "data": {
- "company_court_open_announcement": [row_data]
- },
- }
- })
- row_data['start_date'] = None
- pass
- return row_data
- pass
- @dim_handle.registry_row_func
- async def party_unknown(row_data: dict) -> dict:
- # 过滤当事人名字异常,Z某某、xxx
- plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
- defendant_info = json_str_2_list(row_data['defendant_info'], "name")
- li = plaintiff_info + defendant_info
- flag, error_name = person_name_list_verify(li)
- if not flag:
- result = await get_case_party(row_data['case_no'], source='open_court')
- if result:
- row_data['plaintiff_info'] = result['plaintiff_info']
- row_data['defendant_info'] = result['defendant_info']
- row_data['litigant_info'] = result['litigant_info']
- row_data['plaintiff'] = result['plaintiff']
- row_data['defendant'] = result['defendant']
- row_data['litigant'] = result['litigant']
- pass
- else:
- raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % error_name)
- pass
- return row_data
- async def test():
- res = await _get_max_establish_date(
- ['bc702f0f5202342a9c1c75fbf9be9aff', 'b79d862faef595f33b166562bb3c18b6', '24cb269450f9262051dfcaa3dc389844'])
- print(res)
- pass
- if __name__ == '__main__':
- import asyncio
- asyncio.run(test())
- pass
|