Browse Source

fix: 开庭公告字段格式问题

- 开庭公告字段格式问题
- 测试环境根据维度schema生成测试数据
许家凯 1 year ago
parent
commit
67918f0640

+ 5 - 2
data_clean/api/http_api.py

@@ -9,10 +9,13 @@ import aiohttp
 from data_clean.exception.fetch_exception import FetchException
 
 
-async def get(url: str):
+async def get(url: str, result_json: bool = True):
     async with aiohttp.ClientSession() as session:
         async with session.get(url) as response:
-            result = await response.json()
+            if result_json:
+                result = await response.json()
+            else:
+                result = await response.text()
             if response.status != 200:
                 raise FetchException(response.status, result)
             return result

+ 3 - 3
data_clean/handle/company_court_open_announcement.py

@@ -130,9 +130,9 @@ async def party_unknown(row_data: dict) -> dict:
     if not flag:
         result = await get_case_party(row_data['case_no'], source='open_court')
         if result:
-            row_data['plaintiff_info'] = to_string(result['plaintiff_info'], is_format=False)
-            row_data['defendant_info'] = to_string(result['defendant_info'], is_format=False)
-            row_data['litigant_info'] = to_string(result['litigant_info'], is_format=False)
+            row_data['plaintiff_info'] = result['plaintiff_info']
+            row_data['defendant_info'] = result['defendant_info']
+            row_data['litigant_info'] = result['litigant_info']
             row_data['plaintiff'] = result['plaintiff']
             row_data['defendant'] = result['defendant']
             row_data['litigant'] = result['litigant']

+ 59 - 0
data_clean/utils/data_schema_utils.py

@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/22 10:37
+# @Author : XuJiakai
+# @File : data_schema_utils
+# @Software: PyCharm
+import asyncio
+import json
+
+from data_clean.api.http_api import get
+
+
+def _cast_by_type(val, val_type: list):
+    if val == '':
+        return None
+    val_type = [i for i in val_type if i != 'null']
+    val_type = val_type[0]
+    if val_type == 'array' or val_type == 'object':
+        return json.loads(val)
+    if val_type == 'number':
+        return int(val)
+
+    return val
+    pass
+
+
+async def get_data_schema(tn: str):
+    res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema',
+                    result_json=True)
+    res = json.loads(res)
+
+    print(res)
+
+    return res
+    pass
+
+
+async def record_to_json(tn, record_json):
+    json_schema = await get_data_schema(tn)
+
+    json_schema = json_schema['properties']
+    result_json = {}
+    for key in record_json:
+        key_lower = key.lower()
+        if key_lower in json_schema:
+            result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type'])
+
+        pass
+    return result_json
+    pass
+
+
+async def test():
+    await get_data_schema("company_court_open_announcement")
+    pass
+
+
+if __name__ == '__main__':
+    asyncio.run(test())
+    pass

+ 5 - 2
data_clean/utils/str_utils.py

@@ -7,11 +7,14 @@ import json
 import re
 
 
-def json_str_2_list(json_str: str, key: str):
+def json_str_2_list(json_str, key: str):
     if json_str is None:
         return []
     try:
-        j = json.loads(json_str)
+        if type(json_str) == dict or type(json_str) == list:
+            j = json_str
+        else:
+            j = json.loads(json_str)
         result = []
         for i in j:
             result.append(i[key])

+ 48 - 10
tests/TestMain.py

@@ -5,20 +5,32 @@
 # @Software: PyCharm
 import asyncio
 import json
+from functools import partial
 
-from data_clean.api.http_api import get_json
-from data_clean.utils.async_client import get_aio_kafka_producer
+from aio_pika import Message
+
+from JobMain import source_topic
+from data_clean.api.http_api import get
+from data_clean.utils.async_client import get_aio_kafka_producer, get_rabbitmq_connection
+from data_clean.utils import to_string
+
+json.dumps = partial(json.dumps, ensure_ascii=False)
+from data_clean.utils.data_schema_utils import record_to_json
 
 
 async def get_test_data():
     tn = "company_court_open_announcement"
-    url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/8fd218fc8461789c4c401eb1eaa3d723"
-    res = await get_json(url)
-    res = json.loads(json.dumps(res['data']).lower())
+    url = f"http://47.101.221.131:8288/hbase/get/ng_rt_{tn}/4c957482789a8218461079215b4d239b"
+    res = await get(url)
+    res = await record_to_json(tn, res['data'])
+
+    # res = json.loads(json.dumps(res['data']).lower())
 
     # res["plaintiff_info"] = '[{"name":"季韩旭","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
-    res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
-    # res["start_date"] = '1948-10-01 00:00:00'
+    # res["plaintiff_info"] = '[{"name":"Z某某","litigant_id":""},{"name":"戴莉","litigant_id":""}]'
+    # res["deleted"] = 9
+    # res["start_date"] = '1959-08-16 00:00:00'
+    # del res['case_no']
     data = {
         "data": {
             tn: [
@@ -30,11 +42,26 @@ async def get_test_data():
     return data
 
 
+async def test_send_rabbitmq():
+    connection = await get_rabbitmq_connection()
+    channel = await connection.channel()
+
+    data = await get_test_data()
+    await channel.default_exchange.publish(
+        Message(
+            bytes(json.dumps(data), 'utf-8'),
+        ), routing_key=source_topic,
+    )
+    await channel.close()
+    await connection.close()
+    pass
+
+
 async def test_send_kafka():
     producer = get_aio_kafka_producer()
     await producer.start()
     data = await get_test_data()
-    res = await producer.send_and_wait("source_topic", json.dumps(data).encode())
+    res = await producer.send_and_wait(source_topic, json.dumps(data).encode())
     print(res)
     await producer.stop()
     pass
@@ -44,14 +71,25 @@ async def test_for_url():
     data = await get_test_data()
     asyncio.get_running_loop()
 
-    print("receive : ", data)
+    print("receive : ", to_string(data))
     from data_clean.task_distributor import task_distribute
     result_data = await task_distribute(data)
-    print("send    : ", result_data)
+    print("send    : ", to_string(result_data))
+    pass
+
+
+async def test():
+    data = await get_test_data()
+    print(data)
+    from data_clean.statistic.statistic_filter import filter_data
+    filter_data(data)
+    print(data)
     pass
 
 
 if __name__ == '__main__':
     # asyncio.run(test_send_kafka())
+    # asyncio.run(test_send_rabbitmq())
+    # asyncio.run(test())
     asyncio.run(test_for_url())
     pass