# -*- coding: utf-8 -*- # @Time : 2023/7/20 17:04 # @Author : XuJiakai # @File : task_distributor # @Software: PyCharm import os import uuid import copy from data_clean.dim_handle_registry import get_class_dict from data_clean.statistic.data_clean_statistic import success from loguru import logger as log scan_path = os.path.join(os.path.dirname(__file__), 'handle') file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")] # class_dict = {} for tn in file_name_list: tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn)) # class_dict[tn] = tmp.dim_handle class_dict = get_class_dict() async def task_distribute(data: dict): """ 该函数用于维度的分发处理 :param data: :return: """ session_id = str(uuid.uuid4()) tmp_data = data['data'] original_data = copy.deepcopy(data) for key in set(tmp_data.keys()): if key in class_dict: result_data = await class_dict[key].execute_dim(tmp_data[key], session_id, original_data) if result_data is None or len(result_data) == 0: del tmp_data[key] else: tmp_data[key] = result_data else: # raise ValueError(f"{key} 维度未实现!") log.warning(f'{key}维度未实现!直接发送...') pass if len(tmp_data) == 0: return None else: success(data, session_id, original_data) pass return data if __name__ == '__main__': print(class_dict) pass