许家凯 3 yıl önce
ebeveyn
işleme
645e51bd29

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -216,7 +216,7 @@ trait LoggingUtils extends Logging {
   }
 
 
-  def generateAllTabSql(tableName: String, project: String): (String, Seq[String], String) = {
+  def generateAllTabSql(tableName: String, project: String, maxDs: String = null): (String, Seq[String], String) = {
     val inc_ads_tab = s"$project.inc_ads_$tableName"
     val ads_tab = s"$project.ads_$tableName"
     val cols = getColumns(inc_ads_tab).intersect(getColumns(ads_tab))
@@ -227,6 +227,8 @@ trait LoggingUtils extends Logging {
     }
     val p_b = if (cols.contains("rowkey")) "rowkey" else if (cols.contains("company_id")) "company_id" else throw new RuntimeException(s"$ads_tab partition key is null !")
 
+    val max = if(maxDs==null) "" else s" and ds <= $maxDs"
+
     (
       s"""
          |SELECT  ${cols.mkString(",")}
@@ -240,7 +242,7 @@ trait LoggingUtils extends Logging {
          |                        UNION ALL
          |                        SELECT  ${cols.mkString(",")}
          |                        FROM    $inc_ads_tab
-         |                        WHERE   ds > $ads_last_ds
+         |                        WHERE   ds > $ads_last_ds $max
          |                    ) AS all_t1
          |        ) AS all_t2
          |WHERE   all_t2.xjk_num = 1