exception_handle.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. import traceback
  7. from functools import update_wrapper
  8. from data_clean.api.mongo_api import insert_one
  9. from data_clean.env.const import mongo_table_prefix
  10. from data_clean.exception.fetch_exception import FetchException
  11. from data_clean.exception.ruler_validation_exception import RulerValidationException
  12. from data_clean.statistic.data_clean_statistic import ruler_error, error
  13. from loguru import logger as log
  14. from data_clean.utils.date_utils import get_now_datetime
  15. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
  16. """
  17. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  18. :param ex:
  19. :param tn:
  20. :param data:
  21. :param session_id:
  22. :return:
  23. """
  24. col = mongo_table_prefix + "ruler_valid_error"
  25. doc = {
  26. "ruler_code": ex.ruler_code,
  27. "tn": tn,
  28. "session_id": session_id,
  29. "exception": str(ex),
  30. "create_time": get_now_datetime(),
  31. "data": data
  32. }
  33. await insert_one(col, doc)
  34. pass
  35. async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
  36. """
  37. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  38. :param ex:
  39. :param tn:
  40. :param data:
  41. :param session_id:
  42. :return:
  43. """
  44. col_pre = mongo_table_prefix + "fetch_error"
  45. doc = {
  46. "tn": tn,
  47. "session_id": session_id,
  48. "data": data,
  49. "create_time": get_now_datetime(),
  50. "exception": str(ex),
  51. }
  52. await insert_one(col_pre, doc)
  53. pass
  54. async def error_sink(ex: Exception, tn: str, data, session_id):
  55. """
  56. 未知异常写出,出现该异常需要手动介入查看原因。
  57. :param ex:
  58. :param tn:
  59. :param data:
  60. :param session_id:
  61. :return:
  62. """
  63. col_pre = mongo_table_prefix + "error"
  64. doc = {
  65. "tn": tn,
  66. "session_id": session_id,
  67. "create_time": get_now_datetime(),
  68. "exception": str(ex),
  69. "data": data,
  70. "traceback": "".join(traceback.format_exception(ex))
  71. }
  72. await insert_one(col_pre, doc)
  73. pass
  74. def _handle(content, tn, session_id, ex: Exception, original_data):
  75. content_type = 0
  76. if not isinstance(content, list):
  77. content = [content]
  78. content_type = 1
  79. content = {
  80. "data": {
  81. tn: content
  82. },
  83. }
  84. if isinstance(ex, RulerValidationException):
  85. ruler_error(
  86. data=content,
  87. session_id=session_id,
  88. ex=ex,
  89. content_type=content_type,
  90. tn=tn
  91. , original_data=original_data
  92. )
  93. else:
  94. error(data=content,
  95. session_id=session_id,
  96. ex=ex,
  97. content_type=content_type,
  98. tn=tn
  99. , original_data=original_data
  100. )
  101. pass
  102. return content
  103. pass
  104. def exception_handle(func):
  105. async def wrapper(self, *args, **kwargs):
  106. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  107. tn = self._name
  108. session_id = args[1]
  109. original_data = args[2]
  110. result = None
  111. try:
  112. result = await func(self, *args, **kwargs)
  113. except RulerValidationException as ex:
  114. log.warning("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex)
  115. data = args[0]
  116. data = _handle(data, tn, session_id, ex, original_data)
  117. await ruler_valid_exception_sink(ex, tn, data, session_id)
  118. except FetchException as ex:
  119. log.error("session_id: {} 维度:{} ,detail: {}", session_id, tn, ex)
  120. data = args[0]
  121. data = _handle(data, tn, session_id, ex, original_data)
  122. await fetch_exception_sink(ex, tn, data, session_id)
  123. except Exception as ex:
  124. log.error("session_id: {} 维度:{} ,出现未知异常:{}", session_id, tn, repr(ex))
  125. data = args[0]
  126. data = _handle(data, tn, session_id, ex, original_data)
  127. await error_sink(ex, tn, data, session_id)
  128. return result
  129. pass
  130. return update_wrapper(wrapper, func)
  131. if __name__ == '__main__':
  132. pass