ソースを参照

fix: 动态dynamic合并

许家凯 3 年 前
コミット
7869e47897

+ 278 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/BusinessTotalDynamic.scala

@@ -0,0 +1,278 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import com.winhc.bigdata.spark.ng.dynamic.utils.BusinessTotalDynamicUtils
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.functions.col
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/5 10:29
+ */
+case class BusinessTotalDynamic(s: SparkSession
+                               ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_tab = "winhc_ng.out_company_dynamic_all"
+
+  private def getRdd: DataFrame = {
+    val df = sql(
+      s"""
+         |${generateAllTabSql("company_change", "winhc_ng")._1}
+         |AND change_time is not null
+         |AND content_before is not null
+         |AND content_after is not null
+         |AND content_after <> content_before
+         |""".stripMargin)
+    df.select(df.columns.map(column => col(column).cast("string")): _*)
+  }
+
+
+  def companyName(): RDD[CompanyDynamicRecord] = {
+
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.companyNameFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.companyNameTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+  def registeredCapital(): RDD[CompanyDynamicRecord] = {
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.registeredCapitalFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.registeredCapitalTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+
+  def registeredAddress(): RDD[CompanyDynamicRecord] = {
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.registeredAddressFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.registeredAddressTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+
+  def businessScope(): RDD[CompanyDynamicRecord] = {
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.businessScopeFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.businessScopeTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+
+  def legalRepresentative(): RDD[CompanyDynamicRecord] = {
+
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.legalRepresentativeFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.legalRepresentativeTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+  def holder(): RDD[CompanyDynamicRecord] = {
+
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.holderFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.holderTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+  def staff(): RDD[CompanyDynamicRecord] = {
+
+    val rdd: RDD[CompanyDynamicRecord] = getRdd
+      .rdd
+      .filter(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.staffFilter(category, change_item, change_info, content_before, content_after)
+      })
+      .flatMap(r => {
+        val company_id = r.getAs[String]("company_id")
+        val company_name = r.getAs[String]("company_name")
+        val category = r.getAs[String]("category")
+        val change_item = r.getAs[String]("change_item")
+        val change_info = r.getAs[String]("change_info")
+        val content_before = r.getAs[String]("content_before")
+        val content_after = r.getAs[String]("content_after")
+        val change_time = r.getAs[String]("change_time")
+        BusinessTotalDynamicUtils.staffTransform(company_id, company_name, category, change_item, change_info, content_before, content_after, change_time)
+      })
+    rdd
+  }
+
+
+  def calc(): Unit = {
+
+    val value = companyName()
+      .union(registeredCapital())
+      .union(registeredAddress())
+      .union(businessScope())
+      .union(legalRepresentative())
+    //    val value = legalRepresentative().union(holder()).union(staff())
+    //    val value = holder().union(staff())
+
+
+    spark.createDataFrame(value.map(_.to_row()), spark.table(target_tab).schema)
+      .createTempView("company_dynamic_out_tab")
+    val cols = getColumns(target_tab).diff(Seq("ds")).mkString(",")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}_business')
+         |SELECT $cols
+         |FROM   company_dynamic_out_tab
+         |""".stripMargin)
+  }
+}
+
+object BusinessTotalDynamic {
+
+
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    BusinessTotalDynamic(spark).calc()
+    spark.stop()
+  }
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -93,7 +93,7 @@ case class CompanyDynamicRecord(id: String,
       return null
     }
     // todo 留下人名,为后期补id
-    /* val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
+     val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.name))
      if (rec.isEmpty) return null
 
      if (rec.length != association_entity_info.length)
@@ -110,7 +110,7 @@ case class CompanyDynamicRecord(id: String,
          dynamic_time,
          update_time,
          create_time
-       )*/
+       )
     this
   }
 

+ 19 - 4
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -155,6 +155,7 @@ case class NgCompanyDynamic(s: SparkSession,
     var out_rdd: RDD[CompanyDynamicRecord] = null
 
     for ((key, value) <- rdd_map) {
+      println("union key: " + key)
       if (out_rdd == null) {
         out_rdd = value
       } else {
@@ -168,7 +169,7 @@ case class NgCompanyDynamic(s: SparkSession,
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}_test')
          |SELECT $cols
          |FROM   company_dynamic_out_tab
          |""".stripMargin)
@@ -177,21 +178,35 @@ case class NgCompanyDynamic(s: SparkSession,
 
 object NgCompanyDynamic {
   def main(args: Array[String]): Unit = {
+    var flag: String = null
+    if (args.size == 1) {
+      flag = args(0)
+    } else {
+      flag = "inc"
+    }
+    var inc = true
+    if (!"inc".equals(flag)) {
+      inc = false
+    }
+
+    println(s"${if (inc) "inc..." else "all..."}")
+
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.debug.maxToStringFields" -> "200",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    val inc = false
+
+//    val tn = "company,company_staff,company_holder"
     val tn = "all"
     var dynamicArgs = NgCompanyDynamicArgs.getStartArgs(inc)
     var aggArgs = NgCompanyDynamicArgs.getAggArgs
     if (!"all".equals(tn)) {
       val set = tn.split(",").toSet
       dynamicArgs = dynamicArgs.filter(r => set.contains(r.tn))
-      aggArgs = aggArgs.filter(r=>{
-       r.tabs.forall(s => set.contains(s))
+      aggArgs = aggArgs.filter(r => {
+        r.tabs.forall(s => set.contains(s))
       })
     }
     NgCompanyDynamic(spark, dynamicArgs, aggArgs, inc = inc).calc()

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -101,4 +101,13 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
 
   }
 
+
+  def formatDate(date: String): String = {
+    if (StringUtils.isEmpty(date)) {
+      null
+    } else {
+      if (date.contains(" ")) date.split(" ")(0) else date
+    }
+  }
+
 }

+ 30 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/BusinessInfo.scala

@@ -4,12 +4,28 @@ import com.winhc.bigdata.spark.implicits.BaseHelper._
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.dynamic.utils.{BusinessInfoDynamicInfoMap, ChangeContent}
 import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord}
+import org.apache.commons.lang3.StringUtils
 
 /**
  * @author: XuJiakai
  * @date: 2021/7/28 17:36
  */
 case class BusinessInfo() extends AcrossTabAggHandle {
+
+  private val dynamic_code_map = Map(
+    "109001,109002,109005" -> "109001"
+    , "101001" -> "101001"
+    , "101002" -> "101002"
+    , "101003" -> "101003"
+    , "101004" -> "101004"
+    , "101005" -> "101005"
+    , "101006" -> "101006"
+    , "101301" -> "101301"
+  ).flatMap(r => {
+    r._1.split(",").map((_, r._2))
+  })
+
+
   /**
    * 需要聚合的维度
    *
@@ -23,7 +39,7 @@ case class BusinessInfo() extends AcrossTabAggHandle {
    * @return
    */
   override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    val info = cdr.association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno)).find(r => r.keyno.length == 32 || r.keyno.length == 33).orNull
     if (info == null) {
       s"${cdr.id}_${cdr.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label}"
     } else {
@@ -42,6 +58,18 @@ case class BusinessInfo() extends AcrossTabAggHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
-    Seq(seq(0).customCopy(dynamic_info = str.toAnyMap()))
+
+    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
+  }
+
+
+  private def get_new_code(seq: Seq[CompanyDynamicRecord]): String = {
+    val code = seq.map(_.dynamic_code).distinct.map(dynamic_code_map(_)).distinct
+    if (code.length != 1) {
+      val str = seq.map(_.dynamic_code).distinct.sorted.mkString(",")
+      import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+      throw new RuntimeException("dynamic code error : " + str + "  record:\n " + seq.toJson())
+    }
+    code.head
   }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -55,8 +55,8 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
         , dynamic_info = map.toMap()
         , agg_detail_text = null
         , agg_detail_rowkey = null
-        , biz_time = biz_date
-        , dynamic_time = date
+        , biz_time = formatDate(biz_date)
+        , dynamic_time = formatDate(date)
         , update_time = update_time
         , create_time = update_time
       )

+ 31 - 6
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -20,6 +20,16 @@ import scala.collection.mutable
  * @date: 2021/6/23 10:43
  */
 case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
+
+
+  private val dynamic_code_map = Map(
+    "109003,109004" -> "109003"
+    , "101101,101102" -> "101101"
+  ).flatMap(r=>{
+    r._1.split(",").map((_,r._2))
+  })
+
+
   override def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
     val change_fields = change_extract.change_fields
     val tn = change_extract.tn
@@ -54,8 +64,8 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
         , dynamic_info = map.toMap()
         , agg_detail_text = null
         , agg_detail_rowkey = null
-        , biz_time = biz_date
-        , dynamic_time = date
+        , biz_time = formatDate(biz_date)
+        , dynamic_time = formatDate(date)
         , update_time = update_time
         , create_time = update_time
       )
@@ -247,11 +257,12 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
   }
 
   override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    val info = cdr.association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno)).find(r => r.keyno.length == 32 || r.keyno.length == 33).orNull
+
     if (info == null) {
-      cdr.id
+      s"${cdr.id}_${cdr.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label}"
     } else {
-      info.keyno
+      s"${info.keyno}_${cdr.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label}"
     }
   }
 
