exception_handle.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. import traceback
  7. from functools import update_wrapper
  8. from data_clean.api.mongo_api import insert_one
  9. from data_clean.exception.fetch_exception import FetchException
  10. from data_clean.exception.ruler_validation_exception import RulerValidationException
  11. from data_clean.statistic.data_clean_statistic import ruler_error, error
  12. from data_clean.utils import get_log
  13. from data_clean.utils.date_utils import get_now_datetime
  14. from data_clean.utils.base_utils import is_windows
  15. log = get_log("exception_handler")
  16. mongo_table_prefix = 'a_data_clean_' if not is_windows() else 'a_test_data_clean_'
  17. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
  18. """
  19. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  20. :param ex:
  21. :param tn:
  22. :param data:
  23. :param session_id:
  24. :return:
  25. """
  26. col = mongo_table_prefix + "ruler_valid_error"
  27. doc = {
  28. "ruler_code": ex.ruler_code,
  29. "tn": tn,
  30. "session_id": session_id,
  31. "exception": str(ex),
  32. "create_time": get_now_datetime(),
  33. "data": data
  34. }
  35. await insert_one(col, doc)
  36. pass
  37. async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
  38. """
  39. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  40. :param ex:
  41. :param tn:
  42. :param data:
  43. :param session_id:
  44. :return:
  45. """
  46. col_pre = mongo_table_prefix + "fetch_error"
  47. doc = {
  48. "tn": tn,
  49. "session_id": session_id,
  50. "data": data,
  51. "create_time": get_now_datetime(),
  52. "exception": str(ex),
  53. }
  54. await insert_one(col_pre, doc)
  55. pass
  56. async def error_sink(ex: Exception, tn: str, data, session_id):
  57. """
  58. 未知异常写出,出现该异常需要手动介入查看原因。
  59. :param ex:
  60. :param tn:
  61. :param data:
  62. :param session_id:
  63. :return:
  64. """
  65. col_pre = mongo_table_prefix + "error"
  66. doc = {
  67. "tn": tn,
  68. "session_id": session_id,
  69. "create_time": get_now_datetime(),
  70. "exception": str(ex),
  71. "data": data,
  72. "traceback": "".join(traceback.format_exception(ex))
  73. }
  74. await insert_one(col_pre, doc)
  75. pass
  76. def _handle(content, tn, session_id, ex: Exception, original_data):
  77. content_type = 0
  78. if not isinstance(content, list):
  79. content = [content]
  80. content_type = 1
  81. content = {
  82. "data": {
  83. tn: content
  84. },
  85. }
  86. if isinstance(ex, RulerValidationException):
  87. ruler_error(
  88. data=content,
  89. session_id=session_id,
  90. ex=ex,
  91. content_type=content_type,
  92. tn=tn
  93. , original_data=original_data
  94. )
  95. else:
  96. error(data=content,
  97. session_id=session_id,
  98. ex=ex,
  99. content_type=content_type,
  100. tn=tn
  101. , original_data=original_data
  102. )
  103. pass
  104. return content
  105. pass
  106. def exception_handle(func):
  107. async def wrapper(self, *args, **kwargs):
  108. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  109. tn = self._name
  110. session_id = args[1]
  111. original_data = args[2]
  112. result = None
  113. try:
  114. result = await func(self, *args, **kwargs)
  115. except RulerValidationException as ex:
  116. log.warn("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
  117. data = args[0]
  118. data = _handle(data, tn, session_id, ex, original_data)
  119. await ruler_valid_exception_sink(ex, tn, data, session_id)
  120. except FetchException as ex:
  121. log.error("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
  122. data = args[0]
  123. data = _handle(data, tn, session_id, ex, original_data)
  124. await fetch_exception_sink(ex, tn, data, session_id)
  125. except Exception as ex:
  126. log.error("session_id: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
  127. data = args[0]
  128. data = _handle(data, tn, session_id, ex, original_data)
  129. await error_sink(ex, tn, data, session_id)
  130. return result
  131. pass
  132. return update_wrapper(wrapper, func)
  133. if __name__ == '__main__':
  134. pass