Ver Fonte

v9司法案件

xufei há 2 anos atrás
pai
commit
6d1d9ba9bb

+ 454 - 0
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggsV3.scala

@@ -0,0 +1,454 @@
+package com.winhc.bigdata.spark.ng.judicial
+
+import com.winhc.bigdata.spark.udf._
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件新版本聚合(迭代)
+ * @author π
+ * @date 2022/7/6 16:46
+ */
+
+case class args_case_v3(tableName: String = ""
+                        , rowkey: String = "rowkey"
+                        , cols_map: Map[String, String] = Map.empty
+                       )
+
+object args_case_v3 {
+  val tn_mapping = Map[String, String](
+    "wenshu_detail_v2" -> "0"
+    , "company_court_open_announcement" -> "1"
+    , "company_court_announcement" -> "2"
+    , "company_dishonest_info" -> "3"
+    , "company_send_announcement" -> "4"
+    , "company_zxr_restrict" -> "5"
+    , "company_zxr_final_case" -> "6"
+    , "company_zxr" -> "7"
+    , "company_court_register" -> "8"
+  )
+  val tab_args = Seq(
+    //文书(金额万元)
+    args_case_v3(tableName = "wenshu_detail_v2"
+      , cols_map = Map[String, String]("flag" -> "0", "case_stage" -> "case_stage(case_no)"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "judge_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "case_amt", "judge_amt" -> "judge_amt", "exec_amt" -> "null"
+        , "data" -> "map('date',judge_date,'party_title',party_title,'case_end',case_end(judge_result) )"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '0', 'date', cast(judge_date as string), 'detail_id', rowkey, 'doc_type', doc_type, 'judge_result', judge_result))"
+      ))
+    //开庭公告
+    , args_case_v3(tableName = "company_court_open_announcement"
+      , cols_map = Map[String, String]("flag" -> "1", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '1', 'date', cast(start_date as string), 'detail_id', rowkey, 'court', court, 'court_room',court_room))"
+      ))
+    //法院公告
+    , args_case_v3(tableName = "company_court_announcement"
+      , cols_map = Map[String, String]("flag" -> "2", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "litigant_info", "date" -> "concat_ws(' ',publish_date,'00:00:00')"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',concat_ws(' ',publish_date,'00:00:00'))"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '2', 'date',concat_ws(' ',publish_date,'00:00:00'), 'detail_id', rowkey, 'announcement_type', announcement_type, 'court_name', court_name))"
+      ))
+    //失信人
+    , args_case_v3(tableName = "company_dishonest_info"
+      , cols_map = Map[String, String]("flag" -> "3", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> " to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "pub_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',reg_time,'case_end',if(deleted = 1,'1',null) )"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '3', 'date', cast(pub_date as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'performance',  performance, 'action_content', action_content ))"
+      ))
+    //送达公告
+    , args_case_v3(tableName = "company_send_announcement"
+      , cols_map = Map[String, String]("flag" -> "4", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "start_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',start_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '4', 'date', cast(start_date as string), 'detail_id', rowkey, 'defendant_info', json_array(defendant_info), 'plaintiff_info', json_array(plaintiff_info)))"
+      ))
+    //限高 //todo 有公司取公司,其次取人
+    , args_case_v3(tableName = "company_zxr_restrict"
+      , cols_map = Map[String, String]("flag" -> "5", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null", "yg_name" -> "null"
+        , "bg_name" -> "to_json(array(named_struct('litigant_id',if(length(company_name)  = 0 or company_name is NULL ,pid,company_id) ,'name', if(length(company_name)  = 0 or company_name is NULL ,person_name,company_name) )  ))"
+        , "date" -> "case_create_time", "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',case_create_time,'case_end',if(deleted = 1,'1',null) )"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '5', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'person', array(named_struct('litigant_id',COALESCE(pid,''),'person_name',person_name)), 'company', array(named_struct('litigant_id', company_id, 'company_name',company_name)),'applicant_info', json_array(applicant_info) ))"
+      ))
+    //终本
+    , args_case_v3(tableName = "company_zxr_final_case"
+      , cols_map = Map[String, String]("flag" -> "6", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court_name", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))"
+        , "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',case_create_time,'case_end',if(deleted = 0,'1',null) )"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '6', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''), 'name',name)), 'exec_amount', amt_div(exec_amount, 10000), 'no_exec_amount', amt_div(no_exec_amount, 10000) ))"
+      ))
+    //被执
+    , args_case_v3(tableName = "company_zxr"
+      , cols_map = Map[String, String]("flag" -> "7", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "null"
+        , "yg_name" -> "null", "bg_name" -> "to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)))", "date" -> "case_create_time"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "amt_div(exec_money,10000)"
+        , "data" -> "map('date', case_create_time, 'exec_info', to_json(array(named_struct('litigant_id',COALESCE(keyno,''),'name',name,'exec_money',amt_div(exec_money,10000),'date',case_create_time  ))) )"
+        , "all_name" -> "null"
+        , "detail_info" -> "to_json(named_struct('flag', '7', 'date', cast(case_create_time as string), 'detail_id', rowkey, 'name', array(named_struct('litigant_id',COALESCE(keyno,''),'name',name)), 'exec_money', amt_div(exec_money,10000) ))"
+      ))
+    //立案信息
+    , args_case_v3(tableName = "company_court_register"
+      , cols_map = Map[String, String]("flag" -> "8", "title" -> "null", "case_type" -> "case_type(case_no)"
+        , "case_stage" -> "case_stage(case_no)", "court_name" -> "court", "case_reason" -> "case_reason"
+        , "yg_name" -> "plaintiff_info", "bg_name" -> "defendant_info", "date" -> "filing_date"
+        , "detail_id" -> "rowkey", "case_amt" -> "null", "judge_amt" -> "null", "exec_amt" -> "null"
+        , "data" -> "map('date',filing_date)"
+        , "all_name" -> "litigant_info"
+        , "detail_info" -> "to_json(named_struct('flag', '8', 'date', cast(filing_date as string), 'detail_id', rowkey, 'court', court, 'judge', judge))"
+      ))
+  )
+
+  def get_job_args(tn: String): args_case_v3 = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+  def get_job_args(): args_case_v3 = {
+    args_case_v3()
+  }
+}
+
+
+object JudicialCaseRelationAggsV3 {
+  def main(args: Array[String]): Unit = {
+    var project = ""
+    var tn = ""
+    var c = ""
+    if (args.length == 3) {
+      val Array(p1, p2, p3) = args
+      project = p1
+      tn = p2
+      c = p3
+    } else if (args.length == 2) {
+      val Array(p1, p2) = args
+      project = p1
+      c = p2
+    } else {
+      println("please check project tn c!")
+      sys.exit(-1)
+    }
+    println(
+      s"""
+         |project: $project
+         |tn: $tn
+         |c: $c
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    if (StringUtils.isBlank(tn)) {
+      tn = "wenshu_detail_v2"
+    }
+    if ("all".equals(tn)) {
+      val seq = args_case_v3.tab_args.map(x => {
+        (x.tableName, () => {
+          run(project, x.tableName, c, spark)
+          true
+        })
+      })
+      AsyncExtract.startAndWait(spark, seq)
+    } else if (tn.contains(",")) {
+      val arr = tn.split(",", -1)
+      val seq = args_case_v3.tab_args.map(_.tableName).filter(arr.contains(_)).map(x => {
+        (x, () => {
+          run(project, x, c, spark)
+          true
+        })
+      })
+      AsyncExtract.startAndWait(spark, seq)
+    } else {
+      run(project, tn, c, spark)
+    }
+    spark.stop()
+  }
+
+  private def run(project: String, tn: String, c: String, spark: SparkSession) = {
+    val r = JudicialCaseRelationAggsV3(spark, project, args_case_v3.get_job_args(tn))
+    c match {
+      case "pre_calc" => r.pre_calc()
+      case "calc" => r.calc()
+      case _ => {
+        println("not fun to run !")
+        sys.exit(-1)
+      }
+    }
+  }
+}
+
+case class JudicialCaseRelationAggsV3(s: SparkSession, project: String, args_case_v3: args_case_v3
+                                     ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+  //预处理表
+  val ads_judicial_case_relation_pre = s" $project.ads_judicial_case_relation_pre_v9"
+  //替换id表
+  val ads_judicial_case_relation_id = s" $project.ads_judicial_case_relation_id_v9"
+  //主表
+  val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1_v9"
+  //明细表(增强)
+  val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3_v9"
+
+
+  val update = s"update"
+  val incr = s"incr"
+
+  private val cols_map: Map[String, String] = args_case_v3.cols_map
+  private val rowkey: String = args_case_v3.rowkey
+  private val tableName: String = args_case_v3.tableName
+
+  val ads_table = s" $project.ads_$tableName" + "_v9"
+  val inc_ads_table = s" $project.inc_ads_$tableName" + "_v9"
+
+
+  val pre_cols = getColumns(ads_judicial_case_relation_pre).diff(Seq("ds", "tn"))
+  var last_ds = BaseUtil.getPartion(ads_judicial_case_relation_pre, tableName, spark)
+  val calc_ds = BaseUtil.getYesterday()
+
+  if (calc_ds.equals(last_ds)) {
+    last_ds = BaseUtil.getSecondPartion(ads_judicial_case_relation_pre, tableName, spark)
+  }
+  val is_incr = if (StringUtils.isBlank(last_ds)) false else true
+
+  val cols = pre_cols.map(c => {
+    if (cols_map.contains(c)) {
+      s"${cols_map(c)} as $c"
+    } else c
+  })
+
+  case_no_trim_udf_v2()
+  prepareFunctions(spark)
+
+  val sort = get_partition_order_by()
+
+  def pre_calc(): Unit = {
+    var all_sql = ""
+    if (!is_incr) {
+      all_sql =
+        s"""
+           |SELECT  *
+           |FROM    $ads_table
+           |WHERE   ${if (is_incr) "ds = -1" else "ds > 0"}
+           |UNION ALL
+           |""".stripMargin
+    }
+
+    //裁判文书
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_pre PARTITION(ds='$calc_ds',tn='$tableName')
+         |SELECT
+         |${pre_cols.mkString(",")}
+         |from    (
+         |            SELECT  ${cols.mkString(",")}
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $rowkey ORDER BY $sort) AS num
+         |            from    (
+         |                       $all_sql
+         |                        SELECT  *
+         |                        FROM    $inc_ads_table
+         |                        WHERE    ${if (is_incr) s"ds > $last_ds" else "ds > 0"}
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |${if (isWindows) "LIMIT 1000" else ""}
+         |""".stripMargin).show(100, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkipPlus(ads_judicial_case_relation_pre, calc_ds, tableName)
+  }
+
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf_v2()
+    registerCourtRank()
+    spark.udf.register("name_aggs", new NameAggsPlusV2(1000))
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
+    spark.udf.register("all_name_plus_v2", new AllNamePlusV2(1000))
+    spark.udf.register("case_amt_plus_v2", new CaseAmtAggsPlusV2(1000))
+
+    //明细表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r3 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    id,
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    last_date   ,
+         |    0 deleted   ,
+         |    all_name    ,
+         |    court_level,
+         |    case_amt,
+         |    judge_amt,
+         |    exec_info,
+         |    case_end
+         |FROM
+         |(
+         |SELECT  md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
+         |        ,judicase_id
+         |        ,max(title) title
+         |        ,case_type(max(case_no)) as case_type
+         |        ,case_reason(case_reason,date,flag) case_reason
+         |        ,case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,case_stage(max(case_no)) as case_stage
+         |        ,trim_black(concat_ws(',', collect_set(lable))) lable
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,max(date) last_date
+         |        ,name_aggs(yg_name,bg_name,flag,data['date'],detail_id) name_aggs
+         |        ,all_name_plus_v2(all_name) all_name
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,max(case_amt) as case_amt
+         |        ,max(judge_amt) as judge_amt
+         |        ,case_amt_plus_v2(data['exec_info']) as exec_info
+         |        ,max(case_end) as case_end
+         |FROM    (
+         |        SELECT  a.*,court_level(court_name) court_level
+         |        FROM    (
+         |                   SELECT   judicase_id
+         |                           ,flag
+         |                           ,title
+         |                           ,case_type(case_no) case_type
+         |                           ,adjust_reason(case_reason) case_reason
+         |                           ,case_no_trim(case_no) as case_no
+         |                           ,court_name
+         |                           ,case_stage(case_no) case_stage
+         |                           ,case_label(flag) lable
+         |                           ,detail
+         |                           ,yg_name
+         |                           ,bg_name
+         |                           ,all_name
+         |                           ,date
+         |                           ,detail_id
+         |                           ,case_amt
+         |                           ,judge_amt
+         |                           ,tn
+         |                           ,data
+         |                           ,case_end
+         |                   FROM    $ads_judicial_case_relation_id
+         |                   WHERE   ds = '$calc_ds' AND length(case_label(flag)) > 0 AND  case_no_trim(case_no) is not null AND  date is not null
+         |                )a
+         |)
+         |GROUP BY judicase_id
+         |         ,case_no
+         |) x
+         |""".stripMargin).show(10, false)
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    all_name,
+         |    case_info    ,
+         |    judge_info   ,
+         |    exec_info    ,
+         |    date         ,
+         |    court_level  ,
+         |    0 deleted    ,
+         |    case_end
+         |FROM
+         |(
+         |SELECT  judicase_id
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
+         |        ,case_reason(case_reason,date,'0') case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,trim_black(concat_ws(',',collect_set(court_name))) court_name
+         |        ,max(last_stage) case_stage
+         |        ,trim_black(concat_ws(',', collect_set(lable)) ) lable
+         |        -- ,max(first_case_amt) case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,name_aggs(yg_name,bg_name,'0',date,'0') name_aggs
+         |        ,all_name_plus_v2(all_name) all_name
+         |        ,amt_merge(concat_ws('&',collect_set(case_info))) case_info
+         |        ,amt_merge(concat_ws('&',collect_set(judge_info))) judge_info
+         |        ,case_amt_plus_v2(exec_info) as exec_info
+         |        ,max(case_end) as case_end
+         |FROM    (
+         |        SELECT  a.*
+         |        FROM    (
+         |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt,judge_amt,exec_info,case_end
+         |                   ,court_level(court_name) court_level
+         |                   ,concat_ws('|',case_stage,coalesce(case_amt,0))  as case_info
+         |                   ,concat_ws('|',case_stage,coalesce(judge_amt,0)) as judge_info
+         |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
+         |                   FROM    $ads_judicial_case_relation_r3
+         |                   WHERE   ds = '$calc_ds'
+         |                ) a
+         |        )
+         |GROUP BY judicase_id
+         |)x
+         |""".stripMargin).show(20, false)
+
+    //分区不存在,插入空分区
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)
+    addEmptyPartitionOrSkip(ads_judicial_case_relation_r3, calc_ds)
+  }
+
+  private def get_partition_order_by(): String = {
+    if (pre_cols.contains("update_time") || pre_cols.contains("update_date")) {
+      " ds DESC,update_time DESC "
+    } else {
+      " ds DESC "
+    }
+  }
+
+  def calc_last_ds(tabName: String, default: String = "0"): String = {
+    var d1 = getLastPartitionsOrElse(tabName, default)
+    val d2 = BaseUtil.getYesterday()
+    if (d1.equals(d2)) {
+      d1 = getSecondLastPartitionOrElse(tabName, default)
+    }
+    d1
+  }
+}
+