@@ -262,10 +273,24 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
-    Seq(seq(0).customCopy(dynamic_info = str.toAnyMap()))
+
+    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
   }
 
 
+  private def get_new_code(seq: Seq[CompanyDynamicRecord]): String = {
+    val code = seq.map(_.dynamic_code).distinct.map(dynamic_code_map(_)).distinct
+    if (code.length != 1) {
+      val str = seq.map(_.dynamic_code).distinct.sorted.mkString(",")
+
+      import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+      throw new RuntimeException("dynamic code error : " + str + "  record:\n " + seq.toJson())
+    }
+    code.head
+  }
+
+
+
   def getRealCapital(str: String): String = {
     if (StringUtils.isEmpty(str))
       return null

+ 32 - 12
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_staff.scala

@@ -13,6 +13,13 @@ import scala.collection.mutable
  */
 case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
 
+  private val dynamic_code_map = Map(
+    "109005" -> "109005"
+    , "101301" -> "101301"
+  ).flatMap(r => {
+    r._1.split(",").map((_, r._2))
+  })
+
 
   private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z ]".r
 
@@ -50,8 +57,8 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
         , dynamic_info = map.toMap()
         , agg_detail_text = null
         , agg_detail_rowkey = null
-        , biz_time = biz_date
-        , dynamic_time = date
+        , biz_time = formatDate(biz_date)
+        , dynamic_time = formatDate(date)
         , update_time = update_time
         , create_time = update_time
       )
