Browse Source

fix: 调整炸开表

许家凯 4 years ago
parent
commit
760d08b7d1
1 changed files with 24 additions and 21 deletions
  1. 24 21
      src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

+ 24 - 21
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -315,6 +315,7 @@ case class general_handler(s: SparkSession,
     val all_date_tmp_view = s"insert_${tn}"
     val all_date_explode_tmp_view = s"${all_date_tmp_view}_explode"
 
+    val all_explode_tab_name = s"${ads_tab}_explode_all"
     val explode_tab_name = s"${ads_tab}_explode"
     val inc_explode_tab_name = s"${inc_ads_tab}_explode"
 
@@ -346,40 +347,42 @@ case class general_handler(s: SparkSession,
       //表不存在
 
     }
+    val explode_cols = getColumns(explode_tab_name).diff(Seq("ds"))
 
-    sql(
+    /*sql(
       s"""
-         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
-         |SELECT ${getColumns(explode_tab_name).diff(Seq("ds")).mkString(",")}
+         |INSERT OVERWRITE TABLE $all_explode_tab_name PARTITION(ds='0')
+         |SELECT ${explode_cols.mkString(",")}
          |FROM
          |    $all_date_explode_tmp_view
-         |""".stripMargin)
-
-    sql(s"ALTER TABLE ${inc_explode_tab_name} DROP IF EXISTS PARTITION(ds>'0')")
+         |""".stripMargin)*/
 
     if (ds != null) {
-      val inc_date_tmp_view = s"inc_${tn}"
       sql(
         s"""
-           | SELECT  *
-           | FROM    $inc_ads_tab
-           | WHERE   ds = $ds
+           |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
+           |SELECT  ${explode_cols.map("t2." + _).mkString(",")}
+           |FROM    (
+           |            SELECT  DISTINCT rowkey
+           |            FROM    $inc_ads_tab
+           |            WHERE   ds = '$ds'
+           |        ) AS t1
+           |JOIN    (
+           |            SELECT  *
+           |            FROM    $all_date_explode_tmp_view
+           |            WHERE   ds = '0'
+           |        ) AS t2
+           |ON      t1.rowkey = t2.rowkey
            |""".stripMargin)
-        .createTempView(inc_date_tmp_view)
-
-      explode_tab(spark, inc_date_tmp_view, job_args.explode_args)
-        .calc(s"inc_explode_$tn")
-
+    } else {
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $inc_explode_tab_name PARTITION(ds='$ds')
-           |SELECT ${getColumns(inc_explode_tab_name).diff(Seq("ds")).mkString(",")}
-           |FROM
-           |    inc_explode_$tn
+           |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
+           |SELECT  ${explode_cols.mkString(",")}
+           |FROM    $all_date_explode_tmp_view
+           |WHERE   ds = '0'
            |""".stripMargin)
-
     }
-
   }
 }