Forráskód Böngészése

feat: 第四批动态

许家凯 3 éve
szülő
commit
04c45d949d

+ 45 - 42
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala

@@ -15,55 +15,58 @@ case class NgChangeExtractArgs(project: String = "winhc_ng"
 object NgChangeExtractArgs {
   val startArgs: Seq[NgChangeExtractArgs] =
     Seq(
-    NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted", newlyRegister = true)
-    , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted", newlyRegister = true)
-    , NgChangeExtractArgs(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,reg_capital,reg_location,business_scope,reg_status_std")
-    , NgChangeExtractArgs(tableName = "company_tm", primaryFields = "status")
-    , NgChangeExtractArgs(tableName = "company_icp", primaryFields = "domain")
-  ) ++ Seq(
-    "auction_tracking"
-    , "company_abnormal_info"
-    , "company_court_announcement"
-    , "company_court_open_announcement"
-    , "company_court_register"
-    , "company_env_punishment"
-    , "company_illegal_info"
-    , "company_own_tax"
-    , "company_punishment_info"
-    , "company_punishment_info_creditchina"
-    , "company_send_announcement"
-    , "company_tax_contravention"
-    , "zxr_evaluate_results"
+      NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted", newlyRegister = true)
+      , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted", newlyRegister = true)
+      , NgChangeExtractArgs(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,reg_capital,reg_location,business_scope,reg_status_std")
+      , NgChangeExtractArgs(tableName = "company_tm", primaryFields = "status")
+      , NgChangeExtractArgs(tableName = "company_icp", primaryFields = "domain")
+    ) ++ Seq(
+      "auction_tracking"
+      , "company_abnormal_info"
+      , "company_court_announcement"
+      , "company_court_open_announcement"
+      , "company_court_register"
+      , "company_env_punishment"
+      , "company_illegal_info"
+      , "company_own_tax"
+      , "company_punishment_info"
+      , "company_punishment_info_creditchina"
+      , "company_send_announcement"
+      , "company_tax_contravention"
+      , "zxr_evaluate_results"
 
-    , "company_check_info"
-    , "company_double_random_check_info"
-    , "company_finance"
-    , "company_license"
-    , "company_license_creditchina"
-    , "company_license_entpub"
+      , "company_check_info"
+      , "company_double_random_check_info"
+      , "company_finance"
+      , "company_license"
+      , "company_license_creditchina"
+      , "company_license_entpub"
 
 
+      , "bankruptcy_open_case"
+      , "company_brief_cancel_announcement"
+      , "company_dishonest_info"
+      , "company_public_announcement"
+      , "company_zxr"
+      , "company_zxr_final_case"
+      , "company_zxr_restrict"
+      , "restrictions_on_exit"
 
-    , "bankruptcy_open_case"
-    , "company_brief_cancel_announcement"
-    , "company_dishonest_info"
-    , "company_public_announcement"
-    , "company_zxr"
-    , "company_zxr_final_case"
-    , "company_zxr_restrict"
-    , "restrictions_on_exit"
+      , "company_land_announcement"
+      , "company_land_publicity"
+      , "company_land_transfer"
 
-    , "company_land_announcement"
-    , "company_land_publicity"
-    , "company_land_transfer"
 
+      , "company_equity_info"
+      , "company_equity_pledge_holder"
+      , "company_judicial_assistance"
+      , "company_land_mortgage"
 
+      , "company_lawsuit"
 
-    , "company_equity_info"
-//    , "company_equity_pledge"
-    , "company_judicial_assistance"
-    , "company_land_mortgage"
-//    , "company_mortgage_info"
+      , "company_mortgage_info"
+      , "company_mortgage_people"
 
-  ).map(r => NgChangeExtractArgs(tableName = r))
+
+    ).map(r => NgChangeExtractArgs(tableName = r))
 }

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_equity_pledge_holder.scala

@@ -0,0 +1,13 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+case class company_equity_pledge_holder(equCols: Seq[String], is_inc:Boolean) extends NgCompanyChangeHandle {
+
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("ndate"))
+}
+

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_lawsuit.scala

@@ -0,0 +1,13 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+case class company_lawsuit(equCols: Seq[String], is_inc:Boolean) extends NgCompanyChangeHandle {
+
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("pub_date"))
+}
+

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_mortgage_info.scala