@@ -63,7 +70,7 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     //新增主要成员
     def staffNew(staffName: String, staffId: String, staffType: String): Seq[CompanyDynamicRecord] = {
       val cdr = getRecordPro(
-        dynamicCode = "109005"
+        dynamicCode = "101301"
         , keyno = company_id
         , name = company_name
         , label = "主要成员"
@@ -143,7 +150,7 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     //主要成员移除
     def staffDeleted(staffName: String, staffId: String, staffType: String): Seq[CompanyDynamicRecord] = {
       val cdr = getRecordPro(
-        dynamicCode = "109005"
+        dynamicCode = "101301"
         , keyno = company_id
         , name = company_name
         , label = "主要成员"
@@ -187,12 +194,14 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
 
       case "update" => {
         //主要成员职位变化
-        val n_staff_name = new_data("staff_name")
-        val n_hid = new_data("hid")
-        val n_staff_type = new_data("staff_type")
-        val o_staff_type = old_data("staff_type")
-
-        list = list ++ staffTypeChange(n_staff_name, n_hid, o_staff_type, n_staff_type)
+        if (change_fields.contains("staff_type")) {
+          val n_staff_name = new_data("staff_name")
+          val n_hid = new_data("hid")
+          val n_staff_type = new_data("staff_type")
+          val o_staff_type = old_data("staff_type")
+
+          list = list ++ staffTypeChange(n_staff_name, n_hid, o_staff_type, n_staff_type)
+        }
       }
 
       case "deleted" => {
@@ -233,7 +242,7 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
   }
 
   override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    val info = cdr.association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno)).find(r => r.keyno.length == 32 || r.keyno.length == 33).orNull
     if (info == null) {
       cdr.id
     } else {
@@ -248,7 +257,18 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents)
-    Seq(seq(0).customCopy(dynamic_info = str.toMap()))
+    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toMap()))
+  }
+
+
+  private def get_new_code(seq: Seq[CompanyDynamicRecord]): String = {
+    val code = seq.map(_.dynamic_code).distinct.map(dynamic_code_map(_)).distinct
+    if (code.length != 1) {
+      val str = seq.map(_.dynamic_code).distinct.sorted.mkString(",")
+      import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+      throw new RuntimeException("dynamic code error : " + str + "  record:\n " + seq.toJson())
+    }
+    code.head
   }
 }
 

+ 477 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/BusinessTotalDynamicUtils.scala

