Przeglądaj źródła

feat: 动态跨表聚合加一个前置处理

许家凯 4 lat temu
rodzic
commit
6dfcfd3cd2

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/AcrossTabAggHandle.scala

@@ -8,9 +8,31 @@ import org.apache.spark.internal.Logging
  */
 trait AcrossTabAggHandle extends Serializable with Logging {
 
+  /**
+   * 需要聚合的维度
+   *
+   * @return
+   */
   def getTables: Seq[String]
 
+  /**
+   * group_by前置处理程。flat_map
+   *
+   * @return
+   */
+  def group_by_pre: (CompanyDynamicRecord) => Seq[CompanyDynamicRecord] = null
+
+  /**
+   * 聚合的key,相同的key会聚合到一起
+   *
+   * @return
+   */
   def group_by_key: (CompanyDynamicRecord) => String
 
+  /**
+   * 聚合处理程序  flat_map
+   *
+   * @return
+   */
   def group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord]
 }

+ 14 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -51,6 +51,12 @@ case class NgCompanyDynamic(s: SparkSession,
          |    ds STRING COMMENT '分区'
          |)
          |""".stripMargin)
+
+    val i = agg.flatMap(_.tabs).map((_, 1)).groupBy(_._1)
+      .mapValues(_.foldLeft(0)(_ + _._2)).count(_._2 >= 2)
+    if (i > 0) {
+      throw new RuntimeException("agg.tn 有交叉")
+    }
   }
 
   def calc(): Unit = {
@@ -96,6 +102,7 @@ case class NgCompanyDynamic(s: SparkSession,
       if (elem.group_by_key == null) {
         tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
       } else {
+
         tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
           .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
           .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
@@ -113,7 +120,13 @@ case class NgCompanyDynamic(s: SparkSession,
         }
         rdd_map = rdd_map - tn
       }
-      tmp_rdd = tmp_rdd.groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
+      if (elem.group_by_pre == null) {
+        tmp_rdd = tmp_rdd.groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
+      } else {
+        tmp_rdd = tmp_rdd
+          .flatMap(r => elem.group_by_pre.apply(r))
+          .groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
+      }
       rdd_map = rdd_map + (elem.tabs.mkString("_") -> tmp_rdd)
     }
 

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicArgs.scala

@@ -20,6 +20,7 @@ case class NgCompanyDynamicArgs(
 }
 
 case class NgAcrossTabAggArgs(tabs: Seq[String]
+                              , group_by_pre: (CompanyDynamicRecord) => Seq[CompanyDynamicRecord]
                               , group_by_key: (CompanyDynamicRecord) => String
                               , group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord])
 
@@ -36,7 +37,7 @@ object NgCompanyDynamicArgs {
   def getAggArgs: Seq[NgAcrossTabAggArgs] = {
     val handles = ReflectUtils.subObject[AcrossTabAggHandle](classOf[AcrossTabAggHandle], this.getClass.getPackage.getName)
     handles.map(ch => {
-      NgAcrossTabAggArgs(tabs = ch.getTables, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map)
+      NgAcrossTabAggArgs(tabs = ch.getTables, group_by_pre = ch.group_by_pre, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map)
     })
   }