Browse Source

财产线索v8

xufei 3 years ago
parent
commit
ec55cd8d8f
34 changed files with 1540 additions and 1 deletions
  1. 3 0
      src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala
  2. 11 0
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_bid.scala
  3. 1 1
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license.scala
  4. 12 0
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/increase_registered_capital_info.scala
  5. 12 0
      src/main/scala/com/winhc/bigdata/spark/ng/change/table/zxr_evaluate.scala
  6. 23 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/AbstractDailyHandle.scala
  7. 156 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorHandleV2.scala
  8. 57 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorMappings.scala
  9. 113 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorRecord.scala
  10. 33 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorUtils.scala
  11. 221 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorV2.scala
  12. 22 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/auction_tracking.scala
  13. 14 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/bankruptcy_open_case.scala
  14. 15 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_bid.scala
  15. 14 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_copyright_reg.scala
  16. 14 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_copyright_works.scala
  17. 30 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_court_open_announcement.scala
  18. 74 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_dishonest_info.scala
  19. 78 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_equity_info.scala
  20. 15 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_finance.scala
  21. 104 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_holder.scala
  22. 113 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_judicial_assistance.scala
  23. 23 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_land_announcement.scala
  24. 72 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_land_mortgage.scala
  25. 17 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_license.scala
  26. 20 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_mortgage_info.scala
  27. 14 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_patent.scala
  28. 16 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_tm.scala
  29. 22 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_zxr.scala
  30. 51 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_zxr_restrict.scala
  31. 20 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/increase_registered_capital_info.scala
  32. 98 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/wenshu_detail_v2.scala
  33. 14 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/zxr_evaluate.scala
  34. 38 0
      src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/zxr_evaluate_results.scala

+ 3 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala

@@ -66,6 +66,9 @@ object NgChangeExtractArgs {
       ,NgChangeExtractArgs(tableName = "company_zxr_restrict",primaryFields = "pid")    //限高
       ,NgChangeExtractArgs(tableName = "company_zxr_restrict",primaryFields = "pid")    //限高
       ,NgChangeExtractArgs(tableName = "restrictions_on_exit",primaryFields = "executed_person_keyno,limited_person_pid")    //限制出镜
       ,NgChangeExtractArgs(tableName = "restrictions_on_exit",primaryFields = "executed_person_keyno,limited_person_pid")    //限制出镜
       ,NgChangeExtractArgs(tableName = "zxr_evaluate_results",primaryFields = "keyno")    //询价评估结果
       ,NgChangeExtractArgs(tableName = "zxr_evaluate_results",primaryFields = "keyno")    //询价评估结果
+      ,NgChangeExtractArgs(tableName = "increase_registered_capital_info")    //增资记录
+      ,NgChangeExtractArgs(tableName = "zxr_evaluate")    //询价评估-选定评估机构
+      ,NgChangeExtractArgs(tableName = "company_bid")    //招投标
     )
     )
 
 
 
 

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_bid.scala

@@ -0,0 +1,11 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+case class company_bid(equCols: Seq[String], is_inc:Boolean) extends NgCompanyChangeHandle {
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("publish_time"))
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_license.scala

@@ -9,5 +9,5 @@ import com.winhc.bigdata.spark.utils.DateUtils
 case class company_license(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 case class company_license(equCols: Seq[String],is_inc:Boolean) extends NgCompanyChangeHandle {
 
 
 
 
-  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("issue_date"))
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("start_date"))
 }
 }

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/increase_registered_capital_info.scala

@@ -0,0 +1,12 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+case class increase_registered_capital_info(equCols: Seq[String], is_inc:Boolean) extends NgCompanyChangeHandle {
+
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("change_time"))
+}

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/zxr_evaluate.scala

