Explorar o código

feat: 优化摘要

许家凯 %!s(int64=4) %!d(string=hai) anos
pai
achega
eed71c0a2e

+ 6 - 7
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -469,9 +469,8 @@ object CompanySummaryNg_new {
       ))
     )
 
-    /**
-     * ====================================================
-     */
+    // ====================================================
+
     , company_summary_args(table_name = "bankruptcy_open_case_explode"
       , companyIdField = "applicant_info_id_explode"
       , distinctField = "rowkey,applicant_info_id_explode"
@@ -492,10 +491,10 @@ object CompanySummaryNg_new {
       ))
     )
 
-    /**
-     * ====================================================
-     */
-    , get_default_summary_args("company_zxr")
+
+      //====================================================
+
+    , get_default_summary_args("company_zxr","keyno")
     , get_default_summary_args("company_punishment_info","company_id")
     , get_default_summary_args("company_tax_contravention","company_id")
     , get_default_summary_args("company_own_tax","company_id")

+ 44 - 5
src/main/scala/com/winhc/bigdata/spark/ng/utils/merge_table.scala

@@ -19,8 +19,8 @@ case class merge_table(s: SparkSession,
                        join_key: String
                       ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
+  private val tmp_tab_prefix = "winhc_ng.tmp_"
 
-  init()
 
   private def init(): Unit = {
     if (tables.length <= 1) {
@@ -38,22 +38,61 @@ case class merge_table(s: SparkSession,
     if (int_cols.size != 1 || !int_cols.head.equals(join_key)) {
       throw new RuntimeException("Unable to merge the table !")
     }
+
+    for (elem <- tables) {
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS $tmp_tab_prefix$elem
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS $tmp_tab_prefix$elem
+           |(
+           |    company_id  STRING COMMENT 'company id'
+           |    ,${getColumns(elem).diff(Seq("company_id")).map(s => s"$s bigint COMMENT 'v'").mkString(",")}
+           |)
+           |COMMENT '$elem'
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $tmp_tab_prefix$elem
+           |SELECT ${getColumns(s"$tmp_tab_prefix$elem").mkString(",")}
+           |FROM
+           |    $elem
+           |""".stripMargin)
+
+    }
   }
 
   def calc(tempView: String): Unit = {
+    init()
+
     var df = sql(
       s"""
-         |select * from ${tables.head}
-         |""".stripMargin)
+         |select * from $tmp_tab_prefix${tables.head}
+         |""".stripMargin).cache()
 
     for (i <- 0 until tables.length - 1) {
       val df_2 = sql(
         s"""
-           |select * from ${tables(i + 1)}
+           |select * from $tmp_tab_prefix${tables(i + 1)}
            |""".stripMargin)
-      df = df.join(df_2, Seq(join_key), "full")
+      df = df.join(df_2, Seq(join_key), "full").cache()
     }
     df.createTempView(tempView)
+    drop()
+  }
+
+
+  private def drop(): Unit = {
+    for (elem <- tables) {
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS $tmp_tab_prefix$elem
+           |""".stripMargin)
+    }
   }
 }