浏览代码

启用通知

许家凯 4 年之前
父节点
当前提交
88bd370eae

+ 0 - 1
pom.xml

@@ -17,7 +17,6 @@
     </properties>
 
     <dependencies>
-
         <dependency>
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>okhttp</artifactId>

+ 25 - 22
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -8,12 +8,11 @@ import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.*;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
 import com.winhc.dataworks.flow.touch.service.TouchService;
+import com.winhc.dataworks.flow.touch.utils.DingUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -27,6 +26,9 @@ public class Main {
     @Autowired
     private TouchService touchService;
 
+    @Autowired
+    private DingUtils dingUtils;
+
 
     public static void main(String[] args) {
         if (args.length != 1) {
@@ -65,19 +67,19 @@ public class Main {
 
         int totalTask = collect.size();
 
+        Set<TaskInfo> end = new HashSet<>();
         while (true) {
             int awaitTask = 0;
             int successTask = 0;
             int failedTask = 0;
 
-            int awaitNode = 0;
-            int successNode = 0;
-            int failedNode = 0;
-
             boolean flag = true;
             List<String> empty = new ArrayList<>();
 
             for (TaskInfo taskInfo : collect) {
+                if (end.contains(taskInfo)) {
+                    continue;
+                }
                 Map<String, TaskFlowEnum> query = touchService.query(taskInfo.getProject(), taskInfo.getDagId());
                 Map<String, List<String>> status = query.entrySet().stream()
                         .map(e -> {
@@ -87,6 +89,7 @@ public class Main {
                             } else {
                                 entry.setKey("运行中");
                             }
+                            entry.setValue(e.getKey());
                             return entry;
                         }).collect(Collectors.groupingBy(Entry::getKey, Collectors.mapping(Entry::getValue, Collectors.toList())));
                 List<String> success = status.getOrDefault(TaskFlowEnum.SUCCESS.getMsg(), empty);
@@ -94,21 +97,27 @@ public class Main {
                 List<String> await = status.getOrDefault("运行中", empty);
 
 
-                if (await.size() != 0) {
-                    awaitTask++;
-                    awaitNode += await.size();
+                if (failure.size() != 0) {
+                    failedTask++;
+                    log.error("failure node:{} ", failure);
+                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
+                    dingUtils.send(error);
                 } else {
-                    if (failure.size() != 0) {
-                        failedTask++;
-                        failedNode += failure.size();
-                        log.error("failure node:{}  {}", failedTask, failedNode);
+                    if (await.size() != 0) {
+                        awaitTask++;
                     } else {
-                        successNode += success.size();
                         successTask++;
                     }
                 }
-                flag = flag && await.size() == 0;
+
+
+                flag = flag && (await.size() == 0 || failure.size() != 0);
+
+                if (await.size() == 0 || failure.size() != 0) {
+                    end.add(taskInfo);
+                }
             }
+
             log.info("\nawait task:{}\ntotal task:{}\nsuccess task:{}\nfailure task:{}\n"
                     , awaitTask
                     , totalTask
@@ -116,12 +125,6 @@ public class Main {
                     , failedTask
             );
 
-            log.info("\nawait node:{}\ntotal node:{}\nsuccess node:{}\nfailure node:{}\n"
-                    , awaitNode
-                    , awaitNode + successNode + failedNode
-                    , successNode
-                    , failedNode
-            );
             if (flag) {
                 break;
             }

+ 70 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/DingMsg.java

@@ -0,0 +1,70 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.net.InetAddress;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/30 13:45
+ * @Description:
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class DingMsg {
+    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    public DingMsg(String msgLevel, String project, String flow, String nodeName, String status) {
+        this.msgLevel = msgLevel;
+        this.project = project;
+        this.flow = flow;
+        this.nodeName = nodeName;
+        this.status = status;
+        LocalDateTime date = LocalDateTime.now();
+        this.date = date.format(dtf);
+        this.sysInfo = os();
+    }
+
+    private String msgLevel;
+    private String project;
+    private String flow;
+    private String nodeName;
+    private String status;
+    private String date;
+    private String sysInfo;
+
+    public String toMd() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("#### ").append(msgLevel)
+                .append("\n\n").append("位置:").append(project).append(":").append(flow)
+                .append("\n\n").append("节点:").append(nodeName)
+                .append("\n\n").append("状态:").append(status)
+                .append("\n\n").append("> 时间:").append(date)
+                .append("\n\n").append("> 系统信息:").append(sysInfo)
+                .toString();
+    }
+
+
+    private static String os() {
+        String osInfo = String.format(" %s , %s , %s", SystemUtils.OS_NAME, SystemUtils.OS_ARCH, SystemUtils.OS_VERSION);
+        try {
+            InetAddress address = InetAddress.getLocalHost();
+            String addressInfo = String.format("\n\n> address: %s \n\n> hostname: %s", address.getHostAddress(), address.getHostName());
+            osInfo += addressInfo;
+        } catch (Exception e) {
+        }
+        return osInfo;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(os());
+    }
+}

+ 28 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/TaskInfo.java

@@ -1,6 +1,8 @@
 package com.winhc.dataworks.flow.touch.bean;
 
 import lombok.*;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 /**
  * @Author: XuJiakai
@@ -21,4 +23,30 @@ public class TaskInfo {
     public String getKey() {
         return project + ":" + flow + ":" + taskName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (o == null || getClass() != o.getClass()) return false;
+
+        TaskInfo taskInfo = (TaskInfo) o;
+
+        return new EqualsBuilder()
+                .append(project, taskInfo.project)
+                .append(flow, taskInfo.flow)
+                .append(taskName, taskInfo.taskName)
+                .append(dagId, taskInfo.dagId)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(project)
+                .append(flow)
+                .append(taskName)
+                .append(dagId)
+                .toHashCode();
+    }
 }

+ 15 - 3
src/main/java/com/winhc/dataworks/flow/touch/utils/DingUtils.java

@@ -5,6 +5,7 @@ import cn.hutool.crypto.digest.HmacAlgorithm;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Component;
 import com.helospark.lightdi.annotation.Value;
+import com.winhc.dataworks.flow.touch.bean.DingMsg;
 import com.winhc.dataworks.flow.touch.bean.Entry;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -30,12 +31,19 @@ public class DingUtils {
     @Value("${ding-secret}")
     private String dingSecret;
 
+    public boolean send(DingMsg msg) {
+        return sendByBody(getMdBody(msg));
+    }
 
     public boolean send(String msg) {
+        return sendByBody(getMsgBody(msg));
+    }
+
+    private boolean sendByBody(String body) {
         Entry<Long, String> sign = getSign();
         String query = "&timestamp=" + sign.getKey() + "&sign=" + sign.getValue();
         try {
-            String post = post(URL + query, getPostBody(msg));
+            String post = post(URL + query, body);
             log.info(post);
             return post.contains("ok");
         } catch (Exception e) {
@@ -65,8 +73,12 @@ public class DingUtils {
         return response == null ? "" : response.isSuccessful() ? Objects.requireNonNull(response.body()).string() : "";
     }
 
-    private static String getPostBody(String msg) {
-        return "{\"msgtype\": \"text\",\"text\": {\"content\": \"" + msg + "\"}}";
+    private static String getMdBody(DingMsg msg) {
+        return String.format("{\"msgtype\":\"markdown\",\"markdown\":{\"title\":\"%s\",\"text\":\"%s\"}}", msg.getMsgLevel(), msg.toMd());
+    }
+
+    private static String getMsgBody(String msg) {
+        return "{\"msgtype\": \"markdown\",\"text\": {\"content\": \"" + msg + "\"}}";
     }
 
     @SneakyThrows