浏览代码

feat: 动态数据输出折叠字段

许家凯 3 年之前
父节点
当前提交
1cbadf6300

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -91,9 +91,11 @@ case class CompanyDynamicRecord(id: String,
       , agg_detail_rowkey_str
       , old_record.toJson()
       , new_record.toJson()
+      , s"${tn}_${change_time}"
       , change_time
       , update_time
       , create_time
+      , 0L
       , null
     )
   }

+ 19 - 21
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.ng.dynamic
 
-import com.winhc.bigdata.spark.utils.BaseUtil.{getYesterday, isWindows}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
@@ -17,13 +17,16 @@ import scala.collection.mutable
 case class NgCompanyDynamic(s: SparkSession,
                             args: Seq[NgCompanyDynamicArgs]
                             , agg: Seq[NgAcrossTabAggArgs]
+                            , inc: Boolean = true
                            ) extends LoggingUtils with Logging {
   @(transient@getter) val spark: SparkSession = s
 
-  private val target_tab = "winhc_ng.out_company_dynamic"
+  private val target_tab = if (inc) "winhc_ng.out_company_dynamic" else "winhc_ng.out_company_dynamic_all"
 
   private val args_map: Map[String, NgCompanyDynamicArgs] = args.map(r => (r.tn, r)).toMap
 
+  init()
+
   def init(): Unit = {
     sql(
       s"""
@@ -41,9 +44,11 @@ case class NgCompanyDynamic(s: SparkSession,
          |    ,agg_detail_rowkey STRING COMMENT '聚合类型rowkey的多项rowkey字段,结构:tn@@rowkey,tn@@rowkey'
          |    ,old_record STRING COMMENT '上一个版本数据json格式'
          |    ,new_record STRING COMMENT '当前版本数据json格式'
+         |    ,collapse_key STRING COMMENT '用于折叠的字段,concat_ws("_",tn,change_time)'
          |    ,change_time string COMMENT '变更时间(业务展示 yyyy-MM-dd)'
          |    ,update_time STRING  COMMENT  '更新时间'
          |    ,create_time STRING COMMENT '创建时间'
+         |    ,deleted BIGINT  COMMENT '非0删除'
          |)
          |COMMENT '企业动态输出表'
          |PARTITIONED BY
@@ -60,14 +65,16 @@ case class NgCompanyDynamic(s: SparkSession,
   }
 
   def calc(): Unit = {
-    val ds = getYesterday()
+
     val where = args_map.keys.map(r => s""" "$r" """).mkString("(", ",", ")")
 
+    val org_tab = if (inc) "winhc_ng.bds_change_extract" else "winhc_ng.bds_change_extract_all"
+    val ds = getLastPartitionsOrElse(org_tab, "0")
+
     val rdd: RDD[CompanyDynamicRecord] = sql(
       s"""
          |SELECT  *
-         |FROM    winhc_ng.bds_change_extract
-         |--- FROM    winhc_ng.bds_change_extract_test
+         |FROM    $org_tab
          |WHERE   ds = $ds
          |AND    tn in $where
          |""".stripMargin).rdd.map(r => {
@@ -107,10 +114,12 @@ case class NgCompanyDynamic(s: SparkSession,
         tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
       } else {
         if (elem.group_by_pre == null) {
+          println(s"elem.tn = ${elem.tn} : groupBy and flatMap !")
           tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
             .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
             .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
         } else {
+          println(s"elem.tn = ${elem.tn} : flatMap , groupBy and flatMap")
           tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
             .flatMap(r => args_map(elem.tn).group_by_pre.apply(r))
             .filter(_ != null)
@@ -132,8 +141,10 @@ case class NgCompanyDynamic(s: SparkSession,
         rdd_map = rdd_map - tn
       }
       if (elem.group_by_pre == null) {
+        println(s"agg : elem.tabs = ${elem.tabs} : groupBy and flatMap")
         tmp_rdd = tmp_rdd.groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
       } else {
+        println(s"agg : elem.tabs = ${elem.tabs} : flatMap , groupBy and flatMap")
         tmp_rdd = tmp_rdd
           .flatMap(r => elem.group_by_pre.apply(r))
           .groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
@@ -153,25 +164,12 @@ case class NgCompanyDynamic(s: SparkSession,
 
     spark.createDataFrame(out_rdd.map(_.to_row()), spark.table(target_tab).schema)
       .createTempView("company_dynamic_out_tab")
+    val cols = getColumns(target_tab).diff(Seq("ds")).mkString(",")
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
-         |SELECT id
-         |       ,association_entity_info
-         |       ,rowkey
-         |       ,tn
-         |       ,update_type
-         |       ,risk_level
-         |       ,risk_level_detail
-         |       ,dynamic_info
-         |       ,agg_detail_text
-         |       ,agg_detail_rowkey
-         |       ,old_record
-         |       ,new_record
-         |       ,change_time
-         |       ,update_time
-         |       ,create_time
+         |SELECT $cols
          |FROM   company_dynamic_out_tab
          |""".stripMargin)
   }
@@ -187,7 +185,7 @@ object NgCompanyDynamic {
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
-    NgCompanyDynamic(spark, NgCompanyDynamicArgs.getStartArgs, NgCompanyDynamicArgs.getAggArgs).calc()
+    NgCompanyDynamic(spark, NgCompanyDynamicArgs.getStartArgs, NgCompanyDynamicArgs.getAggArgs, inc = false).calc()
     spark.stop()
   }
 

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyRiskLevelType.scala

@@ -9,7 +9,7 @@ object NgCompanyRiskLevelType extends Enumeration {
   type RiskLevelType = Value //声明枚举对外暴露的变量类型
   val Positive = Value("0") //利好信息
   val Prompt = Value("1") //提示信息
-  val Caution  = Value("2 ") //警示信息
+  val Caution  = Value("2") //警示信息
 
   def showAll = this.values.foreach(println) // 打印所有的枚举值
 }