xufei 4 роки тому
батько
коміт
f279c59140

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -48,6 +48,12 @@ object CalcIncrTotal {
   //  winhc_eci_dev company_license_creditchina new_cid,licence_number cid
   //  winhc_eci_dev company_license_entpub new_cid,license_number cid
 
+  //  winhc_eci_dev company_liquidating_info new_cid,brief_cancel_result,announcement_apply_date cid
+
+  //  winhc_eci_dev wenshu_detail_combine new_cid,case_no,cname cid
+
+
+
 
   def main(args: Array[String]): Unit = {
 

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -34,6 +34,7 @@ object CompanyForCid {
 //  winhc_eci_dev ods_company_finance new_cid,round,money
 //  winhc_eci_dev ods_company_dishonest_info new_cid,case_no
 
+//  winhc_eci_dev wenshu_detail_combine new_cid,case_no,cname,name_type
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args
 

+ 161 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyWenshuDetailCombine.scala

@@ -0,0 +1,161 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+/**
+ * @Description: 文书增量添加cid
+ * @author π
+ * @date 2020/8/17
+ */
+object CompanyWenshuDetailCombine {
+  def main(args: Array[String]): Unit = {
+    val project ="winhc_eci_dev"
+    val tableName ="inc_ods_wenshu_detail_combine"
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyWenshuDetailCombine(spark,project,tableName).calc
+    spark.stop()
+  }
+
+}
+case class CompanyWenshuDetailCombine(s: SparkSession,
+                                      project: String, //表所在工程名
+                                      tableName: String //表名(不加前后辍)
+                                     ) extends LoggingUtils with CompanyMapping{
+  override protected val spark: SparkSession = s
+
+  def calc ={
+    prepareFunctions(spark)
+    var v1 = BaseUtil.getPartion("winhc_eci_dev.inc_ods_wenshu_detail_combine", spark)//添加cid后文书-最新分区
+    val v2 = BaseUtil.getPartion("winhc_eci.ods_wenshu_detail", spark)//缺cid文书-最新分区
+
+    val mapDs = BaseUtil.getPartion("winhc_eci_dev.base_company_mapping", spark)//cid映射最新分区
+
+    if(StringUtils.isBlank(v1)){
+      v1="20200604"
+    }
+
+    println(
+      s"""
+        |v1:$v1
+        |v2:$v2
+        |mapDs:$mapDs
+        |""".stripMargin)
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ods_wenshu_detail_combine PARTITION(ds='$v2')
+        |SELECT  name_type
+        |        ,e.cname
+        |        ,f.new_cid AS cid
+        |        ,case_id
+        |        ,uuid
+        |        ,docid
+        |        ,case_no
+        |        ,doc_type
+        |        ,case_type
+        |        ,case_reason_level2
+        |        ,case_reason_level3
+        |        ,case_reason_level4
+        |        ,case_reason
+        |        ,case_reason_levelnum
+        |        ,case_stage
+        |        ,case_amt
+        |        ,party_info
+        |        ,court_name
+        |        ,court_province
+        |        ,court_city
+        |        ,court_level
+        |        ,yg_info
+        |        ,yg_type
+        |        ,yg_name
+        |        ,yg_wtdlr
+        |        ,yg_faren
+        |        ,yg_lawyer
+        |        ,bg_info
+        |        ,bg_type
+        |        ,bg_name
+        |        ,bg_wtdlr
+        |        ,bg_faren
+        |        ,bg_lawyer
+        |        ,third_party
+        |        ,danbao
+        |        ,fact
+        |        ,court_view
+        |        ,judge
+        |        ,clerk
+        |        ,judge_date_cn
+        |        ,judge_date
+        |        ,judge_year
+        |        ,judge_result
+        |        ,is_success
+        |        ,url
+        |        ,head
+        |        ,title
+        |        ,legal_basis
+        |        ,keywords
+        |        ,plaintiffs
+        |        ,defendants
+        |        ,crawl_date
+        |        ,update_date
+        |        ,sample_type
+        |        ,judge_main
+        |FROM    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER (PARTITION BY case_no,cname,name_type ORDER BY update_date DESC) num
+        |                        FROM    (
+        |                                    SELECT  *
+        |                                    FROM    (
+        |                                                SELECT  "y" AS name_type
+        |                                                        ,*
+        |                                                FROM    winhc_eci.ods_wenshu_detail
+        |                                                LATERAL VIEW explode(split(yg_name, '\n')) tmpTable AS cname
+        |                                                WHERE   ds > '$v1'
+        |                                                AND     yg_type = '企业'
+        |                                            ) c
+        |                                    UNION ALL
+        |                                    SELECT  *
+        |                                    FROM    (
+        |                                                SELECT  "b" AS name_type
+        |                                                        ,*
+        |                                                FROM    winhc_eci.ods_wenshu_detail
+        |                                                LATERAL VIEW explode(split(bg_name, '\n')) tmpTable AS cname
+        |                                                WHERE   ds > '$v1'
+        |                                                AND     bg_type = '企业'
+        |                                            ) d
+        |                                ) e
+        |                    ) x
+        |            WHERE   num = 1
+        |        ) e
+        |JOIN    (
+        |            SELECT  *
+        |            FROM    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY update_time DESC) num
+        |                        FROM    winhc_eci_dev.base_company_mapping
+        |                        WHERE   ds = '$mapDs'
+        |                    ) k
+        |            WHERE   num = 1
+        |            AND     length(CLEANUP(cname)) > 4
+        |        ) f
+        |ON      CLEANUP(e.cname) = CLEANUP(f.cname)
+        |""".stripMargin)
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -317,6 +317,7 @@ object ChangeExtract {
     , Args(tableName = "company_license_creditchina", primaryFields = "licence_content")//行政许可-信用中国
     , Args(tableName = "company_license_entpub", primaryFields = "license_name")//行政许可-企业公示
     , Args(tableName = "company_license", primaryFields = "license_name")//行政许可
+    , Args(tableName = "wenshu_detail_combine", primaryFields = "license_name")//文书
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/wenshu_detail_combine.scala

@@ -0,0 +1,28 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/18
+ * @Description:裁判文书
+ */
+
+case class wenshu_detail_combine(equCols: Seq[String]) extends CompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cname"), s"裁判文书")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cname"), s"裁判文书")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "裁判文书", Array("case_no", "cname"))
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    if(StringUtils.isBlank(newMap("judge_date"))){
+      newMap("update_date")
+    }else{
+      newMap("judge_date")
+    }
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -212,6 +212,7 @@ object CompanyDynamic {
     , Args(tableName = "company_double_random_check_info", bName = 1)//双随机抽查
     , Args(tableName = "company_judicial_sale_combine_list", bName = 1)//司法拍卖
     , Args(tableName = "company_tax_contravention", bName = 1)//税收违法
+    , Args(tableName = "wenshu_detail_combine", bName = 1)//裁判文书
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -22,7 +22,7 @@ trait CompanyDynamicHandle {
     , "" -> "tender_qichacha" //中标信息企查查
     , "company_abnormal_info" -> "eci_exception" //经营异常
     , "" -> "eci_zscq" //知识产权
-    , "" -> "eci_wenshu" //裁判文书
+    , "wenshu_detail_combine" -> "eci_wenshu" //裁判文书
     , "" -> "court_announcement" //法院公告
     , "" -> "" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
@@ -66,7 +66,7 @@ trait CompanyDynamicHandle {
     , "" -> "3" // 企业股东失信被执
     , "company_abnormal_info" -> "4" // 经营异常
     , "" -> "5" // 知识产权
-    , "" -> "6" // 裁判文书
+    , "wenshu_detail_combine" -> "6" // 裁判文书
     , "company_court_announcement_list" -> "7" // 法院公告
     , "" -> "8" // 对外投资
     , "company_mortgage_info" -> "9" // 动产抵押

+ 56 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -0,0 +1,56 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/8/18
+ * @Description: 裁判文书
+ */
+case class wenshu_detail_combine() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    val t1 = new_map("name_type")
+    var t2 =""
+    if(StringUtils.isNotBlank(new_map("name_type"))){
+      if("y".equals(t1)){
+        t2 ="原告"
+      }else{
+        t2="被告"
+      }
+    }
+    s"""
+       |案由:${new_map("case_reason_level3")}\n
+       |案号:${new_map("case_no")}\n
+       |诉讼身份:${t2}\n
+       |发布日期:${new_map("judge_date")}
+       |""".stripMargin
+  }
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -55,7 +55,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |COMMENT 'TABLE COMMENT'
          |PARTITIONED BY (ds STRING COMMENT '分区')
          |""".stripMargin)
-    if (!spark.catalog.tableExists("adsTable")) {
+    if (!spark.catalog.tableExists(adsTable)) {
       return
     }
 

+ 8 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -22,6 +22,10 @@ case class CompanyIncrForCidUtils(s: SparkSession,
   val rowKeyMapping =
     Map("company_double_random_check_result_info" -> "new_cid,main_id" //双随机抽查-结果公示
     )
+//排序时间
+  val updateTimeMapping =
+    Map("wenshu_detail_combine" -> "update_date" //文书排序时间
+    )
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -116,6 +120,9 @@ case class CompanyIncrForCidUtils(s: SparkSession,
     //rowkey前缀匹配
     val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")
 
+    //排序时间
+    val update_time = updateTimeMapping.getOrElse(tableName,"update_time")
+
     sql(
       s"""
          |SELECT  cid,current_cid as new_cid
@@ -141,7 +148,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                    ,new_cid
          |                    ,cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY $update_time DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS flag
          |                                ,a.new_cid