Jelajahi Sumber

fix: 去重加入ds和update_time

许家凯 3 tahun lalu
induk
melakukan
af1a52a278

+ 12 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -56,13 +56,18 @@ case class general_handler(s: SparkSession,
     }
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
+    val up = inter_cols.contains("update_time") match {
+      case true => ",update_time"
+      case false => ""
+    }
+
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$inc_ods_ds')
          |SELECT  ${getColumns(ads_tab).diff(Seq("ds")).mkString(",")}
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
@@ -92,6 +97,11 @@ case class general_handler(s: SparkSession,
 
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
+    val up = inter_cols.contains("update_time") match {
+      case true => ",update_time"
+      case false => ""
+    }
+
 
     sql(
       s"""
@@ -99,7 +109,7 @@ case class general_handler(s: SparkSession,
          |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}