Browse Source

feat: 企业动态

许家凯 4 năm trước cách đây
mục cha
commit
da9a61d010

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/MapHelper.scala

@@ -22,8 +22,15 @@ object MapHelper {
       }).mkString(",")
       s"{$content}"
     }
+
+    def getOrEmptyStr(key: String): String = {
+      val str = map.getOrElse(key, "")
+      if (str == null) "" else str
+    }
+
   }
 
+
   private def getValueOrNull(value: String): String = {
     if (StringUtils.isNotBlank(value)) {
       value.toJson()

+ 29 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -46,6 +46,34 @@ case class CompanyDynamicRecord(id: String,
                                 update_time: String,
                                 create_time: String
                                ) {
+  def format(): CompanyDynamicRecord = {
+    if (id == null) {
+      return null
+    }
+    if (association_entity_info == null || association_entity_info.isEmpty) {
+      return null
+    }
+    val rec = association_entity_info.filter(_.keyno != null)
+    if (rec.isEmpty) return null
+
+    if (rec.length != association_entity_info.length)
+      return CompanyDynamicRecord(id,
+        rec,
+        rowkey,
+        tn,
+        update_type,
+        dynamic_info,
+        agg_detail_text,
+        agg_detail_rowkey,
+        old_record,
+        new_record,
+        change_time,
+        update_time,
+        create_time
+      )
+    this
+  }
+
   def to_row(): Row = {
     val risk_level_str = association_entity_info.map(_.risk_level).distinct.mkString(",")
     val risk_level_detail = association_entity_info.map(r => s"${r.keyno}@@${r.risk_level}").distinct.mkString(",")
@@ -65,7 +93,7 @@ case class CompanyDynamicRecord(id: String,
       , change_time
       , update_time
       , create_time
-      ,null
+      , null
     )
   }
 }

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -59,7 +59,7 @@ case class NgCompanyDynamic(s: SparkSession,
          |SELECT  *
          |--- FROM    winhc_ng.bds_change_extract
          |FROM    winhc_ng.bds_change_extract_test
-         |WHERE   ds > 0
+         |WHERE   ds > 0 and tn in ('company','company_holder')
          |""".stripMargin).rdd.map(r => {
       val value = r.getAs[String]("change_fields")
       val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
@@ -82,9 +82,12 @@ case class NgCompanyDynamic(s: SparkSession,
         filter.apply(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
       }
     }).flatMap(r => args_map(r.tn).flat_map.apply(r))
+      .map(_.format())
       .filter(_ != null)
       .cache()
 
+    //todo  可将rdd直接落hbase库
+
 
     var rdd_map: mutable.Map[String, RDD[CompanyDynamicRecord]] = mutable.Map.empty
 
@@ -156,7 +159,7 @@ object NgCompanyDynamic {
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.debug.maxToStringFields" -> "200",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000000"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 

+ 33 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -16,4 +16,37 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
   def group_by_key: (CompanyDynamicRecord) => String = null
 
   def group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord] = null
+
+
+  protected def getCompanyDynamicRecord(change_extract: ChangeExtract
+                                        , dynamic_info: Map[String, String]
+                                        , update_field: String
+                                        , association_entity_info: Seq[AssociationEntityInfo]
+                                        , create_time: String = null
+                                       ): CompanyDynamicRecord = {
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+
+    CompanyDynamicRecord(
+      id = CompanyDynamicUtils.generateId(rowkey, biz_date, tn, update_field)
+      , association_entity_info = association_entity_info
+      , rowkey = rowkey
+      , tn = tn
+      , update_type = update_type
+      , dynamic_info = dynamic_info
+      , agg_detail_text = null
+      , agg_detail_rowkey = null
+      , old_record = old_data
+      , new_record = new_data
+      , change_time = biz_date
+      , update_time = update_time
+      , create_time = create_time
+    )
+  }
 }

+ 4 - 37
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -1,9 +1,8 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
-import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
-import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.ng.dynamic._
 import com.winhc.bigdata.spark.utils.RegCapitalAmount
-
+import com.winhc.bigdata.spark.implicits.MapHelper._
 import scala.collection.mutable
 
 /**
@@ -11,39 +10,6 @@ import scala.collection.mutable
  * @date: 2021/6/22 16:04
  */
 case class company() extends NgCompanyDynamicHandle {
-
-  private def getCompanyDynamicRecord(change_extract: ChangeExtract
-                                      , dynamic_info: Map[String, String]
-                                      , update_field: String
-                                      , association_entity_info: Seq[AssociationEntityInfo]
-                                     ): CompanyDynamicRecord = {
-    val tn = change_extract.tn
-    val update_type = change_extract.update_type
-    val old_data = change_extract.old_data
-    val new_data = change_extract.new_data
-    val biz_date = change_extract.biz_date
-    val rowkey = change_extract.rowkey
-    val update_time = change_extract.update_time
-
-
-    CompanyDynamicRecord(
-      id = CompanyDynamicUtils.generateId(rowkey, biz_date, tn, update_field)
-      , association_entity_info = association_entity_info
-      , rowkey = rowkey
-      , tn = tn
-      , update_type = update_type
-      , dynamic_info = dynamic_info
-      , agg_detail_text = null
-      , agg_detail_rowkey = null
-      , old_record = old_data
-      , new_record = new_data
-      , change_time = biz_date
-      , update_time = update_time
-      , create_time = null
-    )
-  }
-
-
   private def getCompanyDynamicRecord(change_extract: ChangeExtract, dynamic_info: Map[String, String], update_field: String): CompanyDynamicRecord = {
     val company_id = change_extract.company_id
     val company_name = change_extract.company_name
@@ -106,6 +72,7 @@ case class company() extends NgCompanyDynamicHandle {
                 "code" -> "100103",
                 "description" -> "法定代表人发生变化"
               )
+              //todo 法人变化 单条记录两个实体
               list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "legal_entity_name")
             }
 
@@ -130,7 +97,7 @@ case class company() extends NgCompanyDynamicHandle {
                 "code" -> "100106",
                 "description" -> "公司状态发生变化"
               )
-              val reg_status_std = new_data("reg_status_std")
+              val reg_status_std = new_data.getOrEmptyStr("reg_status_std")
               if (reg_status_std.contains("销")) {
                 list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "reg_status_std",
                   association_entity_info = Seq(AssociationEntityInfo(keyno = company_id, name = company_name, risk_level = NgCompanyRiskLevelType.Caution, rta_info = null))

+ 105 - 8
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -1,18 +1,115 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
-import com.winhc.bigdata.spark.ng.dynamic.{ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle}
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+
+import scala.collection.mutable
 
 /**
  * @author: XuJiakai
  * @date: 2021/6/23 10:43
  */
 case class company_holder() extends NgCompanyDynamicHandle {
-  override def flat_map:( ChangeExtract)=> Seq[CompanyDynamicRecord] = (c:ChangeExtract)=>{
-    null
+  override def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+    update_type match {
+      case "insert" => {
+        val n_company_id = new_data("company_id")
+        val n_company_name = new_data("company_name")
+        val n_holder_name = new_data("holder_name")
+        val n_holder_id = new_data("holder_id")
+
+        val entityInfo: Seq[AssociationEntityInfo] = Seq(
+          AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+          , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+        )
+        val code = "100200"
+        val dynamic_info = Map(
+          "code" -> code,
+          "description" -> "新增股东"
+        )
+        list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, null, entityInfo)
+      }
+      case "create" => {
+        val n_holder_name = new_data("holder_name")
+        val n_holder_id = new_data("holder_id")
+
+        val entityInfo: Seq[AssociationEntityInfo] = Seq(
+          AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+        )
+        val code = "100201"
+        val dynamic_info = Map(
+          "code" -> code,
+          "description" -> "公司创始股东"
+        )
+        list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, code, entityInfo)
+      }
+
+      case "update" => {
+        if (change_fields.contains("amount")) {
+          val old_amount = old_data("amount")
+          val new_amount = new_data("amount")
+
+          val n_company_id = new_data("company_id")
+          val n_company_name = new_data("company_name")
+          val n_holder_name = new_data("holder_name")
+          val n_holder_id = new_data("holder_id")
+
+          val entityInfo: Seq[AssociationEntityInfo] = Seq(
+            AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+            , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+          )
+          val code = "100202"
+          val dynamic_info = Map(
+            "code" -> code,
+            "description" -> "股东出资额发变化"
+          )
+          list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, code, entityInfo)
+        }
+      }
+
+      case "deleted" => {
+        val n_company_id = new_data("company_id")
+        val n_company_name = new_data("company_name")
+        val n_holder_name = new_data("holder_name")
+        val n_holder_id = new_data("holder_id")
+
+        val entityInfo: Seq[AssociationEntityInfo] = Seq(
+          AssociationEntityInfo(keyno = n_company_id, name = n_company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+          , AssociationEntityInfo(keyno = n_holder_id, name = n_holder_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)
+        )
+        val code = "100203"
+        val dynamic_info = Map(
+          "code" -> code,
+          "description" -> "股东退出"
+        )
+        list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, code, entityInfo)
+      }
+    }
+    list
+  }
+
+
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    if (info == null) {
+      cdr.id
+    } else {
+      info.keyno
+    }
   }
-/*
-  override def group_by_key: (CompanyDynamicRecord) => String = (c: CompanyDynamicRecord) => {
-    println("group_by_key")
-    "xjk"
-  }*/
+
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    seq
+  }
+
 }