Browse Source

feat: 全任务job触发

- 加入spark程序主动kill
- 失败job自动重启三次
许家凯 4 years ago
parent
commit
be566fff56

+ 43 - 5
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -8,11 +8,14 @@ import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.*;
+import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
 import com.winhc.dataworks.flow.touch.service.OdpsService;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.DateUtils;
 import com.winhc.dataworks.flow.touch.utils.DingUtils;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonKill;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.*;
@@ -33,6 +36,10 @@ import java.util.stream.Collectors;
 public class Main {
     @Autowired
     private TouchService touchService;
+
+    @Autowired
+    private DataWorksAccessProperties dataWorksAccessProperties;
+
     private static final Options options = new Options();
 
 
@@ -51,6 +58,8 @@ public class Main {
 
         options.addOption("flow", "flow", true, "单任务必填,业务流程名");
         options.addOption("task", "taskName", true, "单任务必填,任务名");
+        options.addOption("odps", "odps_home", true, "odps cmd 根目录");
+
     }
 
     private static void verify(CommandLine commandLine) {
@@ -58,6 +67,9 @@ public class Main {
             if (!commandLine.hasOption("f")) {
                 throw new RuntimeException();
             }
+            if (!commandLine.hasOption("odps")) {
+                throw new RuntimeException();
+            }
             if (commandLine.hasOption("s")) {
                 if (!commandLine.hasOption("flow") || !commandLine.hasOption("task")) {
                     throw new RuntimeException();
@@ -127,8 +139,9 @@ public class Main {
                 if (commandLine.hasOption("d")) {
                     log.info("debug模式:{}", jobs);
                 } else {
+                    String odpsCmdPath = commandLine.getOptionValue("odps");
                     dd.send(msg);
-                    bean.start(bizDate, jobs);
+                    bean.start(bizDate, jobs, odpsCmdPath);
                 }
             }
         } catch (ParseException e) {
@@ -146,7 +159,32 @@ public class Main {
     }
 
     @SneakyThrows
-    private void start(String bizDate, List<DataWorksFlowJob> jobs) {
+    private void start(String bizDate, List<DataWorksFlowJob> jobs, String odpsCmdHome) {
+        //为所有项目空间启动守护线程
+        String accessKeyId = dataWorksAccessProperties.getAccessKeyId();
+        String accessKeySecret = dataWorksAccessProperties.getAccessKeySecret();
+        Set<String> ps = jobs.stream().map(DataWorksFlowJob::getProject).collect(Collectors.toSet());
+        for (String p : ps) {
+            /*SparkDaemonThread th = new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L);
+            th.start();*/
+            new SparkDaemonKill(p, accessKeyId, accessKeySecret, odpsCmdHome, SparkDaemonUtils.getQueue(p)).start();
+        }
+
+        //运行job,并接收失败参数,最大重试三次
+        Set<TaskInfo> failureTask = run(bizDate, jobs);
+        int i = 3;
+        while (!failureTask.isEmpty() && i-- > 0) {
+            Set<String> fSet = failureTask.stream().map(TaskInfo::getKey).collect(Collectors.toSet());
+            List<DataWorksFlowJob> js = jobs.stream().filter(job -> fSet.contains(job.getProject() + ":" + job.getFlow() + ":" + job.getTask())).collect(Collectors.toList());
+            failureTask = run(bizDate, js);
+        }
+        if (!failureTask.isEmpty()) {
+            System.exit(-1);
+        }
+    }
+
+    @SneakyThrows
+    private Set<TaskInfo> run(String bizDate, List<DataWorksFlowJob> jobs) {
         log.info("start!");
         LocalDateTime start = LocalDateTime.now();
         List<TaskInfo> collect = jobs.stream()
@@ -173,6 +211,7 @@ public class Main {
         int totalTask = collect.size();
 
         Set<TaskInfo> end = new HashSet<>();
+        Set<TaskInfo> failureTask = new HashSet<>();
         TimedCache<TaskInfo, String> timedCache = CacheUtil.newTimedCache(300 * 1000);
         int i = 0;
         int successTask = 0;
@@ -209,6 +248,7 @@ public class Main {
                     log.error("failure node:{} ", failure);
                     DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
                     dingUtils.send(error);
+                    failureTask.add(taskInfo);
                 } else {
                     if (await.size() != 0) {
                         awaitTask++;
@@ -250,9 +290,7 @@ public class Main {
             }
             Thread.sleep(10000);
         }
-        if (failedTask != 0) {
-            System.exit(-1);
-        }
         log.info("end");
+        return failureTask;
     }
 }

+ 28 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/SparkJobInfo.java

@@ -0,0 +1,28 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 15:44
+ * @Description:
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class SparkJobInfo {
+    private LocalDateTime startTime;
+    private String instanceId;
+    private String state;
+    private String runningMode;
+    private String applicationName;
+
+    public Long runningMinutes() {
+        return ChronoUnit.MINUTES.between(startTime, LocalDateTime.now());
+    }
+}

+ 109 - 0
src/main/java/com/winhc/dataworks/flow/touch/cmd/OdpsCmd.java

@@ -0,0 +1,109 @@
+package com.winhc.dataworks.flow.touch.cmd;
+
+import cn.hutool.core.io.file.FileWriter;
+import cn.hutool.core.util.RuntimeUtil;
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.File;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 15:26
+ * @Description:
+ */
+@Slf4j
+public class OdpsCmd {
+    private String project_name;
+    private String access_id;
+    private String access_key;
+    private String basePath;
+    private String baseCmd;
+
+    public OdpsCmd(String project_name, String access_id, String access_key, String basePath) {
+        this.project_name = project_name;
+        this.access_id = access_id;
+        this.access_key = access_key;
+        this.basePath = basePath;
+        this.baseCmd = basePath + File.separator + "bin" + File.separator + "odpscmd";
+        this.baseCmd += SystemUtils.IS_OS_WINDOWS ? ".bat" : "";
+        this.writeConfig(new OdpsConfg(project_name, access_id, access_key).toString());
+    }
+
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    private static class OdpsConfg {
+        private String project_name;
+        private String access_id;
+        private String access_key;
+
+        @Override
+        public String toString() {
+            return "project_name=" + project_name + "\n" +
+                    "access_id=" + access_id + "\n" +
+                    "access_key=" + access_key + "\n" +
+                    "end_point=http://service.odps.aliyun.com/api\n" +
+                    "log_view_host=http://logview.odps.aliyun.com\n" +
+                    "https_check=true\n" +
+                    "data_size_confirm=100.0\n" +
+                    "update_url=http://repo.aliyun.com/odpscmd\n" +
+                    "use_instance_tunnel=true\n" +
+                    "instance_tunnel_max_record=10000";
+        }
+    }
+
+    private void writeConfig(String str) {
+        FileWriter writer = new FileWriter(basePath + File.separator + "conf" + File.separator + "odps_config.ini");
+        writer.write(str);
+    }
+
+
+    private List<String> execute(String cmd) {
+        return RuntimeUtil.execForLines(baseCmd + " -e \"" + cmd + "\";");
+    }
+
+    private static final Pattern pattern = Pattern.compile("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) +(\\w+?) +(\\w+?) +(\\w+?) +([0-9a-zA-Z\\._]+?) *");
+
+    public Boolean kill(String instanceId) {
+        List<String> execute = execute("spark kill -i " + instanceId);
+        execute.forEach(System.out::println);
+        return true;
+    }
+
+    public List<SparkJobInfo> query(String state) {
+        List<String> execute = execute("spark list -s " + state.toUpperCase());
+        execute.forEach(System.out::println);
+        return execute.stream().filter(str -> !str.startsWith("StartTime")).map(str -> {
+            Matcher m = pattern.matcher(str);
+            if (m.find()) {
+                return new SparkJobInfo(parse(m.group(1)), m.group(2), m.group(3), m.group(4), m.group(5));
+            } else {
+                log.error("execute result is error!");
+                return null;
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+
+    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
+            .withLocale(Locale.CHINA)
+            .withZone(ZoneId.systemDefault());
+
+    private LocalDateTime parse(String str) {
+        LocalDateTime parse = LocalDateTime.parse(str, dtf);
+        return parse;
+    }
+}

+ 45 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/OdpsCmdService.java

@@ -0,0 +1,45 @@
+package com.winhc.dataworks.flow.touch.service;
+
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 14:43
+ * @Description:
+ */
+@Service
+public class OdpsCmdService {
+    private DataWorksAccessProperties dataWorksAccessProperties;
+    private OdpsCmd odpsCmd;
+
+
+
+    @Autowired
+    public OdpsCmdService(DataWorksAccessProperties dataWorksAccessProperties) {
+        this.dataWorksAccessProperties = dataWorksAccessProperties;
+        this.odpsCmd = new OdpsCmd("", dataWorksAccessProperties.getAccessKeyId(), dataWorksAccessProperties.getAccessKeySecret(), "");
+
+    }
+
+    public List<String> getTimeOutJobs(Long minutes) {
+        List<SparkJobInfo> running = odpsCmd.query("RUNNING");
+        return running.stream().filter(e -> e.runningMinutes() >= minutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+    }
+
+    public Boolean kill(String instanceId) {
+        return odpsCmd.kill(instanceId);
+    }
+
+
+    private static final String basePath = "C:\\Users\\x\\Downloads\\Compressed\\odpscmd_public";
+
+
+
+}

+ 15 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -12,8 +12,10 @@ import com.winhc.dataworks.flow.touch.bean.TaskFlowEnum;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
 import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -105,6 +107,19 @@ public class TouchService {
                 .getAcsResponse(searchNodeInstanceListRequest);
         java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
         for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
+            try {
+                String beginRunningTime = nodeInsInfo.getBeginRunningTime();
+                if (StringUtils.isNotEmpty(beginRunningTime)) {
+                    long l = Long.parseLong(beginRunningTime);
+                    long v = (System.currentTimeMillis() - l) / 60000;
+                    if (v > 90) {
+                        //发送到kill对列
+                        SparkDaemonUtils.put(projectName, String.valueOf(nodeInsInfo.getInstanceId()));
+                    }
+                }
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
             log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus()));
         }
         return nodeInsfos.stream()

+ 50 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonKill.java

@@ -0,0 +1,50 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Queue;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 11:12
+ * @Description:
+ */
+@Slf4j
+public class SparkDaemonKill extends Thread {
+
+    private Queue<String> queue;
+    private OdpsCmd odpsCmd;
+
+    /**
+     * @param project_name 工程名
+     * @param access_id    key
+     * @param access_key   secret
+     * @param basePath     odps根目录
+     * @param queue        对列
+     */
+    public SparkDaemonKill(String project_name, String access_id, String access_key, String basePath, Queue<String> queue) {
+        super.setDaemon(true);
+        super.setName("spark-kill-daemon");
+        this.queue = queue;
+        this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                String poll = queue.poll();
+                if (poll != null) {
+                    odpsCmd.kill(poll);
+                }
+                if (queue.isEmpty()) {
+                    Thread.sleep(10000);
+                }
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

+ 66 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonThread.java

@@ -0,0 +1,66 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 10:26
+ * @Description:
+ */
+@Slf4j
+public class SparkDaemonThread extends Thread {
+    private OdpsCmd odpsCmd;
+    private long maxMinutes;
+    private long interval;
+
+    /**
+     * @param project_name    工程名
+     * @param access_id       key
+     * @param access_key      secret
+     * @param basePath        odps根目录
+     * @param maxMinutes      超时间 分钟
+     * @param intervalMinutes 守护间隔时间,默认为10分钟
+     */
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, long intervalMinutes) {
+        super.setDaemon(true);
+        super.setName("spark-daemon");
+        this.maxMinutes = maxMinutes;
+        this.interval = intervalMinutes * 60 * 1000;
+        this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
+    }
+
+    /**
+     * @param project_name 工程名
+     * @param access_id    key
+     * @param access_key   secret
+     * @param basePath     odps根目录
+     * @param maxMinutes   超时间 分钟
+     */
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes) {
+        this(project_name, access_id, access_key, basePath, maxMinutes, 10L);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                List<SparkJobInfo> running = odpsCmd.query("RUNNING");
+                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() > maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+                if (!timeOut.isEmpty()) {
+                    for (String s : timeOut) {
+                        odpsCmd.kill(s);
+                    }
+                }
+                Thread.sleep(interval);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

+ 39 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonUtils.java

@@ -0,0 +1,39 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 11:20
+ * @Description:
+ */
+public class SparkDaemonUtils {
+
+    private static final Map<String, Queue<String>> queueMap = new HashMap<>();
+
+    private static void createOrIgnore(String project) {
+        if (!queueMap.containsKey(project)) {
+            synchronized (SparkDaemonUtils.class) {
+                if (!queueMap.containsKey(project)) {
+                    queueMap.put(project, new ArrayBlockingQueue<>(100));
+                }
+            }
+        }
+    }
+
+    public static Queue<String> getQueue(String project) {
+        createOrIgnore(project);
+        return queueMap.get(project);
+    }
+
+    public static boolean put(String project, String val) {
+        if (!queueMap.containsKey(project)) {
+            throw new RuntimeException("对列没有进行初始化!");
+        }
+        queueMap.get(project).add(val);
+        return true;
+    }
+}