|
@@ -16,7 +16,7 @@ from data_clean.utils.date_utils import get_now_datetime
|
|
|
log = get_log("exception_handler")
|
|
|
|
|
|
|
|
|
-async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data: list, session_id):
|
|
|
+async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data, session_id):
|
|
|
"""
|
|
|
该异常为业务逻辑异常,出现该异常往往是上游数据源的异常,需要通知到上游数据源重新解析
|
|
|
:param ex:
|
|
@@ -37,7 +37,7 @@ async def ruler_valid_exception_sink(ex: RulerValidationException, tn: str, data
|
|
|
pass
|
|
|
|
|
|
|
|
|
-async def fetch_exception_sink(ex: FetchException, tn: str, data: list, session_id):
|
|
|
+async def fetch_exception_sink(ex: FetchException, tn: str, data, session_id):
|
|
|
"""
|
|
|
拉取数据异常,出现该异常一般为网络异常,无需手动介入,直接读取转换为正确格式重新发回队列。
|
|
|
:param ex:
|
|
@@ -58,7 +58,7 @@ async def fetch_exception_sink(ex: FetchException, tn: str, data: list, session_
|
|
|
pass
|
|
|
|
|
|
|
|
|
-async def error_sink(ex: Exception, tn: str, data: list, session_id):
|
|
|
+async def error_sink(ex: Exception, tn: str, data, session_id):
|
|
|
"""
|
|
|
未知异常写出,出现该异常需要手动介入查看原因。
|
|
|
:param ex:
|
|
@@ -80,30 +80,30 @@ async def error_sink(ex: Exception, tn: str, data: list, session_id):
|
|
|
pass
|
|
|
|
|
|
|
|
|
-def _handle(data, tn, session_id, ex: Exception, original_data):
|
|
|
+def _handle(content, tn, session_id, ex: Exception, original_data):
|
|
|
content_type = 0
|
|
|
- if not isinstance(data, list):
|
|
|
- data = [data]
|
|
|
+ if not isinstance(content, list):
|
|
|
+ content = [content]
|
|
|
content_type = 1
|
|
|
|
|
|
- data = {
|
|
|
+ content = {
|
|
|
"data": {
|
|
|
- tn: data
|
|
|
+ tn: content
|
|
|
},
|
|
|
|
|
|
}
|
|
|
|
|
|
if isinstance(ex, RulerValidationException):
|
|
|
ruler_error(
|
|
|
- data=data,
|
|
|
+ data=content,
|
|
|
session_id=session_id,
|
|
|
ex=ex,
|
|
|
content_type=content_type,
|
|
|
tn=tn
|
|
|
- ,original_data=original_data
|
|
|
+ , original_data=original_data
|
|
|
)
|
|
|
else:
|
|
|
- error(data=data,
|
|
|
+ error(data=content,
|
|
|
session_id=session_id,
|
|
|
ex=ex,
|
|
|
content_type=content_type,
|
|
@@ -111,6 +111,7 @@ def _handle(data, tn, session_id, ex: Exception, original_data):
|
|
|
, original_data=original_data
|
|
|
)
|
|
|
pass
|
|
|
+ return content
|
|
|
pass
|
|
|
|
|
|
|
|
@@ -126,17 +127,17 @@ def exception_handle(func):
|
|
|
except RulerValidationException as ex:
|
|
|
log.warn("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
|
|
|
data = args[0]
|
|
|
- _handle(data, tn, session_id, ex, original_data)
|
|
|
+ data = _handle(data, tn, session_id, ex, original_data)
|
|
|
await ruler_valid_exception_sink(ex, tn, data, session_id)
|
|
|
except FetchException as ex:
|
|
|
log.error("session: %s 维度:%s ,detail: %s", session_id, tn, ex)
|
|
|
data = args[0]
|
|
|
- _handle(data, tn, session_id, ex, original_data)
|
|
|
+ data = _handle(data, tn, session_id, ex, original_data)
|
|
|
await fetch_exception_sink(ex, tn, data, session_id)
|
|
|
except Exception as ex:
|
|
|
log.error("session: %s 维度:%s ,出现未知异常:%s", session_id, tn, repr(ex))
|
|
|
data = args[0]
|
|
|
- _handle(data, tn, session_id, ex, original_data)
|
|
|
+ data = _handle(data, tn, session_id, ex, original_data)
|
|
|
await error_sink(ex, tn, data, session_id)
|
|
|
|
|
|
return result
|