exception_handle.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. import traceback
  7. from functools import update_wrapper
  8. from data_clean.api.mongo_api import insert_one
  9. from data_clean.env.const import mongo_table_prefix
  10. from data_clean.exception.fetch_exception import FetchException
  11. from data_clean.exception.ruler_validation_exception import RulerValidationException
  12. from data_clean.statistic.data_clean_statistic import ruler_error, error
  13. from data_clean.utils import get_log
  14. from data_clean.utils.date_utils import get_now_datetime
  15. log = get_log("exception_handler")
  16. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
  17. """
  18. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  19. :param ex:
  20. :param tn:
  21. :param data:
  22. :param session_id:
  23. :return:
  24. """
  25. col = mongo_table_prefix + "ruler_valid_error"
  26. doc = {
  27. "ruler_code": ex.ruler_code,
  28. "tn": tn,
  29. "session_id": session_id,
  30. "exception": str(ex),
  31. "create_time": get_now_datetime(),
  32. "data": data
  33. }
  34. await insert_one(col, doc)
  35. pass
  36. async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
  37. """
  38. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  39. :param ex:
  40. :param tn:
  41. :param data:
  42. :param session_id:
  43. :return:
  44. """
  45. col_pre = mongo_table_prefix + "fetch_error"
  46. doc = {
  47. "tn": tn,
  48. "session_id": session_id,
  49. "data": data,
  50. "create_time": get_now_datetime(),
  51. "exception": str(ex),
  52. }
  53. await insert_one(col_pre, doc)
  54. pass
  55. async def error_sink(ex: Exception, tn: str, data, session_id):
  56. """
  57. 未知异常写出,出现该异常需要手动介入查看原因。
  58. :param ex:
  59. :param tn:
  60. :param data:
  61. :param session_id:
  62. :return:
  63. """
  64. col_pre = mongo_table_prefix + "error"
  65. doc = {
  66. "tn": tn,
  67. "session_id": session_id,
  68. "create_time": get_now_datetime(),
  69. "exception": str(ex),
  70. "data": data,
  71. "traceback": "".join(traceback.format_exception(ex))
  72. }
  73. await insert_one(col_pre, doc)
  74. pass
  75. def _handle(content, tn, session_id, ex: Exception, original_data):
  76. content_type = 0
  77. if not isinstance(content, list):
  78. content = [content]
  79. content_type = 1
  80. content = {
  81. "data": {
  82. tn: content
  83. },
  84. }
  85. if isinstance(ex, RulerValidationException):
  86. ruler_error(
  87. data=content,
  88. session_id=session_id,
  89. ex=ex,
  90. content_type=content_type,
  91. tn=tn
  92. , original_data=original_data
  93. )
  94. else:
  95. error(data=content,
  96. session_id=session_id,
  97. ex=ex,
  98. content_type=content_type,
  99. tn=tn
  100. , original_data=original_data
  101. )
  102. pass
  103. return content
  104. pass
  105. def exception_handle(func):
  106. async def wrapper(self, *args, **kwargs):
  107. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  108. tn = self._name
  109. session_id = args[1]
  110. original_data = args[2]
  111. result = None
  112. try:
  113. result = await func(self, *args, **kwargs)
  114. except RulerValidationException as ex:
  115. log.warn("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
  116. data = args[0]
  117. data = _handle(data, tn, session_id, ex, original_data)
  118. await ruler_valid_exception_sink(ex, tn, data, session_id)
  119. except FetchException as ex:
  120. log.error("session_id: %s 维度:%s ,detail: %s", session_id, tn, ex)
  121. data = args[0]
  122. data = _handle(data, tn, session_id, ex, original_data)
  123. await fetch_exception_sink(ex, tn, data, session_id)
  124. except Exception as ex:
  125. log.error("session_id: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
  126. data = args[0]
  127. data = _handle(data, tn, session_id, ex, original_data)
  128. await error_sink(ex, tn, data, session_id)
  129. return result
  130. pass
  131. return update_wrapper(wrapper, func)
  132. if __name__ == '__main__':
  133. pass