123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- # -*- coding: utf-8 -*-
- # @Time : 2023/7/20 17:04
- # @Author : XuJiakai
- # @File : task_distributor
- # @Software: PyCharm
- import os
- import uuid
- import copy
- from data_clean.dim_handle_registry import get_class_dict
- from data_clean.statistic.data_clean_statistic import success
- from data_clean.utils import get_log
- scan_path = os.path.join(os.path.dirname(__file__), 'handle')
- file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")]
- log = get_log("task_distribute")
- # class_dict = {}
- for tn in file_name_list:
- tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn))
- # class_dict[tn] = tmp.dim_handle
- class_dict = get_class_dict()
- async def task_distribute(data: dict):
- """
- 该函数用于维度的分发处理
- :param data:
- :return:
- """
- session_id = str(uuid.uuid4())
- tmp_data = data['data']
- original_data = copy.deepcopy(data)
- for key in set(tmp_data.keys()):
- if key in class_dict:
- result_data = await class_dict[key].execute_dim(tmp_data[key], session_id, original_data)
- if result_data is None or len(result_data) == 0:
- del tmp_data[key]
- else:
- tmp_data[key] = result_data
- else:
- # raise ValueError(f"{key} 维度未实现!")
- log.warn(f'{key}维度未实现!直接发送...')
- pass
- if len(tmp_data) == 0:
- return None
- else:
- success(data, session_id, original_data)
- pass
- return data
- if __name__ == '__main__':
- print(class_dict)
- pass
|