Browse Source

feat: 启用守护进程

许家凯 4 years ago
parent
commit
658efac060

+ 27 - 8
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -14,8 +14,7 @@ import com.winhc.dataworks.flow.touch.service.OdpsService;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.DateUtils;
 import com.winhc.dataworks.flow.touch.utils.DingUtils;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonKill;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonThread;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.*;
@@ -165,9 +164,7 @@ public class Main {
         String accessKeySecret = dataWorksAccessProperties.getAccessKeySecret();
         Set<String> ps = jobs.stream().map(DataWorksFlowJob::getProject).collect(Collectors.toSet());
         for (String p : ps) {
-            /*SparkDaemonThread th = new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L);
-            th.start();*/
-            new SparkDaemonKill(p, accessKeyId, accessKeySecret, odpsCmdHome, SparkDaemonUtils.getQueue(p)).start();
+            new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L).start();
         }
 
         //运行job,并接收失败参数,最大重试三次
@@ -175,7 +172,29 @@ public class Main {
         int i = 3;
         while (!failureTask.isEmpty() && i-- > 0) {
             Set<String> fSet = failureTask.stream().map(TaskInfo::getKey).collect(Collectors.toSet());
-            List<DataWorksFlowJob> js = jobs.stream().filter(job -> fSet.contains(job.getProject() + ":" + job.getFlow() + ":" + job.getTask())).collect(Collectors.toList());
+
+            List<DataWorksFlowJob> js = jobs.stream().map(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+
+                List<DataWorksFlowTask> collect = task.stream().filter(t -> fSet.contains(project + ":" + flow + ":" + t.getTaskName())
+                ).collect(Collectors.toList());
+
+                if (collect.isEmpty()) {
+                    return null;
+                } else {
+                    return new DataWorksFlowJob(project, flow, collect);
+                }
+            }).filter(Objects::nonNull).collect(Collectors.toList());
+
+            String collect = js.stream().flatMap(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+                return task.stream().map(t -> project + ":" + flow + ":" + t.getTaskName());
+            }).collect(Collectors.joining(","));
+            dingUtils.send("【" + (3 - i) + "】重新启动以下job:" + collect);
             failureTask = run(bizDate, js);
         }
         if (!failureTask.isEmpty()) {
@@ -246,7 +265,7 @@ public class Main {
                 if (failure.size() != 0) {
                     failedTask++;
                     log.error("failure node:{} ", failure);
-                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
+                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
                     dingUtils.send(error);
                     failureTask.add(taskInfo);
                 } else {
@@ -270,7 +289,7 @@ public class Main {
                         if (!timedCache.containsKey(taskInfo) && i <= 6) {
                             //超两小时
                             i++;
-                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
+                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
                             dingUtils.send(error);
                             timedCache.put(taskInfo, "1");
                         }

+ 4 - 2
src/main/java/com/winhc/dataworks/flow/touch/bean/DingMsg.java

@@ -28,10 +28,11 @@ public class DingMsg {
                 "\n\n" + "> 系统信息:" + os();
     }
 
-    public DingMsg(String msgLevel, String project, String flow, String nodeName, String status) {
+    public DingMsg(String msgLevel, String project, String flow, String task, String nodeName, String status) {
         this.msgLevel = msgLevel;
         this.project = project;
         this.flow = flow;
+        this.task = task;
         this.nodeName = nodeName;
         this.status = status;
         LocalDateTime date = LocalDateTime.now();
@@ -42,6 +43,7 @@ public class DingMsg {
     private String msgLevel;
     private String project;
     private String flow;
+    private String task;
     private String nodeName;
     private String status;
     private String date;
@@ -50,7 +52,7 @@ public class DingMsg {
     public String toMd() {
         StringBuilder sb = new StringBuilder();
         return sb.append("#### ").append(msgLevel)
-                .append("\n\n").append("位置:").append(project).append(":").append(flow)
+                .append("\n\n").append("位置:").append(project).append(":").append(flow).append(":").append(task)
                 .append("\n\n").append("节点:").append(nodeName)
                 .append("\n\n").append("状态:").append(status)
                 .append("\n\n").append("> 时间:").append(date)

+ 2 - 16
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -12,10 +12,8 @@ import com.winhc.dataworks.flow.touch.bean.TaskFlowEnum;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
 import com.winhc.dataworks.flow.touch.utils.JsonUtils;
-import com.winhc.dataworks.flow.touch.utils.SparkDaemonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 
 import java.util.Map;
 import java.util.stream.Collectors;
@@ -107,20 +105,8 @@ public class TouchService {
                 .getAcsResponse(searchNodeInstanceListRequest);
         java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
         for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
-            try {
-                String beginRunningTime = nodeInsInfo.getBeginRunningTime();
-                if (StringUtils.isNotEmpty(beginRunningTime)) {
-                    long l = Long.parseLong(beginRunningTime);
-                    long v = (System.currentTimeMillis() - l) / 60000;
-                    if (v > 90) {
-                        //发送到kill对列
-                        SparkDaemonUtils.put(projectName, String.valueOf(nodeInsInfo.getInstanceId()));
-                    }
-                }
-            } catch (Exception e) {
-                log.error(e.getMessage(), e);
-            }
-            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus()));
+            TaskFlowEnum code = TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus());
+            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), code);
         }
         return nodeInsfos.stream()
                 .collect(Collectors.toMap(SearchManualDagNodeInstanceResponse.NodeInsInfo::getNodeName, node -> TaskFlowEnum.getTaskFlowEnumByCode(node.getStatus()), (o1, o2) -> o1));

+ 1 - 1
src/main/java/com/winhc/dataworks/flow/touch/utils/DingUtils.java

@@ -32,7 +32,7 @@ public class DingUtils {
     }
 
     public boolean send(String msg) {
-        return sendByBody(getMdBody(msg));
+        return sendByBody(getMdBody(msg.replace("\\", "\\\\")));
     }
 
     private boolean sendByBody(String body) {

+ 5 - 4
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonThread.java

@@ -26,11 +26,11 @@ public class SparkDaemonThread extends Thread {
      * @param maxMinutes      超时间 分钟
      * @param intervalMinutes 守护间隔时间,默认为10分钟
      */
-    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, long intervalMinutes) {
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, double intervalMinutes) {
         super.setDaemon(true);
         super.setName("spark-daemon");
         this.maxMinutes = maxMinutes;
-        this.interval = intervalMinutes * 60 * 1000;
+        this.interval = (long) (intervalMinutes * 60 * 1000);
         this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
     }
 
@@ -42,7 +42,7 @@ public class SparkDaemonThread extends Thread {
      * @param maxMinutes   超时间 分钟
      */
     public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes) {
-        this(project_name, access_id, access_key, basePath, maxMinutes, 10L);
+        this(project_name, access_id, access_key, basePath, maxMinutes, 5L);
     }
 
 
@@ -51,9 +51,10 @@ public class SparkDaemonThread extends Thread {
         try {
             while (true) {
                 List<SparkJobInfo> running = odpsCmd.query("RUNNING");
-                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() > maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() >= maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
                 if (!timeOut.isEmpty()) {
                     for (String s : timeOut) {
+                        log.warn("kill InstanceId : {}", s);
                         odpsCmd.kill(s);
                     }
                 }

+ 3 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonUtils.java

@@ -12,6 +12,9 @@ import java.util.concurrent.ArrayBlockingQueue;
  */
 public class SparkDaemonUtils {
 
+
+    public static final Long sparkTimeoutMinutes = 90L;
+
     private static final Map<String, Queue<String>> queueMap = new HashMap<>();
 
     private static void createOrIgnore(String project) {

+ 6 - 6
src/main/resources/task-xjk.yaml

@@ -1,11 +1,11 @@
 job:
-  #------<公司基本信息
   - project: winhc_test
-    flow: inc_company_spark
+    flow: test_kill
     task:
-      - taskName: company_inc
+      - taskName: test_kill
         param:
-          - _nodeId: 700003381602
+          - _nodeId: 700003494765
             project: winhc_eci_dev
-            bizdate: 20200618
-  #------>
+            tableName: test_kill
+            dupliCols: new_cid,case_no
+            flag: cid