Browse Source

feat: 加入上游数据同步通知

许家凯 4 years ago
parent
commit
3835b4f0a4

+ 1 - 1
src/main/java/com/winhc/dataworks/flow/touch/EmptyTable.java

@@ -26,7 +26,7 @@ public class EmptyTable {
     private OdpsService odpsService;
 
     private void start() {
-        List<String> emptyTable = odpsService.getEmptyTable(DateUtils.getYesterday().replace("-", ""));
+        List<String> emptyTable = odpsService.getNotEmptyTable(DateUtils.getYesterday().replace("-", ""));
         System.out.println(emptyTable.size());
         emptyTable.forEach(System.out::println);
     }

+ 14 - 3
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -42,6 +42,9 @@ public class Main {
     static {
         options.addOption("q", "query", false, "选填,查询昨天company增量表是否有数据");
         options.addOption("d", "debug", false, "选填,启用debug模式,只输出任务参数不进行调度");
+        options.addOption("w", "wait", false, "选填,等待上游数据同步完成");
+
+
         options.addOption("s", "singleJob", false, "选填,是否单独触发一个业务流程");
         options.addOption("b", "bizDate", true, "选填,业务时间[2020-07-07],默认为昨天");
         options.addOption("f", "fileName", true, "必填,yaml文件");
@@ -74,12 +77,13 @@ public class Main {
         CommandLineParser parser = new DefaultParser();
         try {
             CommandLine commandLine = parser.parse(options, args);
+
             if (commandLine.hasOption("q")) {
                 OdpsService bean = context.getBean(OdpsService.class);
                 String ds = DateUtils.getYesterday().replace("-", "");
-                List<String> emptyTable = bean.getEmptyTable(ds);
-                HashSet<String> set = new HashSet<>(emptyTable);
-                if (set.contains("company")) {
+                List<String> notEmptyTable = bean.getNotEmptyTable(ds);
+                HashSet<String> set = new HashSet<>(notEmptyTable);
+                if (!set.contains("company")) {
                     log.error("company表{}分区数据为空,调度程序中止!", ds);
                     dd.send("company表" + ds + "分区数据为空,调度程序中止!");
                     System.exit(-1);
@@ -88,6 +92,13 @@ public class Main {
                 }
                 return;
             }
+
+            if (commandLine.hasOption("w")) {
+                QueryStatus bean = context.getBean(QueryStatus.class);
+                bean.waitTaskDone();
+                return;
+            }
+
             verify(commandLine);
             String bizDate = commandLine.getOptionValue("b", DateUtils.getYesterday());
             String fileName = commandLine.getOptionValue("f");

+ 67 - 0
src/main/java/com/winhc/dataworks/flow/touch/QueryStatus.java

@@ -0,0 +1,67 @@
+package com.winhc.dataworks.flow.touch;
+
+import com.helospark.lightdi.LightDi;
+import com.helospark.lightdi.LightDiContext;
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.service.QueryService;
+import com.winhc.dataworks.flow.touch.utils.DingUtils;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalTime;
+import java.time.temporal.ChronoUnit;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/7 13:41
+ * @Description:
+ */
+@Slf4j
+@Service
+public class QueryStatus {
+
+    public static void main(String[] args) {
+        LightDiContext context = LightDi.initContextByPackage(Main.class.getPackage().getName());
+        QueryStatus bean = context.getBean(QueryStatus.class);
+        bean.waitTaskDone();
+    }
+
+
+    @Autowired
+    private QueryService queryService;
+
+    @Autowired
+    private DingUtils dingUtils;
+
+    @SneakyThrows
+    public void waitTaskDone() {
+        boolean flag = false;
+        LocalTime startTime = LocalTime.now();
+
+        LocalTime sendTime = LocalTime.of(8, 0, 0);
+        LocalTime exitTime = LocalTime.of(10, 0, 0);
+
+        while (!queryService.isSuccess()) {
+            log.warn("等待上游数据同步...");
+
+            if (LocalTime.now().isAfter(sendTime) && !flag) {
+                flag = true;
+                log.error("上游数据同步任务长时间阻塞,请及时排查!");
+                dingUtils.send("上游数据同步任务长时间阻塞,请及时排查!");
+            }
+
+            if (LocalTime.now().isAfter(exitTime)) {
+                log.error("数据同步任务长时间未完成,程序退出!");
+                dingUtils.send("数据同步任务长时间未完成,程序退出!");
+                System.exit(-99);
+            }
+            Thread.sleep(120000);
+        }
+        long between = ChronoUnit.MINUTES.between(startTime, LocalTime.now());
+        String msg = "上游数据同步任务已经完成!累计阻塞" + between + "分钟";
+        dingUtils.send(msg);
+        log.info(msg);
+    }
+
+}

+ 4 - 2
src/main/java/com/winhc/dataworks/flow/touch/service/OdpsService.java

@@ -23,9 +23,9 @@ public class OdpsService {
     @Autowired
     private Odps odps;
 
-    public List<String> getEmptyTable(String ds) {
+    public List<String> getNotEmptyTable(String ds) {
         ds = ds.replace("-", "");
-        String sql = "SELECT table_name FROM information_schema.PARTITIONS WHERE regexp_extract(table_name,'^inc_ods_(.+)$') IS NOT NULL AND partition_name = 'ds=" + ds + "' and data_length = 0 ;";
+        String sql = "SELECT table_name FROM information_schema.PARTITIONS WHERE substring(table_name,1,8) = 'inc_ods_' AND partition_name = 'ds=" + ds + "' and data_length <> 0 ;";
         Instance i;
         List<String> list = new ArrayList<>();
         try {
@@ -41,4 +41,6 @@ public class OdpsService {
         return list.stream().map(s -> s.replace("inc_ods_", "")).collect(Collectors.toList());
     }
 
+
+
 }

+ 38 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/QueryService.java

@@ -0,0 +1,38 @@
+package com.winhc.dataworks.flow.touch.service;
+
+import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.utils.DateUtils;
+import com.winhc.dataworks.flow.touch.utils.DingUtils;
+import com.winhc.dataworks.flow.touch.utils.OkHttpUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/7 11:52
+ * @Description:
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class QueryService {
+    private static final String baseUrl = "http://47.101.221.131:8085";
+
+    private OkHttpUtils client;
+    private DingUtils dingUtils;
+    private static Boolean send = false;
+
+    public Boolean isSuccess() {
+        try {
+            return "4".equals(client.get(baseUrl + "/query/" + DateUtils.getMinusDay(0)));
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            if (!send) {
+                send = true;
+                dingUtils.send(e.getMessage());
+            }
+        }
+        return false;
+    }
+
+}

+ 5 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/DateUtils.java

@@ -20,4 +20,9 @@ public class DateUtils {
         Instant instant = Instant.now().minus(1, ChronoUnit.DAYS);
         return df.format(instant);
     }
+
+    public static String getMinusDay(Integer i) {
+        Instant instant = Instant.now().minus(i, ChronoUnit.DAYS);
+        return df.format(instant);
+    }
 }

+ 2 - 26
src/main/java/com/winhc/dataworks/flow/touch/utils/DingUtils.java

@@ -9,12 +9,9 @@ import com.winhc.dataworks.flow.touch.bean.DingMsg;
 import com.winhc.dataworks.flow.touch.bean.Entry;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
-import okhttp3.*;
 import org.apache.commons.codec.binary.Base64;
 
-import java.io.IOException;
 import java.net.URLEncoder;
-import java.util.Objects;
 
 /**
  * @Author: XuJiakai
@@ -25,8 +22,7 @@ import java.util.Objects;
 @Component
 public class DingUtils {
     @Autowired
-    private OkHttpClient client;
-    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
+    private OkHttpUtils client;
     private static final String URL = "https://oapi.dingtalk.com/robot/send?access_token=2773b742b74d84599c4f05f7b42cacd6714b10c33cd4c74402649019fa7e56c8";
     @Value("${ding-secret}")
     private String dingSecret;
@@ -43,7 +39,7 @@ public class DingUtils {
         Entry<Long, String> sign = getSign();
         String query = "&timestamp=" + sign.getKey() + "&sign=" + sign.getValue();
         try {
-            String post = post(URL + query, body);
+            String post = client.post(URL + query, body);
             log.info(post);
             return post.contains("ok");
         } catch (Exception e) {
@@ -53,26 +49,6 @@ public class DingUtils {
     }
 
 
-    private String post(String url, String json) throws IOException {
-        RequestBody body = RequestBody.create(JSON, json);
-        try (Response response = client.newCall((new Request.Builder()).url(url).post(body).build()).execute()) {
-            return parseBody(response);
-        } catch (Exception e) {
-            e.printStackTrace();
-            return "";
-        }
-    }
-
-    private String get(String url) throws IOException {
-        try (Response response = client.newCall(new Request.Builder().url(url).get().build()).execute()) {
-            return parseBody(response);
-        }
-    }
-
-    private static String parseBody(Response response) throws IOException {
-        return response == null ? "" : response.isSuccessful() ? Objects.requireNonNull(response.body()).string() : "";
-    }
-
     private static String getMdBody(String msg) {
         return String.format("{\"msgtype\":\"markdown\",\"markdown\":{\"title\":\"%s\",\"text\":\"%s\"}}", "任务通知", DingMsg.msg2Md(msg));
     }

+ 42 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/OkHttpUtils.java

@@ -0,0 +1,42 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.helospark.lightdi.annotation.Component;
+import lombok.AllArgsConstructor;
+import okhttp3.*;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/7 11:54
+ * @Description:
+ */
+@Component
+@AllArgsConstructor
+public class OkHttpUtils {
+    private OkHttpClient client;
+    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
+
+    public String post(String url, String json) throws IOException {
+        RequestBody body = RequestBody.create(JSON, json);
+        try (Response response = client.newCall((new Request.Builder()).url(url).post(body).build()).execute()) {
+            return parseBody(response);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
+
+    public String get(String url) throws IOException {
+        try (Response response = client.newCall(new Request.Builder().url(url).get().build()).execute()) {
+            return parseBody(response);
+        }
+    }
+
+    private static String parseBody(Response response) throws IOException {
+        return response == null ? "" : response.isSuccessful() ? Objects.requireNonNull(response.body()).string() : "";
+    }
+
+
+}