Browse Source

fix: information_schema延时问题

- 采用limit 1条数据方式判空
许家凯 4 years ago
parent
commit
c85b7e6b00

+ 8 - 5
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -81,14 +81,17 @@ public class Main {
             if (commandLine.hasOption("q")) {
                 OdpsService bean = context.getBean(OdpsService.class);
                 String ds = DateUtils.getYesterday().replace("-", "");
-                List<String> notEmptyTable = bean.getNotEmptyTable(ds);
-                HashSet<String> set = new HashSet<>(notEmptyTable);
-                if (!set.contains("company")) {
+
+//                List<String> notEmptyTable = bean.getNotEmptyTable(ds);
+//                HashSet<String> set = new HashSet<>(notEmptyTable);
+                Boolean success = false;
+                success = bean.companyTableIsNotEmpty(DateUtils.getYesterday());
+                if (success) {
+                    log.info("company表{}分区数据不为空.", ds);
+                } else {
                     log.error("company表{}分区数据为空,调度程序中止!", ds);
                     dd.send("company表" + ds + "分区数据为空,调度程序中止!");
                     System.exit(-1);
-                } else {
-                    log.info("company表{}分区数据不为空.", ds);
                 }
                 return;
             }

+ 17 - 3
src/main/java/com/winhc/dataworks/flow/touch/service/OdpsService.java

@@ -7,6 +7,7 @@ import com.aliyun.odps.data.Record;
 import com.aliyun.odps.task.SQLTask;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -17,12 +18,28 @@ import java.util.stream.Collectors;
  * @Date: 2020/7/6 17:52
  * @Description:
  */
+@Slf4j
 @Service
 public class OdpsService {
 
     @Autowired
     private Odps odps;
 
+    public Boolean companyTableIsNotEmpty(String ds) {
+        ds = ds.replace("-", "");
+        String sql = "select * from inc_ods_company where ds = '" + ds + "' limit 1 ;";
+
+        try {
+            Instance i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResult(i);
+            return !records.isEmpty();
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
     public List<String> getNotEmptyTable(String ds) {
         ds = ds.replace("-", "");
         String sql = "SELECT table_name FROM information_schema.PARTITIONS WHERE substring(table_name,1,8) = 'inc_ods_' AND partition_name = 'ds=" + ds + "' and data_length <> 0 ;";
@@ -40,7 +57,4 @@ public class OdpsService {
         }
         return list.stream().map(s -> s.replace("inc_ods_", "")).collect(Collectors.toList());
     }
-
-
-
 }

+ 1 - 1
start.sh

@@ -52,7 +52,7 @@ start_job() {
     java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1 -d
     return
   else
-    ehco ""
+    echo ""
     java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1
   fi