123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/21 8:43
- # @Author : XuJiakai
- # @File : exception_handle
- # @Software: PyCharm
- import traceback
- from functools import update_wrapper
- from data_clean.api.mongo_api import insert_one
- from data_clean.env.const import mongo_table_prefix
- from data_clean.exception.fetch_exception import FetchException
- from data_clean.exception.ruler_validation_exception import RulerValidationException
- from data_clean.statistic.data_clean_statistic import ruler_error, error
- from data_clean.utils import get_log
- from data_clean.utils.date_utils import get_now_datetime
- log = get_log("exception_handler")
- async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
- """
- 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
- :param ex:
- :param tn:
- :param data:
- :param session_id:
- :return:
- """
- col = mongo_table_prefix + "ruler_valid_error"
- doc = {
- "ruler_code": ex.ruler_code,
- "tn": tn,
- "session_id": session_id,
- "exception": str(ex),
- "create_time": get_now_datetime(),
- "data": data
- }
- await insert_one(col, doc)
- pass
- async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
- """
- 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
- :param ex:
- :param tn:
- :param data:
- :param session_id:
- :return:
- """
- col_pre = mongo_table_prefix + "fetch_error"
- doc = {
- "tn": tn,
- "session_id": session_id,
- "data": data,
- "create_time": get_now_datetime(),
- "exception": str(ex),
- }
- await insert_one(col_pre, doc)
- pass
- async def error_sink(ex: Exception, tn: str, data, session_id):
- """
- 未知异常写出,出现该异常需要手动介入查看原因。
- :param ex:
- :param tn:
- :param data:
- :param session_id:
- :return:
- """
- col_pre = mongo_table_prefix + "error"
- doc = {
- "tn": tn,
- "session_id": session_id,
- "create_time": get_now_datetime(),
- "exception": str(ex),
- "data": data,
- "traceback": "".join(traceback.format_exception(ex))
- }
- await insert_one(col_pre, doc)
- pass
- def _handle(content, tn, session_id, ex: Exception, original_data):
- content_type = 0
- if not isinstance(content, list):
- content = [content]
- content_type = 1
- content = {
- "data": {
- tn: content
- },
- }
- if isinstance(ex, RulerValidationException):
- ruler_error(
- data=content,
- session_id=session_id,
- ex=ex,
- content_type=content_type,
- tn=tn
- , original_data=original_data
- )
- else:
- error(data=content,
- session_id=session_id,
- ex=ex,
- content_type=content_type,
- tn=tn
- , original_data=original_data
- )
- pass
- return content
- pass
- def exception_handle(func):
- async def wrapper(self, *args, **kwargs):
- # tn = pascal_case_to_snake_case(self.__class__.__name__)
- tn = self._name
- session_id = args[1]
- original_data = args[2]
- result = None
- try:
- result = await func(self, *args, **kwargs)
- except RulerValidationException as ex:
- log.warn("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
- data = args[0]
- data = _handle(data, tn, session_id, ex, original_data)
- await ruler_valid_exception_sink(ex, tn, data, session_id)
- except FetchException as ex:
- log.error("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
- data = args[0]
- data = _handle(data, tn, session_id, ex, original_data)
- await fetch_exception_sink(ex, tn, data, session_id)
- except Exception as ex:
- log.error("session_id: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
- data = args[0]
- data = _handle(data, tn, session_id, ex, original_data)
- await error_sink(ex, tn, data, session_id)
- return result
- pass
- return update_wrapper(wrapper, func)
- if __name__ == '__main__':
- pass
|