Browse Source

fix: 工商增量动态

许家凯 3 years ago
parent
commit
6015ca234b

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/BaseHelper.scala

@@ -1,5 +1,9 @@
 package com.winhc.bigdata.spark.implicits
 
+import org.json4s.{Formats, NoTypeHints}
+import org.json4s.jackson.Serialization
+import org.json4s.jackson.Serialization.read
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/14 08:45
@@ -27,6 +31,15 @@ object BaseHelper {
         -1
       }
     }
+
+
+    def toAnyMap(): Map[String, Any] = {
+      implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
+      if (str == null) {
+        null
+      } else
+        read[Map[String, Any]](str)
+    }
   }
 
 

+ 11 - 4
src/main/scala/com/winhc/bigdata/spark/implicits/CaseClass2JsonHelper.scala

@@ -3,16 +3,23 @@ package com.winhc.bigdata.spark.implicits
 import org.json4s.jackson.Serialization
 import org.json4s.jackson.Serialization.write
 import org.json4s.{Formats, NoTypeHints}
+
 /**
  * @author: XuJiakai
  * @date: 2020/11/23 10:51
  */
 object CaseClass2JsonHelper {
-  implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
   implicit class CaseClass2JsonEnhancer[A <: AnyRef](that: A) {
-    def toJson(): String ={
-      if(that==null){
-       return null
+    /* def toJson(): String ={
+       implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
+       if(that==null){
+        return null
+       }
+       write(that)
+     }*/
+    def toJson()(implicit formats: Formats = Serialization.formats(NoTypeHints)): String = {
+      if (that == null) {
+        return null
       }
       write(that)
     }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -180,7 +180,7 @@ object NgCompanyDynamic {
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.debug.maxToStringFields" -> "200",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "1000000"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val inc = false

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/intellectual.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.ng.dynamic.agg
 
 import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
-import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord, RowkeyInfo}
+import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord}
 import com.winhc.bigdata.spark.utils.BaseUtil
 import org.apache.commons.lang3.StringUtils
 

+ 167 - 61
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -1,15 +1,18 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
+import com.winhc.bigdata.spark.implicits.Bool._
+import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic._
+import com.winhc.bigdata.spark.ng.dynamic.utils._
 import com.winhc.bigdata.spark.utils.RegCapitalAmount
-import com.winhc.bigdata.spark.implicits.MapHelper._
+
 import scala.collection.mutable
 
 /**
  * @author: XuJiakai
  * @date: 2021/6/22 16:04
  */
-case class company(is_inc:Boolean) extends NgCompanyDynamicHandle {
+case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
   private def getCompanyDynamicRecord(change_extract: ChangeExtract, dynamic_code: String, dynamic_info: Map[String, String], update_field: String): CompanyDynamicRecord = {
     val company_id = change_extract.company_id
     val company_name = change_extract.company_name
@@ -27,92 +30,196 @@ case class company(is_inc:Boolean) extends NgCompanyDynamicHandle {
     val biz_date = change_extract.biz_date
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
+
+
+    def getRecordPro(dynamicCode: String
+                     , name: String
+                     , keyno: String
+                     , label: String
+                     , risk_level: NgCompanyRiskLevelType
+                     , content: Seq[ChangeContent]
+                    ): CompanyDynamicRecord = {
+      val map = BusinessInfoDynamicInfoMap(label = label, content = content)
+
+      val date = if (is_inc) update_time else biz_date
+      CompanyDynamicRecord(
+        id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamicCode)
+        , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+        , rowkey = rowkey
+        , tn = tn
+        , update_type = update_type
+        , dynamic_code = dynamicCode
+        , dynamic_info = map.toMap()
+        , agg_detail_text = null
+        , agg_detail_rowkey = null
+        , biz_time = biz_date
+        , dynamic_time = date
+        , update_time = update_time
+        , create_time = update_time
+      )
+    }
+
+    def getSingleRecordChange(code: String, label: String, keyno: String, name: String, before: String, after: String, riskLevel: NgCompanyRiskLevelType = NgCompanyRiskLevelType.Prompt, flag: String = null): CompanyDynamicRecord = {
+      getRecordPro(
+        dynamicCode = code
+        , keyno = keyno
+        , name = name
+        , label = label
+        , risk_level = riskLevel
+        , content = Seq(ChangeContent(code, before = Entity(info = before), after = Entity(info = after), Entity(name = name, keyno = keyno), flag = flag))
+      )
+    }
+
+
     var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
 
+
+    def companyNameChange(beforeName: String, afterName: String): Seq[CompanyDynamicRecord] = {
+      val record = getSingleRecordChange("101001", "公司更名", company_id, company_name, beforeName, afterName)
+      Seq(record)
+    }
+
+    //注册资本
+    def registeredCapitalChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
+      val bool = RegCapitalAmount.isAdditionalShare(before, after)
+      val flag = bool ? "1" | "0"
+      val r = bool ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
+
+      val record = getSingleRecordChange("101002", "注册资本", company_id, company_name, before, after, riskLevel = r, flag = flag)
+      Seq(record)
+    }
+
+    def companyNew(afterName: String, afterKeyno: String): Seq[CompanyDynamicRecord] = {
+      val cdr = getRecordPro(
+        dynamicCode = "109001"
+        , keyno = afterKeyno
+        , name = afterName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109001", before = null
+          , after = Entity(name = company_name, keyno = company_id)
+          , Entity(afterName, afterKeyno)))
+      )
+      Seq(cdr)
+    }
+
+    //法人变化
+    def legalRepresentativeChange(beforeName: String, beforeKeyno: String, afterName: String, afterKeyno: String): Seq[CompanyDynamicRecord] = {
+
+      val cdr = getRecordPro(
+        dynamicCode = "101003"
+        , keyno = company_id
+        , name = company_name
+        , label = "法定代表人变更"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("101003", before = Entity(beforeName, beforeKeyno)
+          , after = Entity(afterName, afterKeyno)
+          , Entity(name = company_name, keyno = company_id)))
+      )
+
+      val cdr2 = getRecordPro(
+        dynamicCode = "109001"
+        , keyno = afterKeyno
+        , name = afterName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109001", before = null
+          , after = Entity(name = company_name, keyno = company_id)
+          , Entity(afterName, afterKeyno)))
+      )
+
+      val cdr3 = getRecordPro(
+        dynamicCode = "109002"
+        , keyno = beforeKeyno
+        , name = beforeName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109002", before = Entity(name = company_name, keyno = company_id)
+          , after = null
+          , Entity(beforeName, beforeKeyno)))
+      )
+      Seq(cdr, cdr2, cdr3)
+    }
+
+
+    //注册地址变化
+    def registeredAddressChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
+      val record = getSingleRecordChange("101004", "注册地址变化", company_id, company_name, before, after)
+      Seq(record)
+    }
+
+    //经营范围变化
+    def businessScopeChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
+      val record = getSingleRecordChange("101005", "经营范围变化", company_id, company_name, before, after)
+      Seq(record)
+    }
+
+    //todo 公司状态变化
+    @deprecated
+    def companyStateChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
+      val r = (after.contains("销")) ? NgCompanyRiskLevelType.Caution | NgCompanyRiskLevelType.Prompt
+      val record = getSingleRecordChange("101006", "公司状态变化", company_id, company_name, before, after, riskLevel = r)
+      Seq(record)
+    }
+
+
     update_type match {
       case "insert" => {
-        val legal_entity_id = new_data("legal_entity_id")
-        val legal_entity_name = new_data("legal_entity_name")
-        val code = "100100"
-        val dynamic_info = Map(
-          "code" -> code,
-          "description" -> "公司新成立"
-        )
-        list = list :+ getCompanyDynamicRecord(change_extract = change_extract
-          , code
-          , dynamic_info = dynamic_info
-          , update_field = null
-          , association_entity_info = Seq(AssociationEntityInfo(keyno = legal_entity_id, name = legal_entity_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null))
-        )
+        //公司新成立
+        val new_name = new_data("legal_entity_name")
+        val new_keyno = new_data("legal_entity_id")
+        list = list ++ companyNew(new_name, new_keyno)
       }
       case "update" => {
         for (elem <- change_fields.intersect("name,reg_capital,legal_entity_name,reg_location,business_scope,reg_status_std".split(","))) {
           elem match {
             case "name" => {
-              val code = "100101"
-              val dynamic_info = Map(
-                "code" -> code,
-                "description" -> "公司名称发生变化"
-              )
-              list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "name")
+              val old_name = old_data("name")
+              val new_name = new_data("name")
+              //公司名称变化
+              list = list ++ companyNameChange(old_name, new_name)
             }
-            case "reg_capital" => { //注册资本需发生实质性变化
+            case "reg_capital" => {
+              //注册资本变化
               val old_reg_capital: String = RegCapitalAmount.getAmount(old_data("reg_capital"))
               val new_reg_capital: String = RegCapitalAmount.getAmount(new_data("reg_capital"))
               if (old_reg_capital != null && new_reg_capital != null) {
                 if (!old_reg_capital.equals(new_reg_capital)) {
-                  val code = "100102"
-                  val dynamic_info = Map(
-                    "code" -> code,
-                    "description" -> "注册资本发生变化"
-                  )
-                  list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "reg_capital")
+                  //注册资本发生实质性变化
+                  list = list ++ registeredCapitalChange(old_data("reg_capital"), new_data("reg_capital"))
                 }
               }
             }
 
             case "legal_entity_name" => {
-              val code = "100103"
-              val dynamic_info = Map(
-                "code" -> code,
-                "description" -> "法定代表人发生变化"
-              )
-              //todo 法人变化 单条记录两个实体
-              list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "legal_entity_name")
+              //法定代表人变化
+              val old_name = old_data("legal_entity_name")
+              val old_keyno = old_data("legal_entity_id")
+              val new_name = new_data("legal_entity_name")
+              val new_keyno = new_data("legal_entity_id")
+              list = list ++ legalRepresentativeChange(old_name, old_keyno, new_name, new_keyno)
             }
 
             case "reg_location" => {
-              val code = "100104"
-              val dynamic_info = Map(
-                "code" -> code,
-                "description" -> "公司注册地址发生变化"
-              )
-              list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "reg_location")
+              //注册地址变化
+              val old_record = old_data("reg_location")
+              val new_record = new_data("reg_location")
+              list = list ++ registeredAddressChange(old_record, new_record)
             }
 
             case "business_scope" => {
-              val code = "100105"
-              val dynamic_info = Map(
-                "code" -> code,
-                "description" -> "公司经营范围发生变化"
-              )
-              list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "business_scope")
+              //经营范围变化
+              val old_record = old_data("business_scope")
+              val new_record = new_data("business_scope")
+              list = list ++ businessScopeChange(old_record, new_record)
             }
 
             case "reg_status_std" => {
-              val code = "100106"
-              val dynamic_info = Map(
-                "code" -> code,
-                "description" -> "公司状态发生变化"
-              )
-              val reg_status_std = new_data.getOrEmptyStr("reg_status_std")
-              if (reg_status_std.contains("销")) {
-                list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "reg_status_std",
-                  association_entity_info = Seq(AssociationEntityInfo(keyno = company_id, name = company_name, risk_level = NgCompanyRiskLevelType.Caution, rta_info = null))
-                )
-              } else {
-                list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, "reg_status_std")
-              }
+              //todo 公司状态变化
+              val old_record = old_data("reg_status_std")
+              val new_record = new_data("reg_status_std")
+              //              list = list ++ companyStateChange(old_record, new_record)
+
             }
             case _ => null
           }
