# -*- coding: utf-8 -*- # @Time : 2023/7/21 8:43 # @Author : XuJiakai # @File : exception_handle # @Software: PyCharm import traceback from functools import update_wrapper from data_clean.api.mongo_api import insert_one from data_clean.env.const import mongo_table_prefix from data_clean.exception.fetch_exception import FetchException from data_clean.exception.ruler_validation_exception import RulerValidationException from data_clean.statistic.data_clean_statistic import ruler_error, error from loguru import logger as log from data_clean.utils.date_utils import get_now_datetime async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id): """ 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析 :param ex: :param tn: :param data: :param session_id: :return: """ col = mongo_table_prefix + "ruler_valid_error" doc = { "ruler_code": ex.ruler_code, "tn": tn, "session_id": session_id, "exception": str(ex), "create_time": get_now_datetime(), "data": data } await insert_one(col, doc) pass async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id): """ 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。 :param ex: :param tn: :param data: :param session_id: :return: """ col_pre = mongo_table_prefix + "fetch_error" doc = { "tn": tn, "session_id": session_id, "data": data, "create_time": get_now_datetime(), "exception": str(ex), } await insert_one(col_pre, doc) pass async def error_sink(ex: Exception, tn: str, data, session_id): """ 未知异常写出,出现该异常需要手动介入查看原因。 :param ex: :param tn: :param data: :param session_id: :return: """ col_pre = mongo_table_prefix + "error" doc = { "tn": tn, "session_id": session_id, "create_time": get_now_datetime(), "exception": str(ex), "data": data, "traceback": "".join(traceback.format_exception(ex)) } await insert_one(col_pre, doc) pass def _handle(content, tn, session_id, ex: Exception, original_data): content_type = 0 if not isinstance(content, list): content = [content] content_type = 1 content = { "data": { tn: content }, } if isinstance(ex, RulerValidationException): ruler_error( data=content, session_id=session_id, ex=ex, content_type=content_type, tn=tn , original_data=original_data ) else: error(data=content, session_id=session_id, ex=ex, content_type=content_type, tn=tn , original_data=original_data ) pass return content pass def exception_handle(func): async def wrapper(self, *args, **kwargs): # tn = pascal_case_to_snake_case(self.__class__.__name__) tn = self._name session_id = args[1] original_data = args[2] result = None try: result = await func(self, *args, **kwargs) except RulerValidationException as ex: log.warning("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex) data = args[0] data = _handle(data, tn, session_id, ex, original_data) await ruler_valid_exception_sink(ex, tn, data, session_id) except FetchException as ex: log.error("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex) data = args[0] data = _handle(data, tn, session_id, ex, original_data) await fetch_exception_sink(ex, tn, data, session_id) except Exception as ex: log.error("session_id: {} 维度:{} ,出现未知异常:{}", session_id, tn, repr(ex)) data = args[0] data = _handle(data, tn, session_id, ex, original_data) await error_sink(ex, tn, data, session_id) return result pass return update_wrapper(wrapper, func) if __name__ == '__main__': pass