@@ -0,0 +1,12 @@
+
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.DateUtils
+
+case class zxr_evaluate(equCols: Seq[String], is_inc:Boolean) extends NgCompanyChangeHandle {
+
+
+  override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("insert_time"))
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/AbstractDailyHandle.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/29 14:29
+ */
+abstract class AbstractDailyHandle() extends CompanyMonitorHandleV2 {
+  override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => {
+    CompanyMonitorUtils.default_filter(update_type, biz_date, change_fields, old_data, new_data)
+  }
+
+  def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo]
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    val new_data = change_extract.new_data
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+    list = list :+ getCompanyMonitorRecord(change_extract, getEntityInfo(new_data))
+    list
+  }
+
+}

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorHandleV2.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+
+import scala.annotation.meta.{getter, setter}
+import scala.collection.mutable
+
+/**
+ * @Author: π
+ * @Date: 2021/9/9
+ * @Description: 企业财产监控v8版本
+ */
+trait CompanyMonitorHandleV2 extends Logging {
+
+  @getter
+  @setter
+  protected val is_inc: Boolean //false 为存量
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  protected def get_change_time(bizDate: String, new_map: Map[String, String]): String = {
+    var res = bizDate
+    if (bizDate.length == 10) {
+      res = res.concat(" 00:00:00")
+    }
+    res
+  }
+
+  /**
+   * 业务id
+   *
+   * @param rowkey
+   * @return
+   */
+  protected def get_biz_id(rowkey: String, new_map: Map[String, String]): String = rowkey
+
+  /**
+   * 条件过滤
+   *
+   * @return
+   */
+  def get_conditional_filter(): String = ""
+
+
+  protected def get_money(new_map: Map[String, String]): Double = 0D
+
+  protected def processMoney(amt: String): String = {
+    var amtstr = amt
+    if (amtstr == null) {
+      "0"
+    } else {
+      amtstr = amtstr.replaceAll("[^\\d.]", "")
+      if (amtstr.equals("") || amtstr.split("\\.").length > 2 || amtstr.startsWith(".") || amtstr.endsWith(".")) {
+        amtstr = "0"
+      }
+      try {
+        amtstr.toDouble
+      } catch {
+        case ex: Exception => {
+          amtstr = "0"
+        }
+      }
+      amtstr
+    }
+  }
+
+  protected def get_deleted(new_map: Map[String, String], key: String = "deleted"): String = {
+    if (new_map == null) return "-1"
+    val deleted = new_map.getOrElse(key, "-1")
+    deleted
+  }
+
+  def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = CompanyMonitorUtils.default_filter
+
+  def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord]
+
+
+  protected def getCompanyMonitorRecord(change_extract: ChangeExtract
+                                        , entity_info: Seq[EntityInfo]
+                                       ): CompanyMonitorRecord = {
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    //val date = if (is_inc) update_time else biz_date
+    if(entity_info.isEmpty){
+      CompanyMonitorRecord(
+        //todo 待验证
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn)
+        , entity_info = entity_info
+        , dimension_type = ""
+        , flow_type = ""
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = ""
+        , `type` = ""
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }else{
+      val type_id = entity_info.map(_.type_id).head
+      val args = CompanyMonitorMappings.get_args(`type_id`)
+      CompanyMonitorRecord(
+        //todo 待验证
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn)
+        , entity_info = entity_info
+        , dimension_type = args.dimension_type
+        , flow_type = args.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args.info_risk_level
+        , `type` = type_id
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+
+  }
+
+
+  protected def getEntity(json: String, id_key: String, name_key: String, type_id: String
+                         ): Seq[EntityInfo] = {
+    if (StringUtils.isEmpty(json)) {
+      Seq.empty
+    } else {
+      val array = JSON.parseArray(json)
+      var list: mutable.Seq[EntityInfo] = mutable.Seq.empty
+      for (i <- 0 until array.size()) {
+        val jSONObject = array.getJSONObject(i)
+        val keyno = jSONObject.getString(id_key)
+        val name = jSONObject.getString(name_key)
+        list = list :+ EntityInfo(keyno = keyno, name = name, type_id = type_id)
+      }
+      list
+    }
+  }
+
+
+
+}

+ 57 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorMappings.scala

