Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/master'

xufei 4 rokov pred
rodič
commit
4eb66848f5

+ 27 - 8
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -14,8 +14,7 @@ import com.winhc.dataworks.flow.touch.service.OdpsService;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.DateUtils;
 import com.winhc.dataworks.flow.touch.utils.DingUtils;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonKill;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonThread;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.*;
@@ -165,9 +164,7 @@ public class Main {
         String accessKeySecret = dataWorksAccessProperties.getAccessKeySecret();
         Set<String> ps = jobs.stream().map(DataWorksFlowJob::getProject).collect(Collectors.toSet());
         for (String p : ps) {
-            /*SparkDaemonThread th = new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L);
-            th.start();*/
-            new SparkDaemonKill(p, accessKeyId, accessKeySecret, odpsCmdHome, SparkDaemonUtils.getQueue(p)).start();
+            new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L).start();
         }
 
         //运行job,并接收失败参数,最大重试三次
@@ -175,7 +172,29 @@ public class Main {
         int i = 3;
         while (!failureTask.isEmpty() && i-- > 0) {
             Set<String> fSet = failureTask.stream().map(TaskInfo::getKey).collect(Collectors.toSet());
-            List<DataWorksFlowJob> js = jobs.stream().filter(job -> fSet.contains(job.getProject() + ":" + job.getFlow() + ":" + job.getTask())).collect(Collectors.toList());
+
+            List<DataWorksFlowJob> js = jobs.stream().map(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+
+                List<DataWorksFlowTask> collect = task.stream().filter(t -> fSet.contains(project + ":" + flow + ":" + t.getTaskName())
+                ).collect(Collectors.toList());
+
+                if (collect.isEmpty()) {
+                    return null;
+                } else {
+                    return new DataWorksFlowJob(project, flow, collect);
+                }
+            }).filter(Objects::nonNull).collect(Collectors.toList());
+
+            String collect = js.stream().flatMap(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+                return task.stream().map(t -> project + ":" + flow + ":" + t.getTaskName());
+            }).collect(Collectors.joining(","));
+            dingUtils.send("【" + (3 - i) + "】重新启动以下job:" + collect);
             failureTask = run(bizDate, js);
         }
         if (!failureTask.isEmpty()) {
@@ -246,7 +265,7 @@ public class Main {
                 if (failure.size() != 0) {
                     failedTask++;
                     log.error("failure node:{} ", failure);
-                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
+                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
                     dingUtils.send(error);
                     failureTask.add(taskInfo);
                 } else {
@@ -270,7 +289,7 @@ public class Main {
                         if (!timedCache.containsKey(taskInfo) && i <= 6) {
                             //超两小时
                             i++;
-                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
+                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
                             dingUtils.send(error);
                             timedCache.put(taskInfo, "1");
                         }

+ 4 - 2
src/main/java/com/winhc/dataworks/flow/touch/bean/DingMsg.java

@@ -28,10 +28,11 @@ public class DingMsg {
                 "\n\n" + "> 系统信息:" + os();
     }
 
-    public DingMsg(String msgLevel, String project, String flow, String nodeName, String status) {
+    public DingMsg(String msgLevel, String project, String flow, String task, String nodeName, String status) {
         this.msgLevel = msgLevel;
         this.project = project;
         this.flow = flow;
+        this.task = task;
         this.nodeName = nodeName;
         this.status = status;
         LocalDateTime date = LocalDateTime.now();
@@ -42,6 +43,7 @@ public class DingMsg {
     private String msgLevel;
     private String project;
     private String flow;
+    private String task;
     private String nodeName;
     private String status;
     private String date;
@@ -50,7 +52,7 @@ public class DingMsg {
     public String toMd() {
         StringBuilder sb = new StringBuilder();
         return sb.append("#### ").append(msgLevel)
-                .append("\n\n").append("位置:").append(project).append(":").append(flow)
+                .append("\n\n").append("位置:").append(project).append(":").append(flow).append(":").append(task)
                 .append("\n\n").append("节点:").append(nodeName)
                 .append("\n\n").append("状态:").append(status)
                 .append("\n\n").append("> 时间:").append(date)

+ 29 - 6
src/main/java/com/winhc/dataworks/flow/touch/cmd/OdpsCmd.java

@@ -7,12 +7,14 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.SystemUtils;
 
 import java.io.File;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
 import java.util.Objects;
@@ -72,27 +74,48 @@ public class OdpsCmd {
     }
 
 
-    private List<String> execute(String cmd) {
-        return RuntimeUtil.execForLines(baseCmd + " --project "+project_name+" -e \"" + cmd + "\";");
+    private synchronized List<String> execute(String cmd) {
+        String c = baseCmd + " --project winhc_eci_dev " + " -e \"" + cmd + "\";";
+        log.info(c);
+        List<String> execute = new ArrayList<>();
+        if (SystemUtils.IS_OS_WINDOWS) {
+            execute = RuntimeUtil.execForLines(c);
+        } else {
+            String fs = basePath + "/xjk_start.sh";
+            try {
+                File file = new File(fs);
+                if (!file.exists()) {
+                    file.createNewFile();
+                }
+                file.setExecutable(true);
+                List<String> list = new ArrayList<String>();
+                list.add("#!/bin/bash");
+                list.add(c);
+                FileUtils.writeLines(file, list, false);
+                execute = RuntimeUtil.execForLines(fs);
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+        log.info("out:\n{}", String.join("\n", execute));
+        return execute;
     }
 
     private static final Pattern pattern = Pattern.compile("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) +(\\w+?) +(\\w+?) +(\\w+?) +([0-9a-zA-Z\\._]+?) *");
 
     public Boolean kill(String instanceId) {
-        List<String> execute = execute("spark kill -i " + instanceId);
-        execute.forEach(System.out::println);
+        execute("spark kill -i " + instanceId);
         return true;
     }
 
     public List<SparkJobInfo> query(String state) {
         List<String> execute = execute("spark list -s " + state.toUpperCase());
-        execute.forEach(System.out::println);
         return execute.stream().filter(str -> !str.startsWith("StartTime")).map(str -> {
             Matcher m = pattern.matcher(str);
             if (m.find()) {
                 return new SparkJobInfo(parse(m.group(1)), m.group(2), m.group(3), m.group(4), m.group(5));
             } else {
-                log.error("execute result is error!");
+                log.error("error str : {}", str);
                 return null;
             }
         }).filter(Objects::nonNull).collect(Collectors.toList());

+ 2 - 16
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -12,10 +12,8 @@ import com.winhc.dataworks.flow.touch.bean.TaskFlowEnum;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
 import com.winhc.dataworks.flow.touch.utils.JsonUtils;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -107,20 +105,8 @@ public class TouchService {
                 .getAcsResponse(searchNodeInstanceListRequest);
         java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
         for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
-            try {
-                String beginRunningTime = nodeInsInfo.getBeginRunningTime();
-                if (StringUtils.isNotEmpty(beginRunningTime)) {
-                    long l = Long.parseLong(beginRunningTime);
-                    long v = (System.currentTimeMillis() - l) / 60000;
-                    if (v > 90) {
-                        //发送到kill对列
-                        SparkDaemonUtils.put(projectName, String.valueOf(nodeInsInfo.getInstanceId()));
-                    }
-                }
-            } catch (Exception e) {
-                log.error(e.getMessage(), e);
-            }
-            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus()));
+            TaskFlowEnum code = TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus());
+            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), code);
         }
         return nodeInsfos.stream()
                 .collect(Collectors.toMap(SearchManualDagNodeInstanceResponse.NodeInsInfo::getNodeName, node -> TaskFlowEnum.getTaskFlowEnumByCode(node.getStatus()), (o1, o2) -> o1));

+ 1 - 1
src/main/java/com/winhc/dataworks/flow/touch/utils/DingUtils.java

@@ -32,7 +32,7 @@ public class DingUtils {
     }
 
     public boolean send(String msg) {
-        return sendByBody(getMdBody(msg));
+        return sendByBody(getMdBody(msg.replace("\\", "\\\\")));
     }
 
     private boolean sendByBody(String body) {

+ 5 - 4
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonThread.java

@@ -26,11 +26,11 @@ public class SparkDaemonThread extends Thread {
      * @param maxMinutes      超时间 分钟
      * @param intervalMinutes 守护间隔时间,默认为10分钟
      */
-    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, long intervalMinutes) {
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, double intervalMinutes) {
         super.setDaemon(true);
         super.setName("spark-daemon");
         this.maxMinutes = maxMinutes;
-        this.interval = intervalMinutes * 60 * 1000;
+        this.interval = (long) (intervalMinutes * 60 * 1000);
         this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
     }
 
@@ -42,7 +42,7 @@ public class SparkDaemonThread extends Thread {
      * @param maxMinutes   超时间 分钟
      */
     public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes) {
-        this(project_name, access_id, access_key, basePath, maxMinutes, 10L);
+        this(project_name, access_id, access_key, basePath, maxMinutes, 5L);
     }
 
 
@@ -51,9 +51,10 @@ public class SparkDaemonThread extends Thread {
         try {
             while (true) {
                 List<SparkJobInfo> running = odpsCmd.query("RUNNING");
-                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() > maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() >= maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
                 if (!timeOut.isEmpty()) {
                     for (String s : timeOut) {
+                        log.warn("kill InstanceId : {}", s);
                         odpsCmd.kill(s);
                     }
                 }

+ 3 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonUtils.java

@@ -12,6 +12,9 @@ import java.util.concurrent.ArrayBlockingQueue;
  */
 public class SparkDaemonUtils {
 
+
+    public static final Long sparkTimeoutMinutes = 90L;
+
     private static final Map<String, Queue<String>> queueMap = new HashMap<>();
 
     private static void createOrIgnore(String project) {

+ 6 - 6
src/main/resources/task-xjk.yaml

@@ -1,11 +1,11 @@
 job:
-  #------<公司基本信息
   - project: winhc_test
-    flow: inc_company_spark
+    flow: test_kill
     task:
-      - taskName: company_inc
+      - taskName: test_kill
         param:
-          - _nodeId: 700003381602
+          - _nodeId: 700003494765
             project: winhc_eci_dev
-            bizdate: 20200618
-  #------>
+            tableName: test_kill
+            dupliCols: new_cid,case_no
+            flag: cid

+ 2 - 1
start.sh

@@ -5,6 +5,7 @@ export PATH=$JAVA_HOME/bin:$PATH
 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 
 bashPath=/root/maxcompute_job/flow
+odps_cmd=/root/maxcompute_job/flow/odpscmd
 debug=0
 
 if [ $# -eq 0 ]; then
@@ -53,7 +54,7 @@ start_job() {
     return
   else
     echo ""
-    java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1
+    java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1 -odps $odps_cmd
   fi
 
   if [ $? -eq 0 ]; then