Bladeren bron

feat: 工商增量动态细化处理

许家凯 3 jaren geleden
bovenliggende
commit
3d0f6004a1

+ 11 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.ng.dynamic
 import com.alibaba.fastjson.JSON
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
+import com.winhc.bigdata.spark.utils.BaseUtil
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 
@@ -52,7 +53,7 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
     val biz_date = change_extract.biz_date
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
-    val date = if(is_inc) update_time else biz_date
+    val date = if (is_inc) update_time else biz_date
 
 
     CompanyDynamicRecord(
@@ -91,4 +92,13 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
     }
   }
 
+  protected def changeEquals(v1: String, v2: String): Boolean = {
+    if (v1 == null || v2 == null) {
+      true
+    } else {
+      BaseUtil.toDBC(v1).equals(BaseUtil.toDBC(v2))
+    }
+
+  }
+
 }

+ 26 - 9
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -5,6 +5,7 @@ import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLe
 import com.winhc.bigdata.spark.ng.dynamic._
 import com.winhc.bigdata.spark.ng.dynamic.utils._
 import com.winhc.bigdata.spark.utils.RegCapitalAmount
+import org.apache.commons.lang3.StringUtils
 
 import scala.collection.mutable
 
@@ -23,7 +24,7 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
     val change_fields = change_extract.change_fields
     val tn = change_extract.tn
     val company_id = change_extract.company_id
-    val company_name = change_extract.company_name
+    var company_name = change_extract.company_name
     val update_type = change_extract.update_type
     val old_data = change_extract.old_data
     val new_data = change_extract.new_data
@@ -31,6 +32,8 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
 