@@ -0,0 +1,57 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+/**
+ * @author: π
+ * @date: 2021/9/9 15:46
+ */
+
+case class NgCompanyMonitoArgs(type_id: String //线索id
+                               , type_name: String //线索名称
+                               , dimension_table: String //维度id
+                               , dimension_type: String //维度表名
+                               , flow_type: String //流向id 0 -> 流入 1 -> 流出  2->流向未知
+                               , info_risk_level: String //线索等级id 1->低级 2->中级 3->高级
+                              )
+
+object CompanyMonitorMappings {
+  val args: Seq[NgCompanyMonitoArgs] =
+    Seq(
+      NgCompanyMonitoArgs(type_id = "20", type_name = "新增胜诉案件", dimension_table = "wenshu_detail2", dimension_type = "18", flow_type = "0", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "19", type_name = "有恢复执行案件", dimension_table = "wenshu_detail2", dimension_type = "18", flow_type = "1", info_risk_level = "1")
+      , NgCompanyMonitoArgs(type_id = "27", type_name = "新增财产保全案件", dimension_table = "wenshu_detail2", dimension_type = "18", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "7", type_name = "失信信息移除", dimension_table = "NG_COMPANY_DISHONEST_INFO", dimension_type = "7", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "32", type_name = "新增一条失信", dimension_table = "NG_COMPANY_DISHONEST_INFO", dimension_type = "7", flow_type = "2", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "18", type_name = "限制高消费移除", dimension_table = "NG_COMPANY_ZXR_RESTRICT", dimension_type = "17", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "6", type_name = "有恢复执行案件即将开庭", dimension_table = "NG_COMPANY_COURT_OPEN_ANNOUNCEMENT", dimension_type = "6", flow_type = "1", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "22", type_name = "有资产即将被司法拍卖", dimension_table = "NG_AUCTION_TRACKING", dimension_type = "19", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "8", type_name = "出质了持有股权", dimension_table = "NG_COMPANY_EQUITY_INFO", dimension_type = "8", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "33", type_name = "出质的股权变更为无效", dimension_table = "NG_COMPANY_EQUITY_INFO", dimension_type = "8", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "13", type_name = "抵押了公司土地", dimension_table = "NG_COMPANY_LAND_MORTGAGE", dimension_type = "12", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "21", type_name = "接受了他人土地抵押", dimension_table = "NG_COMPANY_LAND_MORTGAGE", dimension_type = "12", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "12", type_name = "购买了新的土地", dimension_table = "NG_COMPANY_LAND_ANNOUNCEMENT", dimension_type = "11", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "10", type_name = "新增了股东", dimension_table = "NG_COMPANY_HOLDER", dimension_type = "10", flow_type = "0", info_risk_level = "2")//无全量
+      , NgCompanyMonitoArgs(type_id = "11", type_name = "新增对外投资", dimension_table = "NG_COMPANY_HOLDER", dimension_type = "10", flow_type = "1", info_risk_level = "3")//无全量
+      , NgCompanyMonitoArgs(type_id = "34", type_name = "投资的公司,所占股份下降", dimension_table = "NG_COMPANY_HOLDER", dimension_type = "10", flow_type = "0", info_risk_level = "2")//无全量
+      , NgCompanyMonitoArgs(type_id = "2", type_name = "注册资本上升", dimension_table = "NG_INCREASE_REGISTERED_CAPITAL_INFO", dimension_type = "2", flow_type = "0", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "9", type_name = "有新的融资", dimension_table = "NG_COMPANY_FINANCE", dimension_type = "9", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "1", type_name = "新增破产公告", dimension_table = "NG_BANKRUPTCY_OPEN_CASE", dimension_type = "1", flow_type = "1", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "3", type_name = "新增招标信息", dimension_table = "NG_COMPANY_BID", dimension_type = "3", flow_type = "0", info_risk_level = "3")//待处理
+      , NgCompanyMonitoArgs(type_id = "14", type_name = "资产被抵押", dimension_table = "NG_COMPANY_MORTGAGE_INFO", dimension_type = "13", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "16", type_name = "新增商标信息", dimension_table = "NG_COMPANY_TM", dimension_type = "15", flow_type = "0", info_risk_level = "1")
+      , NgCompanyMonitoArgs(type_id = "15", type_name = "新增专利信息", dimension_table = "NG_COMPANY_PATENT", dimension_type = "14", flow_type = "0", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "4", type_name = "新增著作权信息", dimension_table = "NG_COMPANY_COPYRIGHT_REG", dimension_type = "4", flow_type = "0", info_risk_level = "1")
+      , NgCompanyMonitoArgs(type_id = "5", type_name = "新增作品著作权", dimension_table = "NG_COMPANY_COPYRIGHT_WORKS", dimension_type = "5", flow_type = "0", info_risk_level = "1")
+      , NgCompanyMonitoArgs(type_id = "23", type_name = "有资产选定询价评估机构", dimension_table = "NG_ZXR_EVALUATE", dimension_type = "20", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "24", type_name = "有资产完成询价评估", dimension_table = "NG_ZXR_EVALUATE_RESULTS", dimension_type = "21", flow_type = "0", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "26", type_name = "新增行政许可", dimension_table = "NG_COMPANY_LICENSE", dimension_type = "22", flow_type = "0", info_risk_level = "2")
+      , NgCompanyMonitoArgs(type_id = "28", type_name = "在外持有股权被冻结", dimension_table = "NG_COMPANY_JUDICIAL_ASSISTANCE", dimension_type = "23", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "29", type_name = "有股东持有的股权被冻结", dimension_table = "NG_COMPANY_JUDICIAL_ASSISTANCE", dimension_type = "23", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "30", type_name = "有股权解除冻结", dimension_table = "NG_COMPANY_JUDICIAL_ASSISTANCE", dimension_type = "23", flow_type = "1", info_risk_level = "3")
+      , NgCompanyMonitoArgs(type_id = "31", type_name = "有股东持有的股权解除冻结", dimension_table = "NG_COMPANY_JUDICIAL_ASSISTANCE", dimension_type = "23", flow_type = "1", info_risk_level = "3")
+    )
+
+
+  def get_args(type_id: String): NgCompanyMonitoArgs = {
+    args.find(p => type_id.equals(p.type_id)).getOrElse(throw new NullPointerException("type_id is not found !!!"))
+  }
+}

+ 113 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorRecord.scala

@@ -0,0 +1,113 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+import cn.hutool.crypto.SecureUtil
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+
+/**
+ * @author: π
+ * @date: 2021/9/10 13:49
+ */
+case class ChangeExtract(rowkey: String
+                         , company_id: String
+                         , company_name: String
+                         , tn: String
+                         , update_type: String
+                         , old_data: Map[String, String]
+                         , new_data: Map[String, String]
+                         , change_fields: Seq[String]
+                         , biz_date: String
+                         , update_time: String
+                        )
+
+
+case class EntityInfo(keyno: String
+                      , name: String
+                      , type_id: String
+                     ) extends Comparable[EntityInfo] {
+
+
+  override def hashCode(): Int = s"$name".hashCode()
+
+  override def equals(obj: Any): Boolean =
+    obj match {
+      case d: EntityInfo =>
+        s"$name".equals(s"${d.name}")
+      case _ =>
+        false
+    }
+
+  override def compareTo(o: EntityInfo): Int = s"$name".compareTo(s"${o.name}")
+}
+
+case class RowkeyInfo(rowkey: String, tn: String) {
+  def toStr(): String = {
+    s"$tn@@$rowkey"
+  }
+}
+
+
+case class CompanyMonitorRecord(id: String,
+                                entity_info: Seq[EntityInfo],
+                                dimension_type: String,
+                                flow_type: String,
+                                rta_desc: String,
+                                change_time: String,
+                                biz_id: String,
+                                info_risk_level: String,
+                                `type`: String,
+                                create_time: String,
+                                amt: Double,
+                                update_type: String,
+                                deleted: String
+                               ) {
+
+  def format(): CompanyMonitorRecord = {
+    if (id == null) {
+      return null
+    }
+    if (entity_info == null || entity_info.isEmpty) {
+      return null
+    }
+    //剔除id为空的数据
+    val rec = entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
+    if (rec.isEmpty) return null
+
+    if (rec.length != entity_info.length)
+      return CompanyMonitorRecord(id,
+        rec,
+        dimension_type,
+        flow_type,
+        rta_desc,
+        change_time,
+        biz_id,
+        info_risk_level,
+        `type`,
+        create_time,
+        amt,
+        update_type,
+        deleted
+      )
+    this
+  }
+
+  def to_row(): Row = {
+    Row(SecureUtil.md5(id)
+      , entity_info.toJson()
+      , dimension_type
+      , flow_type
+      , rta_desc
+      , change_time
+      , biz_id
+      , info_risk_level
+      , `type`
+      , create_time
+      , amt
+      , update_type
+      , deleted
+    )
+  }
+}
+
+

