# -*- coding: utf-8 -*- # @Time : 2023/7/20 16:27 # @Author : XuJiakai # @File : company_court_open_announcement # @Software: PyCharm import os from data_clean.api.hbase_api import bulk_get from data_clean.api.mongo_api import insert_one from data_clean.dim_handle_registry import get_dim_handle from data_clean.env.const import mongo_table_prefix from data_clean.exception.ruler_validation_exception import RulerValidationException from data_clean.utils.base_utils import * from data_clean.utils.case_utils import case_no_year_datetime from data_clean.utils.case_utils import get_case_party from data_clean.utils.date_utils import str_2_date_time, get_update_time, establish_state_time from data_clean.utils.party_name_verify_utils import person_name_list_verify, remove_null_party from data_clean.utils.str_utils import json_str_2_list # 必须命名为dim_handle dim_handle = get_dim_handle(os.path.basename(__file__)) @dim_handle.registry_prefix_func async def prefix_func(dim_data: list): for row_data in dim_data: row_data['plaintiff_info'] = remove_null_party(row_data['plaintiff_info']) row_data['defendant_info'] = remove_null_party(row_data['defendant_info']) row_data['litigant_info'] = remove_null_party(row_data['litigant_info']) pass @dim_handle.registry_postfix_func() async def post_func(dim_data: list): # print("后置程序:", dim_data) for r in dim_data: r['update_time'] = get_update_time() pass pass @dim_handle.registry_row_func async def party_intersect(row_data: dict) -> dict: # 判断当事人有交叉 if 'case_no' in row_data: case_no = row_data['case_no'] if case_no and '破' in case_no: # 破产案件,允许当事人有交叉。即债务人审请自己破产 return row_data plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name") defendant_info = json_str_2_list(row_data['defendant_info'], "name") inter = list(set(plaintiff_info).intersection(set(defendant_info))) if len(inter) == 0: return row_data else: raise RulerValidationException("ccoa_001", "当事人有交叉:%s" % inter) pass async def _get_max_establish_date(company_ids: list): company_ids = [i for i in company_ids if i] res = await bulk_get('ng_rt_company', company_ids) res = [str_2_date_time(i['ESTIBLISH_TIME']) for i in res if 'ESTIBLISH_TIME' in i and i['ESTIBLISH_TIME']] if not res: return None return max(res) pass # 开庭时间相关过滤 @dim_handle.registry_row_func async def open_ann_date(row_data: dict) -> dict: import datetime now = datetime.datetime.now() delta = datetime.timedelta(days=730) # 两年后 max_date = now + delta start_date = get_or_none(row_data, 'start_date') case_no = get_or_none(row_data, 'case_no') if case_no is None and start_date is None: raise RulerValidationException("ccoa_007", "案号和开庭时间均为空") if start_date is None: return row_data try: this_date = str_2_date_time(row_data['start_date']) if this_date < establish_state_time: raise RulerValidationException("ccoa_002", "开庭时间早于建国时间:%s" % row_data['start_date']) if this_date > max_date: raise RulerValidationException("ccoa_006", "开庭时间在两年后:%s" % row_data['start_date']) part_keyno = json_str_2_list(row_data['plaintiff_info'], 'litigant_id') + json_str_2_list( row_data['defendant_info'], 'litigant_id') part_keyno = [i for i in part_keyno if i and len(i) == 32] max_establish_date = await _get_max_establish_date(part_keyno) if max_establish_date and this_date < max_establish_date: raise RulerValidationException("ccoa_004", "开庭时有公司未成立,最晚一个公司成立日期:%s,开庭时间:%s" % ( max_establish_date, row_data['start_date'])) case_no_year_dt = case_no_year_datetime(case_no) if case_no_year_dt and this_date < case_no_year_dt: raise RulerValidationException("ccoa_005", "案号大于开庭时间年份,案号:%s,开庭时间:%s" % ( case_no, row_data['start_date'])) except RulerValidationException as ex: if case_no is None: raise ex await insert_one(mongo_table_prefix + 'info_cooa_start_date_set_none', { "content": { "data": { "company_court_open_announcement": [row_data] }, } }) row_data['start_date'] = None pass return row_data pass @dim_handle.registry_row_func async def party_unknown(row_data: dict) -> dict: # 过滤当事人名字异常,Z某某、xxx plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name") defendant_info = json_str_2_list(row_data['defendant_info'], "name") li = plaintiff_info + defendant_info flag, error_name = person_name_list_verify(li) if not flag: result = await get_case_party(row_data['case_no'], source='open_court') if result: rowkey = row_data['rowkey'] if 'rowkey' in row_data else None tmp_plaintiff_info = remove_null_party(result['plaintiff_info']) tmp_defendant_info = remove_null_party(result['defendant_info']) tmp_litigant_info = remove_null_party(result['litigant_info']) tmp_plaintiff = result['plaintiff'] tmp_defendant = result['defendant'] tmp_litigant = result['litigant'] await insert_one(mongo_table_prefix + 'info_cooa_party_set_value', { "rowkey": rowkey, "org_party": { 'plaintiff_info': row_data['plaintiff_info'], 'defendant_info': row_data['defendant_info'], 'litigant_info': row_data['litigant_info'], 'plaintiff': row_data['plaintiff'], 'defendant': row_data['defendant'], 'litigant': row_data['litigant'], }, "new_party": { 'plaintiff_info': tmp_plaintiff_info, 'defendant_info': tmp_defendant_info, 'litigant_info': tmp_litigant_info, 'plaintiff': tmp_plaintiff, 'defendant': tmp_defendant, 'litigant': tmp_litigant, } }) row_data['plaintiff_info'] = tmp_plaintiff_info row_data['defendant_info'] = tmp_defendant_info row_data['litigant_info'] = tmp_litigant_info row_data['plaintiff'] = tmp_plaintiff row_data['defendant'] = tmp_defendant row_data['litigant'] = tmp_litigant pass else: raise RulerValidationException("ccoa_003", "人名不符合规范:%s" % error_name) pass return row_data async def test(): res = await _get_max_establish_date( ['b13955ebaa99b06456803041e73c3888', ]) print(res) pass if __name__ == '__main__': row_data = {"rowkey": "a"} rowkey = row_data['rowkey'] if 'rowkey' in row_data else None print(rowkey) # import asyncio # asyncio.run(test()) pass