Ver código fonte

Merge remote-tracking branch 'origin/master'

lyb 4 anos atrás
pai
commit
4dd01bdda2
21 arquivos alterados com 397 adições e 74 exclusões
  1. 8 6
      src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala
  2. 2 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  3. 5 31
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala
  4. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala
  5. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala
  6. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala
  7. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala
  8. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala
  9. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala
  10. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala
  11. 4 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala
  12. 4 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala
  13. 4 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala
  14. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala
  15. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala
  16. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  17. 166 0
      src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala
  18. 0 1
      src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala
  19. 2 2
      src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
  20. 61 0
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala
  21. 120 0
      src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

+ 8 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -30,12 +30,12 @@ case class JustiCase(s: SparkSession,
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.ods_$tableName
          |WHERE   ds=${ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
+         |""".stripMargin)
+/*
          |UNION
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.inc_ods_$tableName
          |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
-         |""".stripMargin)
-/*
           s"""SELECT *,get_justicase_id(case_no) AS case_no_hash FROM (
              |SELECT '(2016)闽02刑更704号' AS case_no, '(2008)厦刑初字第69号\n(2009)闽刑终字第133号\n(2012)厦刑执字第628号\n(2015)厦刑执字第485号' AS connect_case_no
              |UNION
@@ -88,10 +88,6 @@ case class JustiCase(s: SparkSession,
          |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
          |  FROM    $project.ods_$tableName
          |  WHERE   ds=${ods_ds}  AND ${toCol} IS NOT NULL
-         |  UNION
-         |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
-         |  FROM    $project.inc_ods_$tableName
-         |  WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL
          |) A
          |LEFT JOIN
          |(
@@ -99,6 +95,12 @@ case class JustiCase(s: SparkSession,
          |) B
          |ON A.case_no_hash=B.case_no_hash
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+    /*
+         |  UNION
+         |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
+         |  FROM    $project.inc_ods_$tableName
+         |  WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL
+    */
   }
 }
 

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -117,7 +117,8 @@ trait CompanyDynamicHandle {
 
   //风险等级映射
   private val info_risk_level_map = Map(
-    "company_dishonest_info" -> "4" //企业失信被执情况
+    "bankruptcy_open_case" -> "4" //破产公告
+    , "company_dishonest_info" -> "4" //企业失信被执情况
     , "" -> "4" //股东失信被执情况
     , "" -> "4" //股权冻结
     , "" -> "4" //司法拍卖

+ 5 - 31
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala

@@ -16,36 +16,10 @@ case class bankruptcy_open_case() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("case_no")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""案号:${new_map("case_no")}\n
-       |被申请人:${new_map("respondent")}\n
-       |申请人:${new_map("applicant")}\n
-       |公开日期:${new_map("public_date")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""案号:${new_map("case_no")}
+       |被申请人:${new_map("respondent")}
+       |申请人:${new_map("applicant")}
+       |公开日期:${new_map("public_date")}""".stripMargin
   }
-
-  /**
-   * 变更时间
-   *
-   * @param new_map
-   * @return
-   */
-//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
-
-  /**
-   * 风险等级
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala

@@ -42,5 +42,5 @@ case class company_annual_report_out_guarantee()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala

@@ -27,7 +27,7 @@ case class company_check_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     s"""
-       |日期:${new_map("check_date")}\n
+       |日期:${new_map("check_date")}
        |结果:${new_map("check_result")}
        |""".stripMargin
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala

@@ -28,9 +28,9 @@ case class company_court_announcement_list() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     s"""
-       |案号:${new_map("case_no")}万元\n
-       |上诉人:${new_map("plaintiff")}\n
-       |被上诉人:${new_map("litigant")}\n
+       |案号:${new_map("case_no")}万元
+       |上诉人:${new_map("plaintiff")}
+       |被上诉人:${new_map("litigant")}
        |刊登日期:${new_map("publish_date")}
        |""".stripMargin
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala

@@ -29,8 +29,8 @@ case class company_court_open_announcement_list() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |开庭时间:${new_map("start_date")}\n
-       |案号:${new_map("case_no")}\n
+       |开庭时间:${new_map("start_date")}
+       |案号:${new_map("case_no")}
        |案由:${new_map("case_reason")}
        |""".stripMargin
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala

@@ -29,8 +29,8 @@ case class company_court_register_list() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |立案日期:${new_map("filing_date")}\n
-       |上诉人:${new_map("plaintiff")}\n
+       |立案日期:${new_map("filing_date")}
+       |上诉人:${new_map("plaintiff")}
        |被上诉人:${new_map("defendant")}
        |""".stripMargin
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala

@@ -26,9 +26,9 @@ case class company_double_random_check_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     s"""
-       |任务编号:${new_map("check_task_num")}\n
-       |任务名称:${new_map("check_task_name")}\n
-       |抽查机关:${new_map("check_department")}\n
+       |任务编号:${new_map("check_task_num")}
+       |任务名称:${new_map("check_task_name")}
+       |抽查机关:${new_map("check_department")}
        |完成日期:${new_map("check_date")}
        |""".stripMargin
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala

@@ -28,8 +28,8 @@ case class company_judicial_sale_combine_list() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |标题:${new_map("title")}\n
-       |起拍价:${new_map("initial_price")}\n
+       |标题:${new_map("title")}
+       |起拍价:${new_map("initial_price")}
        |拍卖时间:${new_map("start_time")}
        |""".stripMargin
   }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala

@@ -27,10 +27,10 @@ case class company_license() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |许可证编号:${new_map("license_number")}\n
-       |有效期自:${new_map("start_date")}\n
-       |有效期至:${new_map("end_date")}\n
-       |许可机关:${new_map("department")}\n
+       |许可证编号:${new_map("license_number")}
+       |有效期自:${new_map("start_date")}
+       |有效期至:${new_map("end_date")}
+       |许可机关:${new_map("department")}
        |许可内容:${new_map("scope")}
        |""".stripMargin
   }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala

@@ -28,10 +28,10 @@ case class company_license_creditchina() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |许可证编号:${new_map("licence_number")}\n
-       |有效期自:${new_map("decision_date")}\n
-       |有效期至:${new_map("end_date")}\n
-       |许可机关:${new_map("department")}\n
+       |许可证编号:${new_map("licence_number")}
+       |有效期自:${new_map("decision_date")}
+       |有效期至:${new_map("end_date")}
+       |许可机关:${new_map("department")}
        |许可内容:${new_map("resume")}
        |""".stripMargin
   }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala

@@ -27,10 +27,10 @@ case class company_license_entpub() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |许可证编号:${new_map("license_number")}\n
-       |有效期自:${new_map("start_date")}\n
-       |有效期至:${new_map("end_date")}\n
-       |许可机关:${new_map("department")}\n
+       |许可证编号:${new_map("license_number")}
+       |有效期自:${new_map("start_date")}
+       |有效期至:${new_map("end_date")}
+       |许可机关:${new_map("department")}
        |许可内容:${new_map("scope")}
        |""".stripMargin
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala

@@ -28,7 +28,7 @@ case class company_tax_contravention() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     s"""
-       |案件性质:${new_map("case_type")}\n
+       |案件性质:${new_map("case_type")}
        |发布日期:${new_map("publish_time")}
        |""".stripMargin
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala

@@ -28,9 +28,9 @@ case class company_zxr_final_case() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
     s"""
-       |案号:${new_map("case_no")}\n
-       |执行法院:${new_map("court_name")}\n
-       |立案日期:${new_map("case_create_time")}\n
+       |案号:${new_map("case_no")}
+       |执行法院:${new_map("court_name")}
+       |立案日期:${new_map("case_create_time")}
        |终本日期:${new_map("case_final_time")}
        |""".stripMargin
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -36,9 +36,9 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
       }
     }
     s"""
-       |案由:${new_map("case_reason_level3")}\n
-       |案号:${new_map("case_no")}\n
-       |诉讼身份:${t2}\n
+       |案由:${new_map("case_reason_level3")}
+       |案号:${new_map("case_no")}
+       |诉讼身份:${t2}
        |发布日期:${new_map("judge_date")}
        |""".stripMargin
   }

+ 166 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -0,0 +1,166 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, case_connect_utils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/28 16:52
+ * @Description:
+ */
+case class JudicialCaseRelation(s: SparkSession,
+                                project: String //表所在工程名
+                               ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  val table_id_map = Map("justicase" -> "case_id")
+
+
+  def getStrToMap(cols: Seq[String]): String = {
+    val set = cols.toSet
+    val str = set.map(e => {
+      s"concat_ws('\001','$e',cast($e as string))"
+    }).mkString(",")
+    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
+  }
+
+
+  def all(tableName: String): Unit = {
+    val table_id = table_id_map(tableName)
+    val ods_table_name = s"ods_$tableName"
+    val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
+    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    winhc_eci_dev.$ods_table_name lateral view explode(split(connect_case_no,'\n')) t as single_connect_case_no
+         |WHERE   ds = '$ods_last_ds'
+         |""".stripMargin)
+      .createOrReplaceTempView(s"all_case_tmp_$tableName")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ods_last_ds',tn='$tableName')
+         |SELECT  $table_id as id
+         |        , 1 as main_case_no
+         |        ,case_no
+         |        ,${getStrToMap(other_cols)} as case_attribute
+         |FROM    all_case_tmp_$tableName
+         |UNION ALL
+         |SELECT  $table_id as id
+         |        , 0 as main_case_no
+         |        ,single_connect_case_no as case_no
+         |        ,${getStrToMap(other_cols)} as case_attribute
+         |FROM    all_case_tmp_$tableName
+         |WHERE   single_connect_case_no is not null
+         |""".stripMargin)
+  }
+
+
+  def inc(tableName: String, ds: String): Unit = {
+
+
+  }
+
+  private def getVal(map: Map[String, String], key: String): String = {
+    map.getOrElse(key, "")
+  }
+
+  def sort(v1: String, v2: String): String = {
+    val seq = Seq(v1, v2)
+    seq.filter(_ != null).sorted.mkString("")
+  }
+
+  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+
+    try {
+      val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
+      val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
+
+      val current_case_no = getVal(m1, "case_no")
+      val connect_case_no = getVal(m2, "case_no")
+      val current_court_name = getVal(m1, "court_name")
+      val connect_court_name = getVal(m2, "court_name")
+
+      case_connect_utils.isConnect(current_case_party_list_org, connect_case_party_list_org, current_case_no, connect_case_no, current_court_name, connect_court_name)
+    } catch {
+      case ex: Exception => {
+        logError(ex.getMessage)
+        println("error")
+        println(m1)
+        println(m2)
+      }
+        false
+    }
+  }
+
+  def relation(): Unit = {
+    spark.udf.register("case_equ", case_equ _)
+    spark.udf.register("str_sort", sort _)
+    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
+         | SELECT  id_1
+         |         ,id_2
+         |         ,case_no_1
+         |         ,case_no_2
+         |         ,tn_t1
+         |         ,tn_t2
+         | FROM    (
+         |             SELECT  *
+         |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
+         |             FROM    (
+         |                        SELECT  t1.id AS id_1
+         |                                ,t2.id AS id_2
+         |                                ,t1.case_no AS case_no_1
+         |                                ,t2.case_no AS case_no_2
+         |                                ,t1.tn AS tn_t1
+         |                                ,t2.tn AS tn_t2
+         |                                ,concat(concat(t1.id,t1.tn),concat(t2.id,t2.tn)) as xjk_sorted
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    winhc_eci_dev.dwd_judicial_case
+         |                                    WHERE   ds = '$dwd_last_ds'
+         |                                    AND     case_no IS NOT NULL
+         |                                    AND     case_no <> ''
+         |                                    AND     case_no RLIKE '\\d+'
+         |                                ) AS t1
+         |                        FULL JOIN (
+         |                                      SELECT  *
+         |                                      FROM    winhc_eci_dev.dwd_judicial_case
+         |                                      WHERE   ds = '$dwd_last_ds'
+         |                                      AND     case_no IS NOT NULL
+         |                                      AND     case_no <> ''
+         |                                      AND     case_no RLIKE '\\d+'
+         |                                  ) AS t2
+         |                        ON      t1.case_no = t2.case_no
+         |                        AND     t1.id <> t2.id
+         |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
+         |                     ) AS t1
+         |         ) AS t2
+         | WHERE   t2.num = 1
+         |""".stripMargin)
+  }
+}
+
+object JudicialCaseRelation {
+
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
+    //    jcr.all("justicase")
+    jcr.relation()
+    spark.stop()
+  }
+}

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -146,5 +146,4 @@ trait BaseFunc {
       BKDRHash(case_nos.split(",").sorted.mkString(","))
     })
   }