@@ -121,7 +228,6 @@ case class company(is_inc:Boolean) extends NgCompanyDynamicHandle {
 
       case _ => null
     }
-
     list
   }
 

+ 153 - 43
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -1,6 +1,10 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
-import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.implicits.Bool._
+import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
+import com.winhc.bigdata.spark.ng.dynamic._
+import com.winhc.bigdata.spark.ng.dynamic.utils._
+import com.winhc.bigdata.spark.utils.RegCapitalAmount
 
 import scala.collection.mutable
 
@@ -8,7 +12,7 @@ import scala.collection.mutable
  * @author: XuJiakai
  * @date: 2021/6/23 10:43
  */
-case class company_holder(is_inc:Boolean) extends NgCompanyDynamicHandle {
+case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
   override def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
     val change_fields = change_extract.change_fields
     val tn = change_extract.tn
@@ -20,41 +24,163 @@ case class company_holder(is_inc:Boolean) extends NgCompanyDynamicHandle {
     val biz_date = change_extract.biz_date
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
+
+
+    def getRecordPro(dynamicCode: String
+                     , name: String
+                     , keyno: String
+                     , label: String
+                     , risk_level: NgCompanyRiskLevelType
+                     , content: Seq[ChangeContent]
+                    ): CompanyDynamicRecord = {
+      val map = BusinessInfoDynamicInfoMap(label = label, content = content)
+
+      val date = if (is_inc) update_time else biz_date
+      CompanyDynamicRecord(
+        id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamicCode)
+        , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+        , rowkey = rowkey
+        , tn = tn
+        , update_type = update_type
+        , dynamic_code = dynamicCode
+        , dynamic_info = map.toMap()
+        , agg_detail_text = null
+        , agg_detail_rowkey = null
+        , biz_time = biz_date
+        , dynamic_time = date
+        , update_time = update_time
+        , create_time = update_time
+      )
+    }
+
+
     var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+
+
+    //股东出资变化
+    def shareholderCapitalChange(name: String, keyno: String, before: String, after: String): Seq[CompanyDynamicRecord] = {
+      val bool = RegCapitalAmount.isAdditionalShare(before, after)
+      val flag = bool ? "1" | "0"
+      //      val r = bool ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
+      val cdr = getRecordPro(
+        dynamicCode = "101101"
+        , keyno = company_id
+        , name = company_name
+        , label = "股东变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("101101"
+          , before = Entity(name, keyno, info = before)
+          , after = Entity(name, keyno, info = after)
+          , Entity(name = company_name, keyno = company_id)
+          , flag = flag
+        ))
+      )
+
+      val cdr2 = getRecordPro(
+        dynamicCode = "109003"
+        , keyno = keyno
+        , name = name
+        , label = "对外投资"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109003"
+          , before = Entity(name = company_name, keyno = company_id, info = before)
+          , after = Entity(name = company_name, keyno = company_id, info = after)
+          , Entity(name = name, keyno = keyno)
+          , flag = flag
+        ))
+      )
+
+
+      Seq(cdr2, cdr)
+    }
+
+    //股东人员变化
+    def shareholder(beforeName: String, beforeKeyno: String, afterName: String, afterKeyno: String): Seq[CompanyDynamicRecord] = {
+      if (beforeName == null) {
+        val cdr = getRecordPro(
+          dynamicCode = "101102"
+          , keyno = company_id
+          , name = company_name
+          , label = "股东变化"
+          , risk_level = NgCompanyRiskLevelType.Prompt
+          , content = Seq(ChangeContent("101102", before = null
+            , after = Entity(afterName, afterKeyno)
+            , Entity(name = company_name, keyno = company_id)))
+        )
+
+        val cdr2 = getRecordPro(
+          dynamicCode = "109004"
+          , keyno = afterKeyno
+          , name = afterName
+          , label = "对外投资"
+          , risk_level = NgCompanyRiskLevelType.Prompt
+          , content = Seq(ChangeContent("109004", before = null
+            , after = Entity(name = company_name, keyno = company_id)
+            , Entity(afterName, afterKeyno)))
+        )
+        Seq(cdr, cdr2)
+      } else {
+        val cdr = getRecordPro(
+          dynamicCode = "101102"
+          , keyno = company_id
+          , name = company_name
+          , label = "股东变化"
+          , risk_level = NgCompanyRiskLevelType.Prompt
+          , content = Seq(ChangeContent("101102", before = Entity(beforeName, beforeKeyno)
+            , after = null
+            , Entity(name = company_name, keyno = company_id)))
+        )
+
+        val cdr2 = getRecordPro(
+          dynamicCode = "109004"
+          , keyno = beforeKeyno
+          , name = beforeName
+          , label = "对外投资"
+          , risk_level = NgCompanyRiskLevelType.Prompt
+          , content = Seq(ChangeContent("109004", before = Entity(name = company_name, keyno = company_id)
+            , after = null
+            , Entity(beforeName, beforeKeyno)))
+        )
+        Seq(cdr, cdr2)
+      }
+
+    }
+
+    //股东新增对外投资
+    def holderNew(name: String, keyno: String, companyName: String, companyId: String): Seq[CompanyDynamicRecord] = {
+      val cdr = getRecordPro(
+        dynamicCode = "109004"
+        , keyno = keyno
+        , name = name
+        , label = "对外投资"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109004", before = null
+          , after = Entity(companyName, companyId)
+          , Entity(name = name, keyno = keyno)))
+      )
+      Seq(cdr)
+    }
+
+
     update_type match {
       case "insert" => {
+        //新增股东
         val n_company_id = new_data("company_id")
         val n_company_name = new_data("company_name")
         val n_holder_name = new_data("holder_name")
         val n_holder_id = new_data("holder_id")
 
-        val entityInfo: Seq[AssociationEntityInfo] = Seq(
-          AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-          , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-        )
-        val code = "100200"
-        val dynamic_info = Map(
-          "code" -> code,
-          "description" -> "新增股东"
-        )
-        list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, null, entityInfo)
+        list = list ++ shareholder(null, null, n_holder_name, n_holder_id)
       }
       case "create" => {
+        //公司新成立
         val n_holder_name = new_data("holder_name")
         val n_holder_id = new_data("holder_id")
-
-        val entityInfo: Seq[AssociationEntityInfo] = Seq(
-          AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-        )
-        val code = "100201"
-        val dynamic_info = Map(
-          "code" -> code,
-          "description" -> "公司创始股东"
-        )
-        list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, code, entityInfo)
+        list = list ++ holderNew(n_holder_name, n_holder_id, company_name, company_id)
       }
 
       case "update" => {
+        //股东出资额发变化
         if (change_fields.contains("amount")) {
           val old_amount = old_data("amount")
           val new_amount = new_data("amount")
@@ -64,40 +190,23 @@ case class company_holder(is_inc:Boolean) extends NgCompanyDynamicHandle {
           val n_holder_name = new_data("holder_name")
           val n_holder_id = new_data("holder_id")
 
-          val entityInfo: Seq[AssociationEntityInfo] = Seq(
-            AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-            , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-          )
-          val code = "100202"
-          val dynamic_info = Map(
-            "code" -> code,
-            "description" -> "股东出资额发变化"
-          )
-          list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, code, entityInfo)
+          list = list ++ shareholderCapitalChange(n_holder_name, n_holder_id, old_amount, new_amount)
         }
       }
 
       case "deleted" => {
+        //新增股东
         val n_company_id = new_data("company_id")
         val n_company_name = new_data("company_name")
         val n_holder_name = new_data("holder_name")
         val n_holder_id = new_data("holder_id")
-
-        val entityInfo: Seq[AssociationEntityInfo] = Seq(
-          AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-          , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
-        )
-        val code = "100203"
-        val dynamic_info = Map(
-          "code" -> code,
-          "description" -> "股东退出"
-        )
-        list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, code, entityInfo)
+        list = list ++ shareholder(n_holder_name, n_holder_id, null, null)
       }
     }
     list
   }
 
