exception_handle.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/21 8:43
  3. # @Author : XuJiakai
  4. # @File : exception_handle
  5. # @Software: PyCharm
  6. from data_clean.api.mongo_api import insert_one
  7. from data_clean.exception.fetch_exception import FetchException
  8. from data_clean.exception.ruler_validation_exception import RulerValidationException
  9. from data_clean.utils import get_log
  10. log = get_log("exception_handler")
  11. async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list):
  12. """
  13. 该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
  14. :param ex:
  15. :param tn:
  16. :param data:
  17. :return:
  18. """
  19. col = "a_data_clean_ruler_valid_exception"
  20. doc = {
  21. "tn": tn,
  22. "exception": str(ex),
  23. "data": data
  24. }
  25. await insert_one(col, doc)
  26. pass
  27. async def fetch_exception_sink(ex: FetchException, tn: str, data: list):
  28. """
  29. 拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
  30. :param ex:
  31. :param tn:
  32. :param data:
  33. :return:
  34. """
  35. col_pre = "a_data_clean_fetch_exception"
  36. doc = {
  37. "tn": tn,
  38. "data": data,
  39. "exception": str(ex),
  40. }
  41. await insert_one(col_pre, doc)
  42. pass
  43. async def error_sink(ex: Exception, tn: str, data: list):
  44. """
  45. 未知异常写出,出现该异常需要手动介入查看原因。
  46. :param ex:
  47. :param tn:
  48. :param data:
  49. :return:
  50. """
  51. col_pre = f"a_data_clean_error"
  52. col_pre += "_dim" if isinstance(data, list) else "_record"
  53. doc = {
  54. "tn": tn,
  55. "exception": repr(ex),
  56. "data": data
  57. }
  58. await insert_one(col_pre, doc)
  59. pass
  60. def exception_handle(func):
  61. async def wrapper(self, *args, **kwargs):
  62. # tn = pascal_case_to_snake_case(self.__class__.__name__)
  63. tn = self._name
  64. result = None
  65. try:
  66. result = await func(self, *args, **kwargs)
  67. except RulerValidationException as ex:
  68. log.warn("%s", ex)
  69. data = args[0]
  70. if not isinstance(data, list):
  71. data = [data]
  72. await ruler_valid_exception_sink(ex, tn, data)
  73. except FetchException as ex:
  74. log.error("%s", ex)
  75. data = args[0]
  76. if not isinstance(data, list):
  77. data = [data]
  78. await fetch_exception_sink(ex, tn, data)
  79. except Exception as ex:
  80. log.error("出现未知异常:%s", repr(ex))
  81. data = args[0]
  82. if not isinstance(data, list):
  83. data = [data]
  84. await error_sink(ex, tn, data)
  85. return result
  86. pass
  87. return wrapper
  88. if __name__ == '__main__':
  89. pass