Quellcode durchsuchen

任务长时间未结束发送通知

许家凯 vor 4 Jahren
Ursprung
Commit
8e541caabe
1 geänderte Dateien mit 18 neuen und 0 gelöschten Zeilen
  1. 18 0
      src/main/java/com/winhc/dataworks/flow/touch/Main.java

+ 18 - 0
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -1,5 +1,7 @@
 package com.winhc.dataworks.flow.touch;
 
+import cn.hutool.cache.CacheUtil;
+import cn.hutool.cache.impl.TimedCache;
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
 import com.helospark.lightdi.LightDi;
 import com.helospark.lightdi.LightDiContext;
@@ -12,6 +14,8 @@ import com.winhc.dataworks.flow.touch.utils.DingUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
+import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -43,6 +47,7 @@ public class Main {
     @SneakyThrows
     private void start(String bizDate) {
         log.info("start");
+        LocalDateTime start = LocalDateTime.now();
         List<DataWorksFlowJob> jobs = SchemaInit.LIST;
         List<TaskInfo> collect = jobs.stream()
                 .flatMap(dataWorksFlowJob -> {
@@ -68,6 +73,7 @@ public class Main {
         int totalTask = collect.size();
 
         Set<TaskInfo> end = new HashSet<>();
+        TimedCache<TaskInfo, String> timedCache = CacheUtil.newTimedCache(60000);
         while (true) {
             int awaitTask = 0;
             int successTask = 0;
@@ -115,6 +121,18 @@ public class Main {
 
                 if (await.size() == 0 || failure.size() != 0) {
                     end.add(taskInfo);
+                } else {
+                    LocalDateTime now = LocalDateTime.now();
+                    Duration duration = Duration.between(start, now);
+                    long l = duration.toHours();
+                    if (l >= 2) {
+                        if (!timedCache.containsKey(taskInfo)) {
+                            //超两小时
+                            DingMsg error = new DingMsg("任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
+                            dingUtils.send(error);
+                            timedCache.put(taskInfo, "1");
+                        }
+                    }
                 }
             }