exception_handle.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. import traceback
  7. from functools import update_wrapper
  8. from data_clean.api.mongo_api import insert_one
  9. from data_clean.exception.fetch_exception import FetchException
  10. from data_clean.exception.ruler_validation_exception import RulerValidationException
  11. from data_clean.statistic.data_clean_statistic import ruler_error, error
  12. from data_clean.utils import get_log
  13. from data_clean.utils.date_utils import get_now_datetime
  14. log = get_log("exception_handler")
  15. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list, session_id):
  16. """
  17. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  18. :param ex:
  19. :param tn:
  20. :param data:
  21. :return:
  22. """
  23. col = "a_data_clean_ruler_valid_error"
  24. doc = {
  25. "ruler_code": ex.ruler_code,
  26. "tn": tn,
  27. "session_id": session_id,
  28. "exception": str(ex),
  29. "create_time": get_now_datetime(),
  30. "data": data
  31. }
  32. await insert_one(col, doc)
  33. pass
  34. async def fetch_exception_sink(ex: FetchException, tn: str, data: list, session_id):
  35. """
  36. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  37. :param ex:
  38. :param tn:
  39. :param data:
  40. :return:
  41. """
  42. col_pre = "a_data_clean_fetch_error"
  43. doc = {
  44. "tn": tn,
  45. "session_id": session_id,
  46. "data": data,
  47. "create_time": get_now_datetime(),
  48. "exception": str(ex),
  49. }
  50. await insert_one(col_pre, doc)
  51. pass
  52. async def error_sink(ex: Exception, tn: str, data: list, session_id):
  53. """
  54. 未知异常写出,出现该异常需要手动介入查看原因。
  55. :param ex:
  56. :param tn:
  57. :param data:
  58. :return:
  59. """
  60. col_pre = f"a_data_clean_error"
  61. doc = {
  62. "tn": tn,
  63. "session_id": session_id,
  64. "create_time": get_now_datetime(),
  65. "exception": str(ex),
  66. "data": data,
  67. "traceback": "".join(traceback.format_exception(ex))
  68. }
  69. await insert_one(col_pre, doc)
  70. pass
  71. def _handle(data, tn, session_id, ex: Exception):
  72. content_type = 0
  73. if not isinstance(data, list):
  74. data = [data]
  75. content_type = 1
  76. data = {
  77. "data": {
  78. tn: data
  79. },
  80. }
  81. if isinstance(ex, RulerValidationException):
  82. ruler_error(
  83. data=data,
  84. session_id=session_id,
  85. ex=ex,
  86. content_type=content_type,
  87. tn=tn
  88. )
  89. else:
  90. error(data=data,
  91. session_id=session_id,
  92. ex=ex,
  93. content_type=content_type,
  94. tn=tn
  95. )
  96. pass
  97. pass
  98. def exception_handle(func):
  99. async def wrapper(self, *args, **kwargs):
  100. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  101. tn = self._name
  102. session_id = args[1]
  103. result = None
  104. try:
  105. result = await func(self, *args, **kwargs)
  106. except RulerValidationException as ex:
  107. log.warn("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
  108. data = args[0]
  109. _handle(data, tn, session_id, ex)
  110. await ruler_valid_exception_sink(ex, tn, data, session_id)
  111. except FetchException as ex:
  112. log.error("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
  113. data = args[0]
  114. _handle(data, tn, session_id, ex)
  115. await fetch_exception_sink(ex, tn, data, session_id)
  116. except Exception as ex:
  117. log.error("session: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
  118. data = args[0]
  119. _handle(data, tn, session_id, ex)
  120. await error_sink(ex, tn, data, session_id)
  121. return result
  122. pass
  123. return update_wrapper(wrapper, func)
  124. if __name__ == '__main__':
  125. pass