瀏覽代碼

feat: 司法案件v2

许家凯 3 年之前
父節點
當前提交
e709843d3a

+ 970 - 0
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationRowkeyRelation_v2.scala

@@ -0,0 +1,970 @@
+package com.winhc.bigdata.spark.ng.judicial
+
+import com.winhc.bigdata.spark.implicits.BaseHelper._
+import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/28 16:52
+ * @Description:
+ */
+case class JudicialCaseRelationRowkeyRelation_v2(s: SparkSession,
+                                                 project: String //表所在工程名
+                                                ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  private val pat = ".*\\d+.*".r
+
+  private val separation = "@@"
+
+  import spark.implicits._
+
+  private val data_extraction_tab = "winhc_ng.dwd_judicial_case_v2"
+  private val out_tab = "winhc_ng.bds_judicial_case_relation_v2"
+
+  private val lawsuit_tab = "wenshu_detail_v2"
+
+  init()
+
+  private def init(): Unit = {
+    case_no_trim_udf()
+    is_id_card_udf()
+    json_parse_udf()
+    spark.udf.register("case_equ", case_equ _)
+    spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
+    spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
+
+
+    import com.winhc.bigdata.spark.implicits.MysqlHelper._
+    val map = spark.read("all_court_info_ali").collect().map(r => {
+      (r.getAs[String]("court_name"), r.getAs[String]("standard_court_name"))
+    }).flatMap(r => {
+      r._1.split(',').map(s => (s, r._2)).toMap
+    }).toMap
+    val broad_map = spark.sparkContext.broadcast(map)
+
+    def get_standard_court_name(name: String): String = {
+      if (StringUtils.isEmpty(name)) null else broad_map.value.getOrElse(name, name)
+    }
+
+
+    spark.udf.register("get_standard_court_name", get_standard_court_name _)
+
+
+    def parse_litigant(name: String): String =
+      if (StringUtils.isEmpty(name))
+        null
+      else
+        name.split("[\n,,;;]").filter(StringUtils.isNotEmpty).distinct.toSeq.mkStringOrNull("\001")
+
+
+    spark.udf.register("parse_litigant", parse_litigant _)
+
+
+    def company_lawsuit_case_no_equ(case_no: String, connect_case_no: String): Boolean = {
+      val case_no_std = if (case_no == null) "" else case_no
+      val connect_case_no_std = if (connect_case_no == null) "" else connect_case_no
+
+      if (case_no_std.contains("执恢") && connect_case_no_std.contains("执")) return true
+
+      if (case_no_std.contains("执") && connect_case_no_std.contains("执")) return false
+      true
+    }
+
+    spark.udf.register("company_lawsuit_case_no_equ", company_lawsuit_case_no_equ _)
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $data_extraction_tab (
+         |  `main_case_no` BIGINT COMMENT '是否是主案号,1 主案号',
+         |  `case_no` STRING COMMENT '案号',
+         |  `litigant` STRING COMMENT '当事人, 001 分割',
+         |  `court_name` STRING COMMENT '法院名称',
+         |  `rowkey` STRING COMMENT '原表rowkey')
+         | COMMENT '案号与表id关系表'
+         |PARTITIONED BY (
+         |  `ds` STRING COMMENT '分区',
+         |  `tn` STRING COMMENT '表名')
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $out_tab (
+         |  `rowkey_1` STRING COMMENT 'FIELD',
+         |  `rowkey_2` STRING COMMENT 'FIELD',
+         |  `case_no_1` STRING COMMENT 'FIELD',
+         |  `case_no_2` STRING COMMENT 'FIELD',
+         |  `litigant_1` STRING COMMENT 'FIELD',
+         |  `litigant_2` STRING COMMENT 'FIELD',
+         |  `court_name_1` STRING COMMENT 'FIELD',
+         |  `court_name_2` STRING COMMENT 'FIELD',
+         |  `tn_1` STRING COMMENT 'FIELD',
+         |  `tn_2` STRING COMMENT 'FIELD',
+         |  `connect_type` BIGINT COMMENT 'FIELD')
+         | COMMENT 'TABLE COMMENT'
+         | PARTITIONED BY (
+         |  `ds` STRING COMMENT '分区')
+         |""".stripMargin)
+
+  }
+
+
+  private def step_01_DataExtraction(ds: String, inc: Boolean = false): Unit = {
+    if (!inc) {
+      dropAllPartitions(data_extraction_tab)
+    }
+
+    val company_dishonest_info_view =
+      s"""
+         | rowkey
+         | ,court as court_name
+         | ,case_no
+         | ,gist_unit as connect_court_name
+         | ,gist_dd as connect_case_no
+         | ,parse_litigant(name) as litigant
+         | ,ds
+         |""".stripMargin
+    val company_zxr_view =
+      s"""
+         | rowkey
+         | ,court as court_name
+         | ,case_no
+         | ,court as connect_court_name
+         | ,gist_id as connect_case_no
+         | ,parse_litigant(name) as litigant
+         | ,ds
+         |""".stripMargin
+
+    val company_zxr_restrict_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',parse_litigant(person_name),parse_litigant(company_name))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_zxr_final_case_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(name) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_court_announcement_view =
+      """
+        | rowkey
+        | ,court_name as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name')))) as litigant
+        | ,ds
+        |""".stripMargin
+    val company_court_open_announcement_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_send_announcement_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+    val company_court_register_view =
+      """
+        | rowkey
+        | ,court as court_name
+        | ,case_no
+        | ,null as connect_court_name
+        | ,null as connect_case_no
+        | ,parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant
+        | ,ds
+        |""".stripMargin
+
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("dishonest etl...", () => {
+        detail_etl(ds, "company_dishonest_info", company_dishonest_info_view, inc)
+        true
+      }),
+      ("company_zxr etl...", () => {
+        detail_etl(ds, "company_zxr", company_zxr_view, inc)
+        true
+      })
+      ,
+      ("company_zxr_restrict etl...", () => {
+        detail_etl(ds, "company_zxr_restrict", company_zxr_restrict_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_zxr_final_case etl...", () => {
+        detail_etl(ds, "company_zxr_final_case", company_zxr_final_case_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_announcement etl...", () => {
+        detail_etl(ds, "company_court_announcement", company_court_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_open_announcement etl...", () => {
+        detail_etl(ds, "company_court_open_announcement", company_court_open_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_send_announcement etl...", () => {
+        detail_etl(ds, "company_send_announcement", company_send_announcement_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_court_register etl...", () => {
+        detail_etl(ds, "company_court_register", company_court_register_view, inc, is_contain_connect_case_no = false)
+        true
+      })
+      ,
+      ("company_lawsuit etl...", () => {
+        etl_lawsuit(ds, inc)
+        true
+      })
+    ))
+  }
+
+
+  def etl(ds: String): Unit = {
+    val out_tab_last_ds = getLastPartitionsOrElse(out_tab, null)
+
+    val inc = if (out_tab_last_ds == null) false else true
+
+    step_01_DataExtraction(ds, inc)
+    if (inc) {
+      inc_func(ds)
+    } else {
+      relationByGroup(ds)
+    }
+  }
+
+
+  private def etl_lawsuit(ds: String, inc: Boolean = false): Unit = {
+    val tableName = lawsuit_tab
+
+    val org_tab = s"winhc_ng.ads_$tableName"
+    val inc_org_tab = s"winhc_ng.inc_ads_$tableName"
+
+    val table_id = "rowkey"
+    val other_cols = Seq("plaintiff_info", "court_name", "case_no", "litigant_info", "defendant_info") ++ Seq(table_id, "ds", "connect_case_no")
+
+    val ods_end_ds = getLastPartitionsOrElse(org_tab, "0")
+    val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
+
+    if (inc) {
+      val last_ds = getLastPartitionsOrElse(data_extraction_tab, "0")
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $inc_org_tab
+           |                        WHERE   ds > $last_ds
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    } else {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $org_tab
+           |                        WHERE   ds > 0
+           |                        UNION ALL
+           |                        SELECT  ${other_cols.mkString(",")}
+           |                        FROM    $inc_org_tab
+           |                        WHERE   ds > 0
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    }
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    $tmp_tab lateral view OUTER explode(split(connect_case_no,'\\n')) t as single_connect_case_no
+         |""".stripMargin)
+      .createTempView(s"explode_$tmp_tab")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tableName')
+         |SELECT  1 as main_case_no
+         |        ,case_no_trim(case_no) as case_no
+         |        ${",parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant"}
+         |        ,court_name
+         |        ,rowkey
+         |FROM    explode_$tmp_tab
+         |UNION ALL
+         |SELECT   0 as main_case_no
+         |        ,case_no_trim(single_connect_case_no) as case_no
+         |        ${",parse_litigant(concat_ws(',',concat_ws(',',json_2_array(litigant_info,'$.name'),json_2_array(plaintiff_info,'$.name'),concat_ws(',',json_2_array(defendant_info,'$.name'))))) as litigant"}
+         |        ,null as court_name
+         |        ,rowkey
+         |FROM    explode_$tmp_tab
+         |where   case_no_trim(single_connect_case_no) is not null
+         |and     company_lawsuit_case_no_equ(case_no,single_connect_case_no)
+         |""".stripMargin)
+  }
+
+
+  private def detail_etl(ds: String, tn: String, view: String, inc: Boolean = false, is_contain_connect_case_no: Boolean = true): Unit = {
+    val tmp_tab = s"all_etl_${tn}_tmp"
+
+    val org_ads_tab = s"$project.ads_$tn"
+    val org_inc_ads_tab = s"$project.inc_ads_$tn"
+
+    if (inc) {
+      val last_ds = getLastPartitionsOrElse(data_extraction_tab, "0")
+      sql(
+        s"""
+           |SELECT $view
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                    SELECT  *
+           |                    FROM    $org_inc_ads_tab
+           |                    WHERE   ds > '$last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    } else {
+      val ads_last_ds = getLastPartitionsOrElse(org_ads_tab, "0")
+      val intersect_cols = getColumns(org_ads_tab).intersect(getColumns(org_inc_ads_tab))
+      sql(
+        s"""
+           |SELECT $view
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_ads_tab
+           |                        WHERE   ds = '$ads_last_ds'
+           |                        UNION ALL
+           |                        SELECT  ${intersect_cols.mkString(",")}
+           |                        FROM    $org_inc_ads_tab
+           |                        WHERE   ds > '$ads_last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+        .createTempView(tmp_tab)
+    }
+    if (is_contain_connect_case_no) {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tn')
+           |SELECT   1 as main_case_no
+           |        ,case_no_trim(case_no) as case_no
+           |        ,litigant
+           |        ,court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |UNION ALL
+           |SELECT   0 as main_case_no
+           |        ,case_no_trim(connect_case_no) as case_no
+           |        ,litigant
+           |        ,connect_court_name as court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |""".stripMargin)
+    } else {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $data_extraction_tab  PARTITION(ds='$ds',tn='$tn')
+           |SELECT   1 as main_case_no
+           |        ,case_no_trim(case_no) as case_no
+           |        ,litigant
+           |        ,court_name
+           |        ,rowkey
+           |FROM    $tmp_tab
+           |""".stripMargin)
+
+    }
+
+
+  }
+
+
+  private def relationByGroup(ds: String): Unit = {
+    val org_tab = data_extraction_tab
+
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
+
+    sql(
+      s"""
+         | SELECT  main_case_no
+         |         ,case_no_trim(case_no) as case_no
+         |         ,litigant
+         |         ,get_standard_court_name(court_name) as court_name
+         |         ,rowkey
+         |         ,ds
+         |         ,tn
+         | FROM    $org_tab
+         | WHERE   ds = '$dwd_last_ds'
+         | AND     case_no_trim(case_no) IS NOT NULL
+         |""".stripMargin)
+      .repartition(500)
+      //      .cache()
+      .createTempView("dwd_judicial_case_tmp")
+
+    //需要区分group by ,只用一个
+    agg_test_2
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $out_tab PARTITION(ds='$ds')
+         |SELECT  rowkey_1
+         |        ,rowkey_2
+         |        ,case_no_1
+         |        ,case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,tn_1
+         |        ,tn_2
+         |        ,connect_type
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY connect_type) AS num
+         |            FROM    (
+         |                        SELECT  rowkey_1
+         |                                ,rowkey_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,str_sort(concat_ws('',rowkey_1,tn_1),concat_ws('',rowkey_2,tn_2)) AS xjk_sorted
+         |                        FROM    connect_tmp_1
+         |                        UNION ALL
+         |                        SELECT  rowkey_1
+         |                                ,rowkey_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,xjk_sorted
+         |                        FROM    connect_tmp_2
+         |                        UNION ALL
+         |                        SELECT  rowkey as rowkey_1
+         |                                ,null as rowkey_2
+         |                                ,case_no_trim(case_no) as case_no_1
+         |                                ,null as case_no_2
+         |                                ,tn as tn_1
+         |                                ,null as tn_2
+         |                                ,-1 as connect_type
+         |                                ,concat_ws('',rowkey,tn) as xjk_sorted
+         |                        FROM    $org_tab
+         |                        WHERE   ds = '$dwd_last_ds'
+         |                        AND     case_no_trim(case_no) IS NULL
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
+
+  }
+
+
+  private def agg_test_2 = {
+    sql(
+      s"""
+         |SELECT  case_no,party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,court_name
+         |                    ,tn
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |        )
+         |WHERE   length(party) > 4
+         |GROUP BY case_no,court_name,party
+         |UNION ALL
+         |SELECT  case_no,null as party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,court_name
+         |                    ,tn
+         |---                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |---             LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |            WHERE   tn <> '$lawsuit_tab'
+         |        )
+         |GROUP BY case_no,court_name
+         |""".stripMargin)
+
+      .rdd
+      .flatMap(r => {
+        val case_no = r.getAs[String]("case_no")
+        val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
+        val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
+        if (connect_case_id.length < 2) {
+          val e_1 = connect_case_id.head.split(separation)
+          list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
+        }
+        for (i <- 0 to connect_case_id.length - 2) {
+          val e_1 = connect_case_id(i).split(separation)
+          val e_2 = connect_case_id(i + 1).split(separation)
+          list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
+        }
+        list
+      })
+      .toDF("rowkey_1", "rowkey_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
+      .createTempView("connect_tmp_1")
+
+
+    sql(
+      s"""
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 1
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     not (t1.tn=t2.tn and t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |UNION ALL
+         |
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 0
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     not (t1.tn=t2.tn and t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |""".stripMargin)
+      .createTempView("connect_tmp_2")
+  }
+
+  private def agg_test_1 = {
+    sql(
+      s"""
+         |SELECT  case_no,party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |            WHERE   main_case_no = 1
+         |        ) AS t1
+         |WHERE   length(t1.party) > 4
+         |GROUP BY case_no,party
+         |UNION ALL
+         |SELECT  case_no,null as party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',rowkey,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,court_name
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |---             LATERAL VIEW OUTER explode(split(litigant ,'\\001')) t AS party
+         |            WHERE   tn <> '$lawsuit_tab'
+         |        ) AS t1
+         |GROUP BY case_no,court_name
+         |""".stripMargin).rdd
+      .flatMap(r => {
+        val case_no = r.getAs[String]("case_no")
+        val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
+        val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
+        if (connect_case_id.length < 2) {
+          val e_1 = connect_case_id.head.split(separation)
+          list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
+        }
+        for (i <- 0 to connect_case_id.length - 2) {
+          val e_1 = connect_case_id(i).split(separation)
+          val e_2 = connect_case_id(i + 1).split(separation)
+          list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
+        }
+        list
+      })
+      .toDF("rowkey_1", "rowkey_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
+      .createTempView("connect_tmp_1")
+
+    sql(
+      s"""
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 1
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     t1.rowkey <> t2.rowkey
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |UNION ALL
+         |
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |        ,str_sort(
+         |            concat_ws('',t1.rowkey,t1.tn)
+         |            ,concat_ws('',t2.rowkey,t2.tn)
+         |        ) AS xjk_sorted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    dwd_judicial_case_tmp
+         |            WHERE   main_case_no = 0
+         |            AND     tn = '$lawsuit_tab'
+         |        ) AS t1
+         |FULL JOIN (
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 1
+         |              UNION ALL
+         |              SELECT  *
+         |              FROM    dwd_judicial_case_tmp
+         |              WHERE   main_case_no = 0
+         |              AND     tn <> '$lawsuit_tab'
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     t1.rowkey <> t2.rowkey
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |
+         |""".stripMargin)
+      .createTempView("connect_tmp_2")
+  }
+
+  def add_other_info(): Unit = {
+    val org_tab = data_extraction_tab
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
+    sql(
+      s"""
+         |
+         |INSERT OVERWRITE TABLE $out_tab
+         |  SELECT t1.rowkey_1
+         |  ,      t1.rowkey_2
+         |  ,      t1.case_no_1
+         |  ,      t1.case_no_2
+         |  ,      t2.litigant AS litigant_1
+         |  ,      t1.litigant_2
+         |  ,      t2.court_name AS court_name_1
+         |  ,      t1.court_name_2
+         |  ,      t1.tn_1
+         |  ,      t1.tn_2
+         |  ,      t1.connect_type
+         |  FROM (
+         |    SELECT *
+         |    FROM $out_tab
+         |    WHERE rowkey_1 IS NOT NULL
+         |  ) AS t1
+         |  LEFT JOIN (
+         |    SELECT  *
+         |    FROM    (
+         |      SELECT  *
+         |      ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY rowkey) AS num
+         |      FROM    (
+         |        SELECT  *
+         |        FROM    $data_extraction_tab
+         |        WHERE   ds = '$dwd_last_ds'
+         |      )
+         |    )
+         |    WHERE   num = 1
+         |  ) AS t2
+         |  ON t1.rowkey_1 = t2.rowkey AND t1.tn_1 = t2.tn
+         |  UNION ALL
+         |  SELECT rowkey_1
+         |  ,      rowkey_2
+         |  ,      case_no_1
+         |  ,      case_no_2
+         |  ,      litigant_1
+         |  ,      litigant_2
+         |  ,      court_name_1
+         |  ,      court_name_2
+         |  ,      tn_1
+         |  ,      tn_2
+         |  ,      connect_type
+         |  FROM $out_tab
+         |  WHERE rowkey_1 IS NULL
+         |""".stripMargin)
+
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $out_tab
+         |  SELECT t1.rowkey_1
+         |  ,      t1.rowkey_2
+         |  ,      t1.case_no_1
+         |  ,      t1.case_no_2
+         |  ,      t1.litigant_1
+         |  ,      t2.litigant AS litigant_2
+         |  ,      t1.court_name_1
+         |  ,      t2.court_name AS court_name_2
+         |  ,      t1.tn_1
+         |  ,      t1.tn_2
+         |  ,      t1.connect_type
+         |  FROM (
+         |    SELECT *
+         |    FROM $out_tab
+         |    WHERE rowkey_2 IS NOT NULL
+         |  ) AS t1
+         |  LEFT JOIN (
+         |    SELECT  *
+         |    FROM    (
+         |      SELECT  *
+         |      ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY rowkey) AS num
+         |      FROM    (
+         |        SELECT  *
+         |        FROM    $data_extraction_tab
+         |        WHERE   ds = '$dwd_last_ds'
+         |      )
+         |    )
+         |    WHERE   num = 1
+         |  ) AS t2
+         |  ON t1.rowkey_2 = t2.rowkey AND t1.tn_2 = t2.tn
+         |  UNION ALL
+         |  SELECT rowkey_1
+         |  ,      rowkey_2
+         |  ,      case_no_1
+         |  ,      case_no_2
+         |  ,      litigant_1
+         |  ,      litigant_2
+         |  ,      court_name_1
+         |  ,      court_name_2
+         |  ,      tn_1
+         |  ,      tn_2
+         |  ,      connect_type
+         |  FROM $out_tab
+         |  WHERE rowkey_2 IS NULL
+         |""".stripMargin)
+  }
+
+  def inc_func(ds: String): Unit = {
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $out_tab  PARTITION(ds='$ds')
+         |SELECT  t1.rowkey AS rowkey_1
+         |        ,t2.rowkey AS rowkey_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 AS connect_type
+         |FROM    (
+         |            SELECT  *
+         |            FROM    $data_extraction_tab
+         |            WHERE   ds = '$ds'
+         |            AND case_no is not null
+         |        ) AS t1
+         |LEFT JOIN (
+         |              select * from (
+         |                       SELECT  *
+         |                               ,ROW_NUMBER()OVER (PARTITION BY rowkey, tn, main_case_no ORDER BY ds DESC) AS num
+         |                       FROM    (
+         |                                   SELECT  *
+         |                                   FROM    $data_extraction_tab
+         |                                   WHERE   ds > 0
+         |                                   AND case_no is not null
+         |                               )
+         |              ) where num = 1
+         |          ) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     NOT (t1.tn = t2.tn AND t1.rowkey = t2.rowkey)
+         |AND     case_equ(t1.litigant,t2.litigant,t1.case_no,t2.case_no,t1.court_name,t2.court_name ,t1.tn,t2.tn)
+         |UNION all
+         |SELECT  rowkey AS rowkey_1
+         |        ,null AS rowkey_2
+         |        ,case_no AS case_no_1
+         |        ,null AS case_no_2
+         |        ,null as litigant_1
+         |        ,null as litigant_2
+         |        ,null as court_name_1
+         |        ,null as court_name_2
+         |        ,tn AS tn_1
+         |        ,NULL  AS tn_2
+         |        ,3 AS connect_type
+         |FROM    $data_extraction_tab
+         |WHERE   ds = '$ds'
+         |AND case_no is null
+         |""".stripMargin)
+
+
+  }
+
+  private def case_equ(litigant_1: String, litigant_2: String, case_no_1: String, case_no_2: String, court_name_1: String, court_name_2: String, tn1: String, tn2: String): Boolean = {
+    try {
+      val current_case_party_list_org: Seq[String] = if (litigant_1 == null) Seq.empty else litigant_1.split("\001")
+      val connect_case_party_list_org: Seq[String] = if (litigant_2 == null) Seq.empty else litigant_2.split("\001")
+
+      val current_case_no = case_no_1
+      val connect_case_no = case_no_2
+      val current_court_name = court_name_1
+      val connect_court_name = court_name_2
+
+      case_connect_utils.isConnect(current_case_party_list_org, connect_case_party_list_org, current_case_no, connect_case_no, current_court_name, connect_court_name, tn1, tn2)
+    } catch {
+      case ex: Exception => {
+        logError(ex.getMessage)
+        println("error")
+        println(litigant_1)
+        println(litigant_2)
+        println(case_no_1)
+        println(case_no_2)
+        throw new RuntimeException(ex)
+      }
+        false
+    }
+  }
+
+}
+
+
+object JudicialCaseRelationRowkeyRelation_v2 {
+  def main(args: Array[String]): Unit = {
+
+    //    val Array(ds) = args
+    var ds = BaseUtil.getYesterday()
+
+    //ds = "20210606"
+    //    ds = "20210610"
+    //ds = "20210524"
+    println(
+      s"""
+         |ds: $ds
+         |""".stripMargin)
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val jcr = JudicialCaseRelationRowkeyRelation_v2(spark, project = "winhc_ng")
+    jcr.etl(ds)
+    //    jcr.etl_lawsuit("20210524", false)
+    //    jcr.relationByGroup(ds)
+    //        jcr.inc_func(ds)
+    spark.stop()
+  }
+}
+