Преглед изворни кода

feat: 企业动态股权出质融入框架

- 股权出质动态输出
- 股权出质变更修改
许家凯 пре 4 година
родитељ
комит
45e3b9ce96

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_equity_info.scala

@@ -3,9 +3,10 @@ package com.winhc.bigdata.spark.jobs.chance.table
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
 import com.winhc.bigdata.spark.utils.ChangeExtractUtils
 
+//股权出质
 case class company_equity_info(equCols: Seq[String]) extends CompanyChangeHandle {
 
-  override def getCid(rowkey: String, newMap: Map[String, String]): String = null
+  override def getCid(rowkey: String, newMap: Map[String, String]): String = newMap("cid")
 
   override def getUpdateTitle(newMap: Map[String, String]): String = s"股权出质信息发生变更"
 

+ 5 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -84,7 +84,7 @@ object CompanyDynamic {
                |) AS A
                |LEFT JOIN (
                |    SELECT cid,cname FROM  $project.base_company_mapping
-               |    WHERE ds = '${getLastPartitionsOrElse(project + "base_company_mapping", "0")}'
+               |    WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
                |) AS B
                |ON A.cid = B.cid
                |""".stripMargin
@@ -102,7 +102,7 @@ object CompanyDynamic {
           None
         }
         else {
-          result.map(res => Row(cid, if (cname == null) null else cname, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+          result.map(res => Row(res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
         }
       })
 
@@ -164,7 +164,9 @@ object CompanyDynamic {
     cd.init()
 
     for (e <- tableName.split(",")) {
-      cd.calc(e)
+      if (e.length > 2) {
+        cd.calc(e)
+      }
     }
 
     for (e <- bName_table.split(",")) {

+ 11 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -32,7 +32,7 @@ trait CompanyDynamicHandle {
     , "" -> "publish_notice" //公示催告
     , "" -> "serious_violation" //严重违法
     , "" -> "simple_cancellation" //简易注销
-    , "" -> "stock_pledge" //股权出质
+    , "company_equity_info" -> "stock_pledge" //股权出质
     , "" -> "tax_illegal" //税收违法
     , "" -> "tax_owenotice" //欠税公告
     , "" -> "judicial" //司法拍卖
@@ -61,7 +61,7 @@ trait CompanyDynamicHandle {
     , "" -> "14" // 行政处罚
     , "" -> "15" // 公示催告
     , "company_env_punishment" -> "16" // 环保处罚
-    , "" -> "17" // 股权出质
+    , "company_equity_info" -> "17" // 股权出质
     , "" -> "18" // 严重违法
     , "" -> "19" // 简易注销
     , "" -> "20" // 欠税公告
@@ -92,7 +92,9 @@ trait CompanyDynamicHandle {
    * @param change_fields
    * @param old_map
    * @param new_map
-   * @return info_type
+   * @return cid
+   *         cname
+   *         info_type
    *         rta_desc
    *         change_content
    *         change_time
@@ -101,15 +103,17 @@ trait CompanyDynamicHandle {
    *         info_risk_level
    *         winhc_suggest
    */
-  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
-    Seq((get_info_type()
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+    Seq((cid
+      , cname
+      , get_info_type()
       , get_rta_desc(old_map, new_map)
       , get_change_content(old_map, new_map)
       , get_change_time(bizDate, new_map)
       , get_biz_id(rowkey)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
-      , if(suggestion == null) "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。" else suggestion
+      , if (suggestion == null) "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。" else suggestion
     ))
   }
 
@@ -145,7 +149,7 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String
+  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String
 
   /**
    * 变更时间

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_abnormal_info.scala

@@ -20,7 +20,7 @@ case class company_abnormal_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("put_department->做出决定机关","remove_department->移出决定机关","put_reason->列入经营异常目录原因","put_date->列入日期","remove_date->移出日期","remove_reason->移出经营异常目录原因"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("put_department->做出决定机关","remove_department->移出决定机关","put_reason->列入经营异常目录原因","put_date->列入日期","remove_date->移出日期","remove_reason->移出经营异常目录原因"))
 
 
   /**

+ 11 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -7,7 +7,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
  * @Date 2020/7/28
  * @Description TODO
  */
-case class company_bid_list() extends CompanyDynamicHandle{
+case class company_bid_list() extends CompanyDynamicHandle {
   /**
    * 信息描述
    *
@@ -24,7 +24,7 @@ case class company_bid_list() extends CompanyDynamicHandle{
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("abs")
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("abs")
 
   /**
    * 业务id
@@ -50,7 +50,9 @@ case class company_bid_list() extends CompanyDynamicHandle{
    * @param change_fields
    * @param old_map
    * @param new_map
-   * @return info_type
+   * @return cid
+   *         cname
+   *         info_type
    *         rta_desc
    *         change_content
    *         change_time
@@ -59,13 +61,13 @@ case class company_bid_list() extends CompanyDynamicHandle{
    *         info_risk_level
    *         winhc_suggest
    */
-  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
-    val proxyName=new_map("proxy")
-    if (proxyName!=null && !proxyName.isEmpty && proxyName.equals(cname)){
-      return null
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+    val proxyName = new_map("proxy")
+    if (proxyName != null && !proxyName.isEmpty && proxyName.equals(cname)) {
+      null
     }
-    else{
-      super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map,cname,"该企业发布或参与招投标行为")
+    else {
+      super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map, cname, "该企业发布或参与招投标行为")
     }
   }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -25,7 +25,7 @@ case class company_env_punishment()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("content")
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("content")
 
   /**
    * 变更时间

+ 89 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala

@@ -0,0 +1,89 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/31 14:59
+ * @Description: 股权出质
+ */
+case class company_equity_info() extends CompanyDynamicHandle {
+
+
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return cid
+   *         cname
+   *         info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+    if ("1".equals(new_map("deleted"))) {
+      return Seq.empty
+    }
+    var list: Seq[(String, String)] = Seq((new_map("cid"), cname))
+    if ("2".equals(new_map.getOrElse("pledgor_type", "0"))) {
+      list = list :+ (new_map("pledgor_id"), new_map("pledgor"))
+    }
+    if ("2".equals(new_map.getOrElse("pledgee_type", "0"))) {
+      list = list :+ (new_map("pledgee_id"), new_map("pledgee"))
+    }
+
+    list.map(t => {
+      (t._1
+        , t._2
+        , get_info_type()
+        , get_rta_desc(old_map, new_map)
+        , get_change_content(old_map, new_map, cname)
+        , get_change_time(bizDate, new_map)
+        , get_biz_id(rowkey)
+        , get_sub_info_type()
+        , get_info_risk_level(old_map, new_map)
+        , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
+      )
+    }).seq
+  }
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增1条股权出质信息"
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    val json = new_map.toJson(Seq("equity_amount->出质股权数额", "state->出质状态", "reg_date->股权出质设立登记日期", "reg_number->质权登记编号"))
+    json.substring(0, json.length - 1) +
+      s""","出质股权标的企业": {"企业名称": "$cname","企业KeyNo": "${new_map("cid")}"},"出质人信息": {"证件号": "${new_map("certif_number_l")}","出质人": "${new_map("pledgor")}"},"质权人信息": {"质权人": "${new_map("pledgee")}","证件号": "${new_map("certif_number_r")}"}}"""
+  }
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "1"
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala

@@ -23,7 +23,7 @@ case class company_own_tax() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
 
 
   /**