+ 820 - 0
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationRowkeyRelation_v3.scala

@@ -0,0 +1,820 @@
+package com.winhc.bigdata.spark.ng.judicial
+
+import com.winhc.bigdata.spark.implicits.BaseHelper._
+import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/28 16:52
+ * @Description:
+ */
+case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
+                                                 project: String //表所在工程名
+                                                ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  private val pat = ".*\\d+.*".r
+
+  private val separation = "@@"
+
+  import spark.implicits._
+
+  private val data_extraction_tab = "winhc_ng.dwd_judicial_case_v9"
+  private val out_tab = "winhc_ng.bds_judicial_case_relation_v9"
+
+  private val lawsuit_tab = "wenshu_detail_v2"
+
+  init()
+
+  private def init(): Unit = {
+    case_no_trim_udf_v2()
+    is_id_card_udf()
+    json_parse_udf()
+    spark.udf.register("case_equ", case_equ _)
+    spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
+    spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
+
+
+    import com.winhc.bigdata.spark.implicits.MysqlHelper._
+    val map = spark.read("all_court_info_ali").collect().map(r => {
+      (r.getAs[String]("court_name"), r.getAs[String]("standard_court_name"))
+    }).flatMap(r => {
+      r._1.split(',').map(s => (s, r._2)).toMap
+    }).toMap
+    val broad_map = spark.sparkContext.broadcast(map)
+
+    def get_standard_court_name(name: String): String = {
+      if (StringUtils.isEmpty(name)) null else broad_map.value.getOrElse(name, name)
+    }
+
+
+    spark.udf.register("get_standard_court_name", get_standard_court_name _)
+
+
+    def parse_litigant(name: String): String =
+      if (StringUtils.isEmpty(name))
+        null
+      else
+        name.split("[\n,,;;]").filter(StringUtils.isNotEmpty).distinct.toSeq.mkStringOrNull("\001")
+
+
+    spark.udf.register("parse_litigant", parse_litigant _)
+
+
+    def company_lawsuit_case_no_equ(case_no: String, connect_case_no: String): Boolean = {
+      val case_no_std = if (case_no == null) "" else case_no
+      val connect_case_no_std = if (connect_case_no == null) "" else connect_case_no
+
+      if (case_no_std.contains("执恢") && connect_case_no_std.contains("执")) return true
+
+      if (case_no_std.contains("执") && connect_case_no_std.contains("执")) return false
+      true
+    }
+
+    spark.udf.register("company_lawsuit_case_no_equ", company_lawsuit_case_no_equ _)
+  }
+
+
+  private def step_01_DataExtraction(ds: String, inc: Boolean = false): Unit = {
+    if (!inc) {
+      dropAllPartitions(data_extraction_tab)
+    }
+
+    val company_dishonest_info_view =
+      s"""
+         | rowkey
+         | ,court as court_name
+         | ,case_no
+         | ,gist_unit as connect_court_name
+         | ,gist_dd as connect_case_no
+         | ,parse_litigant(name) as litigant
+         | ,ds
+         |""".stripMargin
+    val company_zxr_view =
+      s"""
+         | rowkey
+         | ,court as court_name
+         | ,case_no
+         | ,court as connect_court_name
+         | ,gist_id as connect_case_no
+         | ,parse_litigant(name) as litigant
+         | ,ds
+         |""".stripMargin
+
+    val company_zxr_restrict_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',parse_litigant(person_name),parse_litigant(company_name))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_zxr_final_case_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(name) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_court_announcement_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name')))) as litigant
+        | ,ds
+        |""".stripMargin
+    val company_court_open_announcement_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_send_announcement_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_court_register_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("dishonest etl...", () => {
+        detail_etl(ds, "company_dishonest_info", company_dishonest_info_view, inc)
+        true
+      }),
+      ("company_zxr etl...", () => {
+        detail_etl(ds, "company_zxr", company_zxr_view, inc)
+        true
+      })
+      ,
+      ("company_zxr_restrict etl...", () => {
+        detail_etl(ds, "company_zxr_restrict", company_zxr_restrict_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_zxr_final_case etl...", () => {
+        detail_etl(ds, "company_zxr_final_case", company_zxr_final_case_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_announcement etl...", () => {
+        detail_etl(ds, "company_court_announcement", company_court_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_open_announcement etl...", () => {
+        detail_etl(ds, "company_court_open_announcement", company_court_open_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_send_announcement etl...", () => {
+        detail_etl(ds, "company_send_announcement", company_send_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_register etl...", () => {
+        detail_etl(ds, "company_court_register", company_court_register_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_lawsuit etl...", () => {
+        etl_lawsuit(ds, inc)
+        true
+      })
+    ))
+  }
+
+
+  def etl(ds: String): Unit = {
+    val out_tab_last_ds = getLastPartitionsOrElse(out_tab, null)
+
+    val inc = if (out_tab_last_ds == null) false else true
+
+    step_01_DataExtraction(ds, inc)
+    if (inc) {
+      inc_func(ds)
+    } else {
+      relationByGroup(ds)
+    }
+  }
+
+
+  private def etl_lawsuit(ds: String, inc: Boolean = false): Unit = {
+    val tableName = lawsuit_tab
+
+    val org_tab = s"winhc_ng.ads_$tableName" + "_v9"
+    val inc_org_tab = s"winhc_ng.inc_ads_$tableName" + "_v9"
+
+    val table_id = "rowkey"
+    val other_cols = Seq("plaintiff_info", "court_name", "case_no", "litigant_info", "defendant_info") ++ Seq(table_id, "ds", "connect_case_no")
+
+    val ods_end_ds = getLastPartitionsOrElse(org_tab, "0")
+    val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
+
+    if (inc) {
+      val last_ds = getLastPartitionsOrElse(data_extraction_tab, "0")
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $inc_org_tab
+           |                        WHERE   ds > $last_ds
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    } else {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $org_tab
+           |                        WHERE   ds > 0
+           |                        UNION ALL
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $inc_org_tab
+           |                        WHERE   ds > 0
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    }
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    $tmp_tab lateral view OUTER explode(split(connect_case_no,'\\n')) t as single_connect_case_no
+         |""".stripMargin)
+      .createTempView(s"explode_$tmp_tab")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tableName')
+         |SELECT  1 as main_case_no
+         |        ,case_no_trim(case_no) as case_no
+         |        ${",parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant"}
+         |        ,court_name
+         |        ,rowkey
+         |FROM    explode_$tmp_tab
+         |UNION ALL
+         |SELECT   0 as main_case_no
+         |        ,case_no_trim(single_connect_case_no) as case_no
+         |        ${",parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant"}
+         |        ,null as court_name
+         |        ,rowkey
+         |FROM    explode_$tmp_tab
+         |where   case_no_trim(single_connect_case_no) is not null
+         |and     company_lawsuit_case_no_equ(case_no,single_connect_case_no)
+         |""".stripMargin)
+  }
+
+
+  private def detail_etl(ds: String, tn: String, view: String, inc: Boolean = false, is_contain_connect_case_no: Boolean = true): Unit = {
+    val tmp_tab = s"all_etl_${tn}_tmp"
+
+    val org_ads_tab = s"$project.ads_$tn" + "_v9"
+    val org_inc_ads_tab = s"$project.inc_ads_$tn" + "_v9"
+
+    if (inc) {
+      val last_ds = getLastPartitionsOrElse(data_extraction_tab, "0")
+      sql(
+        s"""
+           |SELECT $view
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                    SELECT  *
+           |                    FROM    $org_inc_ads_tab
+           |                    WHERE   ds > '$last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    } else {
+      val ads_last_ds = getLastPartitionsOrElse(org_ads_tab, "0")
+      val intersect_cols = getColumns(org_ads_tab).intersect(getColumns(org_inc_ads_tab))
+      sql(
+        s"""
+           |SELECT $view
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_ads_tab
+           |                        WHERE   ds = '$ads_last_ds'
+           |                        UNION ALL
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_inc_ads_tab
+           |                        WHERE   ds > '$ads_last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    }
+    if (is_contain_connect_case_no) {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tn')
+           |SELECT   1 as main_case_no
+           |        ,case_no_trim(case_no) as case_no
+           |        ,litigant
+           |        ,court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |UNION ALL
+           |SELECT   0 as main_case_no
+           |        ,case_no_trim(connect_case_no) as case_no
+           |        ,litigant
+           |        ,connect_court_name as court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |""".stripMargin)
+    } else {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tn')
+           |SELECT   1 as main_case_no
+           |        ,case_no_trim(case_no) as case_no
+           |        ,litigant
+           |        ,court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |""".stripMargin)
+
+    }
+
+
+  }
+
+
+  private def relationByGroup(ds: String): Unit = {
+    val org_tab = data_extraction_tab
+
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
+
+    sql(
+      s"""
+         | SELECT  main_case_no
+         |         ,case_no_trim(case_no) as case_no
+         |         ,litigant
+         |         ,get_standard_court_name(court_name) as court_name
+         |         ,rowkey
+         |         ,ds
+         |         ,tn
+         | FROM    $org_tab
+         | WHERE   ds = '$dwd_last_ds'
+         | AND     case_no_trim(case_no) IS NOT NULL
+         |""".stripMargin)
+      .repartition(500)
+      //      .cache()
+      .createTempView("dwd_judicial_case_tmp")
+
+    //需要区分group by ,只用一个
+    agg_test_2
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $out_tab PARTITION(ds='$ds')
+         |SELECT  rowkey_1
+         |        ,rowkey_2
+         |        ,case_no_1
+         |        ,case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,tn_1
+         |        ,tn_2
+         |        ,connect_type
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY connect_type) AS num
+         |            FROM    (
+         |                        SELECT  rowkey_1
+         |                                ,rowkey_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,str_sort(concat_ws('',rowkey_1,tn_1),concat_ws('',rowkey_2,tn_2)) AS xjk_sorted
+         |                        FROM    connect_tmp_1
+         |                        UNION ALL
+         |                        SELECT  rowkey_1
+         |                                ,rowkey_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,xjk_sorted
+         |                        FROM    connect_tmp_2
+         |                        UNION ALL
+         |                        SELECT  rowkey as rowkey_1
+         |                                ,null as rowkey_2
+         |                                ,case_no_trim(case_no) as case_no_1
+         |                                ,null as case_no_2
+         |                                ,tn as tn_1
+         |                                ,null as tn_2
+         |                                ,-1 as connect_type
+         |                                ,concat_ws('',rowkey,tn) as xjk_sorted
+         |                        FROM    $org_tab
+         |                        WHERE   ds = '$dwd_last_ds'
+         |                        AND     case_no_trim(case_no) IS NULL
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
+
+  }
+
+
+  private def agg_test_2 = {
+    sql(
+      s"""
+         |SELECT  case_no,party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,court_name
+         |                    ,tn
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |        )
+         |WHERE   length(party) > 4
+         |GROUP BY case_no,court_name,party
+         |UNION ALL
+         |SELECT  case_no,null as party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,court_name
+         |                    ,tn
+         |---                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |---             LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |            WHERE   tn <> '$lawsuit_tab'
+         |        )
+         |GROUP BY case_no,court_name
+         |""".stripMargin)
+
+      .rdd
+      .flatMap(r => {
+        val case_no = r.getAs[String]("case_no")
+        val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
+        val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
+        if (connect_case_id.length < 2) {
+          val e_1 = connect_case_id.head.split(separation)
+          list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
+        }
+        for (i <- 0 to connect_case_id.length - 2) {
+          val e_1 = connect_case_id(i).split(separation)
+          val e_2 = connect_case_id(i + 1).split(separation)
+          list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
+        }
+        list
+      })
+      .toDF("rowkey_1", "rowkey_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
+      .createTempView("connect_tmp_1")
+
+
+    sql(
+      s"""
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 1
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     not (t1.tn=t2.tn and t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |UNION ALL
+         |
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 0
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     not (t1.tn=t2.tn and t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |""".stripMargin)
+      .createTempView("connect_tmp_2")
+  }
+
+  def add_other_info(): Unit = {
+    val org_tab = data_extraction_tab
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
+    sql(
+      s"""
+         |
+         |INSERT OVERWRITE TABLE $out_tab
+         |  SELECT t1.rowkey_1
+         |  ,      t1.rowkey_2
+         |  ,      t1.case_no_1
+         |  ,      t1.case_no_2
+         |  ,      t2.litigant AS litigant_1
+         |  ,      t1.litigant_2
+         |  ,      t2.court_name AS court_name_1
+         |  ,      t1.court_name_2
+         |  ,      t1.tn_1
+         |  ,      t1.tn_2
+         |  ,      t1.connect_type
+         |  FROM (
+         |    SELECT *
+         |    FROM $out_tab
+         |    WHERE rowkey_1 IS NOT NULL
+         |  ) AS t1
+         |  LEFT JOIN (
+         |    SELECT  *
+         |    FROM    (
+         |      SELECT  *
+         |      ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY rowkey) AS num
+         |      FROM    (
+         |        SELECT  *
+         |        FROM    $data_extraction_tab
+         |        WHERE   ds = '$dwd_last_ds'
+         |      )
+         |    )
+         |    WHERE   num = 1
+         |  ) AS t2
+         |  ON t1.rowkey_1 = t2.rowkey AND t1.tn_1 = t2.tn
+         |  UNION ALL
+         |  SELECT rowkey_1
+         |  ,      rowkey_2
+         |  ,      case_no_1
+         |  ,      case_no_2
+         |  ,      litigant_1
+         |  ,      litigant_2
+         |  ,      court_name_1
+         |  ,      court_name_2
+         |  ,      tn_1
+         |  ,      tn_2
+         |  ,      connect_type
+         |  FROM $out_tab
+         |  WHERE rowkey_1 IS NULL
+         |""".stripMargin)
+
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $out_tab
+         |  SELECT t1.rowkey_1
+         |  ,      t1.rowkey_2
+         |  ,      t1.case_no_1
+         |  ,      t1.case_no_2
+         |  ,      t1.litigant_1
+         |  ,      t2.litigant AS litigant_2
+         |  ,      t1.court_name_1
+         |  ,      t2.court_name AS court_name_2
+         |  ,      t1.tn_1
+         |  ,      t1.tn_2
+         |  ,      t1.connect_type
+         |  FROM (
+         |    SELECT *
+         |    FROM $out_tab
+         |    WHERE rowkey_2 IS NOT NULL
+         |  ) AS t1
+         |  LEFT JOIN (
+         |    SELECT  *
+         |    FROM    (
+         |      SELECT  *
+         |      ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY rowkey) AS num
+         |      FROM    (
+         |        SELECT  *
+         |        FROM    $data_extraction_tab
+         |        WHERE   ds = '$dwd_last_ds'
+         |      )
+         |    )
+         |    WHERE   num = 1
+         |  ) AS t2
+         |  ON t1.rowkey_2 = t2.rowkey AND t1.tn_2 = t2.tn
+         |  UNION ALL
+         |  SELECT rowkey_1
+         |  ,      rowkey_2
+         |  ,      case_no_1
+         |  ,      case_no_2
+         |  ,      litigant_1
+         |  ,      litigant_2
+         |  ,      court_name_1
+         |  ,      court_name_2
+         |  ,      tn_1
+         |  ,      tn_2
+         |  ,      connect_type
+         |  FROM $out_tab
+         |  WHERE rowkey_2 IS NULL
+         |""".stripMargin)
+  }
+
+  def inc_func(ds: String): Unit = {
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $out_tab  PARTITION(ds='$ds')
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |FROM    (
+         |            SELECT  *
+         |            FROM    $data_extraction_tab
+         |            WHERE   ds = '$ds'
+         |            AND case_no is not null
+         |        ) AS t1
+         |LEFT JOIN (
+         |              select * from (
+         |                       SELECT  *
+         |                               ,ROW_NUMBER()OVER (PARTITION BY rowkey, tn, main_case_no ORDER BY ds DESC) AS num
+         |                       FROM    (
+         |                                   SELECT  *
+         |                                   FROM    $data_extraction_tab
+         |                                   WHERE   ds > 0
+         |                                   AND case_no is not null
+         |                               )
+         |              ) where num = 1
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     NOT (t1.tn = t2.tn AND t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |UNION all
+         |SELECT  rowkey AS rowkey_1
+         |        ,null AS rowkey_2
+         |        ,case_no AS case_no_1
+         |        ,null AS case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,tn AS tn_1
+         |        ,NULL  AS tn_2
+         |        ,3 AS connect_type
+         |FROM    $data_extraction_tab
+         |WHERE   ds = '$ds'
+         |AND case_no is null
+         |""".stripMargin)
+
+
+  }
+
+  private def case_equ(litigant_1: String, litigant_2: String, case_no_1: String, case_no_2: String, court_name_1: String, court_name_2: String, tn1: String, tn2: String): Boolean = {
+    try {
+      val current_case_party_list_org: Seq[String] = if (litigant_1 == null) Seq.empty else litigant_1.split("\001")
+      val connect_case_party_list_org: Seq[String] = if (litigant_2 == null) Seq.empty else litigant_2.split("\001")
+
+      val current_case_no = case_no_1
+      val connect_case_no = case_no_2
+      val current_court_name = court_name_1
+      val connect_court_name = court_name_2
+
+      case_connect_utils_v2.isConnect(current_case_party_list_org, connect_case_party_list_org, current_case_no, connect_case_no, current_court_name, connect_court_name, tn1, tn2)
+    } catch {
+      case ex: Exception => {
+        logError(ex.getMessage)
+        println("error")
+        println(litigant_1)
+        println(litigant_2)
+        println(case_no_1)
+        println(case_no_2)
+        throw new RuntimeException(ex)
+      }
+        false
+    }
+  }
+
+}
+
+object JudicialCaseRelationRowkeyRelation_v3 {
+  def main(args: Array[String]): Unit = {
+
+    //    val Array(ds) = args
+    var ds = BaseUtil.getYesterday()
+
+    //ds = "20210606"
+    //    ds = "20210610"
+    //ds = "20210524"
+    println(
+      s"""
+         |ds: $ds
+         |""".stripMargin)
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val jcr = JudicialCaseRelationRowkeyRelation_v3(spark, project = "winhc_ng")
+    jcr.etl(ds)
+    //    jcr.etl_lawsuit("20210524", false)
+    //    jcr.relationByGroup(ds)
+    //        jcr.inc_func(ds)
+    spark.stop()
+  }
+}
+
+
+

+ 12 - 3
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -9,11 +9,10 @@ import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Json
-
 import java.security.MessageDigest
 import java.text.SimpleDateFormat
 import java.util
-import java.util.regex.Pattern
+import java.util.regex.{Matcher, Pattern}
 import java.util.{Calendar, Date, Locale}
 import scala.collection.{immutable, mutable}
 import scala.collection.mutable.ListBuffer
@@ -541,6 +540,16 @@ object BaseUtil {
     } else null
   }
 
+  private val case_no_pat = "^[((](\\d{4})[))][黑吉辽冀甘青陕豫鲁晋皖鄂湘苏川黔云浙赣粤闽台琼新蒙宁桂藏京沪津渝港澳内军兵最].*[\\u4e00-\\u9fa5]{1,3}.*\\d{0,7}.*号.*$".r
+
+  def verify_case_no(str: String): Boolean = {
+    if (StringUtils.isBlank(str)) return false
+    if (case_no_pat.matches(str)) {
+      return true
+    }
+    false
+  }
+
   def sortString(s: String, split: String = "\\001"): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
@@ -924,7 +933,7 @@ object BaseUtil {
     //    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
-    println(case_no_trim_v2("(2020)豫1728执541号之一"))
+    println(case_no_trim_v2("(2021)京0108民初44491号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
     println(caseStage("(2019)鄂民申7号"))

+ 133 - 0
src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils_v2.scala

@@ -0,0 +1,133 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/31 10:42
+ * @Description:
+ */
+object case_connect_utils_v2 {
+  private val lawsuit_tab = Seq("wenshu_detail_v2")
+
+  private def equ(current_case_no: String = "", connect_case_no: String = "", current_court_name: String = "", connect_court_name: String = ""): Boolean = {
+    if (StringUtils.isEmpty(current_case_no) || StringUtils.isEmpty(connect_case_no) || StringUtils.isEmpty(current_court_name) || StringUtils.isEmpty(connect_court_name))
+      false
+    else
+      current_case_no.equalsIgnoreCase(connect_case_no) && current_court_name.equalsIgnoreCase(connect_court_name)
+  }
+
+
+  def isConnect(current_case_party_list_org: Seq[String], connect_case_party_list_org: Seq[String], current_case_no: String = "", connect_case_no: String = "", current_court_name: String = "", connect_court_name: String = "", current_tn: String, connect_tn: String): Boolean = {
+    val current_case_party_list = current_case_party_list_org.filter(StringUtils.isNotBlank)
+    val connect_case_party_list = connect_case_party_list_org.filter(StringUtils.isNotBlank)
+
+    if (!lawsuit_tab.contains(current_tn) && !lawsuit_tab.contains(connect_tn)) {
+      val bool = equ(current_case_no, connect_case_no, current_court_name, connect_court_name)
+      if (bool)
+        return bool
+    }
+
+
+    if (current_case_party_list.isEmpty || connect_case_party_list.isEmpty) {
+      return false
+    }
+
+    is_vague_word(current_case_party_list = current_case_party_list, connect_case_party_list = connect_case_party_list) match {
+      case 1 => {
+        vague_match(position = 0, current_case_party_list
+          , connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name
+        )
+      }
+      case 2 => {
+        vague_match(position = 1, current_case_party_list
+          , connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name
+        )
+      }
+      case _ => {
+        precise_match(current_case_party_list, connect_case_party_list, BaseUtil.verify_case_no(connect_case_no))
+      }
+    }
+  }
+
+  private def vague_match(position: Int = 1,
+                          current_case_party_list: Seq[String]
+                          , connect_case_party_list: Seq[String]
+                          , current_case_no: String, connect_case_no: String
+                          , current_court_name: String, connect_court_name: String
+                         ): Boolean = {
+    var current_party: Seq[String] = null
+    var connect_party: Seq[String] = null
+    position match {
+      case 1 => {
+        current_party = connect_case_party_list
+        connect_party = current_case_party_list
+      }
+      case _ => {
+        current_party = current_case_party_list
+        connect_party = connect_case_party_list
+      }
+    }
+
+    for (char <- current_party.mkString("").toCharArray.map(_.toString).filter(vague_word.contains(_))) {
+      for (userName <- current_party) {
+        for (splitUser <- userName.replace(char, "\001").split("\001")) {
+          val all_str = connect_party.mkString("")
+          if (StringUtils.isNotEmpty(splitUser) && (!all_str.contains(splitUser.substring(0, 1)))) {
+            return false
+          }
+        }
+      }
+    }
+    case_no_match(current_case_no = current_case_no, connect_case_no = connect_case_no) || court_name_match(connect_court_name = connect_court_name, current_court_name = current_court_name)
+  }
+
+  private def case_no_match(current_case_no: String, connect_case_no: String): Boolean = current_case_no.equals(connect_case_no)
+
+  private def court_name_match(current_court_name: String, connect_court_name: String): Boolean = current_court_name.equals(connect_court_name)
+
+  private def precise_match(current_case_party_list: Seq[String]
+                            , connect_case_party_list: Seq[String]
+                            , is_case_no: Boolean = false
+                           ): Boolean = {
+    var num = 0
+    for (userName <- connect_case_party_list) {
+      if (current_case_party_list.contains(userName))
+        num = num + 1
+    }
+    if (num == 0)
+      return false
+    if (num > 0 && is_case_no) {
+      return true
+    }
+    num >= connect_case_party_list.size / 2
+  }
+
+  val vague_word: Seq[String] = Seq("某", "*", "*", "x", "ⅹ", "x", "X", "×")
+
+  private def is_vague_word(current_case_party_list: Seq[String], connect_case_party_list: Seq[String]): Int = {
+    val current_vague = current_case_party_list.exists(r => vague_word.exists(r.equals(_)))
+    if (current_vague) {
+      return 1
+    }
+    val connect_vague = connect_case_party_list.exists(r => vague_word.exists(r.contains(_)))
+    if (connect_vague) {
+      return 2
+    }
+    0
+  }
+
+  def main(args: Array[String]): Unit = {
+    val current_case_party_list: Seq[String] = Seq("徐靖良", "黄剑麟")
+    val connect_case_party_list: Seq[String] = Seq("张徐靖良", "黄剑麟", "云南长天商务有限公司", "云南滇沪房地产开发有限公司")
+
+    val current_case_no = "(2019)云民辖终279号"
+    val connect_case_no = "(2019)云民辖终279号"
+    val current_court_name = "云南省高级人民法院"
+    val connect_court_name = "云南省昆明市中级人民法院"
+    val tn1 = "wenshu_detail_v2"
+    val tn2 = "company_send_announcement"
+    //
+    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name, tn1, tn2))
+  }
+}