+/*
 
   override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
     val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
@@ -111,5 +220,6 @@ case class company_holder(is_inc:Boolean) extends NgCompanyDynamicHandle {
   override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
     seq
   }
+*/
 
 }

+ 236 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_staff.scala

@@ -0,0 +1,236 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
+import com.winhc.bigdata.spark.ng.dynamic._
+import com.winhc.bigdata.spark.ng.dynamic.utils.{BusinessInfoDynamicInfoMap, ChangeContent, CompanyDynamicUtils, Entity}
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/26 16:15
+ */
+case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
+  override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+
+    def getRecord(dynamic_code: String
+                  , name: String
+                  , keyno: String
+                  , description: String
+                  , risk_level: NgCompanyRiskLevelType
+                  , dynamic_info: Map[String, Any]
+                 ): CompanyDynamicRecord = {
+      val date = if (is_inc) update_time else biz_date
+      CompanyDynamicRecord(
+        id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamic_code)
+        , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+        , rowkey = rowkey
+        , tn = tn
+        , update_type = update_type
+        , dynamic_code = dynamic_code
+        , dynamic_info = dynamic_info
+        , agg_detail_text = null
+        , agg_detail_rowkey = null
+        , biz_time = biz_date
+        , dynamic_time = date
+        , update_time = update_time
+        , create_time = update_time
+      )
+    }
+
+
+    def getRecordPro(dynamicCode: String
+                     , name: String
+                     , keyno: String
+                     , label: String
+                     , risk_level: NgCompanyRiskLevelType
+                     , content: Seq[ChangeContent]
+                    ): CompanyDynamicRecord = {
+      val map = BusinessInfoDynamicInfoMap(label = label, content = content)
+
+      val date = if (is_inc) update_time else biz_date
+      CompanyDynamicRecord(
+        id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamicCode)
+        , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
+        , rowkey = rowkey
+        , tn = tn
+        , update_type = update_type
+        , dynamic_code = dynamicCode
+        , dynamic_info = map.toMap()
+        , agg_detail_text = null
+        , agg_detail_rowkey = null
+        , biz_time = biz_date
+        , dynamic_time = date
+        , update_time = update_time
+        , create_time = update_time
+      )
+    }
+
+
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+
+    //新增主要成员
+    def staffNew(staffName: String, staffId: String, staffType: String): Seq[CompanyDynamicRecord] = {
+      val cdr = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = company_id
+        , name = company_name
+        , label = "主要成员"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005", before = null
+          , after = Entity(name = staffName, keyno = staffId, info = staffType)
+          , Entity(company_name, company_id)))
+      )
+
+      val cdr2 = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = staffId
+        , name = staffName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005", before = null
+          , after = Entity(name = company_name, keyno = company_id, info = staffType)
+          , Entity(staffName, staffId)))
+      )
+      Seq(cdr, cdr2)
+    }
+
+    //公司新成立
+    def staffCreate(staffName: String, staffId: String, staffType: String): Seq[CompanyDynamicRecord] = {
+      val cdr = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = staffId
+        , name = staffName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005", before = null
+          , after = Entity(name = company_name, keyno = company_id, info = staffType)
+          , Entity(staffName, staffId)))
+      )
+      Seq(cdr)
+    }
+
+    //职位变化
+    def staffTypeChange(staffName: String, staffId: String, beforeType: String, afterType: String): Seq[CompanyDynamicRecord] = {
+
+      val cdr = getRecordPro(
+        dynamicCode = "101301"
+        , keyno = company_id
+        , name = company_name
+        , label = "主要成员"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("101301"
+          , before = Entity(staffName, staffId, info = beforeType)
+          , after = Entity(staffName, staffId, info = afterType)
+          , Entity(name = company_name, keyno = company_id)
+        ))
+      )
+
+      val cdr2 = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = staffId
+        , name = staffName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005"
+          , before = Entity(name = company_name, keyno = company_id, info = beforeType)
+          , after = Entity(name = company_name, keyno = company_id, info = afterType)
+          , Entity(name = staffName, keyno = staffId)
+        ))
+      )
+      Seq(cdr, cdr2)
+    }
+
+
+    //主要成员移除
+    def staffDeleted(staffName: String, staffId: String, staffType: String): Seq[CompanyDynamicRecord] = {
+      val cdr = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = company_id
+        , name = company_name
+        , label = "主要成员"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005", before = Entity(name = staffName, keyno = staffId, info = staffType)
+          , after = null
+          , Entity(company_name, company_id)))
+      )
+
+      val cdr2 = getRecordPro(
+        dynamicCode = "109005"
+        , keyno = staffId
+        , name = staffName
+        , label = "任职变化"
+        , risk_level = NgCompanyRiskLevelType.Prompt
+        , content = Seq(ChangeContent("109005", before = Entity(name = company_name, keyno = company_id, info = staffType)
+          , after = null
+          , Entity(staffName, staffId)))
+      )
+      Seq(cdr, cdr2)
+    }
+
+
+    update_type match {
+      case "insert" => {
+        //新增主要成员
+        val n_staff_name = new_data("staff_name")
+        val n_hid = new_data("hid")
+        val n_staff_type = new_data("staff_type")
+
+        list = list ++ staffNew(n_staff_name, n_hid, n_staff_type)
+      }
+      case "create" => {
+        //公司新成立
+        val n_staff_name = new_data("staff_name")
+        val n_hid = new_data("hid")
+        val n_staff_type = new_data("staff_type")
+
+        list = list ++ staffCreate(n_staff_name, n_hid, n_staff_type)
+      }
+
+      case "update" => {
+        //主要成员职位变化
+        val n_staff_name = new_data("staff_name")
+        val n_hid = new_data("hid")
+        val n_staff_type = new_data("staff_type")
+        val o_staff_type = old_data("staff_type")
+
+        list = list ++ staffTypeChange(n_staff_name, n_hid, o_staff_type, n_staff_type)
+      }
+
+      case "deleted" => {
+        //移除主要成员
+        val n_staff_name = new_data("staff_name")
+        val n_hid = new_data("hid")
+        val n_staff_type = new_data("staff_type")
+
+        list = list ++ staffDeleted(n_staff_name, n_hid, n_staff_type)
+      }
+    }
+    list
+  }
+/*
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    if (info == null) {
+      cdr.id
+    } else {
+      info.keyno
+    }
+  }
+
+
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    seq
+  }*/
+}

