|
@@ -1,17 +1,22 @@
|
|
package com.winhc.bigdata.task.framework.odps.service.impl;
|
|
package com.winhc.bigdata.task.framework.odps.service.impl;
|
|
|
|
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
|
|
|
+import com.winhc.bigdata.task.enums.DataWorksTaskFlowEnum;
|
|
import com.winhc.bigdata.task.enums.TaskStatusEnum;
|
|
import com.winhc.bigdata.task.enums.TaskStatusEnum;
|
|
import com.winhc.bigdata.task.framework.odps.entity.PullRequestRecord;
|
|
import com.winhc.bigdata.task.framework.odps.entity.PullRequestRecord;
|
|
import com.winhc.bigdata.task.framework.odps.mapper.PullRequestRecordMapper;
|
|
import com.winhc.bigdata.task.framework.odps.mapper.PullRequestRecordMapper;
|
|
import com.winhc.bigdata.task.framework.odps.service.OdpsSqlInfoService;
|
|
import com.winhc.bigdata.task.framework.odps.service.OdpsSqlInfoService;
|
|
import com.winhc.bigdata.task.framework.odps.service.PullRequestRecordService;
|
|
import com.winhc.bigdata.task.framework.odps.service.PullRequestRecordService;
|
|
|
|
+import com.winhc.bigdata.task.service.DataWorksService;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.AllArgsConstructor;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.util.Assert;
|
|
import org.springframework.util.Assert;
|
|
|
|
|
|
|
|
+import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -22,9 +27,8 @@ import java.util.stream.Collectors;
|
|
@Service
|
|
@Service
|
|
@AllArgsConstructor
|
|
@AllArgsConstructor
|
|
public class PullRequestRecordServiceImpl extends ServiceImpl<PullRequestRecordMapper, PullRequestRecord> implements PullRequestRecordService {
|
|
public class PullRequestRecordServiceImpl extends ServiceImpl<PullRequestRecordMapper, PullRequestRecord> implements PullRequestRecordService {
|
|
-
|
|
|
|
private final OdpsSqlInfoService odpsSqlInfoService;
|
|
private final OdpsSqlInfoService odpsSqlInfoService;
|
|
-
|
|
|
|
|
|
+ private final DataWorksService dataWorksService;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public synchronized Object pull(String sqlId, String toMongoDbName, String pullBy) {
|
|
public synchronized Object pull(String sqlId, String toMongoDbName, String pullBy) {
|
|
@@ -43,6 +47,24 @@ public class PullRequestRecordServiceImpl extends ServiceImpl<PullRequestRecordM
|
|
.partitionTaskStatus(TaskStatusEnum.等待中)
|
|
.partitionTaskStatus(TaskStatusEnum.等待中)
|
|
.build();
|
|
.build();
|
|
save(build);
|
|
save(build);
|
|
- return true;
|
|
|
|
|
|
+ return build.getId();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static final Map<String, DataWorksTaskFlowEnum> map = new HashMap<String, DataWorksTaskFlowEnum>() {
|
|
|
|
+ {
|
|
|
|
+ put("to_mongo", DataWorksTaskFlowEnum.NOT_RUN);
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public Object status(String pullId) {
|
|
|
|
+ PullRequestRecord byId = getById(pullId);
|
|
|
|
+ Assert.notNull(byId, "拉取任务的id不存在!");
|
|
|
|
+ String pullInstanceId = byId.getPullInstanceId();
|
|
|
|
+ if (StringUtils.isEmpty(pullInstanceId)) {
|
|
|
|
+ return map;
|
|
|
|
+ }
|
|
|
|
+ return dataWorksService.query("winhc_ng", Long.parseLong(pullInstanceId));
|
|
|
|
+
|
|
}
|
|
}
|
|
}
|
|
}
|