task_distributor.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # -*- coding: utf-8 -*-
  2. # @Time : 2023/7/20 17:04
  3. # @Author : XuJiakai
  4. # @File : task_distributor
  5. # @Software: PyCharm
  6. import os
  7. scan_path = os.path.join(os.path.dirname(__file__), 'handle')
  8. file_name_list = [file_name[:-3] for file_name in os.listdir(scan_path) if not file_name.startswith("__")]
  9. class_dict = {}
  10. for tn in file_name_list:
  11. tmp = __import__(f"data_clean.handle.{tn}", fromlist=(tn))
  12. class_dict[tn] = tmp.dim_handle
  13. async def task_distribute(data: dict):
  14. """
  15. 该函数用于维度的分发处理
  16. :param data:
  17. :return:
  18. """
  19. tmp_data = data['data']
  20. for key in set(tmp_data.keys()):
  21. if key in class_dict:
  22. result_data = await class_dict[key].execute_dim(tmp_data[key])
  23. if len(result_data) == 0:
  24. del tmp_data[key]
  25. else:
  26. tmp_data[key] = result_data
  27. else:
  28. raise ValueError(f"{key} 维度未实现!")
  29. if len(tmp_data) == 0:
  30. return None
  31. return data
  32. if __name__ == '__main__':
  33. print(class_dict)
  34. pass