瀏覽代碼

完善重跑功能

xufei 4 年之前
父節點
當前提交
413a1daa8f

+ 7 - 9
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -70,11 +70,6 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       return
     }
 
-
-
-
-
-
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -100,7 +95,13 @@ case class CompanyIncrForCidUtils(s: SparkSession,
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
       println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      runDs = lastDsIncOds
+      //runDs = lastDsIncOds
+      val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
+      if (l1.size > 1) {
+        runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
+      }else{
+        runDs = firstDsIncOds
+      }
       //sys.exit(-1)
     }
 
@@ -114,9 +115,6 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
-
-
-
     //rowkey前缀匹配
     val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")
 

+ 7 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -93,7 +93,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
       println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      runDs = lastDsIncOds
+      //runDs = lastDsIncOds
+      val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
+      if (l1.size > 1) {
+        runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
+      }else{
+        runDs = firstDsIncOds
+      }
       //sys.exit(-1)
     }