-
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -116,11 +116,11 @@ object BaseUtil {
     }
   }
   def BKDRHash(str: String): Long = {
-    val seed: Long = 131 // 31 131 1313 13131 131313 etc..
+    val seed: Long = 1313131313 // 31 131 1313 13131 131313 etc..
     var hash: Long = 0
     for (i <- 0 to str.length - 1) {
       hash = hash * seed + str.charAt(i)
-      hash = hash & 0x7FFFFFFF
+      hash = hash & 0x7FFFFFFFFFFFFFFFL
     }
     return hash
   }

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 自有增量数据合并到天眼查数据
+ * @author π
+ * @date 2020/8/3114:07
+ */
+object CompanyIncrCombineUtils {
+  def main(args: Array[String]): Unit = {
+    val Array(project, source, target) = args
+
+    println(
+      s"""
+         |project:$project
+         |source:$source
+         |target:$target
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    CompanyIncrCombineUtils(spark, source, target).calc()
+    spark.stop()
+  }
+}
+
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+  override protected val spark: SparkSession = s
+
+  def calc(): Unit = {
+
+    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+
+    val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
+      !s.equals("ds")
+    })
+
+    //判断目标表是否之前合并过
+    val list = sql(
+      s"""
+         |select max(ds) max_ds from $target where id = -1 and ds > '0'
+         |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
+
+    println(s"list: $list")
+
+    sql(
+      s"""
+         |INSERT into table $target PARTITION(ds=$ds2)
+         |SELECT ${cols.mkString(",")} from
+         |$source
+         |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
+         |""".stripMargin)
+  }
+}

+ 120 - 0
src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

@@ -0,0 +1,120 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/31 10:42
+ * @Description:
+ */
+object case_connect_utils {
+
+  def isConnect(current_case_party_list_org: Seq[String], connect_case_party_list_org: Seq[String], current_case_no: String = "", connect_case_no: String = "", current_court_name: String = "", connect_court_name: String = ""): Boolean = {
+    val current_case_party_list = current_case_party_list_org.filter(StringUtils.isNotBlank)
+    val connect_case_party_list = connect_case_party_list_org.filter(StringUtils.isNotBlank)
+
+    if (current_case_party_list.isEmpty || connect_case_party_list.isEmpty) {
+      return false
+    }
+
+    is_vague_word(current_case_party_list = current_case_party_list, connect_case_party_list = connect_case_party_list) match {
+      case 1 => {
+        vague_match(position = 0, current_case_party_list
+          , connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name
+        )
+      }
+      case 2 => {
+        vague_match(position = 1, current_case_party_list
+          , connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name
+        )
+      }
+      case _ => {
+        precise_match(current_case_party_list, connect_case_party_list)
+      }
+    }
+  }
+
+  private def vague_match(position: Int = 1,
+                          current_case_party_list: Seq[String]
+                          , connect_case_party_list: Seq[String]
+                          , current_case_no: String, connect_case_no: String
+                          , current_court_name: String, connect_court_name: String
+                         ): Boolean = {
+    var current_party: Seq[String] = null
+    var connect_party: Seq[String] = null
+    position match {
+      case 1 => {
+        current_party = connect_case_party_list
+        connect_party = current_case_party_list
+      }
+      case _ => {
+        current_party = current_case_party_list
+        connect_party = connect_case_party_list
+      }
+    }
+
+    for (char <- current_party.mkString("").toCharArray.map(_.toString).filter(vague_word.contains(_))) {
+      for (userName <- current_party) {
+        for (splitUser <- userName.split(char)) {
+          val all_str = connect_party.mkString("")
+          if (!all_str.contains(splitUser.substring(0, 1))) {
+            return false
+          }
+        }
+      }
+    }
+    case_no_match(current_case_no = current_case_no, connect_case_no = connect_case_no) || court_name_match(connect_court_name = connect_court_name, current_court_name = current_court_name)
+  }
+
+  private def case_no_match(current_case_no: String, connect_case_no: String): Boolean = current_case_no.equals(connect_case_no)
+
+  private def court_name_match(current_court_name: String, connect_court_name: String): Boolean = current_court_name.equals(connect_court_name)
+
+  private def precise_match(current_case_party_list: Seq[String]
+                            , connect_case_party_list: Seq[String]
+                           ): Boolean = {
+    var num = 0
+    for (userName <- connect_case_party_list) {
+      if (current_case_party_list.contains(userName))
+        num = num + 1
+    }
+    num >= connect_case_party_list.size / 2
+  }
+
+  val vague_word: Seq[String] = Seq("某", "*", "*", "x", "ⅹ", "x", "X", "×")
+
+  private def is_vague_word(current_case_party_list: Seq[String], connect_case_party_list: Seq[String]): Int = {
+    val current_vague = current_case_party_list.exists(r => vague_word.exists(r.equals(_)))
+    if (current_vague) {
+      return 1
+    }
+    val connect_vague = connect_case_party_list.exists(r => vague_word.exists(r.contains(_)))
+    if (connect_vague) {
+      return 2
+    }
+    0
+  }
+
+  def sort(v1: String, v2: String): String = {
+    val seq = Seq(v1, v2)
+    seq.filter(_ != null).sorted.mkString("")
+  }
+
+  def main(args: Array[String]): Unit = {
+    for(e<-Seq(("a","b"),("b","a"))){
+      println(sort(e._1,e._2))
+    }
+
+
+
+//    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
+//    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
+//
+//    val current_case_no = ""
+//    val connect_case_no = ""
+//    val current_court_name = ""
+//    val connect_court_name = ""
+//
+//    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
+  }
+}