+ 33 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorUtils.scala

@@ -0,0 +1,33 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @author: π
+ * @date: 2021/9/9 15:46
+ */
+object CompanyMonitorUtils {
+
+  def default_filter(update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]): Boolean = {
+    if (biz_date == null) return false
+    //if (update_type.equals("remove") || update_type.equals("other")) return false
+    if (update_type.equals("update") && change_fields.isEmpty) return false
+    true
+  }
+
+  def generateId(rowkey: String, biz_date: String, tn: String, random_num: String = null): String = {
+    if (StringUtils.isEmpty(random_num)) {
+      s"$rowkey@$biz_date@$tn"
+    } else {
+      s"$rowkey@$biz_date@$tn@$random_num"
+    }
+  }
+
+  def formatDate(date: String): String = {
+    if (StringUtils.isEmpty(date)) {
+      null
+    } else {
+      if (date.contains(" ")) date.split(" ")(0) else date
+    }
+  }
+}

+ 221 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/CompanyMonitorV2.scala

@@ -0,0 +1,221 @@
+package com.winhc.bigdata.spark.ng.monitor
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandleUtils
+import com.winhc.bigdata.spark.jobs.monitor.CompanyMonitorHandle
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.{DoubleType, StringType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
+import scala.collection.mutable
+
+/**
+ * @Author: π
+ * @Date: 2021/9/9
+ * @Description: 企业财产监控v8版本
+ */
+object CompanyMonitorV2 {
+  val targetTab = "winhc_ng.ads_company_monitor_dev"
+
+  case class CompanyMonitorUtil(s: SparkSession,
+                                project: String, //表所在工程名
+                                incr: Boolean //是否增量
+                               ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+    private lazy val org_tab = if (incr) "winhc_ng.bds_change_extract" else "winhc_ng.bds_change_extract_all_v2"
+    private val ds = getLastPartitionsOrElse(org_tab, "0")
+    private val ads_judicial_case_relation_case_amt = "winhc_ng.ads_judicial_case_relation_case_amt"
+
+    def calc(tableName: String, needMoney: String): Unit = {
+      val clazz = getClazz[CompanyMonitorHandleV2](s"com.winhc.bigdata.spark.ng.monitor.table.$tableName", incr)
+      val conditional = clazz.get_conditional_filter()
+      val filter = clazz.filter
+      val flat_map = clazz.flat_map
+      val tn = tableName
+
+      val rdd = sql(
+        s"""
+           |SELECT  *
+           |FROM    $org_tab
+           |WHERE   ds = '$ds'
+           |AND     tn = '$tn'
+           |$conditional
+           |""".stripMargin)
+        .rdd.map(r => {
+        val value = r.getAs[String]("change_fields")
+        val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
+        ChangeExtract(rowkey = r.getAs("rowkey")
+          , company_id = r.getAs("company_id")
+          , company_name = null
+          , tn = r.getAs("table_name")
+          , update_type = r.getAs("update_type")
+          , old_data = r.getAs("old_data")
+          , new_data = r.getAs("new_data")
+          , change_fields = change_fields
+          , biz_date = r.getAs("biz_date")
+          , update_time = r.getAs("update_time")
+        )
+      }).filter(r => {
+        if (filter == null) {
+          true
+        } else {
+          filter(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
+        }
+      }).flatMap(flat_map)
+        .map(_.format())
+        .filter(_ != null)
+
+      val schema = getSchema(ListMap(
+        "id" -> StringType
+        , "entity_info" -> StringType
+        , "dimension_type" -> StringType
+        , "flow_type" -> StringType
+        , "rta_desc" -> StringType
+        , "change_time" -> StringType
+        , "biz_id" -> StringType
+        , "info_risk_level" -> StringType
+        , "type" -> StringType
+        , "create_time" -> StringType
+        , "amt" -> DoubleType
+        , "update_type" -> StringType
+        , "deleted" -> StringType
+      ))
+
+      spark.createDataFrame(rdd.map(_.to_row()), schema)
+        .createOrReplaceTempView(s"company_monitor_tmp1_$tableName")
+
+      needMoney match {
+        case "0" => {
+          sql(
+            s"""
+               |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $targetTab PARTITION(ds='$ds',tn='$tableName')
+               |SELECT  id
+               |        , entity_info
+               |        , dimension_type
+               |        , flow_type
+               |        , rta_desc
+               |        , change_time
+               |        , biz_id
+               |        , info_risk_level
+               |        , type
+               |        , create_time
+               |        , amt
+               |        , update_type
+               |        , deleted
+               |FROM
+               |    company_monitor_tmp1_$tableName
+               |WHERE id IS NOT NULL
+               |""".stripMargin)
+        }
+        case "1" => {
+          sql(
+            s"""
+               |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $targetTab PARTITION(ds='$ds',tn='$tableName')
+               |SELECT  id
+               |        , entity_info
+               |        , dimension_type
+               |        , flow_type
+               |        , rta_desc
+               |        , change_time
+               |        , biz_id
+               |        , info_risk_level
+               |        , type
+               |        , create_time
+               |        , case_amt
+               |        , update_type
+               |        , deleted
+               |FROM
+               |    (
+               |        SELECT a.*, b.case_amt
+               |        FROM  company_monitor_tmp1_$tableName a
+               |        LEFT JOIN $ads_judicial_case_relation_case_amt b  ON a.biz_id = b.detail_id and b.tn = '$tableName'
+               |    )
+               |WHERE id IS NOT NULL
+               |""".stripMargin)
+        }
+      }
+    }
+  }
+
+  private val startArgs = Seq(
+    Args(tableName = "wenshu_detail_v2")
+    , Args(tableName = "company_dishonest_info", needMoney = "1") //todo 金额提取
+    //, Args(tableName = "company_zxr")
+    , Args(tableName = "company_zxr_restrict", needMoney = "1") //todo 金额提取
+    , Args(tableName = "company_court_open_announcement")
+    , Args(tableName = "company_equity_info")
+    , Args(tableName = "company_land_mortgage")
+    , Args(tableName = "company_land_announcement")
+    , Args(tableName = "company_finance")
+    , Args(tableName = "bankruptcy_open_case")
+    , Args(tableName = "company_bid") //todo 待处理
+    , Args(tableName = "company_mortgage_info")
+    , Args(tableName = "company_tm")
+    , Args(tableName = "company_patent")
+    , Args(tableName = "company_copyright_reg")
+    , Args(tableName = "company_copyright_works")
+    , Args(tableName = "company_holder")
+    //, Args(tableName = "company", bName = 1)
+    , Args(tableName = "auction_tracking")
+    , Args(tableName = "increase_registered_capital_info")
+    , Args(tableName = "zxr_evaluate")
+    , Args(tableName = "zxr_evaluate_results")
+    , Args(tableName = "company_license")
+    , Args(tableName = "company_judicial_assistance")
+
+  )
+
+  private case class Args(project: String = "winhc_ng"
+                          , tableName: String
+                          , needMoney: String = "0")
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length != 3) {
+      println(
+        s"""
+           |Please enter the legal parameters !
+           |<project> <tableNames> <incr>
+           |""".stripMargin)
+      sys.exit(-99)
+    }
+
+    val Array(project, tableNames, incr) = args
+
+    println(
+      s"""
+         |project: $project
+         |tableNames: $tableNames
+         |incr: $incr
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("CompanyMonitorV2", config)
+    val cd = CompanyMonitorUtil(spark, project, if ("incr".equals(incr)) true else false)
+
+    var start = startArgs
+    if (!tableNames.equals("all")) {
+      val set = tableNames.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      cd.calc(e.tableName, e.needMoney) //通用处理
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+    spark.stop()
+  }
+}

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/auction_tracking.scala

@@ -0,0 +1,22 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.utils.BaseUtil
+
+/**
+ * @Date: 2021/9/9
+ * @Description: 司法拍卖
+ */
+case class auction_tracking(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("query_price")), "10000")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("company_info"), "company_id", "company_name", "22")
+
+  }
+
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/bankruptcy_open_case.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 新增破产公告
+ */
+case class bankruptcy_open_case(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data("respondent_info"), id_key = "litigant_id", name_key = "name", "1")
+  }
+}

+ 15 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_bid.scala

@@ -0,0 +1,15 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 招投标
+ */
+case class company_bid(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("purchaser_info"), "keyno", "company_name", "3")
+  }
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_copyright_reg.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 软件著作权
+ */
+case class company_copyright_reg(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("author_nationality_info"), "keyno", "name","4")
+  }
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_copyright_works.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 著作权
+ */
+case class company_copyright_works(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("author_info"), "keyno", "name","5")
+  }
+}

+ 30 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_court_open_announcement.scala

@@ -0,0 +1,30 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import scala.collection.mutable
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 有恢复执行案件即将开庭(被告)
+ */
+case class company_court_open_announcement(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def get_conditional_filter(): String = {
+    "AND  new_data['case_no'] like concat('%','恢','%')"
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    try {
+      var list: mutable.Seq[EntityInfo] = mutable.Seq.empty
+      val company_info = new_data.getOrEmptyStr("defendant_info")
+      list = list ++ getEntity(company_info, "litigant_id", "name","6")
+      list
+    } catch {
+      case e: Exception => {
+        logError(e.getMessage, e)
+        Seq.empty
+      }
+    }
+  }
+
+}

+ 74 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_dishonest_info.scala

@@ -0,0 +1,74 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo, NgCompanyMonitoArgs}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 失信信息移除
+ *               新增一条失信
+ */
+case class company_dishonest_info(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+    if (new_data.getOrEmptyStr("deleted").equals("1")) {
+      //失信信息移除
+      val `type` = "7"
+      val args = CompanyMonitorMappings.get_args(`type`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("keyno"), name = new_data.getOrEmptyStr("name"), type_id = `type`))
+        , dimension_type = args.dimension_type
+        , flow_type = args.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    } else if (new_data.getOrEmptyStr("deleted").equals("0")
+      && (new_data.getOrEmptyStr("action_content").contains("有履行能力")
+      || new_data.getOrEmptyStr("action_content").contains("转移财产")
+      || new_data.getOrEmptyStr("action_content").contains("违反财产报告制度")
+      )) {
+      val `type` = "32"
+      val args = CompanyMonitorMappings.get_args(`type`)
+      //新增一条失信
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("keyno"), name = new_data.getOrEmptyStr("name"), type_id = `type`))
+        , dimension_type = args.dimension_type
+        , flow_type = args.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+    list
+  }
+}

+ 78 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_equity_info.scala

@@ -0,0 +1,78 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.utils.BaseUtil
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 出质了持有股权(非历史忽略)
+ *               出质的股权变更为无效
+ */
+case class company_equity_info(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("equity_amount")),"1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    val change_fields = change_extract.change_fields
+
+    //出质了持有股权(非历史忽略)
+    val `type1` = "8"
+    val args1 = CompanyMonitorMappings.get_args(`type1`)
+    list = list :+ CompanyMonitorRecord(
+      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
+      , entity_info = getEntity(new_data.getOrEmptyStr("pledgor_info"), "pledgor_id", "pledgor",type_id=`type1`)
+      , dimension_type = args1.dimension_type
+      , flow_type = args1.flow_type
+      , rta_desc = ""
+      , change_time = biz_date
+      , biz_id = rowkey
+      , info_risk_level = args1.info_risk_level
+      , `type` = `type1`
+      , create_time = update_time
+      , amt = get_money(new_data)
+      , update_type = update_type
+      , deleted = get_deleted(new_data)
+    )
+
+    //出质的股权变更为无效
+    if (new_data.getOrEmptyStr("state").equals("无效")) {
+      val `type2` = "33"
+      val args2 = CompanyMonitorMappings.get_args(`type2`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
+        , entity_info = getEntity(new_data.getOrEmptyStr("pledgor_info"), "pledgor_id", "pledgor",type_id=`type2`)
+        , dimension_type = args2.dimension_type
+        , flow_type = args2.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args2.info_risk_level
+        , `type` = `type2`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+
+
+    list
+  }
+}

+ 15 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_finance.scala

@@ -0,0 +1,15 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 融资信息
+ */
+case class company_finance(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = "9"))
+  }
+}

+ 104 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_holder.scala

@@ -0,0 +1,104 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo}
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 新增了股东
+ *               新增对外投资
+ *               投资的公司,所占股份下降
+ */
+case class company_holder(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("amount")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val old_data = change_extract.old_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    val change_fields = change_extract.change_fields
+
+    //新增了股东
+    val `type1` = "10"
+    val args1 = CompanyMonitorMappings.get_args(`type1`)
+    list = list :+ CompanyMonitorRecord(
+      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
+      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = `type1`))
+      , dimension_type = args1.dimension_type
+      , flow_type = args1.flow_type
+      , rta_desc = ""
+      , change_time = biz_date
+      , biz_id = rowkey
+      , info_risk_level = args1.info_risk_level
+      , `type` = `type1`
+      , create_time = update_time
+      , amt = get_money(new_data)
+      , update_type = update_type
+      , deleted = get_deleted(new_data)
+    )
+
+    //新增对外投资
+    val `type2` = "11"
+    val args2 = CompanyMonitorMappings.get_args(`type2`)
+    list = list :+ CompanyMonitorRecord(
+      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
+      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("holder_id"), name = new_data.getOrEmptyStr("holder_name"), type_id = `type2`))
+      , dimension_type = args2.dimension_type
+      , flow_type = args2.flow_type
+      , rta_desc = ""
+      , change_time = CompanyMonitorUtils.formatDate(biz_date)
+      , biz_id = rowkey
+      , info_risk_level = args2.info_risk_level
+      , `type` = `type2`
+      , create_time = CompanyMonitorUtils.formatDate(update_time)
+      , amt = get_money(new_data)
+      , update_type = update_type
+      , deleted = get_deleted(new_data)
+    )
+
+    //投资的公司,所占股份下降
+    if (new_data != null && old_data != null) {
+      val new_percent = new_data.getOrEmptyStr("percent")
+      val old_percent = old_data.getOrEmptyStr("percent")
+      if (change_fields != null && change_fields.contains("percent") && StringUtils.isNotBlank(new_percent) && StringUtils.isNotBlank(old_percent) && new_percent.toDouble < old_percent.toDouble) {
+        val `type3` = "34"
+        val args3 = CompanyMonitorMappings.get_args(`type3`)
+        list = list :+ CompanyMonitorRecord(
+          id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type3`)
+          , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("holder_id"), name = new_data.getOrEmptyStr("holder_name"), type_id = `type3`))
+          , dimension_type = args3.dimension_type
+          , flow_type = args3.flow_type
+          , rta_desc = ""
+          , change_time = CompanyMonitorUtils.formatDate(biz_date)
+          , biz_id = rowkey
+          , info_risk_level = args3.info_risk_level
+          , `type` = `type3`
+          , create_time = CompanyMonitorUtils.formatDate(update_time)
+          , amt = get_money(new_data)
+          , update_type = update_type
+          , deleted = get_deleted(new_data)
+        )
+      }
+    }
+    list
+  }
+}
+
+

+ 113 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_judicial_assistance.scala

@@ -0,0 +1,113 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.ng.monitor._
+import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/9/13 11:44
+ * @Description: 在外持有股权被冻结
+ *               有股东持有的股权被冻结
+ *               有股权解除冻结
+ *               有股东持有的股权解除冻结
+ */
+case class company_judicial_assistance(is_inc: Boolean) extends AbstractDailyHandle {
+
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("equity_amount")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+    if (StringUtils.isNotBlank(new_data.getOrEmptyStr("fz_execute_order_num"))) {
+      //冻结信息
+      val `type1` = "28"
+      val args1 = CompanyMonitorMappings.get_args(`type1`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("executed_person_id"), name = new_data.getOrEmptyStr("executed_person"), type_id = `type1`))
+        , dimension_type = args1.dimension_type
+        , flow_type = args1.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args1.info_risk_level
+        , `type` = `type1`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+      val `type2` = "29"
+      val args2 = CompanyMonitorMappings.get_args(`type2`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = `type2`))
+        , dimension_type = args2.dimension_type
+        , flow_type = args2.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args2.info_risk_level
+        , `type` = `type2`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    } else if (StringUtils.isNotBlank(new_data.getOrEmptyStr("lf_execute_order_num"))) {
+      //解冻信息
+      val `type3` = "30"
+      val args3 = CompanyMonitorMappings.get_args(`type3`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type3`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("executed_person_id"), name = new_data.getOrEmptyStr("executed_person"), type_id = `type3`))
+        , dimension_type = args3.dimension_type
+        , flow_type = args3.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args3.info_risk_level
+        , `type` = `type3`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+      val `type4` = "31"
+      val args4 = CompanyMonitorMappings.get_args(`type4`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type4`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = `type4`))
+        , dimension_type = args4.dimension_type
+        , flow_type = args4.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args4.info_risk_level
+        , `type` = `type4`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+    list
+  }
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_land_announcement.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 购买了新的土地-购地信息
+ */
+case class company_land_announcement(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("tran_price")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+
+    Seq(
+      EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("application_name"), type_id = "12")
+    )
+  }
+}

+ 72 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_land_mortgage.scala

@@ -0,0 +1,72 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.utils.BaseUtil
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 抵押了公司土地-抵押人
+ *               接受了他人土地抵押-抵押权人
+ */
+case class company_land_mortgage(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("mortgage_amount")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+    //抵押了公司土地
+    val `type1` = "13"
+    val args1 = CompanyMonitorMappings.get_args(`type1`)
+    list = list :+ CompanyMonitorRecord(
+      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
+      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("mortgagor_company_id"), name = new_data.getOrEmptyStr("mortgagor"), type_id = `type1`))
+      , dimension_type = args1.dimension_type
+      , flow_type = args1.flow_type
+      , rta_desc = ""
+      , change_time = biz_date
+      , biz_id = rowkey
+      , info_risk_level = args1.info_risk_level
+      , `type` = `type1`
+      , create_time = update_time
+      , amt = get_money(new_data)
+      , update_type = update_type
+      , deleted = get_deleted(new_data)
+    )
+    val `type2` = "21"
+    val args2 = CompanyMonitorMappings.get_args(`type2`)
+    //接受了他人土地抵押
+    list = list :+ CompanyMonitorRecord(
+      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
+      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("mortgagee_company_id"), name = new_data.getOrEmptyStr("mortgagee"), type_id = `type2`))
+      , dimension_type = args2.dimension_type
+      , flow_type = args2.flow_type
+      , rta_desc = ""
+      , change_time = biz_date
+      , biz_id = rowkey
+      , info_risk_level = args2.info_risk_level
+      , `type` = `type2`
+      , create_time = update_time
+      , amt = get_money(new_data)
+      , update_type = update_type
+      , deleted = get_deleted(new_data)
+    )
+    list
+  }
+}

+ 17 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_license.scala

@@ -0,0 +1,17 @@
+
+
+package com.winhc.bigdata.spark.ng.monitor.table
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+
+/**
+ * @author: π
+ * @date: 2021/9/15 16:44
+ */
+case class company_license(is_inc:Boolean) extends AbstractDailyHandle {
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"),type_id = "26"))
+  }
+
+}
+

+ 20 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_mortgage_info.scala

@@ -0,0 +1,20 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 资产被抵押-动产抵押
+ */
+case class company_mortgage_info(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("amount")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = "14"))
+  }
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_patent.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 新增专利信息
+ */
+case class company_patent(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    getEntity(new_data.getOrEmptyStr("applicant_name_info"), "keyno", "name","15")
+  }
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_tm.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 新增商标信息
+ */
+case class company_tm(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("keyno"), name = new_data.getOrEmptyStr("applicant_cn"), type_id = "16"))
+  }
+
+}

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_zxr.scala

@@ -0,0 +1,22 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 企业被执
+ */
+case class company_zxr(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+
+    var amtstr = new_map("exec_money")
+    amtstr = processMoney(amtstr)
+    amtstr = (amtstr.toDouble/10000 ).formatted("%.2f")
+    amtstr.toDouble
+
+
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = ???
+}

+ 51 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_zxr_restrict.scala

@@ -0,0 +1,51 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/8/12 18:50
+ * @Description: 限制高消费移除
+ */
+case class company_zxr_restrict(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+    if (new_data.getOrEmptyStr("deleted").equals("1")) {
+      //限制高消费移除
+      val `type` = "18"
+      val args = CompanyMonitorMappings.get_args(`type`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), `type_id` = `type`)
+          , EntityInfo(keyno = new_data.getOrEmptyStr("pid"), name = new_data.getOrEmptyStr("person_name"), `type_id` = `type`))
+        , dimension_type = args.dimension_type
+        , flow_type = args.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+    list
+  }
+}

+ 20 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/increase_registered_capital_info.scala

@@ -0,0 +1,20 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.utils.BaseUtil
+
+/**
+ * @Date: 2020/12/14
+ * @Description: 注册资本上升-增资记录
+ */
+case class increase_registered_capital_info(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("content_after")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"),type_id = "2"))
+  }
+}

+ 98 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/wenshu_detail_v2.scala

@@ -0,0 +1,98 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, ChangeExtract, CompanyMonitorMappings, CompanyMonitorRecord, CompanyMonitorUtils, EntityInfo}
+import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+import scala.collection.mutable
+
+/**
+ * @Date: 2020/9/13 11:44
+ * @Description: 新增胜诉案件(原告) | AND data['is_success'] = '胜'  AND data['case_stage']= '一审'  AND  data['case_type'] = '民事案件' AND  data['name_type'] = 'y'
+ *               有恢复执行案件(被告) | AND  data['case_no'] like concat('%','恢','%')
+ *               新增财产保全案件(被告) | AND  data['case_no'] like concat('%','恢','%')
+ */
+case class wenshu_detail_v2(is_inc: Boolean) extends AbstractDailyHandle {
+
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    BaseUtil.amt_div(processMoney(new_map("judge_amt")), "1")
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq.empty[EntityInfo]
+  }
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyMonitorRecord] = (change_extract: ChangeExtract) => {
+    var list: mutable.Seq[CompanyMonitorRecord] = mutable.Seq.empty
+
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+    if (new_data.getOrEmptyStr("is_success").equals("胜")
+      && new_data.getOrEmptyStr("case_stage").equals("一审")
+      && new_data.getOrEmptyStr("case_type").equals("民事案件")) {
+      //新增胜诉案件
+      val `type` = "20"
+      val args1 = CompanyMonitorMappings.get_args(`type`)
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = getEntity(new_data.getOrEmptyStr("plaintiff_info"), "litigant_id", "name",type_id = `type`)
+        , dimension_type = args1.dimension_type
+        , flow_type = args1.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args1.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    } else if (new_data.getOrEmptyStr("case_no").contains("恢")) {
+      val `type` = "19"
+      val args2 = CompanyMonitorMappings.get_args(`type`)
+      //有恢复执行案件
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = getEntity(new_data.getOrEmptyStr("defendant_info"), "litigant_id", "name",type_id = `type`)
+        , dimension_type = args2.dimension_type
+        , flow_type = args2.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args2.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    } else if (new_data.getOrEmptyStr("case_no").contains("保")) {
+      val `type` = "27"
+      val args3 = CompanyMonitorMappings.get_args(`type`)
+      //新增财产保全案件
+      list = list :+ CompanyMonitorRecord(
+        id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type`)
+        , entity_info = getEntity(new_data.getOrEmptyStr("defendant_info"), "litigant_id", "name",type_id = `type`)
+        , dimension_type = args3.dimension_type
+        , flow_type = args3.flow_type
+        , rta_desc = ""
+        , change_time = biz_date
+        , biz_id = rowkey
+        , info_risk_level = args3.info_risk_level
+        , `type` = `type`
+        , create_time = update_time
+        , amt = get_money(new_data)
+        , update_type = update_type
+        , deleted = get_deleted(new_data)
+      )
+    }
+    list
+  }
+}

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/zxr_evaluate.scala

@@ -0,0 +1,14 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+/**
+ * @Date: 2021/1/07 18:50
+ * @Description: 询价评估
+ */
+case class zxr_evaluate(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("keyno"), name = new_data.getOrEmptyStr("name"),type_id = "23"))
+  }
+}

+ 38 - 0
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/zxr_evaluate_results.scala

@@ -0,0 +1,38 @@
+package com.winhc.bigdata.spark.ng.monitor.table
+
+import com.alibaba.fastjson.JSONArray
+import com.winhc.bigdata.spark.ng.monitor.{AbstractDailyHandle, EntityInfo}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import org.apache.commons.lang3.StringUtils
+import com.alibaba.fastjson.JSON
+import scala.collection.mutable.ListBuffer
+
+/**
+ * @Date: 2021/1/07 18:50
+ * @Description: 询价评估结果
+ */
+case class zxr_evaluate_results(is_inc: Boolean) extends AbstractDailyHandle {
+
+  override protected def get_money(new_map: Map[String, String]): Double = {
+    val money = new_map("money")
+    if (StringUtils.isBlank(money)) {
+      0D
+    } else {
+      val array: JSONArray = JSON.parseArray(money)
+      if (array.size() == 0) {
+        0D
+      } else {
+        val list = ListBuffer[Double]()
+        for (i <- 0 until array.size()) {
+          list.append(array.getString(i).toDouble)
+        }
+        amt_div(amt_div(list.sum.toString, list.size.toString).toString, "10000")
+      }
+    }
+  }
+
+  override def getEntityInfo(new_data: Map[String, String]): Seq[EntityInfo] = {
+    Seq(EntityInfo(keyno = new_data.getOrEmptyStr("keyno"), name = new_data.getOrEmptyStr("name"),type_id = "24"))
+  }
+}