Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
#	src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
yandawei 4 rokov pred
rodič
commit
19f756b5d3
25 zmenil súbory, kde vykonal 858 pridanie a 37 odobranie
  1. 32 21
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala
  2. 10 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  3. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_check_info.scala
  4. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_announcement_list.scala
  5. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_open_announcement_list.scala
  6. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_register_list.scala
  7. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_double_random_check_info.scala
  8. 19 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_illegal_info.scala
  9. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_judicial_sale_combine_list.scala
  10. 24 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_send_announcement_list.scala
  11. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_tax_contravention.scala
  12. 2 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  13. 22 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  14. 45 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala
  15. 61 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala
  16. 58 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala
  17. 61 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala
  18. 46 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala
  19. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala
  20. 54 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_illegal_info.scala
  21. 49 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala
  22. 78 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala
  23. 75 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala
  24. 1 0
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala
  25. 24 2
      src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

+ 32 - 21
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -39,26 +39,37 @@ object CompanySummaryBySingle extends Logging {
       }
 
       val cols = Seq("rowkey", "ds")
-      val df = sql(
-        s"""
-           |SELECT  split(rowkey,'_')[0] AS cid
-           |        ,COUNT(1) as ${tableName}
-           |FROM    (
-           |            SELECT  t1.*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
-           |            FROM    (
-           |                        SELECT  ${cols.mkString(",")}
-           |                        FROM    winhc_eci_dev.ads_company_change
-           |                        WHERE   ds = ${lastDs}
-           |                        UNION ALL
-           |                        SELECT  ${cols.mkString(",")}
-           |                        FROM    winhc_eci_dev.inc_ads_company_change
-           |                        WHERE   ds > $lastDs
-           |                    ) AS t1
-           |        ) AS t2
-           |WHERE   t2.num = 1
-           |GROUP BY split(rowkey,'_')[0]
-           |""".stripMargin)
+      val df =
+
+        sql(
+          s"""
+             |SELECT  split(rowkey,'_')[0] AS cid
+             |        ,COUNT(1) as ${tableName}
+             |FROM    (
+             |            SELECT  t1.*
+             |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+             |            FROM    (
+             |                        SELECT  ${cols.mkString(",")}
+             |                        FROM    winhc_eci_dev.ads_${tableName}
+             |                        WHERE   ds = ${lastDs}
+             |                        ${
+            spark.catalog.tableExists(s"winhc_eci_dev.inc_ads_${tableName}") match {
+              case true => {
+                s"""
+                   |                        UNION ALL
+                   |                        SELECT  ${cols.mkString(",")}
+                   |                        FROM    winhc_eci_dev.inc_ads_${tableName}
+                   |                        WHERE   ds > $lastDs
+                   |""".stripMargin
+              }
+              case _ => ""
+            }
+          }
+             |                    ) AS t1
+             |        ) AS t2
+             |WHERE   t2.num = 1
+             |GROUP BY split(rowkey,'_')[0]
+             |""".stripMargin)
 
       if (out != null) {
         df.createTempView("xjk_tmp_summary_test")
@@ -95,7 +106,7 @@ object CompanySummaryBySingle extends Logging {
 
     val spark = SparkUtils.InitEnv("CompanySummaryBySingle", config)
 
-    CompanySummaryBySingleUtil(spark, "winhc_eci_dev").all(tableName,out = "xjk_tmp_summ")
+    CompanySummaryBySingleUtil(spark, "winhc_eci_dev").all(tableName, out = "xjk_tmp_summ")
 
     spark.stop()
   }

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -301,6 +301,15 @@ object ChangeExtract {
     , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy=false) //破产重整
     , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num")//公示催告
     , Args(tableName = "company_mortgage_info", primaryFields = "reg_num")//动产抵押
+    , Args(tableName = "company_stock_announcement", primaryFields = "title")//企业公告
+    , Args(tableName = "company_check_info", primaryFields = "check_result")//抽查检查
+    , Args(tableName = "company_court_announcement_list", primaryFields = "content")//法院公告
+    , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason")//开庭公告
+    , Args(tableName = "company_court_register_list", primaryFields = "area")//立案信息
+    , Args(tableName = "company_double_random_check_info", primaryFields = "check_plan_name")//双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", primaryFields = "title")//司法拍卖
+    , Args(tableName = "company_tax_contravention", primaryFields = "case_type")//税收违法
+    , Args(tableName = "company_send_announcement_list", primaryFields = "title")//送达公告
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")
@@ -311,6 +320,7 @@ object ChangeExtract {
     , Args(tableName = "company_staff", primaryFields = "staff_type")
     //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
     , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
+    , Args(tableName = "company_illegal_info",  primaryFields = "remove_reason")
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_check_info.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:抽查检查
+ */
+
+case class company_check_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_org"), s"抽查检查发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_org"), s"新增抽查检查")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "抽查检查", Array("check_org", "check_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("check_date"))){
+      newMap("update_time")
+    }else{
+      newMap("check_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_announcement_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:法院公告
+ */
+
+case class company_court_announcement_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("bltn_no"), s"${newMap("bltn_no")}法院公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("bltn_no"), s"新增${newMap("bltn_no")}法院公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "法院公告", Array("bltn_no", "publish_date", "case_no"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("publish_date"))){
+      newMap("update_time")
+    }else{
+      newMap("publish_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_open_announcement_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:开庭公告
+ */
+
+case class company_court_open_announcement_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}开庭公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}开庭公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "开庭公告", Array("case_no", "start_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String ={
+    if(StringUtils.isBlank(newMap("start_date"))){
+      newMap("update_time")
+    }else{
+      newMap("start_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_register_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:立案信息
+ */
+
+case class company_court_register_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}立案信息发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}立案信息")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "立案信息", Array("case_no", "filing_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("filing_date"))){
+      newMap("update_time")
+    }else{
+      newMap("filing_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_double_random_check_info.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:双随机抽查
+ */
+
+case class company_double_random_check_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_task_num"), s"${newMap("check_task_num")}双随机抽查发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_task_num"), s"新增${newMap("check_task_num")}双随机抽查")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "双随机抽查", Array("check_task_num", "check_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("check_date"))){
+      newMap("update_time")
+    }else{
+      newMap("check_date")
+    }
+  }
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_illegal_info.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/11 17:26
+ * @Description: 严重违法
+ */
+case class company_illegal_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "严重违法发生变化"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = "新增一条严重违法"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String =  ChangeExtractUtils.getTags(newMap, "严重违法", Array("put_reason", "put_date", "put_department", "remove_reason", "remove_date", "remove_department", "type"))
+
+  override def getBizTime(newMap: Map[String, String]): String = DateUtils.getMaxDate(newMap.getOrElse("put_date", ""), newMap.getOrElse("remove_date", ""))
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_judicial_sale_combine_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:司法拍卖
+ */
+
+case class company_judicial_sale_combine_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}司法拍卖发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}司法拍卖")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "司法拍卖", Array("title", "pub_time"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("pub_time"))){
+      newMap("update_time")
+    }else{
+      newMap("pub_time")
+    }
+  }
+}

