许家凯 пре 4 година
родитељ
комит
e005dcce74
1 измењених фајлова са 9 додато и 54 уклоњено
  1. 9 54
      src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

+ 9 - 54
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -232,7 +232,7 @@ case class general_handler(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-    explode_calc(null)
+    explode_calc(ads_tab, inc_ods_ds)
   }
 
   def inc(): Unit = {
@@ -294,7 +294,7 @@ case class general_handler(s: SparkSession,
          |""".stripMargin)
 
     addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
-    explode_calc(target_ds)
+    explode_calc(inc_ads_tab, target_ds)
   }
 
 
@@ -307,7 +307,7 @@ case class general_handler(s: SparkSession,
   }
 
 
-  def explode_calc(ds: String): Unit = {
+  private def explode_calc(org_tab: String, ds: String): Unit = {
     if (job_args.explode_args == null) {
       return
     }
@@ -315,31 +315,14 @@ case class general_handler(s: SparkSession,
     val all_date_tmp_view = s"insert_${tn}"
     val all_date_explode_tmp_view = s"${all_date_tmp_view}_explode"
 
-    val all_explode_tab_name = s"${ads_tab}_explode_all"
-    val explode_tab_name = s"${ads_tab}_explode"
-    val inc_explode_tab_name = s"${inc_ads_tab}_explode"
-
+    val explode_tab_name = s"${org_tab}_explode"
 
     sql(
       s"""
-         |SELECT  ${getColumns(ads_tab)}
-         |FROM    (
-         |        SELECT  * ,ROW_NUMBER()OVER (PARTITION BY rowkey ORDER BY ${up} ) AS num
-         |        FROM    (
-         |                SELECT  *
-         |                FROM    $ads_tab
-         |                WHERE   ds >0
-         |                UNION ALL
-         |                SELECT  *
-         |                FROM    $inc_ads_tab
-         |                WHERE   ds >0
-         |                )
-         |        )
-         |WHERE   num =1
+         |select * from $org_tab where ds = '$ds'
          |""".stripMargin)
       .createTempView(all_date_tmp_view)
 
-
     explode_tab(spark, all_date_tmp_view, job_args.explode_args)
       .calc(all_date_explode_tmp_view)
 
@@ -347,42 +330,14 @@ case class general_handler(s: SparkSession,
       //表不存在
 
     }
-    val explode_cols = getColumns(explode_tab_name).diff(Seq("ds"))
 
-    /*sql(
+    sql(
       s"""
-         |INSERT OVERWRITE TABLE $all_explode_tab_name PARTITION(ds='0')
-         |SELECT ${explode_cols.mkString(",")}
+         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='$ds')
+         |SELECT ${getColumns(explode_tab_name).diff(Seq("ds")).mkString(",")}
          |FROM
          |    $all_date_explode_tmp_view
-         |""".stripMargin)*/
-
-    if (ds != null) {
-      sql(
-        s"""
-           |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
-           |SELECT  ${explode_cols.map("t2." + _).mkString(",")}
-           |FROM    (
-           |            SELECT  DISTINCT rowkey
-           |            FROM    $inc_ads_tab
-           |            WHERE   ds = '$ds'
-           |        ) AS t1
-           |JOIN    (
-           |            SELECT  *
-           |            FROM    $all_date_explode_tmp_view
-           |            WHERE   ds = '0'
-           |        ) AS t2
-           |ON      t1.rowkey = t2.rowkey
-           |""".stripMargin)
-    } else {
-      sql(
-        s"""
-           |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
-           |SELECT  ${explode_cols.mkString(",")}
-           |FROM    $all_date_explode_tmp_view
-           |WHERE   ds = '0'
-           |""".stripMargin)
-    }
+         |""".stripMargin)
   }
 }