# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:27 # @Author : XuJiakai # @File : company_court_open_announcement # @Software: PyCharm import os from data_clean.api.hbase_api import bulk_get from data_clean.api.mongo_api import insert_one from data_clean.dim_handle_registry import get_dim_handle from data_clean.env.const import mongo_table_prefix from data_clean.exception.ruler_validation_exception import RulerValidationException from data_clean.utils.base_utils import * from data_clean.utils.case_utils import case_no_year_datetime from data_clean.utils.date_utils import str_2_date_time, get_update_time, establish_state_time from data_clean.utils.party_name_verify_utils import person_name_list_verify from data_clean.utils.str_utils import json_str_2_list from data_clean.utils.case_utils import get_case_party # 必须命名为dim_handle dim_handle = get_dim_handle(os.path.basename(__file__)) # @dim_handle.registry_prefix_func async def prefix_func(dim_data: list): print("前置程序:", dim_data) raise ValueError("前置程序错误") pass @dim_handle.registry_postfix_func() async def post_func(dim_data: list): # print("后置程序:", dim_data) for r in dim_data: r['update_time'] = get_update_time() pass pass @dim_handle.registry_row_func async def party_intersect(row_data: dict) -> dict: # 判断当事人有交叉 plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name") defendant_info = json_str_2_list(row_data['defendant_info'], "name") inter = list(set(plaintiff_info).intersection(set(defendant_info))) if len(inter) == 0: return row_data else: raise RulerValidationException("ccoa_001", "当事人有交叉:%s" % inter) pass async def _get_max_establish_date(company_ids: list): company_ids = [i for i in company_ids if i] res = await bulk_get('ng_rt_company', company_ids) res = [str_2_date_time(i['ESTIBLISH_TIME']) for i in res if 'ESTIBLISH_TIME' in i and i['ESTIBLISH_TIME']] if not res: return None return max(res) pass # 开庭时间相关过滤 @dim_handle.registry_row_func async def open_ann_date(row_data: dict) -> dict: import datetime now = datetime.datetime.now() delta = datetime.timedelta(days=730) # 两年后 max_date = now + delta start_date = get_or_none(row_data, 'start_date') case_no = get_or_none(row_data, 'case_no') if case_no is None and start_date is None: raise RulerValidationException("ccoa_007", "案号和开庭时间均为空") if start_date is None: return row_data try: this_date = str_2_date_time(row_data['start_date']) if this_date < establish_state_time: raise RulerValidationException("ccoa_002", "开庭时间早于建国时间:%s" % row_data['start_date']) if this_date > max_date: raise RulerValidationException("ccoa_006", "开庭时间在两年后:%s" % row_data['start_date']) part_keyno = json_str_2_list(row_data['plaintiff_info'], 'litigant_id') + json_str_2_list( row_data['defendant_info'], 'litigant_id') part_keyno = [i for i in part_keyno if i and len(i) == 32] max_establish_date = await _get_max_establish_date(part_keyno) if max_establish_date and this_date < max_establish_date: raise RulerValidationException("ccoa_004", "开庭时有公司未成立,最晚一个公司成立日期:%s,开庭时间:%s" % ( max_establish_date, row_data['start_date'])) case_no_year_dt = case_no_year_datetime(case_no) if case_no_year_dt and this_date < case_no_year_dt: raise RulerValidationException("ccoa_005", "案号大于开庭时间年份,案号:%s,开庭时间:%s" % ( case_no, row_data['start_date'])) except RulerValidationException as ex: if case_no is None: raise ex await insert_one(mongo_table_prefix + 'info_cooa_start_date_set_none', { "content": { "data": { "company_court_open_announcement": [row_data] }, } }) row_data['start_date'] = None pass return row_data pass @dim_handle.registry_row_func async def party_unknown(row_data: dict) -> dict: # 过滤当事人名字异常,Z某某、xxx plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name") defendant_info = json_str_2_list(row_data['defendant_info'], "name") li = plaintiff_info + defendant_info flag, error_name = person_name_list_verify(li) if not flag: result = await get_case_party(row_data['case_no'], source='open_court') if result: row_data['plaintiff_info'] = result['plaintiff_info'] row_data['defendant_info'] = result['defendant_info'] row_data['litigant_info'] = result['litigant_info'] row_data['plaintiff'] = result['plaintiff'] row_data['defendant'] = result['defendant'] row_data['litigant'] = result['litigant'] pass else: raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % error_name) pass return row_data async def test(): res = await _get_max_establish_date( ['bc702f0f5202342a9c1c75fbf9be9aff', 'b79d862faef595f33b166562bb3c18b6', '24cb269450f9262051dfcaa3dc389844']) print(res) pass if __name__ == '__main__': import asyncio asyncio.run(test()) pass