# -*- coding: utf-8 -*- # @Time : 2023/7/21 8:43 # @Author : XuJiakai # @File : exception_handle # @Software: PyCharm from functools import update_wrapper from data_clean.api.mongo_api import insert_one from data_clean.exception.fetch_exception import FetchException from data_clean.exception.ruler_validation_exception import RulerValidationException from data_clean.utils import get_log log = get_log("exception_handler") async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list): """ 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析 :param ex: :param tn: :param data: :return: """ col = "a_data_clean_ruler_valid_error" doc = { "ruler_code": ex.ruler_code, "tn": tn, "exception": str(ex), "data": data } await insert_one(col, doc) pass async def fetch_exception_sink(ex: FetchException, tn: str, data: list): """ 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。 :param ex: :param tn: :param data: :return: """ col_pre = "a_data_clean_fetch_error" doc = { "tn": tn, "data": data, "exception": str(ex), } await insert_one(col_pre, doc) pass async def error_sink(ex: Exception, tn: str, data: list): """ 未知异常写出,出现该异常需要手动介入查看原因。 :param ex: :param tn: :param data: :return: """ col_pre = f"a_data_clean_error" doc = { "tn": tn, "exception": repr(ex), "data": data } await insert_one(col_pre, doc) pass def exception_handle(func): async def wrapper(self, *args, **kwargs): # tn = pascal_case_to_snake_case(self.__class__.__name__) tn = self._name result = None try: result = await func(self, *args, **kwargs) except RulerValidationException as ex: log.warn("维度:%s ,detail: %s", tn, ex) data = args[0] if not isinstance(data, list): data = [data] await ruler_valid_exception_sink(ex, tn, data) except FetchException as ex: log.error("维度:%s ,detail: %s", tn, ex) data = args[0] if not isinstance(data, list): data = [data] await fetch_exception_sink(ex, tn, data) except Exception as ex: log.error("维度:%s ,出现未知异常:%s", tn, repr(ex)) data = args[0] if not isinstance(data, list): data = [data] await error_sink(ex, tn, data) return result pass return update_wrapper(wrapper, func) if __name__ == '__main__': pass