Quellcode durchsuchen

fix:第二批动态

许家凯 vor 3 Jahren
Ursprung
Commit
fd219e0f8a

+ 1 - 4
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -44,8 +44,6 @@ case class CompanyDynamicRecord(id: String,
                                 dynamic_info: Map[String, Any],
                                 agg_detail_text: String,
                                 agg_detail_rowkey: Seq[RowkeyInfo],
-                                old_record: Map[String, String],
-                                new_record: Map[String, String],
                                 change_time: String,
                                 update_time: String,
                                 create_time: String
@@ -70,8 +68,7 @@ case class CompanyDynamicRecord(id: String,
         dynamic_info,
         agg_detail_text,
         agg_detail_rowkey,
-        old_record,
-        new_record,
+
         change_time,
         update_time,
         create_time

+ 16 - 17
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -65,20 +65,18 @@ case class NgCompanyDynamic(s: SparkSession,
     }
   }
 
-  def calc(): Unit = {
-
-    val where = args_map.keys.map(r => s""" "$r" """).mkString("(", ",", ")")
-
-    val org_tab = if (inc) "winhc_ng.bds_change_extract" else "winhc_ng.bds_change_extract_all"
-    val ds = getLastPartitionsOrElse(org_tab, "0")
+  private lazy val org_tab = if (inc) "winhc_ng.bds_change_extract" else "winhc_ng.bds_change_extract_all"
+  private lazy val ds = getLastPartitionsOrElse(org_tab, "0")
 
-    val rdd: RDD[CompanyDynamicRecord] = sql(
+  private def get_rdd(tn: String): RDD[CompanyDynamicRecord] = {
+    val rdd = sql(
       s"""
          |SELECT  *
          |FROM    $org_tab
          |WHERE   ds = $ds
-         |AND    tn in $where
-         |""".stripMargin).rdd.map(r => {
+         |AND     tn = '$tn'
+         |""".stripMargin)
+      .rdd.map(r => {
       val value = r.getAs[String]("change_fields")
       val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
       ChangeExtract(rowkey = r.getAs("rowkey")
@@ -102,26 +100,27 @@ case class NgCompanyDynamic(s: SparkSession,
     }).flatMap(r => args_map(r.tn).flat_map.apply(r))
       .map(_.format())
       .filter(_ != null)
-      .cache()
 
-    //todo  可将rdd直接落hbase库
+    rdd
+  }
 
+  def calc(): Unit = {
+    //todo  可将rdd直接落hbase库
 
     var rdd_map: mutable.Map[String, RDD[CompanyDynamicRecord]] = mutable.Map.empty
 
     for (elem <- args) {
-      var tmp_rdd: RDD[CompanyDynamicRecord] = null
-      if (elem.group_by_key == null) {
-        tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
-      } else {
+      var tmp_rdd: RDD[CompanyDynamicRecord] = get_rdd(elem.tn)
+
+      if (elem.group_by_key != null) {
         if (elem.group_by_pre == null) {
           println(s"elem.tn = ${elem.tn} : groupBy and flatMap !")
-          tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+          tmp_rdd = tmp_rdd
             .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
             .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
         } else {
           println(s"elem.tn = ${elem.tn} : flatMap , groupBy and flatMap")
-          tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+          tmp_rdd = tmp_rdd
             .flatMap(r => args_map(elem.tn).group_by_pre.apply(r))
             .filter(_ != null)
             .groupBy(r => args_map(elem.tn).group_by_key.apply(r))

+ 0 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -53,8 +53,6 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
       , dynamic_info = dynamic_info
       , agg_detail_text = null
       , agg_detail_rowkey = null
-      , old_record = old_data
-      , new_record = new_data
       , change_time = biz_date
       , update_time = update_time
       , create_time = create_time

+ 11 - 5
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/ComplexDailyDynamic.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.ng.dynamic.utils
 
 import com.winhc.bigdata.spark.ng.dynamic.utils.DynamicFiledUtils.createDynamicInfo
 import org.apache.spark.internal.Logging
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 
 /**
  * @author ZhangJi
@@ -11,11 +12,16 @@ trait ComplexDailyDynamic extends Logging {
   def getDynamicInfo(new_data: Map[String, String]): (String, Map[String, Any]) = {
     try {
       createDynamicInfo(this.getClass.getSimpleName, new_data)
+    } catch {
+      case e: Exception => {
+        logError("xjk:" + e.getMessage, e)
+        println("error data:")
+        println(new_data.toJson())
+        logError("error data:")
+        logError(new_data.toJson())
+        throw new RuntimeException(e)
+      }
     }
-    catch {
-      case e: Exception => logError("xjk:"+e.getMessage, e)
-        (null, Map.empty)
-    }
-  }
 
+  }
 }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DynamicDimConfiguration.scala

@@ -155,8 +155,8 @@ object DynamicDimConfiguration {
       "name" -> "行政处罚信用中国",
       "list_field" -> Map(
         "punish_number" -> Map(),
-        "content" -> Map("from" -> "reason"),
-        "department_name" -> Map("from" -> "department"),
+        "content" -> Map("from" -> Seq("reason")),
+        "department_name" -> Map("from" -> Seq("department")),
         "decision_date" -> Map()
       )
     )
@@ -224,10 +224,10 @@ object DynamicDimConfiguration {
       "name" -> "行政许可",
       "list_field" -> Map(
         "license_number" -> Map(),
-        "start_date" -> Map("from" -> "decision_date"),
+        "start_date" -> Map("from" -> Seq("decision_date")),
         "end_date" -> Map(),
         "department" -> Map(),
-        "scope" -> Map("from" -> "licence_content")
+        "scope" -> Map("from" -> Seq("licence_content"))
       )
     )
 

Datei-Diff unterdrückt, da er zu groß ist
+ 26 - 45
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DynamicFiledUtils.scala


+ 10 - 3
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/SimpleDailyDynamic.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.ng.dynamic.utils
 
 import com.winhc.bigdata.spark.ng.dynamic.utils.DynamicFiledUtils.createSimpleDynamicInfo
 import org.apache.spark.internal.Logging
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 
 /**
  * @author ZhangJi
@@ -12,9 +13,15 @@ trait SimpleDailyDynamic extends Logging {
     try {
       createSimpleDynamicInfo(this.getClass.getSimpleName, new_data)
     } catch {
-      case e: Exception => logError("xjk:"+e.getMessage, e)
-      (null, Map.empty)
+      case e: Exception => {
+        logError("xjk:" + e.getMessage, e)
+        println("error data:")
+        println(new_data.toJson())
+        logError("error data:")
+        logError(new_data.toJson())
+        throw new RuntimeException(e)
+      }
     }
-  }
 
+  }
 }