Browse Source

feat: 添加基础性组件

- 日志组件修改为loguru
- 添加可复用的http client
- hbase接口改用可复用的http
- 开庭公告添加前置函数,过滤当事人为空问题
- 添加scan es sdk
- 读取线上schema添加缓存功能
-
许家凯 1 year ago
parent
commit
926fb9abaf

+ 2 - 3
JobMain.py

@@ -12,7 +12,7 @@ from aio_pika import IncomingMessage
 from environs import Env
 
 from data_clean.task_distributor import task_distribute
-from data_clean.utils import get_log
+from loguru import logger as log
 from data_clean.utils.async_client import get_aio_kafka_consumer, get_aio_kafka_producer, get_rabbitmq_connection
 from data_clean.utils.asyncio_pool import AsyncPool
 
@@ -26,7 +26,6 @@ source_topic = env.str("source_topic", base_topic)  # "rt_company_dim"
 target_topic = env.str("target_topic", base_topic)  # "rt_company_dim"
 
 max_concurrency = env.int("concurrency", 1)
-log = get_log("JobMain")
 
 
 async def handle(producer, data: dict):
@@ -48,7 +47,7 @@ async def on_message_received(producer, msg: IncomingMessage):
 
 
 async def main_for_rabbitmq():
-    log.info("start job. Listening queue : %s , send topic: %s , max concurrency: %s", source_topic, target_topic,
+    log.info("start job. Listening queue : {} , send topic: {} , max concurrency: {}", source_topic, target_topic,
              max_concurrency)
 
     pool = AsyncPool(max_concurrency)

+ 5 - 5
data_clean/api/hbase_api.py

@@ -3,17 +3,17 @@
 # @Author : XuJiakai
 # @File : hbase_api
 # @Software: PyCharm
-from data_clean.api.http_api import get as http_get
-from data_clean.api.http_api import post
+from data_clean.api.http_api import HttpSessionReuse
 
 from data_clean.env.environment_switch import project_env
 from data_clean.exception.fetch_exception import FetchException
 
 _hosts = project_env.get_val('winhc_open_api.eci_data.host')
+http_session_reuse = HttpSessionReuse(20)
 
 
 async def get(table_name: str, rowkey: str):
-    result = await http_get(_hosts + '/hbase/get/' + table_name + '/' + rowkey)
+    result = await http_session_reuse.get(_hosts + '/hbase/get/' + table_name + '/' + rowkey)
     if not result['success']:
         raise FetchException(200, result, '内容异常')
     return result['data']
@@ -24,7 +24,7 @@ async def bulk_get(table_name, rowkey: list, query_key: list = None):
     if query_key is None:
         query_key = []
     rowkey = list(set(rowkey))
-    result = await post(_hosts + '/hbase/bulk-get?tableName=' + table_name, data={
+    result = await http_session_reuse.post(_hosts + '/hbase/bulk-get?tableName=' + table_name, data={
         "query_key": query_key,
         "rowkey": rowkey
     })
@@ -42,6 +42,6 @@ async def bulk_get(table_name, rowkey: list, query_key: list = None):
 if __name__ == '__main__':
     import asyncio
 
-    asyncio.run(get("ng_rt_company","bc702f0f5202342a9c1c75fbf9be9aff"))
+    asyncio.run(get("ng_rt_company", "bc702f0f5202342a9c1c75fbf9be9aff"))
     # asyncio.run(bulk_get("ng_rt_company", ["a33f3cd172f8f9bd61d5f3cd84a4ffd9", "bc702f0f5202342a9c1c75fbf9be9aff"]))
     pass

+ 67 - 1
data_clean/api/http_api.py

@@ -3,9 +3,12 @@
 # @Author : XuJiakai
 # @File : http_api
 # @Software: PyCharm
+import asyncio
+import contextlib
 
 import aiohttp
-
+from aiohttp import ClientSession
+from loguru import logger as log
 from data_clean.exception.fetch_exception import FetchException
 
 
@@ -32,5 +35,68 @@ async def post(url: str, data: dict):
     pass
 
 
+class HttpSessionReuse:
+    def __init__(self, max_pool: int = 1):
+        self._session_pool: list[ClientSession] = [None] * max_pool
+        self._index = 0
+        self._max_pool = max_pool
+        self._lock = asyncio.Lock()
+        pass
+
+    async def _create_session(self, index: int):
+        async with self._lock:
+            if self._session_pool[index]:
+                return
+            self._session_pool[index] = aiohttp.ClientSession()
+            log.info("init client session, session index : {}", index)
+            pass
+        pass
+
+    @contextlib.asynccontextmanager
+    async def get_session(self) -> ClientSession:
+        self._index = (self._index + 1) % self._max_pool
+        tmp_index = self._index
+        if self._session_pool[tmp_index]:
+            yield self._session_pool[tmp_index]
+            pass
+        else:
+            await self._create_session(tmp_index)
+            yield self._session_pool[tmp_index]
+            pass
+
+    async def post(self, url: str, data: dict):
+        async with self.get_session() as session:
+            async with session.post(url, json=data) as response:
+                result = await response.json()
+                if response.status != 200:
+                    raise FetchException(response.status, result)
+                return result
+
+        pass
+
+    async def get(self, url: str, result_json: bool = True):
+        async with self.get_session() as session:
+            async with session.get(url) as response:
+                if result_json:
+                    result = await response.json()
+                else:
+                    result = await response.text()
+                if response.status != 200:
+                    raise FetchException(response.status, result)
+                return result
+
+    async def __aenter__(self) -> "HttpSessionReuse":
+        return self
+
+    async def __aexit__(self) -> None:
+        await self.release()
+
+    async def release(self):
+        for i in range(len(self._session_pool)):
+            await self._session_pool[i].close()
+            log.info('close client session, session index :{}', i)
+        pass
+
+
 if __name__ == '__main__':
     pass

+ 4 - 6
data_clean/exception/exception_handle.py

@@ -11,11 +11,9 @@ from data_clean.env.const import mongo_table_prefix
 from data_clean.exception.fetch_exception import FetchException
 from data_clean.exception.ruler_validation_exception import RulerValidationException
 from data_clean.statistic.data_clean_statistic import ruler_error, error
-from data_clean.utils import get_log
+from loguru import logger as log
 from data_clean.utils.date_utils import get_now_datetime
 
-log = get_log("exception_handler")
-
 
 async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
     """
@@ -129,17 +127,17 @@ def exception_handle(func):
         try:
             result = await func(self, *args, **kwargs)
         except RulerValidationException as ex:
-            log.warn("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
+            log.warning("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex)
             data = args[0]
             data = _handle(data, tn, session_id, ex, original_data)
             await ruler_valid_exception_sink(ex, tn, data, session_id)
         except FetchException as ex:
-            log.error("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
+            log.error("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex)
             data = args[0]
             data = _handle(data, tn, session_id, ex, original_data)
             await fetch_exception_sink(ex, tn, data, session_id)
         except Exception as ex:
-            log.error("session_id: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
+            log.error("session_id: {} 维度:{} ,出现未知异常:{}", session_id, tn, repr(ex))
             data = args[0]
             data = _handle(data, tn, session_id, ex, original_data)
             await error_sink(ex, tn, data, session_id)

+ 8 - 6
data_clean/handle/company_court_open_announcement.py

@@ -21,10 +21,12 @@ from data_clean.utils.case_utils import get_case_party
 dim_handle = get_dim_handle(os.path.basename(__file__))
 
 
-# @dim_handle.registry_prefix_func
+@dim_handle.registry_prefix_func
 async def prefix_func(dim_data: list):
-    print("前置程序:", dim_data)
-    raise ValueError("前置程序错误")
+    for row_data in dim_data:
+        row_data['plaintiff_info'] = remove_null_party(row_data['plaintiff_info'])
+        row_data['defendant_info'] = remove_null_party(row_data['defendant_info'])
+        row_data['litigant_info'] = remove_null_party(row_data['litigant_info'])
 
     pass
 
@@ -129,9 +131,9 @@ async def open_ann_date(row_data: dict) -> dict:
 @dim_handle.registry_row_func
 async def party_unknown(row_data: dict) -> dict:
     # 过滤当事人名字异常,Z某某、xxx
-    row_data['plaintiff_info'] = remove_null_party(row_data['plaintiff_info'])
-    row_data['defendant_info'] = remove_null_party(row_data['defendant_info'])
-    row_data['litigant_info'] = remove_null_party(row_data['litigant_info'])
+    # row_data['plaintiff_info'] = remove_null_party(row_data['plaintiff_info'])
+    # row_data['defendant_info'] = remove_null_party(row_data['defendant_info'])
+    # row_data['litigant_info'] = remove_null_party(row_data['litigant_info'])
 
     plaintiff_info = json_str_2_list(row_data['plaintiff_info'], "name")
     defendant_info = json_str_2_list(row_data['defendant_info'], "name")

+ 8 - 8
data_clean/statistic/data_clean_statistic.py

@@ -10,9 +10,9 @@ from data_clean.exception.ruler_validation_exception import RulerValidationExcep
 from data_clean.statistic.sls_log import get_logger
 from data_clean.statistic.statistic_filter import filter_data
 from data_clean.utils import is_windows
-from data_clean.utils.log import get_log as get_local_log
+from loguru import logger as log
 
-_log = get_logger() if not is_windows() else get_local_log("local_log")
+_log = get_logger() if not is_windows() else log
 
 '''
 log template:
@@ -36,7 +36,7 @@ def error(data, session_id, ex: Exception, content_type: int, tn: str, original_
     error_type = 1 if isinstance(ex, FetchException) else 9
     filter_data(data)
     filter_data(original_data)
-    _log.error(msg={
+    _log.error({
         "session_id": session_id,
         "content": data,
         "original_content": original_data,
@@ -50,14 +50,14 @@ def error(data, session_id, ex: Exception, content_type: int, tn: str, original_
             "traceback": "".join(traceback.format_exception(ex))
         }
 
-    }, stacklevel=2)
+    })
     pass
 
 
 def ruler_error(data, session_id, ex: RulerValidationException, content_type: int, tn: str, original_data):
     filter_data(data)
     filter_data(original_data)
-    _log.warn(msg={
+    _log.warning({
         "session_id": session_id,
         "content": data,
         "original_content": original_data,
@@ -71,19 +71,19 @@ def ruler_error(data, session_id, ex: RulerValidationException, content_type: in
             "error_msg": ex.get_msg()
         }
 
-    }, stacklevel=2)
+    })
     pass
 
 
 def success(data, session_id, original_data):
     filter_data(data)
     filter_data(original_data)
-    _log.info(msg={
+    _log.info({
         "session_id": session_id,
         "content": data,
         "original_content": original_data,
         "successful": 1,
         "error": 0,
-    }, stacklevel=1)
+    })
 
     pass

+ 2 - 4
data_clean/task_distributor.py

@@ -8,17 +8,15 @@ import uuid
 import copy
 from data_clean.dim_handle_registry import get_class_dict
 from data_clean.statistic.data_clean_statistic import success
-from data_clean.utils import get_log
+from loguru import logger as log
 
 scan_path = os.path.join(os.path.dirname(__file__), 'handle')
 
 file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")]
-log = get_log("task_distribute")
 # class_dict = {}
 for tn in file_name_list:
     tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn))
     # class_dict[tn] = tmp.dim_handle
-
 class_dict = get_class_dict()
 
 
@@ -43,7 +41,7 @@ async def task_distribute(data: dict):
 
         else:
             # raise ValueError(f"{key} 维度未实现!")
-            log.warn(f'{key}维度未实现!直接发送...')
+            log.warning(f'{key}维度未实现!直接发送...')
             pass
 
     if len(tmp_data) == 0:

+ 48 - 0
data_clean/utils/asyncio_es_scan.py

@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# @Time : 2023/8/28 17:32
+# @Author : XuJiakai
+# @File : asyncio_es_scan
+# @Software: PyCharm
+
+from loguru import logger as log
+
+from data_clean.utils.async_client import get_aio_elasticsearch
+from data_clean.utils.base_utils import parse_env_and_name
+
+
+class AsyncioElasticsearchFastScan:
+    def __init__(self, index, query_dsl, env='new', doc_type: str = '_doc'):
+        env, index = parse_env_and_name(index, env)
+        self.es = get_aio_elasticsearch(env)
+        self.env = env
+        self.index = index
+        self.query_dsl = query_dsl
+        self.doc_type = doc_type
+        pass
+
+    async def scan(self, scroll='5m', timeout='5m', size=500):
+        result_data = await self.es.search(index=self.index
+                                           , doc_type=self.doc_type
+                                           , body=self.query_dsl
+                                           , scroll=scroll
+                                           , timeout=timeout
+                                           , size=size,
+                                           )
+
+        hits = result_data.get("hits").get("hits")
+        total = result_data["hits"]["total"]
+        log.info(
+            "total record : {}", total
+        )
+
+        log.info("Processing a batch of data: {} - {}", 0, size)
+        yield hits
+
+        scroll_id = result_data["_scroll_id"]
+
+        for i in range(int(total / size)):
+            res = await self.es.scroll(scroll_id=scroll_id, scroll=scroll)
+            log.info("Processing a batch of data: {} - {}", (i + 1) * size, (i + 2) * size)
+            yield res["hits"]["hits"]
+        log.info("scan successful ! ")
+        pass

+ 2 - 4
data_clean/utils/asyncio_pool.py

@@ -9,9 +9,7 @@ import signal
 import sys
 from typing import Coroutine
 
-from data_clean.utils import get_log
-
-log = get_log("JobMain")
+from loguru import logger as log
 
 
 class AsyncPool(object):
@@ -31,7 +29,7 @@ class AsyncPool(object):
 
     async def wait_all_done(self):
         while self._running_task_num > 0:
-            log.info("wait done...")
+            log.info("running task num : {} , wait done...", self._running_task_num)
             await asyncio.sleep(1)
 
     def _release(self, t):

+ 31 - 28
data_clean/utils/data_schema_utils.py

@@ -23,37 +23,40 @@ def _cast_by_type(val, val_type: list):
     pass
 
 
-async def get_data_schema(tn: str):
-    res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema',
-                    result_json=False)
-    res = json.loads(res)
-
-    # print(res)
-
-    return res
-    pass
-
-
-async def record_to_json(tn, record_json):
-    json_schema = await get_data_schema(tn)
-
-    json_schema = json_schema['properties']
-    result_json = {}
-    for key in record_json:
-        key_lower = key.lower()
-        if key_lower in json_schema:
-            result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type'])
+class BusinessDataSchema:
+    def __init__(self):
+        self.org_data_schema = {}
+        self._lock = asyncio.Lock()
 
         pass
-    return result_json
-    pass
 
+    async def get_data_schema(self, tn: str):
+        if tn in self.org_data_schema:
+            return self.org_data_schema[tn]
+            pass
+
+        async with self._lock:
+            if tn in self.org_data_schema:
+                return self.org_data_schema[tn]
+                pass
+            res = await get('https://bigdata-rt.oss-cn-shanghai.aliyuncs.com/business-schema/' + tn + '.schema',
+                            result_json=False)
+            res = json.loads(res)
+            self.org_data_schema[tn] = res
+
+        return res
+        pass
 
-async def test():
-    await get_data_schema("company_court_open_announcement")
-    pass
+    async def hbase_record_to_json(self, tn, record_json):
+        json_schema = await self.get_data_schema(tn)
 
+        json_schema = json_schema['properties']
+        result_json = {}
+        for key in record_json:
+            key_lower = key.lower()
+            if key_lower in json_schema:
+                result_json[key_lower] = _cast_by_type(record_json[key], json_schema[key_lower]['type'])
 
-if __name__ == '__main__':
-    asyncio.run(test())
-    pass
+            pass
+        return result_json
+        pass

+ 1 - 1
data_clean/utils/party_name_verify_utils.py

@@ -37,5 +37,5 @@ def remove_null_party(party: list, key: str = "name"):
 
 if __name__ == '__main__':
     # print(person_name_verify("Z某某"))
-    print(remove_null_party([{"name": "", "litigant_id": ""}], "litigant_id"))
+    print(remove_null_party([{"name": "", "litigant_id": ""}], "name"))
     pass

+ 59 - 1
poetry.lock

@@ -451,6 +451,22 @@ url = "https://mirror.baidu.com/pypi/simple"
 reference = "douban"
 
 [[package]]
+name = "colorama"
+version = "0.4.6"
+description = "Cross-platform colored terminal text."
+optional = false
+python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7"
+files = [
+    {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
+    {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
+]
+
+[package.source]
+type = "legacy"
+url = "https://mirror.baidu.com/pypi/simple"
+reference = "douban"
+
+[[package]]
 name = "dateparser"
 version = "1.1.8"
 description = "Date parsing library designed to parse dates from HTML pages"
@@ -807,6 +823,29 @@ url = "https://mirror.baidu.com/pypi/simple"
 reference = "douban"
 
 [[package]]
+name = "loguru"
+version = "0.7.0"
+description = "Python logging made (stupidly) simple"
+optional = false
+python-versions = ">=3.5"
+files = [
+    {file = "loguru-0.7.0-py3-none-any.whl", hash = "sha256:b93aa30099fa6860d4727f1b81f8718e965bb96253fa190fab2077aaad6d15d3"},
+    {file = "loguru-0.7.0.tar.gz", hash = "sha256:1612053ced6ae84d7959dd7d5e431a0532642237ec21f7fd83ac73fe539e03e1"},
+]
+
+[package.dependencies]
+colorama = {version = ">=0.3.4", markers = "sys_platform == \"win32\""}
+win32-setctime = {version = ">=1.0.0", markers = "sys_platform == \"win32\""}
+
+[package.extras]
+dev = ["Sphinx (==5.3.0)", "colorama (==0.4.5)", "colorama (==0.4.6)", "freezegun (==1.1.0)", "freezegun (==1.2.2)", "mypy (==v0.910)", "mypy (==v0.971)", "mypy (==v0.990)", "pre-commit (==3.2.1)", "pytest (==6.1.2)", "pytest (==7.2.1)", "pytest-cov (==2.12.1)", "pytest-cov (==4.0.0)", "pytest-mypy-plugins (==1.10.1)", "pytest-mypy-plugins (==1.9.3)", "sphinx-autobuild (==2021.3.14)", "sphinx-rtd-theme (==1.2.0)", "tox (==3.27.1)", "tox (==4.4.6)"]
+
+[package.source]
+type = "legacy"
+url = "https://mirror.baidu.com/pypi/simple"
+reference = "douban"
+
+[[package]]
 name = "marshmallow"
 version = "3.20.1"
 description = "A lightweight library for converting complex datatypes to and from native Python datatypes."
@@ -1447,6 +1486,25 @@ url = "https://mirror.baidu.com/pypi/simple"
 reference = "douban"
 
 [[package]]
+name = "win32-setctime"
+version = "1.1.0"
+description = "A small Python utility to set file creation time on Windows"
+optional = false
+python-versions = ">=3.5"
+files = [
+    {file = "win32_setctime-1.1.0-py3-none-any.whl", hash = "sha256:231db239e959c2fe7eb1d7dc129f11172354f98361c4fa2d6d2d7e278baa8aad"},
+    {file = "win32_setctime-1.1.0.tar.gz", hash = "sha256:15cf5750465118d6929ae4de4eb46e8edae9a5634350c01ba582df868e932cb2"},
+]
+
+[package.extras]
+dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"]
+
+[package.source]
+type = "legacy"
+url = "https://mirror.baidu.com/pypi/simple"
+reference = "douban"
+
+[[package]]
 name = "yarl"
 version = "1.9.2"
 description = "Yet another URL library"
@@ -1541,4 +1599,4 @@ reference = "douban"
 [metadata]
 lock-version = "2.0"
 python-versions = "^3.8"
-content-hash = "bb1426ac49b561de757a2d26e11ba9b1542abc16b6dff0bdd5ff083dc66f2ad1"
+content-hash = "cfa7cd3ff3de117bcdf7df3974b5d0f52536c085acf9567d1dc5474c004a4c7a"

+ 1 - 0
pyproject.toml

@@ -17,6 +17,7 @@ aio-pika = "^9.1.5"
 aliyun-log-python-sdk = "^0.8.7"
 elasticsearch7 = {extras = ["async"], version = "^7.17.9"}
 jsonpath = "^0.82"
+loguru = "^0.7.0"
 
 
 [build-system]