+ 24 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_send_announcement_list.scala

@@ -0,0 +1,24 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: Yan Yongnian
+ * @Date: 2020/8/12
+ * @Description:
+ */
+
+
+//送达公告
+
+case class company_send_announcement_list(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}送达公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}送达公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "送达公告", Array("court", "case_no", "area", "case_reason", "plaintiff", "defendant"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_tax_contravention.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:税收违法
+ */
+
+case class company_tax_contravention(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("taxpayer_number"), s"${newMap("taxpayer_number")}税收违法发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("taxpayer_number"), s"新增${newMap("taxpayer_number")}税收违法")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "税收违法", Array("taxpayer_number", "publish_time"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("publish_time"))){
+      newMap("update_time")
+    }else{
+      newMap("publish_time")
+    }
+  }
+}

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -182,6 +182,7 @@ object CompanyDynamic {
     , Args(tableName = "company_staff", bName = 0)
     , Args(tableName = "company", bName = 0)
     , Args(tableName = "bankruptcy_open_case", bName = 1)
+    , Args(tableName = "company_illegal_info", bName = 0)
     , Args(tableName = "company_land_publicity", bName = 1)
     , Args(tableName = "company_employment", bName = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
@@ -194,6 +195,7 @@ object CompanyDynamic {
     , Args(tableName = "company_public_announcement2_list", bName = 1)
     , Args(tableName = "company_mortgage_info", bName = 1)
     , Args(tableName = "company_stock_announcement", bName = 1)
+    , Args(tableName = "company_send_announcement_list", bName = 1)
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 22 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -34,18 +34,22 @@ trait CompanyDynamicHandle {
     , "" -> "serious_violation" //严重违法
     , "" -> "simple_cancellation" //简易注销
     , "company_equity_info" -> "stock_pledge" //股权出质
-    , "" -> "tax_illegal" //税收违法
-    , "" -> "tax_owenotice" //欠税公告
-    , "" -> "judicial" //司法拍卖
+    , "company_tax_contravention" -> "tax_illegal" //税收违法
+    , "company_own_tax" -> "tax_owenotice" //欠税公告
+    , "company_judicial_sale_combine_list" -> "judicial" //司法拍卖
     , "company_employment" -> "recruit" //招聘信息
     , "" -> "liquidation_information" //清算信息
     , "" -> "investor_equity_change" //大股东变更
     , "" -> "actual_controller_change" //实际控制人变更
-    , "" -> "court_notice" //开庭公告
+    , "company_court_open_announcement_list" -> "court_notice" //开庭公告
     , "bankruptcy_open_case" -> "bankruptcy_open_case" //破产重整
     , "company_stock_announcement" -> "company_stock_announcement" //企业公告
+    , "company_send_announcement_list" -> "company_send_announcement_list" //送达公告
 
     , "company_staff" -> "company_staff" //主要成员
+    , "company_check_info" -> "spot_check" //抽查检查
+    , "company_double_random_check_info" -> "company_double_random_check_info" //双随机抽查
+    , "company_court_register" -> "company_court_register" //立案信息
   )
 
   private val table_2_info_type = Map(
@@ -56,10 +60,10 @@ trait CompanyDynamicHandle {
     , "company_abnormal_info" -> "4" // 经营异常
     , "" -> "5" // 知识产权
     , "" -> "6" // 裁判文书
-    , "" -> "7" // 法院公告
+    , "company_court_announcement_list" -> "7" // 法院公告
     , "" -> "8" // 对外投资
     , "company_mortgage_info" -> "9" // 动产抵押
-    , "" -> "10" // 司法拍卖
+    , "company_judicial_sale_combine_list" -> "10" // 司法拍卖
     , "company_land_publicity" -> "11-1" // 土地信息-地块公示
     , "company_land_announcement" -> "11-2" // 土地信息-购地信息
     , "company_land_transfer" -> "11-3" // 土地信息-土地转让
@@ -73,26 +77,27 @@ trait CompanyDynamicHandle {
     , "company_equity_info" -> "17" // 股权出质
     , "" -> "18" // 严重违法
     , "" -> "19" // 简易注销
-    , "" -> "20" // 欠税公告
-    , "" -> "21" // 税收违法
+    , "company_own_tax" -> "20" // 欠税公告
+    , "company_tax_contravention" -> "21" // 税收违法
     , "" -> "22" // 股权冻结
     , "" -> "23" // 清算信息
     , "" -> "24" // 大股东变更
     , "" -> "25" // 实际控制人变更
-    , "" -> "26" // 开庭公告
+    , "company_court_open_announcement_list" -> "26" // 开庭公告
     , "" -> "27" // 新闻信息
     , "" -> "28" // 股东信息
     , "" -> "29" // 最终受益人
     , "company_staff" -> "30" // 主要成员
     , "" -> "31" // 融资动态
     , "company_stock_announcement" -> "32" // 企业公告
-    , "" -> "33" // 抽查检查
+    , "company_check_info" -> "33" // 抽查检查
     , "" -> "34" // 行政许可
-    , "" -> "35" // 双随机抽查
+    , "company_double_random_check_info" -> "35" // 双随机抽查
     , "" -> "36" // 限制高消费
     , "" -> "37" // 被执行人
-    , "" -> "38" // 送达报告
+    , "company_send_announcement_list" -> "38" // 送达报告
     , "bankruptcy_open_case" -> "39" // 破产重整
+    , "company_court_register" -> "40" // 立案信息
   )
 
 
@@ -172,10 +177,14 @@ trait CompanyDynamicHandle {
    *         winhc_suggest
    */
   def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+    val rta_desc = get_rta_desc(old_map, new_map)
+    if (rta_desc == null) {
+      return Seq.empty
+    }
     Seq((cid
       , cname
       , get_info_type()
-      , get_rta_desc(old_map, new_map)
+      , rta_desc
       , get_change_content(old_map, new_map)
       , get_change_time(bizDate, new_map)
       , get_biz_id(rowkey)

+ 45 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala

@@ -0,0 +1,45 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 抽查检查
+ */
+case class company_check_info() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""
+       |日期:${new_map("check_date")}\n
+       |结果:${new_map("check_result")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 法院公告
+ */
+case class company_court_announcement_list() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""
+       |案号:${new_map("case_no")}万元\n
+       |上诉人:${new_map("plaintiff")}\n
+       |被上诉人:${new_map("litigant")}\n
+       |刊登日期:${new_map("publish_date")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "announcement_type_code->公告类型"
+    , "plaintiff->原告"
+    , "bltn_no->公告号"
+    , "court_name->法院名"
+    , "deal_grade->处理等级名称"
+    , "litigant->当事人"
+    , "judge->法官"
+    , "province->省份"
+    , "judge_phone->法官电话"
+    , "case_no->案件号"
+    , "content->案件内容"
+    , "publish_page->刊登版面"
+    , "publish_date->刊登日期"
+  ))
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 58 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala

@@ -0,0 +1,58 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 开庭公告
+ */
+case class company_court_open_announcement_list() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+
+    s"""
+       |开庭时间:${new_map("start_date")}\n
+       |案号:${new_map("case_no")}\n
+       |案由:${new_map("case_reason")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "case_reason->案由"
+    ,"case_no->案号"
+    ,"start_date->开庭时间"
+    ,"area->地区"
+    ,"plan_date->排期日期"
+    ,"judge->审判长/主审人"
+    ,"litigant->当事人"
+    ,"court->法院"
+    ,"court_room->法庭"
+    ,"content->公告内容"
+  ))
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 立案信息
+ */
+case class company_court_register_list() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+
+    s"""
+       |立案日期:${new_map("filing_date")}\n
+       |上诉人:${new_map("plaintiff")}\n
+       |被上诉人:${new_map("defendant")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "case_reason->案由"
+    ,"case_no->案号"
+    ,"filing_date->立案日期"
+    ,"start_time->开庭时间"
+    ,"department->承办部门"
+    ,"court->法院"
+    ,"judge->承办法官"
+    ,"assistant->法官助理"
+    ,"case_type->案件类型"
+    ,"case_status->案件状态"
+    ,"plaintiff->上诉人"
+    ,"defendant->被上诉人"
+    ,"third->第三人"
+  ))
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 46 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala

@@ -0,0 +1,46 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 双随机抽查
+ */
+case class company_double_random_check_info() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""
+       |任务编号:${new_map("check_task_num")}\n
+       |任务名称:${new_map("check_task_name")}\n
+       |抽查机关:${new_map("check_department")}\n
+       |完成日期:${new_map("check_date")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala

@@ -84,5 +84,5 @@ case class company_equity_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "1"
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = null
 }

+ 54 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_illegal_info.scala

@@ -0,0 +1,54 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/11 17:39
+ * @Description: 严重违法
+ */
+case class company_illegal_info() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert", "update")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    if (old_map == null)
+      return s"新增严重违法记录:${new_map.getOrElse("put_reason", "")}"
+
+    if (old_map.get("remove_date") == null && new_map("remove_date") != null)
+      return s"移除严重违法记录:${new_map.getOrElse("remove_reason", "")}"
+
+    null
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "put_reason->列入原因"
+    , "put_date->列入时间"
+    , "remove_date->移除时间"
+    , "put_department->列入决定机关"
+    , "remove_reason->移除原因"
+    , "remove_department->移除决定机关"
+    , "type->类型"
+  ))
+}

+ 49 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala

@@ -0,0 +1,49 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 司法拍卖
+ */
+case class company_judicial_sale_combine_list() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+
+    s"""
+       |标题:${new_map("title")}\n
+       |起拍价:${new_map("initial_price")}\n
+       |拍卖时间:${new_map("start_time")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "source_id->address"
+  ))
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 78 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala

@@ -0,0 +1,78 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/8/12
+ * @Description TODO
+ */
+//送达公告
+case class company_send_announcement_list()extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    s"""发布日期:$new_map("start_date")\n
+       |法院名称:$new_map("court")\n""".stripMargin
+  }
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    if(new_map("defendant_cids").contains("cid"))//原告
+      {
+        "警示信息"
+      }
+    else {
+        "提示信息"
+      }
+  }
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return cid
+   *         cname
+   *         info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String): Seq[(String, String, String, String, String, String, String, String, String, String)] = {
+     super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map, cname, "该企业发布送达公告信息")
+  }
+}

+ 75 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala

@@ -0,0 +1,75 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11 17:39
+ * @Description: 税收违法
+ */
+case class company_tax_contravention() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""
+       |案件性质:${new_map("case_type")}\n
+       |发布日期:${new_map("publish_time")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
+      new_map.toJson(Seq(
+        "taxpayer_name->纳税人名称"
+        , "taxpayer_number->纳税人识别号"
+        , "taxpayer_code->组织机构代码"
+        , "address->注册地址"
+        , "publish_time->发布日期"
+        , "case_type->案件性质"
+        , "department->所属税务机关"
+        , "case_info->主要违法事实"
+        , "legal_person_info->法定代表人或负责人"
+//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+        , "responsible_person_info->负有责任的财务负责人"
+//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+        , "responsible_department_info->负有直接责任的中介机构"
+      ))
+  }
+
+  def splitInfo(s: String) = {
+    if (StringUtils.isNotBlank(s) && s.replaceAll(":", ",").split(",").size == 4) {
+      val Array(name, sex, card, number) = s.replaceAll(":", ",").split(",")
+      (name, sex, card, number)
+    } else {
+      ("", "", "", "")
+    }
+  }
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -23,6 +23,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
   val tabMapping =
     Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
+      ,"company_send_announcement" -> ("litigant_cids",",")//送达公告
     )
 
   val funMap =

+ 24 - 2
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -11,11 +11,33 @@ object DateUtils {
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
     val fm = new SimpleDateFormat(pattern)
-    fm.parse(date).getTime/1000
+    fm.parse(date).getTime / 1000
+  }
+
+
+  /**
+   * 获取最大的一个日期,如果有异常情况则反回第一个日期
+   *
+   * @param date1
+   * @param date2
+   */
+  def getMaxDate(date1: String, date2: String): String = {
+    try {
+      if (date1.length != date2.length) {
+        return date1
+      }
+      date1.compareTo(date2) match {
+        case -1 => date2
+        case 1 => date1
+        case _ => date1
+      }
+    } catch {
+      case e: Exception => date1
+    }
   }
 
   def main(args: Array[String]): Unit = {
-    println(toUnixTimestamp(date = "2020-08-10 11:22:08"))
+    println(getMaxDate("2003-10-12 10:00:00", "2003-11-12 00:00:01"))
   }
 
 }