task_distributor.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 17:04
  3. # @Author : XuJiakai
  4. # @File : task_distributor
  5. # @Software: PyCharm
  6. import os
  7. import uuid
  8. import copy
  9. from data_clean.dim_handle_registry import get_class_dict
  10. from data_clean.statistic.data_clean_statistic import success
  11. from data_clean.utils import get_log
  12. scan_path = os.path.join(os.path.dirname(__file__), 'handle')
  13. file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")]
  14. log = get_log("task_distribute")
  15. # class_dict = {}
  16. for tn in file_name_list:
  17. tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn))
  18. # class_dict[tn] = tmp.dim_handle
  19. class_dict = get_class_dict()
  20. async def task_distribute(data: dict):
  21. """
  22. 该函数用于维度的分发处理
  23. :param data:
  24. :return:
  25. """
  26. session_id = str(uuid.uuid4())
  27. tmp_data = data['data']
  28. original_data = copy.deepcopy(data)
  29. for key in set(tmp_data.keys()):
  30. if key in class_dict:
  31. result_data = await class_dict[key].execute_dim(tmp_data[key], session_id, original_data)
  32. if result_data is None or len(result_data) == 0:
  33. del tmp_data[key]
  34. else:
  35. tmp_data[key] = result_data
  36. else:
  37. # raise ValueError(f"{key} 维度未实现!")
  38. log.warn(f'{key}维度未实现!直接发送...')
  39. pass
  40. if len(tmp_data) == 0:
  41. return None
  42. else:
  43. success(data, session_id, original_data)
  44. pass
  45. return data
  46. if __name__ == '__main__':
  47. print(class_dict)
  48. pass