Pārlūkot izejas kodu

fix: 重跑数据分区设置

许家凯 3 gadi atpakaļ
vecāks
revīzija
182d568802

+ 25 - 9
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -85,13 +85,29 @@ case class general_handler(s: SparkSession,
   }
 
   def inc(): Unit = {
-    val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
-    var ads_ds = getLastPartitionsOrElse(ads_tab, null)
-    if (ads_ds.equals(inc_ods_ds)) {
-      ads_ds = getSecondLastPartitionOrElse(ads_tab, null)
-      if (ads_ds == null) {
-        all()
-        return
+    var org_ds = ""
+    var target_ds = ""
+
+    val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, null)
+    val inc_ads_ds = getLastPartitionsOrElse(inc_ads_tab, null)
+    val ads_ds = getLastPartitionsOrElse(ads_tab, null)
+
+
+    if (inc_ods_ds == null || inc_ads_ds == null) {
+      //没有inc_ods_tab数据,直接重跑全量ods数据
+      all()
+      return
+    }
+    target_ds = inc_ods_ds
+
+    org_ds = inc_ads_ds
+
+    if (org_ds.equals(target_ds)) {
+      val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
+      if (inc_ads_sec_ds == null) {
+        org_ds = ads_ds
+      } else {
+        org_ds = inc_ads_sec_ds
       }
     }
 
@@ -105,7 +121,7 @@ case class general_handler(s: SparkSession,
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_ds')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$target_ds')
          |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
          |FROM    (
          |            SELECT  *
@@ -114,7 +130,7 @@ case class general_handler(s: SparkSession,
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
-         |                        WHERE   ds > $ads_ds
+         |                        WHERE   ds > $org_ds
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1

+ 28 - 9
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -54,19 +54,38 @@ case class inc_company_ng(s: SparkSession,
   }
 
   def inc(): Unit = {
-    val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
-    var ads_ds = getLastPartitionsOrElse(inc_ads_tab, null)
-    if (inc_ods_ds.equals(ads_ds)) {
-      ads_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
-      if (ads_ds == null) {
-        all()
-        return
+
+    var org_ds = ""
+    var target_ds = ""
+
+    val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, null)
+    val inc_ads_ds = getLastPartitionsOrElse(inc_ads_tab, null)
+    val ads_ds = getLastPartitionsOrElse(ads_tab, null)
+
+
+    if (inc_ods_ds == null || inc_ads_ds == null) {
+      //没有inc_ods_tab数据,直接重跑全量ods数据
+      all()
+      return
+    }
+    target_ds = inc_ods_ds
+
+
+    org_ds = inc_ads_ds
+
+    if (org_ds.equals(target_ds)) {
+      val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
+      if (inc_ads_sec_ds == null) {
+        org_ds = ads_ds
+      } else {
+        org_ds = inc_ads_sec_ds
       }
     }
 
+
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_ds')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$target_ds')
          |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
          |FROM    (
          |            SELECT  *
@@ -74,7 +93,7 @@ case class inc_company_ng(s: SparkSession,
          |            FROM    (
          |                        SELECT  *
          |                        FROM    $inc_ods_tab
-         |                        WHERE   ds > $ads_ds
+         |                        WHERE   ds > $org_ds
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1