Procházet zdrojové kódy

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
许家凯 před 4 roky
rodič
revize
f9921d040a
20 změnil soubory, kde provedl 457 přidání a 13 odebrání
  1. 10 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala
  2. 2 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala
  3. 4 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala
  4. 5 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala
  5. 107 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyJudicialSaleCombine.scala
  6. 8 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  7. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_check_info.scala
  8. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_announcement_list.scala
  9. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_open_announcement_list.scala
  10. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_register_list.scala
  11. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_double_random_check_info.scala
  12. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_judicial_sale_combine_list.scala
  13. 14 1
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_mortgage_info.scala
  14. 24 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_stock_announcement.scala
  15. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_tax_contravention.scala
  16. 12 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  17. 6 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  18. 49 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala
  19. 11 4
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala
  20. 9 2
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -31,8 +31,18 @@ object CalcIncrTotal {
   //winhc_eci_dev company_patent new_cid,pub_number,app_number cids
 
   //winhc_eci_dev company_court_open_announcement new_cid,case_no,plaintiff,defendant cids
+  //winhc_eci_dev company_court_register new_cid,case_no,plaintiff,defendant cids
+  //winhc_eci_dev company_court_announcement new_cid,plaintiff,litigant,publish_date,case_no cids
 
 
+  //  winhc_eci_dev company_check_info new_cid,check_org,check_date cid
+  //  winhc_eci_dev company_tax_contravention new_cid,taxpayer_number,case_info cid
+
+  //  winhc_eci_dev company_double_random_check_info check_task_num,new_cid cid
+  //  winhc_eci_dev company_double_random_check_result_info main_id,check_item,new_cid cid
+
+  //  winhc_eci_dev company_judicial_sale_combine main_id,new_cid cids
+
   def main(args: Array[String]): Unit = {
 
     val Array(project, tableName, dupliCols, flag) = args

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -136,6 +136,8 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |            WHERE   ds = $adsListDs
          |            AND     announcement_type = '起诉状副本及开庭传票'
          |            AND     LENGTH(plaintiff_name) > 4
+         |            AND     plaintiff_name not like '%银行%'
+         |            AND     plaintiff_name not like '%保险%'
          |        ) x
          |WHERE   num = 1 AND publish_date >= '${atMonthsBefore(3)}'
          |""".stripMargin).cache().createOrReplaceTempView("announcement")

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -20,6 +20,10 @@ object CompanyForCid {
       "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
 //  winhc_eci_dev ods_company_own_tax tax_balance,tax_category,tax_num,new_cid
+//  winhc_eci_dev company_check_info check_org,check_date,new_cid
+//  winhc_eci_dev company_tax_contravention taxpayer_number,case_info,new_cid
+//  winhc_eci_dev company_double_random_check_info check_task_num,new_cid
+//  winhc_eci_dev company_double_random_check_result_info main_id,check_item,new_cid
 
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args

+ 5 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCids.scala

@@ -18,7 +18,12 @@ object CompanyForCids {
     )
 
   //  winhc_eci_dev company_court_open_announcement case_no,plaintiff,defendant,new_cid
+  //  winhc_eci_dev company_court_register case_no,plaintiff,defendant,new_cid
   //  winhc_eci_dev company_copyright_reg reg_num,new_cid
+  //  winhc_eci_dev company_court_announcement new_cid,plaintiff,litigant,publish_date,case_no
+  //  winhc_eci_dev company_judicial_sale new_cid,title
+
+  //winhc_eci_dev company_judicial_sale_combine new_cid,main_id
 
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args

+ 107 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyJudicialSaleCombine.scala

@@ -0,0 +1,107 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 司法拍卖两表合并
+ * @author π
+ * @date 2020/8/1115:35
+ */
+object CompanyJudicialSaleCombine {
+  def main(args: Array[String]): Unit = {
+    //winhc_eci_dev company_judicial_sale
+    val Array(project, tableName) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |""".stripMargin)
+    if (args.length != 2) {
+      println("请输入 project:项目, tableName:表名 !!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyJudicialSaleCombine(spark,project,tableName).calc
+    spark.stop()
+  }
+
+}
+case class CompanyJudicialSaleCombine(s: SparkSession,
+                                      project: String, //表所在工程名
+                                      tableName: String //表名(不加前后辍)
+                                     ) extends LoggingUtils with CompanyMapping{
+  override protected val spark: SparkSession = s
+
+  def calc ={
+
+    val inc_ods_v0 = s"${project}.inc_ods_${tableName}_item" //ods源表
+    val inc_ods_v1 = s"${project}.inc_ods_${tableName}_item" //ods源表
+    val inc_ods_v2 = s"${project}.inc_ods_${tableName}_combine" //ods合并表
+    //增量ods最新分区
+    val v0 = BaseUtil.getFirstPartion(inc_ods_v0, spark)
+    val v1 = BaseUtil.getPartion(inc_ods_v1, spark)
+    var v2 = BaseUtil.getPartion(inc_ods_v2, spark)
+    if(StringUtils.isBlank(v2)){
+      v2 = BaseUtil.atDaysAfter(-1, v0)
+    }
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ods_company_judicial_sale_combine PARTITION(ds='$v1')
+        |SELECT  a.id
+        |        ,a.main_id
+        |        ,a.cids
+        |        ,a.title AS name
+        |        ,a.initial_price
+        |        ,a.current_price
+        |        ,a.consult_price
+        |        ,a.start_time
+        |        ,a.end_time
+        |        ,a.pic_source_url
+        |        ,a.pic_oss_url
+        |        ,a.create_time
+        |        ,a.update_time
+        |        ,a.deleted
+        |        ,b.title
+        |        ,b.introduction
+        |        ,b.court
+        |        ,b.pub_time
+        |        ,b.end_date
+        |        ,b.content
+        |        ,b.source_tag
+        |        ,b.source_id
+        |FROM    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY cids,main_id ORDER BY update_time DESC) num1
+        |                        FROM    winhc_eci_dev.inc_ods_company_judicial_sale_item
+        |                        WHERE   ds > $v2 AND ds <= $v1
+        |                        AND     cids IS NOT NULL
+        |                    ) c
+        |            WHERE   num1 = 1
+        |        ) a
+        |LEFT JOIN (
+        |              SELECT  *
+        |              FROM    (
+        |                          SELECT  *
+        |                                  ,ROW_NUMBER() OVER(PARTITION BY cids,title ORDER BY update_time DESC) num2
+        |                          FROM    winhc_eci_dev.inc_ods_company_judicial_sale
+        |                          WHERE   ds > $v2 AND ds <= $v1
+        |                      ) d
+        |              WHERE   num2 = 1
+        |          ) b
+        |ON      a.main_id = b.id
+        |WHERE   b.id IS NOT NULL
+        |""".stripMargin)
+  }
+}

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -300,6 +300,14 @@ object ChangeExtract {
     , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy=false) //破产重整
     , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num")//公示催告
     , Args(tableName = "company_mortgage_info", primaryFields = "reg_num")//动产抵押
+    , Args(tableName = "company_stock_announcement", primaryFields = "title")//企业公告
+    , Args(tableName = "company_check_info", primaryFields = "check_result")//抽查检查
+    , Args(tableName = "company_court_announcement_list", primaryFields = "content")//法院公告
+    , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason")//开庭公告
+    , Args(tableName = "company_court_register_list", primaryFields = "area")//立案信息
+    , Args(tableName = "company_double_random_check_info", primaryFields = "check_plan_name")//双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", primaryFields = "title")//司法拍卖
+    , Args(tableName = "company_tax_contravention", primaryFields = "case_type")//税收违法
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_check_info.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:抽查检查
+ */
+
+case class company_check_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_org"), s"抽查检查发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_org"), s"新增抽查检查")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "抽查检查", Array("check_org", "check_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("check_date"))){
+      newMap("update_time")
+    }else{
+      newMap("check_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_announcement_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:法院公告
+ */
+
+case class company_court_announcement_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("bltn_no"), s"${newMap("bltn_no")}法院公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("bltn_no"), s"新增${newMap("bltn_no")}法院公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "法院公告", Array("bltn_no", "publish_date", "case_no"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("publish_date"))){
+      newMap("update_time")
+    }else{
+      newMap("publish_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_open_announcement_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:开庭公告
+ */
+
+case class company_court_open_announcement_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}开庭公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}开庭公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "开庭公告", Array("case_no", "start_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String ={
+    if(StringUtils.isBlank(newMap("start_date"))){
+      newMap("update_time")
+    }else{
+      newMap("start_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_court_register_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:立案信息
+ */
+
+case class company_court_register_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}立案信息发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}立案信息")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "立案信息", Array("case_no", "filing_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("filing_date"))){
+      newMap("update_time")
+    }else{
+      newMap("filing_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_double_random_check_info.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:双随机抽查
+ */
+
+case class company_double_random_check_info(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_task_num"), s"${newMap("check_task_num")}双随机抽查发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("check_task_num"), s"新增${newMap("check_task_num")}双随机抽查")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "双随机抽查", Array("check_task_num", "check_date"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("check_date"))){
+      newMap("update_time")
+    }else{
+      newMap("check_date")
+    }
+  }
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_judicial_sale_combine_list.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:司法拍卖
+ */
+
+case class company_judicial_sale_combine_list(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}司法拍卖发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}司法拍卖")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "司法拍卖", Array("title", "pub_time"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("pub_time"))){
+      newMap("update_time")
+    }else{
+      newMap("pub_time")
+    }
+  }
+}

+ 14 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_mortgage_info.scala

@@ -20,7 +20,20 @@ case class company_mortgage_info(equCols: Seq[String]) extends CompanyChangeHand
     str
   }
 
-  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_date")
+  override def getBizTime(newMap: Map[String, String]): String = {
+    //若公示日期为null则取登记日期
+    if(newMap("publish_date")==null) {
+      if(newMap("reg_date")==null){
+        newMap("update_time")
+      }
+      else {
+        newMap("reg_date")
+      }
+    }
+    else {
+      newMap("publish_date")
+    }
+  }
 
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("reg_num"), s"${newMap("reg_num")}动产抵押发生变更")
 

+ 24 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_stock_announcement.scala

@@ -0,0 +1,24 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: Yan Yongnian
+ * @Date: 2020/8/11
+ * @Description:
+ */
+
+
+//企业公告(上市公告)
+
+case class company_stock_announcement(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}企业公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}企业公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "企业公告", Array("title", "time", "type"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("time")
+}

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_tax_contravention.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:税收违法
+ */
+
+case class company_tax_contravention(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("taxpayer_number"), s"${newMap("taxpayer_number")}税收违法发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("taxpayer_number"), s"新增${newMap("taxpayer_number")}税收违法")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "税收违法", Array("taxpayer_number", "publish_time"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("publish_time"))){
+      newMap("update_time")
+    }else{
+      newMap("publish_time")
+    }
+  }
+}

+ 12 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -143,7 +143,7 @@ object CompanyDynamic {
           None
         }
         else {
-          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._8), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
         }
       })
 
@@ -183,6 +183,17 @@ object CompanyDynamic {
     , Args(tableName = "company", bName = 0)
     , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 0)
+    , Args(tableName = "company_land_publicity", bName = 1)
+    , Args(tableName = "company_land_announcement", bName = 1)
+    , Args(tableName = "company_bid_list", bName = 1)
+    , Args(tableName = "company_land_transfer", bName = 1)
+    , Args(tableName = "company_env_punishment", bName = 1)
+    , Args(tableName = "company_punishment_info", bName = 1)
+    , Args(tableName = "company_punishment_info_creditchina", bName = 1)
+    , Args(tableName = "bankruptcy_open_case", bName = 1)
+    , Args(tableName = "company_public_announcement2_list", bName = 1)
+    , Args(tableName = "company_mortgage_info", bName = 1)
+    , Args(tableName = "company_stock_announcement", bName = 1)
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 6 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -27,10 +27,10 @@ trait CompanyDynamicHandle {
     , "" -> "" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
     , "company_punishment_info_creditchina" -> "punishment_info_creditchina" //行政处罚-信用中国
-    , "" -> "eci_chattel" //动产抵押
+    , "company_mortgage_info" -> "eci_chattel" //动产抵押
     , "company_env_punishment" -> "env_punishment" //环保处罚
     , "" -> "judicial_assistance" //股权冻结
-    , "" -> "publish_notice" //公示催告
+    , "company_public_announcement2_list" -> "company_public_announcement2_list" //公示催告
     , "" -> "serious_violation" //严重违法
     , "" -> "simple_cancellation" //简易注销
     , "company_equity_info" -> "stock_pledge" //股权出质
@@ -43,6 +43,7 @@ trait CompanyDynamicHandle {
     , "" -> "actual_controller_change" //实际控制人变更
     , "" -> "court_notice" //开庭公告
     , "bankruptcy_open_case" -> "bankruptcy_open_case" //破产重整
+    , "company_stock_announcement" -> "company_stock_announcement" //企业公告
 
     , "company_staff" -> "company_staff" //主要成员
   )
@@ -57,7 +58,7 @@ trait CompanyDynamicHandle {
     , "" -> "6" // 裁判文书
     , "" -> "7" // 法院公告
     , "" -> "8" // 对外投资
-    , "" -> "9" // 动产抵押
+    , "company_mortgage_info" -> "9" // 动产抵押
     , "" -> "10" // 司法拍卖
     , "company_land_publicity" -> "11-1" // 土地信息-地块公示
     , "company_land_announcement" -> "11-2" // 土地信息-购地信息
@@ -67,7 +68,7 @@ trait CompanyDynamicHandle {
     , "" -> "13" // 招聘信息
     , "company_punishment_info" -> "14-1" // 行政处罚
     , "company_punishment_info_creditchina" -> "14-2" // 行政处罚-信用中国
-    , "" -> "15" // 公示催告
+    , "company_public_announcement2_list" -> "15" // 公示催告
     , "company_env_punishment" -> "16" // 环保处罚
     , "company_equity_info" -> "17" // 股权出质
     , "" -> "18" // 严重违法
@@ -84,7 +85,7 @@ trait CompanyDynamicHandle {
     , "" -> "29" // 最终受益人
     , "company_staff" -> "30" // 主要成员
     , "" -> "31" // 融资动态
-    , "" -> "32" // 企业公告
+    , "company_stock_announcement" -> "32" // 企业公告
     , "" -> "33" // 抽查检查
     , "" -> "34" // 行政许可
     , "" -> "35" // 双随机抽查

+ 49 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala

@@ -0,0 +1,49 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * @Author yyn
+ * @Date 2020/8/11
+ * @Description TODO
+ */
+//企业公告
+case class company_stock_announcement()extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    s"""公告名称:$new_map("title")\n
+       |公告日期:$new_map("time")\n""".stripMargin
+  }
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 11 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -15,12 +15,16 @@ import scala.annotation.meta.getter
 case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
+  val rowKeyMapping =
+    Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
+    )
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     prepareFunctions(spark)
-    val odsTable = s"${space}.$sourceTable"
-    val adsTable = s"${space}.ads_${sourceTable.substring(4)}"
-    val companyMapping = s"${space}.company_map"
+    val odsTable = s"${space}.ods_$sourceTable"
+    val adsTable = s"${space}.ads_${sourceTable}"
+    val companyMapping = s"${space}.company_name_mapping_pro_v2"
     val ds = BaseUtil.getPartion(odsTable, spark)
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
@@ -28,6 +32,9 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
 
     val cols_md5 = disCol.filter(!_.equals("new_cid"))
 
+    //rowkey前缀匹配
+    val rowKeyPre = rowKeyMapping.getOrElse(sourceTable,"new_cid")
+
    val ddl =  spark.table(odsTable).schema.filter(s=>{!"ds".equals(s.name)}).map(s=>{
 
       val name = s.name
@@ -57,7 +64,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |        SELECT
          |                *
          |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY id DESC ) num
-         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
          |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT

+ 9 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -18,6 +18,10 @@ case class CompanyIncrForCidUtils(s: SparkSession,
                                   dupliCols: Seq[String] // 去重列
                                  ) extends LoggingUtils with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
+ //主键字段
+  val rowKeyMapping =
+    Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
+    )
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -73,6 +77,9 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
     })
 
+    //rowkey前缀匹配
+    val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")
+
     sql(
       s"""
          |SELECT  cid,current_cid as new_cid
@@ -93,7 +100,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |        ,cid
          |        ,${columns.mkString(",")}
          |FROM    (
-         |            SELECT  CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |            SELECT  CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
          |                    ,flag
          |                    ,new_cid
          |                    ,cid
@@ -142,7 +149,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       inc_ads_company_tb,
       tableName,
       lastDsIncOds,
-      s"CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
+      s"CONCAT_WS('_',$rowKeyPre,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
     ).syn()
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)