浏览代码

feat: 炸开表搞要计算调整

许家凯 4 年之前
父节点
当前提交
2019d45252
共有 1 个文件被更改,包括 48 次插入6 次删除
  1. 48 6
      src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

+ 48 - 6
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -232,7 +232,7 @@ case class general_handler(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-    explode_calc(ads_tab, inc_ods_ds)
+    explode_calc(null)
   }
 
   def inc(): Unit = {
@@ -294,7 +294,7 @@ case class general_handler(s: SparkSession,
          |""".stripMargin)
 
     addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
-    explode_calc(inc_ads_tab, target_ds)
+    explode_calc(target_ds)
   }
 
 
@@ -307,7 +307,7 @@ case class general_handler(s: SparkSession,
   }
 
 
-  private def explode_calc(org_tab: String, ds: String): Unit = {
+  def explode_calc(ds: String): Unit = {
     if (job_args.explode_args == null) {
       return
     }
@@ -315,14 +315,30 @@ case class general_handler(s: SparkSession,
     val all_date_tmp_view = s"insert_${tn}"
     val all_date_explode_tmp_view = s"${all_date_tmp_view}_explode"
 
-    val explode_tab_name = s"${org_tab}_explode"
+    val explode_tab_name = s"${ads_tab}_explode"
+    val inc_explode_tab_name = s"${inc_ads_tab}_explode"
+
 
     sql(
       s"""
-         |select * from $org_tab where ds = '$ds'
+         |SELECT  ${getColumns(ads_tab)}
+         |FROM    (
+         |        SELECT  * ,ROW_NUMBER()OVER (PARTITION BY rowkey ORDER BY ${up} ) AS num
+         |        FROM    (
+         |                SELECT  *
+         |                FROM    $ads_tab
+         |                WHERE   ds >0
+         |                UNION ALL
+         |                SELECT  *
+         |                FROM    $inc_ads_tab
+         |                WHERE   ds >0
+         |                )
+         |        )
+         |WHERE   num =1
          |""".stripMargin)
       .createTempView(all_date_tmp_view)
 
+
     explode_tab(spark, all_date_tmp_view, job_args.explode_args)
       .calc(all_date_explode_tmp_view)
 
@@ -333,11 +349,37 @@ case class general_handler(s: SparkSession,
 
     sql(
       s"""
-         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='$ds')
+         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='0')
          |SELECT ${getColumns(explode_tab_name).diff(Seq("ds")).mkString(",")}
          |FROM
          |    $all_date_explode_tmp_view
          |""".stripMargin)
+
+    sql(s"ALTER TABLE ${inc_explode_tab_name} DROP IF EXISTS PARTITION(ds>'0')")
+
+    if (ds != null) {
+      val inc_date_tmp_view = s"inc_${tn}"
+      sql(
+        s"""
+           | SELECT  *
+           | FROM    $inc_ads_tab
+           | WHERE   ds = $ds
+           |""".stripMargin)
+        .createTempView(inc_date_tmp_view)
+
+      explode_tab(spark, inc_date_tmp_view, job_args.explode_args)
+        .calc(s"inc_explode_$tn")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $inc_explode_tab_name PARTITION(ds='$ds')
+           |SELECT ${getColumns(inc_explode_tab_name).diff(Seq("ds")).mkString(",")}
+           |FROM
+           |    inc_explode_$tn
+           |""".stripMargin)
+
+    }
+
   }
 }