Parcourir la source

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 il y a 4 ans
Parent
commit
8238f04300
33 fichiers modifiés avec 385 ajouts et 67 suppressions
  1. 1 1
      src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala
  2. 6 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala
  3. 60 20
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala
  4. 1 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala
  5. 161 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyWenshuDetailCombine.scala
  6. 1 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  7. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/creditor_info_add_other.scala
  8. 28 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/wenshu_detail_combine.scala
  9. 1 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  10. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  11. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala
  12. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala
  13. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala
  14. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala
  15. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_announcement.scala
  16. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala
  17. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala
  18. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala
  19. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala
  20. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala
  21. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala
  22. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala
  23. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala
  24. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala
  25. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala
  26. 56 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  27. 4 2
      src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala
  28. 1 2
      src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala
  29. 1 1
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala
  30. 13 10
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala
  31. 15 10
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala
  32. 7 1
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala
  33. 9 0
      src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -26,7 +26,7 @@ object DataFrame2HBaseHelper {
         for (f <- fields) {
           val v = row.getAs[String](f.toLowerCase)
           if (v != null) {
-            put.addColumn(f_bytes, Bytes.toBytes(f), Bytes.toBytes(v))
+            put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
           }
         }
         (new ImmutableBytesWritable, put)

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -48,6 +48,12 @@ object CalcIncrTotal {
   //  winhc_eci_dev company_license_creditchina new_cid,licence_number cid
   //  winhc_eci_dev company_license_entpub new_cid,license_number cid
 
+  //  winhc_eci_dev company_liquidating_info new_cid,brief_cancel_result,announcement_apply_date cid
+
+  //  winhc_eci_dev wenshu_detail_combine new_cid,case_no,cname cid
+
+
+
 
   def main(args: Array[String]): Unit = {
 

+ 60 - 20
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{DataTypeUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{CompanyIncSummary, DataTypeUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -146,14 +146,19 @@ object CompanyAnnualReport {
 
       val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_company_annual_report", "0")
       val ads_end_ds = getLastPartitionsOrElse(s"$project.ads_company_annual_report", "0")
+      var inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_annual_report", "0")
 
+      if (inc_ods_end_ds.equals(inc_ads_end_ds)) {
+        println("inc_ods_end_ds = inc_ads_end_ds ! ")
+        inc_ads_end_ds = getSecondLastPartitionOrElse(s"$project.inc_ads_company_annual_report", "0")
+      }
       val columns = getColumns(s"$project.ads_company_annual_report").diff(Seq("ds", "rowkey", "new_cid", "cid"))
 
       sql(
         s"""
            |SELECT  cid,current_cid as new_cid
            |FROM    $project.inc_ods_company
-           |WHERE   ds > ${ads_end_ds}
+           |WHERE   ds > ${inc_ads_end_ds}
            |AND     cid IS NOT NULL
            |AND     current_cid IS NOT NULL
            |GROUP BY cid,current_cid
@@ -180,12 +185,12 @@ object CompanyAnnualReport {
            |                FROM    (
            |                        SELECT  *
            |                        FROM    $project.inc_ods_company_annual_report
-           |                        WHERE   ds > $ads_end_ds and cid is not null
+           |                        WHERE   ds > $inc_ads_end_ds and cid is not null
            |                        ) a
            |                LEFT JOIN (
            |                        select *
            |                        from $project.base_company_mapping
-           |                        where ds =${getLastPartitionsOrElse(s"$project.base_company_mapping", "0")}
+           |                        where ds = ${getLastPartitionsOrElse(s"$project.base_company_mapping", "0")}
            |                ) b
            |                ON      a.cid = b.cid
            |                UNION ALL
@@ -205,7 +210,7 @@ object CompanyAnnualReport {
            |                                new_cid AS cid
            |                                ,${columns.mkString(",")}
            |                        FROM    $project.inc_ads_company_annual_report
-           |                        WHERE   ds > ${ads_end_ds}
+           |                        WHERE   ds > ${inc_ads_end_ds}
            |                     ) b
            |                ON      a.cid = b.cid
            |                ) c
@@ -215,6 +220,26 @@ object CompanyAnnualReport {
 
       //todo 只写入hbase
 
+      val writCols = getColumns("winhc_eci_dev.inc_ads_company_annual_report").diff(Seq(
+        "ds"
+        , "id"
+        , "new_cid"
+        , "cid"
+      ))
+
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  new_cid as cid,${writCols.mkString(",")}
+           |FROM    winhc_eci_dev.inc_ads_company_annual_report
+           |WHERE   ds = '$inc_ods_end_ds'
+           |""".stripMargin)
+        .save2HBase("COMPANY_ANNUAL_REPORT"
+          , "rowkey"
+          , "cid" +: writCols)
+
+      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("rowkey")).calc
+
     }
 
 
@@ -287,8 +312,13 @@ object CompanyAnnualReport {
       utils.getMainTmpTab("main_table_tmp")
 
 
-      val inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_$project", "0")
-      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_$project", "0")
+      var inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_$tableName", "0")
+      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_$tableName", "0")
+
+      if (inc_ads_end_ds.equals(inc_ods_end_ds)) {
+        println("inc_ads_end_ds = inc_ods_end_ds !")
+        inc_ads_end_ds = getSecondLastPartitionOrElse(s"$project.inc_ads_$tableName", "0")
+      }
 
       sql(
         s"""
@@ -325,8 +355,24 @@ object CompanyAnnualReport {
            |WHERE   t4.num = 1
            |""".stripMargin)
 
-      //todo 只写入hbase
+      val writeCols = getColumns(s"winhc_eci_dev.inc_ads_$tableName").diff(Seq(
+        "ds"
+        , "id"
+        , "new_cid"
+        , "cid"
+      ))
 
+      //todo 只写入hbase
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  new_cid as cid,${writeCols.mkString(",")}
+           |FROM    winhc_eci_dev.inc_ads_$tableName
+           |WHERE   ds = '$inc_ods_end_ds'
+           |""".stripMargin)
+        .save2HBase(tableName.toUpperCase
+          , "rowkey"
+          , "cid" +: writeCols)
     }
   }
 
@@ -352,32 +398,26 @@ object CompanyAnnualReport {
       , "company_annual_report_change" -> "change_item,change_time" //年报-变更
       , "company_annual_report_equity_change" -> "investor_name,change_time" //年报-股权变更
       , "company_annual_report_holder" -> "investor_name" //年报-股东
-      , "company_annual_report_out_guarantee" -> "id" //年报-对外担保
+      //      , "company_annual_report_out_guarantee" -> "id" //年报-对外担保
       , "company_annual_report_webinfo" -> "website" //年报-网站
       , "company_annual_report_social_security" -> "" //年报-社保 (不采取去重,和main_id一对一)
     )
 
-    //    for (elem <- sublist_map) {
-    //      CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
-    //    }
-
-    //    for (e <- sublist_map) {
-    //      CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
-    //    }
-
-    val all_flag = true
+    val all_flag = false
 
     if (all_flag) {
       //存量
-      CompanyAnnualReportHandle(spark, project).main_table_all()
+      //      CompanyAnnualReportHandle(spark, project).main_table_all()
       for (elem <- sublist_map) {
+        println("xjk:" + elem._1)
         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
       }
     } else {
       //增量
       CompanyAnnualReportHandle(spark, project).main_table_inc()
       for (e <- sublist_map) {
-        CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
+        println("xjk:" + e._1)
+        CompanyAnnualReportHandle(spark, project).sublist_inc(e._1, e._2.split(","))
       }
     }
     spark.stop()

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -34,6 +34,7 @@ object CompanyForCid {
 //  winhc_eci_dev ods_company_finance new_cid,round,money
 //  winhc_eci_dev ods_company_dishonest_info new_cid,case_no
 
+//  winhc_eci_dev wenshu_detail_combine new_cid,case_no,cname,name_type
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args
 

+ 161 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyWenshuDetailCombine.scala

@@ -0,0 +1,161 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+/**
+ * @Description: 文书增量添加cid
+ * @author π
+ * @date 2020/8/17
+ */
+object CompanyWenshuDetailCombine {
+  def main(args: Array[String]): Unit = {
+    val project ="winhc_eci_dev"
+    val tableName ="inc_ods_wenshu_detail_combine"
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyWenshuDetailCombine(spark,project,tableName).calc
+    spark.stop()
+  }
+
+}
+case class CompanyWenshuDetailCombine(s: SparkSession,
+                                      project: String, //表所在工程名
+                                      tableName: String //表名(不加前后辍)
+                                     ) extends LoggingUtils with CompanyMapping{
+  override protected val spark: SparkSession = s
+
+  def calc ={
+    prepareFunctions(spark)
+    var v1 = BaseUtil.getPartion("winhc_eci_dev.inc_ods_wenshu_detail_combine", spark)//添加cid后文书-最新分区
+    val v2 = BaseUtil.getPartion("winhc_eci.ods_wenshu_detail", spark)//缺cid文书-最新分区
+
+    val mapDs = BaseUtil.getPartion("winhc_eci_dev.base_company_mapping", spark)//cid映射最新分区
+
+    if(StringUtils.isBlank(v1)){
+      v1="20200604"
+    }
+
+    println(
+      s"""
+        |v1:$v1
+        |v2:$v2
+        |mapDs:$mapDs
+        |""".stripMargin)
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ods_wenshu_detail_combine PARTITION(ds='$v2')
+        |SELECT  name_type
+        |        ,e.cname
+        |        ,f.new_cid AS cid
+        |        ,case_id
+        |        ,uuid
+        |        ,docid
+        |        ,case_no
+        |        ,doc_type
+        |        ,case_type
+        |        ,case_reason_level2
+        |        ,case_reason_level3
+        |        ,case_reason_level4
+        |        ,case_reason
+        |        ,case_reason_levelnum
+        |        ,case_stage
+        |        ,case_amt
+        |        ,party_info
+        |        ,court_name
+        |        ,court_province
+        |        ,court_city
+        |        ,court_level
+        |        ,yg_info
+        |        ,yg_type
+        |        ,yg_name
+        |        ,yg_wtdlr
+        |        ,yg_faren
+        |        ,yg_lawyer
+        |        ,bg_info
+        |        ,bg_type
+        |        ,bg_name
+        |        ,bg_wtdlr
+        |        ,bg_faren
+        |        ,bg_lawyer
+        |        ,third_party
+        |        ,danbao
+        |        ,fact
+        |        ,court_view
+        |        ,judge
+        |        ,clerk
+        |        ,judge_date_cn
+        |        ,judge_date
+        |        ,judge_year
+        |        ,judge_result
+        |        ,is_success
+        |        ,url
+        |        ,head
+        |        ,title
+        |        ,legal_basis
+        |        ,keywords
+        |        ,plaintiffs
+        |        ,defendants
+        |        ,crawl_date
+        |        ,update_date
+        |        ,sample_type
+        |        ,judge_main
+        |FROM    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER (PARTITION BY case_no,cname,name_type ORDER BY update_date DESC) num
+        |                        FROM    (
+        |                                    SELECT  *
+        |                                    FROM    (
+        |                                                SELECT  "y" AS name_type
+        |                                                        ,*
+        |                                                FROM    winhc_eci.ods_wenshu_detail
+        |                                                LATERAL VIEW explode(split(yg_name, '\n')) tmpTable AS cname
+        |                                                WHERE   ds > '$v1'
+        |                                                AND     yg_type = '企业'
+        |                                            ) c
+        |                                    UNION ALL
+        |                                    SELECT  *
+        |                                    FROM    (
+        |                                                SELECT  "b" AS name_type
+        |                                                        ,*
+        |                                                FROM    winhc_eci.ods_wenshu_detail
+        |                                                LATERAL VIEW explode(split(bg_name, '\n')) tmpTable AS cname
+        |                                                WHERE   ds > '$v1'
+        |                                                AND     bg_type = '企业'
+        |                                            ) d
+        |                                ) e
+        |                    ) x
+        |            WHERE   num = 1
+        |        ) e
+        |JOIN    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY update_time DESC) num
+        |                        FROM    winhc_eci_dev.base_company_mapping
+        |                        WHERE   ds = '$mapDs'
+        |                    ) k
+        |            WHERE   num = 1
+        |            AND     length(CLEANUP(cname)) > 4
+        |        ) f
+        |ON      CLEANUP(e.cname) = CLEANUP(f.cname)
+        |""".stripMargin)
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -318,6 +318,7 @@ object ChangeExtract {
     , Args(tableName = "company_license_creditchina", primaryFields = "licence_content")//行政许可-信用中国
     , Args(tableName = "company_license_entpub", primaryFields = "license_name")//行政许可-企业公示
     , Args(tableName = "company_license", primaryFields = "license_name")//行政许可
+    , Args(tableName = "wenshu_detail_combine", primaryFields = "license_name")//文书
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/creditor_info_add_other.scala

@@ -315,7 +315,7 @@ object creditor_info_add_other {
     //    val ds = "20200721"
     val spark = SparkUtils.InitEnv("add_cols_other", map)
 
-    val table = "xjk_tmp_ads_cre_info_test_v2"
+    val table = "xjk_tmp_ads_cre_info_test_v3"
 
     val v = add_cols_other(spark, project)
     v.prefix(ds, table)

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/wenshu_detail_combine.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/18
+ * @Description:裁判文书
+ */
+
+case class wenshu_detail_combine(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cname"), s"裁判文书")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cname"), s"裁判文书")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "裁判文书", Array("case_no", "cname"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("judge_date"))){
+      newMap("update_date")
+    }else{
+      newMap("judge_date")
+    }
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -213,6 +213,7 @@ object CompanyDynamic {
     , Args(tableName = "company_double_random_check_info", bName = 1)//双随机抽查
     , Args(tableName = "company_judicial_sale_combine_list", bName = 1)//司法拍卖
     , Args(tableName = "company_tax_contravention", bName = 1)//税收违法
+    , Args(tableName = "wenshu_detail_combine", bName = 1)//裁判文书
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -22,7 +22,7 @@ trait CompanyDynamicHandle {
     , "" -> "tender_qichacha" //中标信息企查查
     , "company_abnormal_info" -> "eci_exception" //经营异常
     , "" -> "eci_zscq" //知识产权
-    , "" -> "eci_wenshu" //裁判文书
+    , "wenshu_detail_combine" -> "eci_wenshu" //裁判文书
     , "" -> "court_announcement" //法院公告
     , "" -> "" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
@@ -66,7 +66,7 @@ trait CompanyDynamicHandle {
     , "" -> "3" // 企业股东失信被执
     , "company_abnormal_info" -> "4" // 经营异常
     , "" -> "5" // 知识产权
-    , "" -> "6" // 裁判文书
+    , "wenshu_detail_combine" -> "6" // 裁判文书
     , "company_court_announcement_list" -> "7" // 法院公告
     , "" -> "8" // 对外投资
     , "company_mortgage_info" -> "9" // 动产抵押

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/5
- * @Description TODO
+ * @Description
  */
 //破产公告
 case class bankruptcy_open_case() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/13
- * @Description TODO
+ * @Description
  */
 //年报-对外担保
 case class company_annual_report_out_guarantee()extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/7/28
- * @Description TODO
+ * @Description
  */
 case class company_bid_list() extends CompanyDynamicHandle {
   /**

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/7/27
- * @Description TODO
+ * @Description
  */
 //环保处罚
 case class company_env_punishment()extends CompanyDynamicHandle {

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_announcement.scala

@@ -5,10 +5,10 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //购地信息
-case class company_land_announcement()extends CompanyDynamicHandle {
+case class company_land_announcement() extends CompanyDynamicHandle {
   /**
    * 信息描述
    *
@@ -33,7 +33,7 @@ case class company_land_announcement()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+  //  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
 
   /**
    * 风险等级

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //土地抵押
 case class company_land_mortgage() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //地块公示
 case class company_land_publicity() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //土地转让
 case class company_land_transfer() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //动产抵押
 case class company_mortgage_info() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/6
- * @Description TODO
+ * @Description
  */
 //公示催告
 case class company_public_announcement2_list()extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //行政处罚
 case class company_punishment_info() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/4
- * @Description TODO
+ * @Description
  */
 //行政处罚-信用中国
 case class company_punishment_info_creditchina() extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/12
- * @Description TODO
+ * @Description
  */
 //送达公告
 case class company_send_announcement_list()extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/11
- * @Description TODO
+ * @Description
  */
 //企业公告
 case class company_stock_announcement()extends CompanyDynamicHandle {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala

@@ -5,7 +5,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 /**
  * @Author yyn
  * @Date 2020/8/14
- * @Description TODO
+ * @Description
  */
 //限制消费令
 case class company_zxr_restrict()extends CompanyDynamicHandle {

+ 56 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -0,0 +1,56 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/18
+ * @Description: 裁判文书
+ */
+case class wenshu_detail_combine() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    val t1 = new_map("name_type")
+    var t2 =""
+    if(StringUtils.isNotBlank(new_map("name_type"))){
+      if("y".equals(t1)){
+        t2 ="原告"
+      }else{
+        t2="被告"
+      }
+    }
+    s"""
+       |案由:${new_map("case_reason_level3")}\n
+       |案号:${new_map("case_no")}\n
+       |诉讼身份:${t2}\n
+       |发布日期:${new_map("judge_date")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -30,12 +30,12 @@ object inc_company_equity_info {
       cleanup()
       val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
 
-      val startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
+      var startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
       val endPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ods_company_equity_info", BaseUtil.getYesterday())
 
       if (startPart.equals(endPart)) {
         println("start partition = end partition!")
-        sys.exit(-1)
+        startPart = getSecondLastPartitionOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
       }
 
       sql(
@@ -172,6 +172,8 @@ object inc_company_equity_info {
         , "CONCAT_WS('_',cid,main_id)").syn()
       import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
 
+      //todo 没有写出摘要
+
       val outFields = getColumns("winhc_eci_dev.inc_ads_company_equity_info").map(_.toUpperCase)
       sql(
         s"""

+ 1 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala

@@ -6,13 +6,12 @@ import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col
 
-import scala.annotation.meta.getter
 import scala.collection.mutable
 
 /**
  * @Author yyn
  * @Date 2020/7/15
- * @Description TODO
+ * @Description
  */
 object inc_phx_cid_ads extends LoggingUtils {
   var config = mutable.Map(

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -55,7 +55,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |COMMENT 'TABLE COMMENT'
          |PARTITIONED BY (ds STRING COMMENT '分区')
          |""".stripMargin)
-    if (!spark.catalog.tableExists("adsTable")) {
+    if (!spark.catalog.tableExists(adsTable)) {
       return
     }
 

+ 13 - 10
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -24,6 +24,9 @@ case class CompanyIncSummary(s: SparkSession,
   @(transient@getter) val spark: SparkSession = s
   private val f_bytes: Array[Byte] = Bytes.toBytes("F")
   private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
+  val updateTimeMapping = Map(
+    "wenshu_detail_combine" -> "update_date" //文书排序时间
+  )
 
   def calc(): Unit = {
     val ads_table = s"${project}.ads_$tableName" //存量ads表
@@ -57,7 +60,7 @@ case class CompanyIncSummary(s: SparkSession,
          |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
          |FROM    (
          |            SELECT  tmp.*
-         |                    ,ROW_NUMBER() OVER(PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) c
+         |                    ,ROW_NUMBER() OVER(PARTITION BY ${dupliCols.mkString(",")} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC ) c
          |            FROM    (
          |                        SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
          |                        FROM    (
@@ -87,19 +90,19 @@ case class CompanyIncSummary(s: SparkSession,
          |SELECT  ${cidField} as cid
          |        ,COUNT(1) as num
          |FROM    inc_tmp_view
+         |where $cidField is not null
          |GROUP BY $cidField
+         |having count(1) >0
          |""".stripMargin)
       .select(Seq("cid", "num").map(column => col(column).cast("string")): _*)
       .rdd
-      .filter(r => {
-        r.get(1) != null && !"0".equals(r.getString(1))
-      }).map(row => {
-      val id = row.getString(0)
-      val num = row.getString(1)
-      val put = new Put(Bytes.toBytes(id))
-      put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
-      (new ImmutableBytesWritable, put)
-    }).filter(_ != null).saveAsHadoopDataset(jobConf)
+      .map(row => {
+        val id = row.getString(0)
+        val num = row.getString(1)
+        val put = new Put(Bytes.toBytes(id))
+        put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
+        (new ImmutableBytesWritable, put)
+      }).filter(_ != null).saveAsHadoopDataset(jobConf)
   }
 
   private def getCastCols(name: String, pre: String): String = {

+ 15 - 10
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -22,6 +22,10 @@ case class CompanyIncrForCidUtils(s: SparkSession,
   val rowKeyMapping =
     Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
     )
+//排序时间
+  val updateTimeMapping =
+    Map("wenshu_detail_combine" -> "update_date" //文书排序时间
+    )
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -66,11 +70,6 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       return
     }
 
-
-
-
-
-
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -96,7 +95,13 @@ case class CompanyIncrForCidUtils(s: SparkSession,
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
       println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      runDs = lastDsIncOds
+      //runDs = lastDsIncOds
+      val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
+      if (l1.size > 1) {
+        runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
+      }else{
+        runDs = firstDsIncOds
+      }
       //sys.exit(-1)
     }
 
@@ -110,12 +115,12 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
-
-
-
     //rowkey前缀匹配
     val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")
 
+    //排序时间
+    val update_time = updateTimeMapping.getOrElse(tableName,"update_time")
+
     sql(
       s"""
          |SELECT  cid,current_cid as new_cid
@@ -141,7 +146,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                    ,new_cid
          |                    ,cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY $update_time DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS flag
          |                                ,a.new_cid

+ 7 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -93,7 +93,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
       println("inc_ods equals inc_ads ds ,please delete last ds !!!")
-      runDs = lastDsIncOds
+      //runDs = lastDsIncOds
+      val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
+      if (l1.size > 1) {
+        runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
+      }else{
+        runDs = firstDsIncOds
+      }
       //sys.exit(-1)
     }
 

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -121,6 +121,15 @@ trait LoggingUtils extends Logging {
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
   }
 
+  def getSecondLastPartitionOrElse(t: String, default: String): String = {
+    val ps = getPartitions(t)
+    if (ps.length >= 2) {
+      ps(ps.length - 2)
+    } else {
+      default
+    }
+  }
+
   def getLastPartitionsOrElse(t: String, default: String): String = {
     val ps = getPartitions(t)
     if (ps.nonEmpty) {