@@ -0,0 +1,13 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+//动产抵押-主表
+case class company_mortgage_info(equCols: Seq[String], is_inc: Boolean) extends NgCompanyChangeHandle {
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("reg_date"))
+}
+

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_mortgage_people.scala

@@ -0,0 +1,13 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+//动产抵押-抵押人
+case class company_mortgage_people(equCols: Seq[String], is_inc: Boolean) extends NgCompanyChangeHandle {
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("update_time"))
+}
+

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -133,7 +133,7 @@ case class NgCompanyDynamic(s: SparkSession,
       var tmp_rdd: RDD[CompanyDynamicRecord] = null
       for (tn <- elem.tabs) {
         if (tmp_rdd == null) {
-          rdd_map(tn)
+          tmp_rdd = rdd_map(tn)
         } else {
           tmp_rdd = tmp_rdd.union(rdd_map(tn))
         }

+ 64 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/company_mortgage.scala

@@ -0,0 +1,64 @@
+package com.winhc.bigdata.spark.ng.dynamic.agg
+
+import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, AssociationEntityInfo, CompanyDynamicRecord}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/20 10:42
+ */
+case class company_mortgage() extends AcrossTabAggHandle {
+  /**
+   * 需要聚合的维度
+   *
+   * @return
+   */
+  override def getTables: Seq[String] = Seq("company_mortgage_people", "company_mortgage_info")
+
+  /**
+   * 聚合的key,相同的key会聚合到一起
+   *
+   * @return
+   */
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    cdr.tn match {
+      case "company_mortgage_people" => cdr.dynamic_info("main_id").asInstanceOf[String]
+      case "company_mortgage_info" => cdr.rowkey
+      case _ => throw new RuntimeException("123")
+    }
+  }
+
+
+  /**
+   * 聚合处理程序  flat_map
+   *
+   * @return
+   */
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    val main_cdr: CompanyDynamicRecord = seq.find(p => p.tn.equals("company_mortgage_info")).orNull
+    if (main_cdr == null) {
+      Seq.empty[CompanyDynamicRecord]
+    } else {
+      val entityInfo: Seq[AssociationEntityInfo] = seq.filter(r => r.tn.equals("company_mortgage_people")).flatMap(r => r.association_entity_info)
+
+      val association_entity_info: Seq[AssociationEntityInfo] = main_cdr.association_entity_info ++ entityInfo
+
+      val map: Map[String, Any] = main_cdr.dynamic_info ++ Map("pledgee" -> entityInfo)
+
+      val record: CompanyDynamicRecord = CompanyDynamicRecord(id = main_cdr.id
+        , association_entity_info = association_entity_info
+        , rowkey = main_cdr.rowkey
+        , tn = main_cdr.tn
+        , update_type = main_cdr.update_type
+        , dynamic_code = main_cdr.dynamic_code
+        , dynamic_info = map
+        , agg_detail_text = main_cdr.agg_detail_text
+        , agg_detail_rowkey = main_cdr.agg_detail_rowkey
+        , biz_time = main_cdr.biz_time
+        , dynamic_time = main_cdr.dynamic_time
+        , update_time = main_cdr.update_time
+        , create_time = main_cdr.create_time
+      )
+      Seq(record)
+    }
+  }
+}

+ 0 - 19
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/test.scala

@@ -1,19 +0,0 @@
-package com.winhc.bigdata.spark.ng.dynamic.agg
-
-import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord}
-
-/**
- * @author: XuJiakai
- * @date: 2021/6/23 13:57
- */
-case class test() extends AcrossTabAggHandle {
-  override def getTables: Seq[String] = Seq("", "")
-
-  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    ""
-  }
-
-  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
-    seq
-  }
-}

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_lawsuit.scala

