浏览代码

fix: 将同一cid的摘要进行聚合

许家凯 4 年之前
父节点
当前提交
77f5fc9b4e
共有 1 个文件被更改,包括 23 次插入6 次删除
  1. 23 6
      src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

+ 23 - 6
src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

@@ -132,9 +132,9 @@ case class CompanySummaryPro(s: SparkSession,
 
     val view = groupByInfo == null match {
       case true => s"arr[0] as $tableName"
-      case false => for (i <- 0 to groupByInfo.value_alias.length) {
+      case false => groupByInfo.value_alias.indices.map(i => {
         s"arr[$i] as ${groupByInfo.value_alias(i)._2}"
-      }.mkString(",")
+      }).mkString(",")
     }
 
     //注册函数
@@ -143,7 +143,7 @@ case class CompanySummaryPro(s: SparkSession,
         (s"${r._1}", r._2)
       })
 
-      def getResArr(cid: Long, group_val: String, num: Long): Seq[Long] = {
+      def getResArr(group_val: String, num: Long): Seq[Long] = {
         val res = scala.collection.mutable.ArrayBuffer[Long]()
         for (i <- fieldSeq) {
           if (i._1.equals(group_val)) {
@@ -181,6 +181,18 @@ case class CompanySummaryPro(s: SparkSession,
          |""".stripMargin)
       .createOrReplaceTempView("summary_tab")
 
+    if (groupByInfo != null) {
+      sql(
+        s"""
+           |SELECT  cid
+           |        ,${groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
+           |FROM    summary_tab
+           |GROUP BY cid
+           |""".stripMargin)
+        .createOrReplaceTempView("summary_tab")
+    }
+
+
     if (StringUtils.isEmpty(target_tab)) {
       /*import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
       val writF = getColumns("summary_tab").diff(Seq("cid"))
@@ -233,11 +245,16 @@ object CompanySummaryPro {
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
     CompanySummaryPro(s = spark
       , project = "winhc_eci_dev"
-      , tableName = "company_judicial_assistance_list"
+      , tableName = "company_equity_info_list"
       , cidField = "split(rowkey,'_')[0]"
+      , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
+        ("0", "company_equity_info_list_0")
+        , ("1", "company_equity_info_list_1")
+        , ("2", "company_equity_info_list_2")
+      ))
       , where = "deleted = 0"
-      )
-      .calc(is_inc = true, target_tab = "winhc_eci_dev.xjk_tmp_summary")
+    )
+      .calc(is_inc = false, target_tab = "winhc_eci_dev.xjk_tmp_summary")
 
     spark.stop()
   }