+ 32 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/BusinessInfoDynamicInfoMap.scala

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.spark.ng.dynamic.utils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/27 10:45
+ */
+case class Entity(name: String = null, keyno: String = null, info: String = null) {
+  override def toString: String = {
+    if (name == null) {
+      return info
+    }
+    if (info == null) {
+      s"""{"name":"$name","keyno":"$keyno"}"""
+    } else {
+      s"""{"name":"$name","keyno":"$keyno","info":"$info"}"""
+    }
+  }
+}
+
+case class ChangeContent(code: String, before: Entity, after: Entity, entity: Entity, flag: String = null)
+
+case class BusinessInfoDynamicInfoMap(label: String, content: Seq[ChangeContent]) {
+  def toMap(): Map[String, Any] = {
+    Map("base" -> this)
+  }
+}
+
+
+object BusinessInfoDynamicInfoMap {
+  def main(args: Array[String]): Unit = {
+  }
+}

+ 19 - 1
src/main/scala/com/winhc/bigdata/spark/utils/RegCapitalAmount.scala

@@ -7,6 +7,7 @@ import java.text.DecimalFormat
 /**
  * @author: XuJiakai
  * @date: 2020/12/5 11:53
+ *        注册资本utils
  */
 object RegCapitalAmount {
   private val df = new DecimalFormat("00.00")
@@ -63,7 +64,24 @@ object RegCapitalAmount {
     }
   }
 
+
+  /**
+   * 是否增资
+   *
+   * @param before
+   * @param after
+   */
+  def isAdditionalShare(before: String, after: String): Boolean = {
+    val old_record = getAmount(before)
+    val new_record = getAmount(after)
+
+    if (old_record != null && new_record != null)
+      new_record.toDouble > old_record.toDouble
+    else
+      false
+  }
+
   def main(args: Array[String]): Unit = {
-    print(getAmount("1万元"))
+    println(isAdditionalShare("110万","10亿"))
   }
 }