@@ -0,0 +1,22 @@
+
+
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.ng.dynamic.utils.{ComplexDailyDynamic, DailyAggHandle}
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, NgCompanyRiskLevelType}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/13 15:07
+ *        裁判文书
+ */
+case class company_lawsuit(is_inc:Boolean) extends DailyAggHandle with ComplexDailyDynamic {
+
+
+  override def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("plaintiff_info"),"litigant_id","name",NgCompanyRiskLevelType.Positive,"原告/上诉人")++
+      getEntity(new_data.getOrEmptyStr("defendant_info"),"litigant_id","name",NgCompanyRiskLevelType.Caution,"被告/被上诉人")
+  }
+}
+

+ 44 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_mortgage_info.scala

@@ -0,0 +1,44 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/20 09:56
+ */
+case class company_mortgage_info(is_inc: Boolean) extends NgCompanyDynamicHandle {
+
+  override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+
+
+    val code = "304301"
+
+    val entityInfo: Seq[AssociationEntityInfo] = Seq(
+      AssociationEntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), risk_level = NgCompanyRiskLevelType.Caution, rta_info = "抵押人")
+    )
+
+    val dynamic_info = Map(
+      "mortgager" -> entityInfo
+      , "amount" -> new_data.getOrElse("amount", null)
+      , "term" -> new_data.getOrElse("term", null)
+      , "reg_date" -> new_data.getOrElse("reg_date", null)
+    )
+
+    list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, null, entityInfo)
+
+    list
+  }
+}

+ 40 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_mortgage_people.scala

@@ -0,0 +1,40 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/20 09:56
+ */
+case class company_mortgage_people(is_inc:Boolean)extends NgCompanyDynamicHandle {
+
+  override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+
+
+    val code = "304301"
+
+    val entityInfo: Seq[AssociationEntityInfo] = Seq(
+      AssociationEntityInfo(keyno = new_data.getOrEmptyStr("application_name_id"), name = new_data.getOrEmptyStr("application_name"), risk_level = NgCompanyRiskLevelType.Positive, rta_info = "抵押权人")
+    )
+
+    val dynamic_info = Map(
+      "pledgee" -> entityInfo
+      ,"main_id" -> new_data("main_id")
+    )
+    list = list :+ getCompanyDynamicRecord(change_extract, code, dynamic_info, null, entityInfo)
+    list
+  }
+}

+ 38 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DynamicDimConfiguration.scala

@@ -441,6 +441,28 @@ object DynamicDimConfiguration {
       )
     )
 
+
+    , "company_equity_pledge_holder" -> Map(
+      "code" -> Map(
+        "insert" -> "304001"
+      ),
+      "sample" -> true,
+      "name" -> "股权质押",
+      "list_field" -> Map(
+        "holder_id" -> Map()
+        , "holder_name" -> Map()
+        , "company_name" -> Map()
+        , "company_id" -> Map()
+        , "sharefrozennum" -> Map()
+        , "frozenintotal" -> Map()
+        , "ndate" -> Map()
+        , "pledgee" -> Map()
+        , "pledgee_id" -> Map()
+      )
+    )
+
+
+
     , "company_judicial_assistance" -> Map(
       "code" -> Map(
         "insert" -> "304101"
@@ -475,5 +497,21 @@ object DynamicDimConfiguration {
     )
 
 
+    , "company_lawsuit" -> Map(
+      "code" -> Map(
+        "insert" -> "304401"
+      ),
+      "sample" -> true,
+      "name" -> "裁判文书",
+      "list_field" -> Map(
+        "case_reason" -> Map()
+        , "case_no" -> Map()
+        , "pub_date" -> Map()
+      )
+    )
+
+
+
+
   )
 }