exception_handle.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. from functools import update_wrapper
  7. from data_clean.api.mongo_api import insert_one
  8. from data_clean.exception.fetch_exception import FetchException
  9. from data_clean.exception.ruler_validation_exception import RulerValidationException
  10. from data_clean.utils import get_log
  11. log = get_log("exception_handler")
  12. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list):
  13. """
  14. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  15. :param ex:
  16. :param tn:
  17. :param data:
  18. :return:
  19. """
  20. col = "a_data_clean_ruler_valid_error"
  21. doc = {
  22. "ruler_code": ex.ruler_code,
  23. "tn": tn,
  24. "exception": str(ex),
  25. "data": data
  26. }
  27. await insert_one(col, doc)
  28. pass
  29. async def fetch_exception_sink(ex: FetchException, tn: str, data: list):
  30. """
  31. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  32. :param ex:
  33. :param tn:
  34. :param data:
  35. :return:
  36. """
  37. col_pre = "a_data_clean_fetch_error"
  38. doc = {
  39. "tn": tn,
  40. "data": data,
  41. "exception": str(ex),
  42. }
  43. await insert_one(col_pre, doc)
  44. pass
  45. async def error_sink(ex: Exception, tn: str, data: list):
  46. """
  47. 未知异常写出,出现该异常需要手动介入查看原因。
  48. :param ex:
  49. :param tn:
  50. :param data:
  51. :return:
  52. """
  53. col_pre = f"a_data_clean_error"
  54. doc = {
  55. "tn": tn,
  56. "exception": repr(ex),
  57. "data": data
  58. }
  59. await insert_one(col_pre, doc)
  60. pass
  61. def exception_handle(func):
  62. async def wrapper(self, *args, **kwargs):
  63. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  64. tn = self._name
  65. result = None
  66. try:
  67. result = await func(self, *args, **kwargs)
  68. except RulerValidationException as ex:
  69. log.warn("维度:%s ,detail: %s", tn, ex)
  70. data = args[0]
  71. if not isinstance(data, list):
  72. data = [data]
  73. await ruler_valid_exception_sink(ex, tn, data)
  74. except FetchException as ex:
  75. log.error("维度:%s ,detail: %s", tn, ex)
  76. data = args[0]
  77. if not isinstance(data, list):
  78. data = [data]
  79. await fetch_exception_sink(ex, tn, data)
  80. except Exception as ex:
  81. log.error("维度:%s ,出现未知异常:%s", tn, repr(ex))
  82. data = args[0]
  83. if not isinstance(data, list):
  84. data = [data]
  85. await error_sink(ex, tn, data)
  86. return result
  87. pass
  88. return update_wrapper(wrapper, func)
  89. if __name__ == '__main__':
  90. pass