|
@@ -0,0 +1,323 @@
|
|
|
+package com.winhc.bigdata.task.service.impl;
|
|
|
+
|
|
|
+import cn.hutool.core.lang.Assert;
|
|
|
+import com.winhc.bigdata.task.configuration.ProjectParamInit;
|
|
|
+import com.winhc.bigdata.task.entity.MaxComputeDetailOutput;
|
|
|
+import com.winhc.bigdata.task.entity.PullDataLog;
|
|
|
+import com.winhc.bigdata.task.entity.PullDataTask;
|
|
|
+import com.winhc.bigdata.task.enums.DataWorksTaskFlowEnum;
|
|
|
+import com.winhc.bigdata.task.enums.TaskStatusEnum;
|
|
|
+import com.winhc.bigdata.task.service.*;
|
|
|
+import com.winhc.bigdata.task.util.GenerateIdUtils;
|
|
|
+import com.winhc.bigdata.task.util.JsonUtils;
|
|
|
+import com.winhc.bigdata.task.vo.ResponseVo;
|
|
|
+import com.winhc.bigdata.task.vo.TablePartitionInfoVO;
|
|
|
+import com.winhc.bigdata.task.vo.dataworks.DataWorksFlowTask;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.springframework.context.ApplicationContext;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.annotation.PostConstruct;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * 2021/4/12 13:40
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+public class PullDataServiceImpl implements PullDataService {
|
|
|
+ private final ApplicationContext applicationContext;
|
|
|
+
|
|
|
+ private final MaxComputeDetailOutputService maxComputeDetailOutputService;
|
|
|
+ private final PullDataTaskService pullDataTaskService;
|
|
|
+ private final PullDataLogService pullDataLogService;
|
|
|
+ private final DataWorksService dataWorksService;
|
|
|
+ private final Map<String, DataWorksFlowTask> dataWorksFlowTaskMap;
|
|
|
+ private final DingTalkService dingTalkService;
|
|
|
+
|
|
|
+
|
|
|
+ @PostConstruct
|
|
|
+ public void init() {
|
|
|
+ PullDataLog pullDataLog = pullDataLogService.lambdaQuery().eq(PullDataLog::getPullStatus, TaskStatusEnum.导出中).one();
|
|
|
+ MaxComputeDetailOutput maxComputeDetailOutput = maxComputeDetailOutputService.lambdaQuery().eq(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.生成中).one();
|
|
|
+ if (pullDataLog != null && maxComputeDetailOutput != null) {
|
|
|
+ log.info("存在运行中的任务...");
|
|
|
+ applicationContext.getBean(PullDataService.class).queryWait(maxComputeDetailOutput, pullDataLog.getId(), maxComputeDetailOutput.getDagId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized ResponseVo pull() {
|
|
|
+ Integer count = pullDataLogService.lambdaQuery()
|
|
|
+ .eq(PullDataLog::getPullStatus, TaskStatusEnum.导出中)
|
|
|
+ .count();
|
|
|
+ if (count != 0) {
|
|
|
+ return ResponseVo.success(getExportDataStatus(), "已有数据在导出中,请稍后...");
|
|
|
+ }
|
|
|
+
|
|
|
+ PullDataLog we = pullDataLogService.lambdaQuery()
|
|
|
+ .eq(PullDataLog::getPullStatus, TaskStatusEnum.等待中)
|
|
|
+ .one();
|
|
|
+
|
|
|
+ if (we != null) {
|
|
|
+ return export(we.getId());
|
|
|
+ } else {
|
|
|
+ PullDataLog pullDataLog = PullDataLog.builder()
|
|
|
+ .pullBy("xjk")
|
|
|
+ .pullStatus(TaskStatusEnum.等待中)
|
|
|
+ .requestTime(new Date())
|
|
|
+ .build();
|
|
|
+ pullDataLogService.save(pullDataLog);
|
|
|
+ return export(pullDataLog.getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized Object callBack(List<TablePartitionInfoVO> tab) {
|
|
|
+ Assert.isFalse(tab.isEmpty(), "maxcompute回调数据为空!");
|
|
|
+
|
|
|
+ String taskId = tab.get(0).getTaskId();
|
|
|
+ PullDataTask byId = pullDataTaskService.getById(taskId);
|
|
|
+
|
|
|
+ Assert.notNull(byId, "回调的taskId: {} 不存在", taskId);
|
|
|
+
|
|
|
+ Assert.isFalse(TaskStatusEnum.导出完毕 == byId.getTaskStatus() || TaskStatusEnum.生成完毕 == byId.getTaskStatus(), "请勿重复回调");
|
|
|
+
|
|
|
+
|
|
|
+ pullDataTaskService.lambdaUpdate()
|
|
|
+ .eq(PullDataTask::getId, taskId)
|
|
|
+ .set(PullDataTask::getEndTime, new Date())
|
|
|
+ .set(PullDataTask::getTaskStatus, TaskStatusEnum.生成完毕)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ List<MaxComputeDetailOutput> collect = tab.stream().map(e ->
|
|
|
+ MaxComputeDetailOutput.builder()
|
|
|
+ .taskId(e.getTaskId())
|
|
|
+ .maxComputeTableName(e.getMaxComputeTableName())
|
|
|
+ .partitionName(e.getPartitionName())
|
|
|
+ .recordCount(e.getRecordCount())
|
|
|
+ .priorityLevel(e.getPriorityLevel())
|
|
|
+ .partitionTaskStatus(TaskStatusEnum.生成完毕).build()
|
|
|
+ ).collect(Collectors.toList());
|
|
|
+ maxComputeDetailOutputService.saveBatch(collect);
|
|
|
+
|
|
|
+ List<String> ps = tab.stream().map(TablePartitionInfoVO::getPartitionName).collect(Collectors.toList());
|
|
|
+
|
|
|
+ dingTalkService.info("数据生成完毕,共计生成{}个分区,分区列表:{}", tab.size(), JsonUtils.jsonObjToString(ps));
|
|
|
+
|
|
|
+ PullDataLog we = pullDataLogService.lambdaQuery()
|
|
|
+ .eq(PullDataLog::getPullStatus, TaskStatusEnum.等待中)
|
|
|
+ .one();
|
|
|
+ if (we != null) {
|
|
|
+ return export(we.getId());
|
|
|
+ } else {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private ResponseVo export(Long pullId) {
|
|
|
+ PullDataTask pullDataTask = pullDataTaskService
|
|
|
+ .lambdaQuery()
|
|
|
+ .eq(PullDataTask::getTaskStatus, TaskStatusEnum.生成完毕)
|
|
|
+ .one();
|
|
|
+
|
|
|
+ if (pullDataTask == null) {
|
|
|
+ PullDataTask one = pullDataTaskService
|
|
|
+ .lambdaQuery()
|
|
|
+ .eq(PullDataTask::getTaskStatus, TaskStatusEnum.生成中)
|
|
|
+ .one();
|
|
|
+
|
|
|
+ if (one == null) {
|
|
|
+ //重新生成数据
|
|
|
+ PullDataTask pullDataTask1 = generatingData();
|
|
|
+ return ResponseVo.success(pullDataTask1, "数据已经提交生成...");
|
|
|
+ } else {
|
|
|
+ //已有生成数据任务在队列中
|
|
|
+ return ResponseVo.success(getGeneratingDataStatus(), "数据已经在生成中...");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //直接导出
|
|
|
+ String taskId = pullDataTask.getId();
|
|
|
+
|
|
|
+ MaxComputeDetailOutput detailOutput = maxComputeDetailOutputService
|
|
|
+ .lambdaQuery()
|
|
|
+ .eq(MaxComputeDetailOutput::getTaskId, taskId)
|
|
|
+ .eq(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.生成完毕)
|
|
|
+ .orderByDesc(MaxComputeDetailOutput::getPriorityLevel)
|
|
|
+ .list()
|
|
|
+ .get(0);
|
|
|
+ export(detailOutput, pullId);
|
|
|
+ return ResponseVo.success(getExportDataStatus(), "数据正在导出中....");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 提交导出任务
|
|
|
+ * 提交线程池
|
|
|
+ *
|
|
|
+ * @param detailOutput
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ public void export(MaxComputeDetailOutput detailOutput, Long pullId) {
|
|
|
+ Long dagId = dataWorksService.exportData(detailOutput.getPartitionName());
|
|
|
+ Assert.notNull(dagId, "触发业务流程失败:{}", detailOutput);
|
|
|
+
|
|
|
+ pullDataLogService.lambdaUpdate()
|
|
|
+ .eq(PullDataLog::getId, pullId)
|
|
|
+ .set(PullDataLog::getPullStatus, TaskStatusEnum.导出中)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ maxComputeDetailOutputService.lambdaUpdate()
|
|
|
+ .eq(MaxComputeDetailOutput::getId, detailOutput.getId())
|
|
|
+ .set(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.导出中)
|
|
|
+ .set(MaxComputeDetailOutput::getDagId, dagId)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ log.info("导出数据任务提交成功, {}", detailOutput);
|
|
|
+
|
|
|
+ applicationContext.getBean(PullDataService.class).queryWait(detailOutput, pullId, dagId);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ @Async("export-data-pool")
|
|
|
+ public void queryWait(MaxComputeDetailOutput detailOutput, Long pullId, Long dagId) {
|
|
|
+ while (true) {
|
|
|
+ log.info("{} 运行中...", detailOutput);
|
|
|
+ Map<String, DataWorksTaskFlowEnum> query = dataWorksService.query(ProjectParamInit.project, dagId);
|
|
|
+ long count = query.values()
|
|
|
+ .stream()
|
|
|
+ .filter(e -> !(DataWorksTaskFlowEnum.SUCCESS.equals(e) || DataWorksTaskFlowEnum.FAILURE.equals(e)))
|
|
|
+ .count();
|
|
|
+
|
|
|
+ if (count != 0) {
|
|
|
+ try {
|
|
|
+ Thread.sleep(100000);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.info("end");
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, DataWorksTaskFlowEnum> query = dataWorksService.query(ProjectParamInit.project, dagId);
|
|
|
+ long count = query.values()
|
|
|
+ .stream()
|
|
|
+ .filter(DataWorksTaskFlowEnum.FAILURE::equals)
|
|
|
+ .count();
|
|
|
+ if (count == 0) {
|
|
|
+ exportDataCallBack(detailOutput, pullId);
|
|
|
+ } else {
|
|
|
+ //失败数据回滚
|
|
|
+ pullDataLogService.lambdaUpdate()
|
|
|
+ .eq(PullDataLog::getId, pullId)
|
|
|
+ .set(PullDataLog::getPullStatus, TaskStatusEnum.等待中)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ maxComputeDetailOutputService.lambdaUpdate()
|
|
|
+ .eq(MaxComputeDetailOutput::getId, detailOutput.getId())
|
|
|
+ .set(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.生成完毕)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ log.error("存在任务失败!dagId: {}", dagId);
|
|
|
+ dingTalkService.error("导出任务失败,dagId:{}", dagId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导出数据回调
|
|
|
+ * 更新状态
|
|
|
+ *
|
|
|
+ * @param detailOutput
|
|
|
+ * @param pullId
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private boolean exportDataCallBack(MaxComputeDetailOutput detailOutput, Long pullId) {
|
|
|
+ pullDataLogService.lambdaUpdate()
|
|
|
+ .eq(PullDataLog::getId, pullId)
|
|
|
+ .set(PullDataLog::getPullStatus, TaskStatusEnum.导出完毕)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ maxComputeDetailOutputService.lambdaUpdate()
|
|
|
+ .eq(MaxComputeDetailOutput::getId, detailOutput.getId())
|
|
|
+ .set(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.导出完毕)
|
|
|
+ .update();
|
|
|
+
|
|
|
+ Integer count = maxComputeDetailOutputService.lambdaQuery()
|
|
|
+ .eq(MaxComputeDetailOutput::getTaskId, detailOutput.getTaskId())
|
|
|
+ .eq(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.生成完毕)
|
|
|
+ .count();
|
|
|
+ if (count == 0) {
|
|
|
+ pullDataTaskService.lambdaUpdate()
|
|
|
+ .eq(PullDataTask::getId, detailOutput.getTaskId())
|
|
|
+ .set(PullDataTask::getTaskStatus, TaskStatusEnum.导出完毕)
|
|
|
+ .update();
|
|
|
+ }
|
|
|
+ dingTalkService.info("导出任务成功,导出分区号:{} 本批次数量:{}", detailOutput.getPartitionName(), detailOutput.getRecordCount());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成数据
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private PullDataTask generatingData() {
|
|
|
+ String taskId = GenerateIdUtils.generateTaskId();
|
|
|
+
|
|
|
+ Long dagId = dataWorksService.generateData(taskId);
|
|
|
+
|
|
|
+ PullDataTask build = PullDataTask.builder().id(taskId)
|
|
|
+ .startTime(new Date())
|
|
|
+ .dagId(dagId)
|
|
|
+ .maxComputeFlowName(ProjectParamInit.generateFlowName)
|
|
|
+ .taskStatus(TaskStatusEnum.生成中)
|
|
|
+ .build();
|
|
|
+
|
|
|
+ pullDataTaskService.save(build);
|
|
|
+ return build;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成数据进度
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Map<String, DataWorksTaskFlowEnum> getGeneratingDataStatus() {
|
|
|
+ PullDataTask one = pullDataTaskService
|
|
|
+ .lambdaQuery()
|
|
|
+ .eq(PullDataTask::getTaskStatus, TaskStatusEnum.生成中)
|
|
|
+ .one();
|
|
|
+ Long dagId = one.getDagId();
|
|
|
+ Assert.notNull(dagId, "dagId为空, {}", one);
|
|
|
+
|
|
|
+ return dataWorksService.query("winhc_ng", dagId);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 导出数据进度
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private Map<String, DataWorksTaskFlowEnum> getExportDataStatus() {
|
|
|
+ MaxComputeDetailOutput one = maxComputeDetailOutputService
|
|
|
+ .lambdaQuery()
|
|
|
+ .eq(MaxComputeDetailOutput::getPartitionTaskStatus, TaskStatusEnum.导出中)
|
|
|
+ .one();
|
|
|
+ Long dagId = one.getDagId();
|
|
|
+ Assert.notNull(dagId, "dagId为空, {}", one);
|
|
|
+
|
|
|
+ return dataWorksService.query("winhc_ng", dagId);
|
|
|
+ }
|
|
|
+}
|