exception_handle.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. from functools import update_wrapper
  7. from data_clean.api.mongo_api import insert_one
  8. from data_clean.exception.fetch_exception import FetchException
  9. from data_clean.exception.ruler_validation_exception import RulerValidationException
  10. from data_clean.utils import get_log
  11. from data_clean.utils.date_utils import get_now_datetime
  12. log = get_log("exception_handler")
  13. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list):
  14. """
  15. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  16. :param ex:
  17. :param tn:
  18. :param data:
  19. :return:
  20. """
  21. col = "a_data_clean_ruler_valid_error"
  22. doc = {
  23. "ruler_code": ex.ruler_code,
  24. "tn": tn,
  25. "exception": str(ex),
  26. "create_time": get_now_datetime(),
  27. "data": data
  28. }
  29. await insert_one(col, doc)
  30. pass
  31. async def fetch_exception_sink(ex: FetchException, tn: str, data: list):
  32. """
  33. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  34. :param ex:
  35. :param tn:
  36. :param data:
  37. :return:
  38. """
  39. col_pre = "a_data_clean_fetch_error"
  40. doc = {
  41. "tn": tn,
  42. "data": data,
  43. "create_time": get_now_datetime(),
  44. "exception": str(ex),
  45. }
  46. await insert_one(col_pre, doc)
  47. pass
  48. async def error_sink(ex: Exception, tn: str, data: list):
  49. """
  50. 未知异常写出,出现该异常需要手动介入查看原因。
  51. :param ex:
  52. :param tn:
  53. :param data:
  54. :return:
  55. """
  56. col_pre = f"a_data_clean_error"
  57. doc = {
  58. "tn": tn,
  59. "create_time": get_now_datetime(),
  60. "exception": repr(ex),
  61. "data": data
  62. }
  63. await insert_one(col_pre, doc)
  64. pass
  65. def exception_handle(func):
  66. async def wrapper(self, *args, **kwargs):
  67. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  68. tn = self._name
  69. result = None
  70. try:
  71. result = await func(self, *args, **kwargs)
  72. except RulerValidationException as ex:
  73. log.warn("维度:%s ,detail: %s", tn, ex)
  74. data = args[0]
  75. if not isinstance(data, list):
  76. data = [data]
  77. await ruler_valid_exception_sink(ex, tn, data)
  78. except FetchException as ex:
  79. log.error("维度:%s ,detail: %s", tn, ex)
  80. data = args[0]
  81. if not isinstance(data, list):
  82. data = [data]
  83. await fetch_exception_sink(ex, tn, data)
  84. except Exception as ex:
  85. log.error("维度:%s ,出现未知异常:%s", tn, repr(ex))
  86. data = args[0]
  87. if not isinstance(data, list):
  88. data = [data]
  89. await error_sink(ex, tn, data)
  90. return result
  91. pass
  92. return update_wrapper(wrapper, func)
  93. if __name__ == '__main__':
  94. pass