+    if (StringUtils.isEmpty(company_name)) company_name = new_data("name")
+
 
     def getRecordPro(dynamicCode: String
                      , name: String
@@ -82,8 +85,14 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
     //注册资本
     def registeredCapitalChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
       val bool = RegCapitalAmount.isAdditionalShare(before, after)
-      val flag = bool ? "1" | "0"
-      val r = bool ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
+      if (bool == 0)
+        return Seq.empty
+
+      if (0 == RegCapitalAmount.getAmount(before).toDouble || RegCapitalAmount.getAmount(after).toDouble == 0)
+        return Seq.empty
+
+      val flag = (bool > 0) ? "1" | "0"
+      val r = (bool > 0) ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
 
       val record = getSingleRecordChange("101002", "注册资本", company_id, company_name, before, after, riskLevel = r, flag = flag)
       Seq(record)
@@ -150,6 +159,9 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
 
     //经营范围变化
     def businessScopeChange(before: String, after: String): Seq[CompanyDynamicRecord] = {
+      if (StringUtils.isEmpty(before) || StringUtils.isEmpty(after))
+        return Seq.empty
+
       val record = getSingleRecordChange("101005", "经营范围变化", company_id, company_name, before, after)
       Seq(record)
     }
@@ -174,10 +186,11 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
         for (elem <- change_fields.intersect("name,reg_capital,legal_entity_name,reg_location,business_scope,reg_status_std".split(","))) {
           elem match {
             case "name" => {
-              val old_name = old_data("name")
-              val new_name = new_data("name")
+              val old_name = old_data("name").replace("(", "(").replace(")", ")")
+              val new_name = new_data("name").replace("(", "(").replace(")", ")")
               //公司名称变化
-              list = list ++ companyNameChange(old_name, new_name)
+              if (!old_name.equals(new_name))
+                list = list ++ companyNameChange(old_name, new_name)
             }
             case "reg_capital" => {
               //注册资本变化
@@ -197,14 +210,17 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
               val old_keyno = old_data("legal_entity_id")
               val new_name = new_data("legal_entity_name")
               val new_keyno = new_data("legal_entity_id")
-              list = list ++ legalRepresentativeChange(old_name, old_keyno, new_name, new_keyno)
+              if (!changeEquals(old_name, new_name))
+                list = list ++ legalRepresentativeChange(old_name, old_keyno, new_name, new_keyno)
             }
 
             case "reg_location" => {
               //注册地址变化
               val old_record = old_data("reg_location")
               val new_record = new_data("reg_location")
-              list = list ++ registeredAddressChange(old_record, new_record)
+
+              if (!changeEquals(old_record, new_record))
+                list = list ++ registeredAddressChange(old_record, new_record)
             }
 
             case "business_scope" => {
@@ -218,7 +234,8 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
               //todo 公司状态变化
               val old_record = old_data("reg_status_std")
               val new_record = new_data("reg_status_std")
-              //              list = list ++ companyStateChange(old_record, new_record)
+              if (!changeEquals(old_record, new_record))
+                list = list ++ companyStateChange(old_record, new_record)
 
             }
             case _ => null

+ 59 - 20
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -5,6 +5,11 @@ import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLe
 import com.winhc.bigdata.spark.ng.dynamic._
 import com.winhc.bigdata.spark.ng.dynamic.utils._
 import com.winhc.bigdata.spark.utils.RegCapitalAmount
+import org.apache.commons.lang3.StringUtils
+import org.json4s.JsonAST.{JField, JObject, JString}
+import org.json4s.jackson.Serialization
+import org.json4s.jackson.Serialization.read
+import org.json4s.{Formats, JArray, NoTypeHints}
 
 import scala.collection.mutable
 
@@ -17,7 +22,7 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     val change_fields = change_extract.change_fields
     val tn = change_extract.tn
     val company_id = change_extract.company_id
-    val company_name = change_extract.company_name
+    var company_name = change_extract.company_name
     val update_type = change_extract.update_type
     val old_data = change_extract.old_data
     val new_data = change_extract.new_data
@@ -25,6 +30,7 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
 
+    if (StringUtils.isEmpty(company_name)) company_name = new_data("company_name")
 
     def getRecordPro(dynamicCode: String
                      , name: String
@@ -60,14 +66,20 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     //股东出资变化
     def shareholderCapitalChange(name: String, keyno: String, before: String, after: String): Seq[CompanyDynamicRecord] = {
       val bool = RegCapitalAmount.isAdditionalShare(before, after)
-      val flag = bool ? "1" | "0"
-      //      val r = bool ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
+      if (bool == 0)
+        return Seq.empty
+
+      if (0 == RegCapitalAmount.getAmount(before).toDouble || RegCapitalAmount.getAmount(after).toDouble == 0)
+        return Seq.empty
+
+      val flag = (bool > 0) ? "1" | "0"
+      val r = (bool > 0) ? NgCompanyRiskLevelType.Positive | NgCompanyRiskLevelType.Caution
       val cdr = getRecordPro(
         dynamicCode = "101101"
         , keyno = company_id
         , name = company_name
         , label = "股东变化"
-        , risk_level = NgCompanyRiskLevelType.Prompt
+        , risk_level = r
         , content = Seq(ChangeContent("101101"
           , before = Entity(name, keyno, info = before)
           , after = Entity(name, keyno, info = after)
@@ -81,7 +93,7 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
         , keyno = keyno
         , name = name
         , label = "对外投资"
-        , risk_level = NgCompanyRiskLevelType.Prompt
+        , risk_level = r
         , content = Seq(ChangeContent("109003"
           , before = Entity(name = company_name, keyno = company_id, info = before)
           , after = Entity(name = company_name, keyno = company_id, info = after)
@@ -90,7 +102,6 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
         ))
       )
 
-
       Seq(cdr2, cdr)
     }
 
@@ -182,8 +193,11 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
       case "update" => {
         //股东出资额发变化
         if (change_fields.contains("amount")) {
-          val old_amount = old_data("amount")
-          val new_amount = new_data("amount")
+          val o_capital = getRealCapital(old_data("capital"))
+          val n_capital = getRealCapital(new_data("capital"))
+
+          val old_amount = (o_capital == null) ? old_data("amount") | o_capital
+          val new_amount = (n_capital == null) ? new_data("amount") | n_capital
 
           val n_company_id = new_data("company_id")
           val n_company_name = new_data("company_name")
@@ -206,20 +220,45 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     list
   }
 
-/*
+  /*
+
+    override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+      val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+      if (info == null) {
+        cdr.id
+      } else {
+        info.keyno
+      }
+    }
 
-  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
-    if (info == null) {
-      cdr.id
-    } else {
-      info.keyno
+    override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+      seq
     }
-  }
+  */
 
-  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
-    seq
-  }
-*/
 
+  def getRealCapital(str: String): String = {
+    if (StringUtils.isEmpty(str))
+      return null
+    try {
+      implicit val formats: AnyRef with Formats = Serialization.formats(NoTypeHints)
+      val json = read[JArray](str)
+
+      val as = for {
+        JObject(child) <- json
+        JField("amomon", JString(amount)) <- child
+      } yield amount
+      if (as.isEmpty)
+        return null
+      val head = as.head
+      val v = RegCapitalAmount.getAmount(head)
+      if (v == null) {
+        return null
+      }
+      head
+    } catch {
+      case ex: Exception => {}
+        null
+    }
+  }
 }

+ 46 - 38
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_staff.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.ng.dynamic.handle
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic._
 import com.winhc.bigdata.spark.ng.dynamic.utils.{BusinessInfoDynamicInfoMap, ChangeContent, CompanyDynamicUtils, Entity}
+import org.apache.commons.lang3.StringUtils
 
 import scala.collection.mutable
 
@@ -11,11 +12,15 @@ import scala.collection.mutable
  * @date: 2021/7/26 16:15
  */
 case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
+
+
+  private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z ]".r
+
   override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
     val change_fields = change_extract.change_fields
     val tn = change_extract.tn
     val company_id = change_extract.company_id
-    val company_name = change_extract.company_name
+    var company_name = change_extract.company_name
     val update_type = change_extract.update_type
     val old_data = change_extract.old_data
     val new_data = change_extract.new_data
@@ -23,32 +28,7 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     val rowkey = change_extract.rowkey
     val update_time = change_extract.update_time
 
-
-    def getRecord(dynamic_code: String
-                  , name: String
-                  , keyno: String
-                  , description: String
-                  , risk_level: NgCompanyRiskLevelType
-                  , dynamic_info: Map[String, Any]
-                 ): CompanyDynamicRecord = {
-      val date = if (is_inc) update_time else biz_date
-      CompanyDynamicRecord(
-        id = CompanyDynamicUtils.generateId(rowkey, date, tn, dynamic_code)
-        , association_entity_info = Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = null))
-        , rowkey = rowkey
-        , tn = tn
-        , update_type = update_type
-        , dynamic_code = dynamic_code
-        , dynamic_info = dynamic_info
-        , agg_detail_text = null
-        , agg_detail_rowkey = null
-        , biz_time = biz_date
-        , dynamic_time = date
-        , update_time = update_time
-        , create_time = update_time
-      )
-    }
-
+    if (StringUtils.isEmpty(company_name)) company_name = new_data("company_name")
 
     def getRecordPro(dynamicCode: String
                      , name: String
@@ -123,6 +103,13 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
 
     //职位变化
     def staffTypeChange(staffName: String, staffId: String, beforeType: String, afterType: String): Seq[CompanyDynamicRecord] = {
+      if (beforeType == null || afterType == null)
+        return Seq.empty
+
+      val v1 = pattern.split(beforeType).distinct.sorted.mkString(",")
+      val v2 = pattern.split(afterType).distinct.sorted.mkString(",")
+      if (v1.equals(v2))
+        return Seq.empty
 
       val cdr = getRecordPro(
         dynamicCode = "101301"
@@ -219,18 +206,39 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     }
     list
   }
-/*
-  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
-    if (info == null) {
-      cdr.id
-    } else {
-      info.keyno
+  /*
+    override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+      val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+      if (info == null) {
+        cdr.id
+      } else {
+        info.keyno
+      }
     }
-  }
 
 
-  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
-    seq
-  }*/
+    override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+      seq
+    }*/
 }
+
+/*
+object company_staff {
+
+  private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z ]".r
+
+  def main(args: Array[String]): Unit = {
+    val beforeType = "总经理,执行董事"
+    val afterType = "执行董事,总经理"
+
+    val v1 = pattern.split(beforeType).sorted.mkString(",")
+    val v2 = pattern.split(afterType).sorted.mkString(",")
+
+    println(v1)
+    println(v2)
+    if (v1.equals(v2))
+      println("aa")
+
+
+  }
+}*/

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/utils/RegCapitalAmount.scala

@@ -71,17 +71,17 @@ object RegCapitalAmount {
    * @param before
    * @param after
    */
-  def isAdditionalShare(before: String, after: String): Boolean = {
+  def isAdditionalShare(before: String, after: String): Int = {
     val old_record = getAmount(before)
     val new_record = getAmount(after)
 
     if (old_record != null && new_record != null)
-      new_record.toDouble > old_record.toDouble
+      new_record.toDouble.compare(old_record.toDouble)
     else
-      false
+      0
   }
 
   def main(args: Array[String]): Unit = {
-    println(isAdditionalShare("110万","10亿"))
+    println(isAdditionalShare("110万", "11000000"))
   }
 }