@@ -0,0 +1,477 @@
+package com.winhc.bigdata.spark.ng.dynamic.utils
+
+import com.winhc.bigdata.spark.implicits.BaseHelper._
+import com.winhc.bigdata.spark.implicits.Bool._
+import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, CompanyDynamicRecord, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.utils.{BaseUtil, RegCapitalAmount}
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/5 15:55
+ */
+object BusinessTotalDynamicUtils {
+
+  private def getRecordPro(dynamicCode: String
+                           , rowkey: String
+                           , name: String
+                           , keyno: String
+                           , label: String
+                           , risk_level: NgCompanyRiskLevelType
+                           , before: String
+                           , after: String
+                           , entity: List[Map[String, String]]
+                           , change_time: String
+                          ): CompanyDynamicRecord = {
+    val tn = "company_change"
+    val update_time = BaseUtil.nowDate()
+
+
+    val map = Map[String, Any](
+      "code" -> dynamicCode
+      , "label" -> label
+      , "before" -> before
+      , "after" -> after
+      , "entity" -> entity
+    )
+
+    val date = if (change_time.contains(" ")) change_time.split(" ")(0) else change_time
+    CompanyDynamicRecord(
+      id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamicCode)
+      , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+      , rowkey = rowkey
+      , tn = tn
+      , update_type = "update"
+      , dynamic_code = dynamicCode
+      , dynamic_info = map
+      , agg_detail_text = null
+      , agg_detail_rowkey = null
+      , biz_time = date
+      , dynamic_time = date
+      , update_time = update_time
+      , create_time = update_time
+    )
+  }
+
+
+  private def getRecordPro(dynamicCode: String
+                           , rowkey: String
+                           , name: String
+                           , keyno: String
+                           , label: String
+                           , risk_level: NgCompanyRiskLevelType
+                           , content: Seq[ChangeContent]
+                           , change_time: String
+                          ): CompanyDynamicRecord = {
+    val map = BusinessInfoDynamicInfoMap(label = label, content = content)
+    val tn = "company_change"
+    val update_time = BaseUtil.nowDate()
+
+    val date = if (change_time.contains(" ")) change_time.split(" ")(0) else change_time
+    CompanyDynamicRecord(
+      id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamicCode)
+      , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+      , rowkey = rowkey
+      , tn = tn
+      , update_type = "update"
+      , dynamic_code = dynamicCode
+      , dynamic_info = map.toMap()
+      , agg_detail_text = null
+      , agg_detail_rowkey = null
+      , biz_time = date
+      , dynamic_time = date
+      , update_time = update_time
+      , create_time = update_time
+    )
+  }
+
+  def getSingleRecordChange(code: String
+                            , rowkey: String
+                            , label: String
+                            , keyno: String
+                            , name: String
+                            , before: String
+                            , after: String
+                            , date: String
+                            , riskLevel: NgCompanyRiskLevelType = NgCompanyRiskLevelType.Prompt, flag: String = null): CompanyDynamicRecord = {
+    getRecordPro(
+      dynamicCode = code
+      , rowkey = rowkey
+      , keyno = keyno
+      , name = name
+      , label = label
+      , risk_level = riskLevel
+      , content = Seq(ChangeContent(code, before = Entity(info = before), after = Entity(info = after), Entity(name = name, keyno = keyno), flag = flag))
+      , change_time = date
+    )
+  }
+
+
+  def companyNameFilter(category: String,
+                        change_item: String,
+                        change_info: String,
+                        content_before: String,
+                        content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category)) {
+      false
+    } else {
+      category.equals("公司名称变更")
+    }
+  }
+
+
+  def companyNameTransform(company_id: String,
+                           company_name: String,
+                           category: String,
+                           change_item: String,
+                           change_info: String,
+                           content_before: String,
+                           content_after: String,
+                           change_time: String
+                          ): Seq[CompanyDynamicRecord] = {
+    val record = getSingleRecordChange(
+      code = "101001"
+      , rowkey = company_id
+      , label = "公司更名"
+      , keyno = company_id
+      , name = company_name
+      , before = content_before
+      , after = content_after
+      , date = change_time
+    )
+    Seq(record)
+  }
+
+
+  def registeredCapitalFilter(category: String,
+                              change_item: String,
+                              change_info: String,
+                              content_before: String,
+                              content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("注册资本变更")) {
+      return false
+    }
+    if (change_item.contains("实")) {
+      return false
+    }
+    true
+  }
+
+
+  def registeredCapitalTransform(company_id: String,
+                                 company_name: String,
+                                 category: String,
+                                 change_item: String,
+                                 change_info: String,
+                                 content_before: String,
+                                 content_after: String,
+                                 change_time: String
+                                ): Seq[CompanyDynamicRecord] = {
+    val bool = RegCapitalAmount.isAdditionalShare(content_before, content_after)
+    if (bool == 0)
+      return Seq.empty
+    if (0 == RegCapitalAmount.getAmount(content_before).toDouble || RegCapitalAmount.getAmount(content_after).toDouble == 0)
+      return Seq.empty
+    val flag = (bool > 0) ? "1" | "0"
+    val r = (bool > 0) ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
+
+    val record = getSingleRecordChange("101002"
+      , company_id
+      , "注册资本"
+      , company_id
+      , company_name
+      , content_before
+      , content_after
+      , riskLevel = r
+      , date = change_time
+      , flag = flag)
+    Seq(record)
+  }
+
+
+  def registeredAddressFilter(category: String,
+                              change_item: String,
+                              change_info: String,
+                              content_before: String,
+                              content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("住所变更")) {
+      return false
+    }
+    true
+  }
+
+
+  def registeredAddressTransform(company_id: String,
+                                 company_name: String,
+                                 category: String,
+                                 change_item: String,
+                                 change_info: String,
+                                 content_before: String,
+                                 content_after: String,
+                                 change_time: String
+                                ): Seq[CompanyDynamicRecord] = {
+    val record = getSingleRecordChange(
+      "101004"
+      , company_id
+      , "注册地址变化"
+      , company_id
+      , company_name
+      , content_before
+      , content_after
+      , change_time
+    )
+    Seq(record)
+  }
+
+
+  def businessScopeFilter(category: String,
+                          change_item: String,
+                          change_info: String,
+                          content_before: String,
+                          content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("经营范围变更")) {
+      return false
+    }
+    true
+  }
+
+
+  def businessScopeTransform(company_id: String,
+                             company_name: String,
+                             category: String,
+                             change_item: String,
+                             change_info: String,
+                             content_before: String,
+                             content_after: String,
+                             change_time: String
+                            ): Seq[CompanyDynamicRecord] = {
+    if (StringUtils.isEmpty(content_before) || StringUtils.isEmpty(content_after))
+      return Seq.empty
+    val record = getSingleRecordChange(
+      "101005"
+      , company_id
+      , "经营范围变化"
+      , company_id
+      , company_name
+      , content_before
+      , content_after
+      , change_time
+    )
+    Seq(record)
+  }
+
+
+  def legalRepresentativeFilter(category: String,
+                                change_item: String,
+                                change_info: String,
+                                content_before: String,
+                                content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("法定代表人、负责人变更")) {
+      return false
+    }
+    true
+  }
+
+
+  def legalRepresentativeTransform(company_id: String,
+                                   company_name: String,
+                                   category: String,
+                                   change_item: String,
+                                   change_info: String,
+                                   content_before: String,
+                                   content_after: String,
+                                   change_time: String
+                                  ): Seq[CompanyDynamicRecord] = {
+    if (StringUtils.isEmpty(content_before) || StringUtils.isEmpty(content_after))
+      return Seq.empty
+
+
+
+    var entity: List[Map[String, String]] = List.empty
+    if (change_info != null) {
+      try {
+        val map: Map[String, Any] = change_info.toAnyMap()
+        entity = map("entity").asInstanceOf[List[Map[String, String]]]
+      } catch {
+        case ex: Exception => {}
+      }
+    }
+
+    val cdr = getRecordPro(
+      dynamicCode = "110101"
+      , rowkey = company_id
+      , keyno = company_id
+      , name = company_name
+      , label = "法定代表人变更"
+      , risk_level = NgCompanyRiskLevelType.Prompt
+      , before = content_before
+      , after = content_after
+      , entity = entity
+      , change_time = change_time
+    )
+    Seq(cdr)
+
+
+   /* var beforeKeyno: String = null
+    var afterKeyno: String = null
+    try {
+      val map: Map[String, Any] = change_info.toAnyMap()
+      val entity = map("entity").asInstanceOf[List[Map[String, String]]]
+      for (elem <- entity) {
+        try {
+          val keyno = elem("keyno")
+          val name = elem("name")
+          if (content_before.equals(name)) {
+            beforeKeyno = keyno
+          }
+          if (content_after.equals(name)) {
+            afterKeyno = keyno
+          }
+        } catch {
+          case e: Exception => {
+          }
+        }
+      }
+    } catch {
+      case ex: Exception => {
+      }
+    }
+
+
+    val cdr = getRecordPro(
+      dynamicCode = "110101"
+      , rowkey = company_id
+      , keyno = company_id
+      , name = company_name
+      , label = "法定代表人变更"
+      , risk_level = NgCompanyRiskLevelType.Prompt
+      , content = Seq(ChangeContent("101003", before = Entity(content_before, beforeKeyno)
+        , after = Entity(content_after, afterKeyno)
+        , Entity(name = company_name, keyno = company_id)))
+      , change_time = change_time
+    )
+    Seq(cdr)*/
+  }
+
+
+  def holderFilter(category: String,
+                   change_item: String,
+                   change_info: String,
+                   content_before: String,
+                   content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("股东、发起人变更")) {
+      return false
+    }
+    true
+  }
+
+
+  def holderTransform(company_id: String,
+                      company_name: String,
+                      category: String,
+                      change_item: String,
+                      change_info: String,
+                      content_before: String,
+                      content_after: String,
+                      change_time: String
+                     ): Seq[CompanyDynamicRecord] = {
+    if (StringUtils.isEmpty(content_before) || StringUtils.isEmpty(content_after))
+      return Seq.empty
+
+    var entity: List[Map[String, String]] = List.empty
+    if (change_info != null) {
+      try {
+        val map: Map[String, Any] = change_info.toAnyMap()
+        entity = map("entity").asInstanceOf[List[Map[String, String]]]
+      } catch {
+        case ex: Exception => {}
+      }
+    }
+
+    val cdr = getRecordPro(
+      dynamicCode = "110102"
+      , rowkey = company_id
+      , keyno = company_id
+      , name = company_name
+      , label = "股东、发起人变更"
+      , risk_level = NgCompanyRiskLevelType.Prompt
+      , before = content_before
+      , after = content_after
+      , entity = entity
+      , change_time = change_time
+    )
+    Seq(cdr)
+  }
+
+
+  def staffFilter(category: String,
+                  change_item: String,
+                  change_info: String,
+                  content_before: String,
+                  content_after: String): Boolean = {
+
+    if (StringUtils.isEmpty(category) || !category.equals("高级管理人员备案")) {
+      return false
+    }
+    true
+  }
+
+
+  def staffTransform(company_id: String,
+                     company_name: String,
+                     category: String,
+                     change_item: String,
+                     change_info: String,
+                     content_before: String,
+                     content_after: String,
+                     change_time: String
+                    ): Seq[CompanyDynamicRecord] = {
+    if (StringUtils.isEmpty(content_before) || StringUtils.isEmpty(content_after))
+      return Seq.empty
+    var entity: List[Map[String, String]] = List.empty
+    if (change_info != null) {
+      try {
+        val map: Map[String, Any] = change_info.toAnyMap()
+        entity = map("entity").asInstanceOf[List[Map[String, String]]]
+      } catch {
+        case ex: Exception => {}
+      }
+    }
+    val cdr = getRecordPro(
+      dynamicCode = "110103"
+      , rowkey = company_id
+      , keyno = company_id
+      , name = company_name
+      , label = "主要成员变更"
+      , risk_level = NgCompanyRiskLevelType.Prompt
+      , before = content_before
+      , after = content_after
+      , entity = entity
+      , change_time = change_time
+    )
+    Seq(cdr)
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val company_id = "170ab62a5934f62c69f68a916a2b92dc"
+    val company_name = "科右前旗爱日克奶食品专业合作社"
+    val change_info = "{\"entity\":[{\"name\":\"达胡巴雅尔\",\"keyno\":\"5d2f687cd40ed30cd8b9e1cb29f1bfa0\"}]}"
+    val content_before = "翡翠"
+    val content_after = "达胡巴雅尔"
+    val change_time = "2019-07-18 00:00:00"
+
+    val records = legalRepresentativeTransform(company_id, company_name, "", "", change_info, content_before, content_after, change_time)
+    println(records(0))
+
+  }
+
+
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DailyAggHandle.scala

@@ -8,7 +8,7 @@ import scala.collection.mutable
  * @author: XuJiakai
  * @date: 2021/6/29 14:29
  */
-abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
+abstract class DailyAggHandle() extends NgCompanyDynamicHandle with DynamicAssociationEntity {
   override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => {
     CompanyDynamicUtils.default_filter(update_type, biz_date, change_fields, old_data, new_data) && update_type.equals("insert")
   }

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DynamicAssociationEntity.scala

@@ -0,0 +1,11 @@
+package com.winhc.bigdata.spark.ng.dynamic.utils
+
+import com.winhc.bigdata.spark.ng.dynamic.AssociationEntityInfo
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/7 15:30
+ */
+trait DynamicAssociationEntity {
+  def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo]
+}