Kaynağa Gözat

fix: 摘要调整

许家凯 3 yıl önce
ebeveyn
işleme
14ca488311

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -324,7 +324,8 @@ object CompanySummaryNg_new {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    WinhcNgSummary_new(s = spark, project = "winhc_ng", target_tab = "winhc_ng.out_es_summary", args = start_args).calc()
+    WinhcNgSummary_new(s = spark, project = "winhc_ng", target_tab = "winhc_ng.out_es_summary", args = start_args)
+      .calc()
     spark.stop()
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/utils/SwitchSummaryIndex.scala

@@ -26,7 +26,7 @@ case class SwitchSummaryIndex(s: SparkSession
     val person_summary = "out_es_summary_person_v" + ds
     val allIndices = es.catIndices()
 
-    val size = allIndices.filter(a => a.equals(company_summary) || a.equals(person_summary)).size
+    val size = allIndices.count(a => a.equals(company_summary) || a.equals(person_summary))
     if (size != 2) {
       throw new RuntimeException("target indices is not found !")
     }

+ 43 - 19
src/main/scala/com/winhc/bigdata/spark/ng/utils/WinhcNgSummary_new.scala

@@ -18,6 +18,8 @@ case class WinhcNgSummary_new(s: SparkSession,
                              ) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
   init()
+  private lazy val ds = getLastPartitionsOrElse(target_tab, null)
+
 
   private def init() {
     sql(
@@ -29,14 +31,16 @@ case class WinhcNgSummary_new(s: SparkSession,
          |    ,detail STRING COMMENT '个别维度详细的摘要信息'
          |)
          |COMMENT 'out es summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
-         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |PARTITIONED BY (
+         |  `ds` STRING COMMENT '时间分区',
+         |  `tn` STRING COMMENT 'tn分区'
+         |  )
          |LIFECYCLE 15
          |""".stripMargin)
   }
 
 
   private def get_table_data(arg: SummaryArgs): String = {
-    val ds = getLastPartitionsOrElse(target_tab, null)
 
     val tab = arg.table_name
     val companyIdField = arg.companyIdField
@@ -204,26 +208,46 @@ case class WinhcNgSummary_new(s: SparkSession,
 
 
   def calc(): Unit = {
-    val summary_tab = "summary_tab_xjk"
     val summary_tabs = args.map(get_tab_summary).seq
-    val merge = merge_table(spark, summary_tabs, "company_id")
-    merge.calc(summary_tab)
-    val cols = getColumns(summary_tab).diff(Seq("company_id"))
 
-    sql(
-      s"""
-         |select * from $summary_tab
-         |""".stripMargin)
-      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
+    for (elem <- summary_tabs) {
+      val cols = getColumns(elem).diff(Seq("company_id"))
+      sql(
+        s"""
+           |select * from $elem
+           |""".stripMargin)
+        .withColumn("summary", to_json(struct(cols.map(col): _*)))
+        .createTempView(s"xjk_tmp_summary_tab_$elem")
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}',tn='$elem')
+           |SELECT company_id,summary,null as detail
+           |FROM
+           |    xjk_tmp_summary_tab_$elem
+           |""".stripMargin)
+    }
 
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
-         |SELECT company_id,summary,null as detail
-         |FROM
-         |    xjk_tmp_summary_tab
-         |""".stripMargin)
-    merge.drop()
+
+    /* val merge = merge_table(spark, summary_tabs, "company_id")
+
+     merge.calc(summary_tab)
+     val cols = getColumns(summary_tab).diff(Seq("company_id"))
+
+     sql(
+       s"""
+          |select * from $summary_tab
+          |""".stripMargin)
+       .withColumn("summary", to_json(struct(cols.map(col): _*)))
+       .createTempView("xjk_tmp_summary_tab")
+
+     sql(
+       s"""
+          |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+          |SELECT company_id,summary,null as detail
+          |FROM
+          |    xjk_tmp_summary_tab
+          |""".stripMargin)
+     merge.drop()*/
   }
 
   private def getCastCols(name: String, pre: String): String = {