Browse Source

fix: dynamic

许家凯 3 years ago
parent
commit
1bfcad6128

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.ng.dynamic
 
+import cn.hutool.crypto.SecureUtil
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic.utils.CollapseKeyArgs
@@ -119,7 +120,7 @@ case class CompanyDynamicRecord(id: String,
     val risk_level_str = association_entity_info.map(_.risk_level).distinct.mkString(",")
     val risk_level_detail = association_entity_info.filter(r => StringUtils.isNotBlank(r.keyno)).map(r => s"${r.keyno}@@${r.risk_level}").distinct.mkStringOrNull(",")
     val agg_detail_rowkey_str: String = if (agg_detail_rowkey == null || agg_detail_rowkey.isEmpty) null else agg_detail_rowkey.map(_.toStr).mkString(",")
-    Row(id
+    Row(SecureUtil.md5(id)
       , association_entity_info.toJson()
       , rowkey
       , tn

+ 1 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -93,6 +93,7 @@ case class NgCompanyDynamic(s: SparkSession,
       val filter = args_map(r.tn).filter
       if (filter == null) {
         true
+//        CompanyDynamicUtils.default_filter(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
       } else {
         filter.apply(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
       }
@@ -175,7 +176,6 @@ case class NgCompanyDynamic(s: SparkSession,
 }
 
 object NgCompanyDynamic {
-
   def main(args: Array[String]): Unit = {
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
@@ -187,5 +187,4 @@ object NgCompanyDynamic {
     NgCompanyDynamic(spark, NgCompanyDynamicArgs.getStartArgs(inc), NgCompanyDynamicArgs.getAggArgs, inc = inc).calc()
     spark.stop()
   }
-
 }

+ 3 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DailyAggHandle.scala

@@ -9,7 +9,9 @@ import scala.collection.mutable
  * @date: 2021/6/29 14:29
  */
 abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
-  override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => update_type.equals("insert")
+  override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => {
+    CompanyDynamicUtils.default_filter(update_type, biz_date, change_fields, old_data, new_data) && update_type.equals("insert")
+  }
 
   def getDynamicInfo(new_data: Map[String, String]): (String, Map[String, Any])