Selaa lähdekoodia

feat: 司法案件关系提取

- LoggingUtils getPartition应对多分区情况
许家凯 4 vuotta sitten
vanhempi
commit
b729d92c48

+ 184 - 66
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.jobs.judicial
 
+import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, case_connect_utils}
@@ -8,6 +9,7 @@ import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * @Author: XuJiakai
@@ -19,22 +21,16 @@ case class JudicialCaseRelation(s: SparkSession,
                                ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   val table_id_map = Map("justicase" -> "case_id")
+  val pat = ".*\\d+.*".r
 
-
-  def getStrToMap(cols: Seq[String]): String = {
-    val set = cols.toSet
-    val str = set.map(e => {
-      s"concat_ws('\001','$e',cast($e as string))"
-    }).mkString(",")
-    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
-  }
-
+  import spark.implicits._
 
   def all(tableName: String): Unit = {
     val table_id = table_id_map(tableName)
     val ods_table_name = s"ods_$tableName"
     val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
-    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
+    //    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
+    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name")
 
     sql(
       s"""
@@ -64,21 +60,190 @@ case class JudicialCaseRelation(s: SparkSession,
 
 
   def inc(tableName: String, ds: String): Unit = {
+  }
 
 
-  }
+  def relationByGroup(): Unit = {
+    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+    spark.udf.register("case_equ", case_equ _)
+    spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
+    spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
+
+    sql(
+      s"""
+         | SELECT  *
+         | FROM    winhc_eci_dev.dwd_judicial_case
+         | WHERE   ds = '$dwd_last_ds'
+         | AND     case_no IS NOT NULL
+         | AND     case_no <> ''
+         | AND     match_case_no(case_no)
+         |""".stripMargin)
+      .cache()
+      .createOrReplaceTempView("dwd_judicial_case_tmp")
+
+    sql(
+      s"""
+         |SELECT  case_no,party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('_',id,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,main_case_no
+         |                    ,case_attribute
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW explode(split(concat_ws('\\n',case_attribute['yg_name'],case_attribute['bg_name']) ,'\\n')) t AS party
+         |        ) AS t1
+         |WHERE   length(t1.party) > 4
+         |GROUP BY case_no,party
+         |""".stripMargin).rdd
+      .flatMap(r => {
+        val case_no = r.getAs[String]("case_no")
+        val party = r.getAs[String]("party")
+        val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
+        val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
+        if (connect_case_id.length < 2) {
+          val e_1 = connect_case_id.head.split("_")
+          list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
+        }
+        for (i <- 0 to connect_case_id.length - 2) {
+          val e_1 = connect_case_id(i).split("_")
+          val e_2 = connect_case_id(i + 1).split("_")
+          list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
+        }
+        list
+      })
+      .toDF("id_1", "id_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
+      .createOrReplaceTempView("connect_tmp_1")
+
+    sql(
+      s"""
+         |SELECT  t1.id AS id_1
+         |        ,t2.id AS id_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 as connect_type
+         |        ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
+         |FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
+         |FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     t1.id <> t2.id
+         |AND     case_equ(t1.case_attribute , t2.case_attribute)
+         |""".stripMargin)
+      .createOrReplaceTempView("connect_tmp_2")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation1
+         |SELECT  id_1
+         |        ,id_2
+         |        ,case_no_1
+         |        ,case_no_2
+         |        ,tn_1
+         |        ,tn_2
+         |        ,connect_type
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY connect_type) AS num
+         |            FROM    (
+         |                        SELECT  id_1
+         |                                ,id_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,str_sort(concat_ws('',id_1,tn_1),concat_ws('',id_2,tn_2)) AS xjk_sorted
+         |                        FROM    connect_tmp_1
+         |                        UNION ALL
+         |                        SELECT  id_1
+         |                                ,id_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,xjk_sorted
+         |                        FROM    connect_tmp_2
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
 
-  private def getVal(map: Map[String, String], key: String): String = {
-    map.getOrElse(key, "")
   }
 
-  def sort(v1: String, v2: String): String = {
-    val seq = Seq(v1, v2)
-    seq.filter(_ != null).sorted.mkString("")
+  /* def relation(): Unit = {
+     spark.udf.register("case_equ", case_equ _)
+     spark.udf.register("str_sort", sort _)
+     spark.udf.register("match_case_no", match_case_no _)
+     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+     val ignoreCaseNo = JudicialCaseRelation.getTopCaseNo()
+     sql(
+       s"""
+          | SELECT  *
+          | FROM    winhc_eci_dev.dwd_judicial_case
+          | WHERE   ds = '$dwd_last_ds'
+          | AND     case_no IS NOT NULL
+          | AND     case_no <> ''
+          | AND     match_case_no(case_no)
+          | ${
+         ignoreCaseNo.isEmpty match {
+           case true => ""
+
+           case false => s"AND case_no not in (${ignoreCaseNo.map(ss => "\"" + ss + "\"").mkString(",")})"
+
+         }
+       }
+          |""".stripMargin)
+       .cache()
+       .createOrReplaceTempView("dwd_judicial_case_tmp")
+     sql(
+       s"""
+          |--- INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
+          | SELECT  id_1
+          |         ,id_2
+          |         ,case_no_1
+          |         ,case_no_2
+          |         ,tn_1
+          |         ,tn_2
+          |         ,connect_type
+          | FROM    (
+          |             SELECT  *
+          |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
+          |             FROM    (
+          |                        SELECT  t1.id AS id_1
+          |                                ,t2.id AS id_2
+          |                                ,t1.case_no AS case_no_1
+          |                                ,t2.case_no AS case_no_2
+          |                                ,t1.tn AS tn_1
+          |                                ,t2.tn AS tn_2
+          |                                ,1 as connect_type
+          |                                ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
+          |                        FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
+          |                        FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+          |                        ON      t1.case_no = t2.case_no
+          |                        AND     t1.id <> t2.id
+          |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
+          |                     ) AS t1
+          |         ) AS t2
+          | WHERE   t2.num = 1
+          |""".stripMargin)
+   }*/
+
+
+  def getStrToMap(cols: Seq[String]): String = {
+    val set = cols.toSet
+    val str = set.map(e => {
+      s"concat_ws('\001','$e',cast($e as string))"
+    }).mkString(",")
+    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
   }
 
-  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
 
+  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -100,54 +265,6 @@ case class JudicialCaseRelation(s: SparkSession,
     }
   }
 
-  def relation(): Unit = {
-    spark.udf.register("case_equ", case_equ _)
-    spark.udf.register("str_sort", sort _)
-    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
-         | SELECT  id_1
-         |         ,id_2
-         |         ,case_no_1
-         |         ,case_no_2
-         |         ,tn_t1
-         |         ,tn_t2
-         | FROM    (
-         |             SELECT  *
-         |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
-         |             FROM    (
-         |                        SELECT  t1.id AS id_1
-         |                                ,t2.id AS id_2
-         |                                ,t1.case_no AS case_no_1
-         |                                ,t2.case_no AS case_no_2
-         |                                ,t1.tn AS tn_t1
-         |                                ,t2.tn AS tn_t2
-         |                                ,concat(concat(t1.id,t1.tn),concat(t2.id,t2.tn)) as xjk_sorted
-         |                        FROM    (
-         |                                    SELECT  *
-         |                                    FROM    winhc_eci_dev.dwd_judicial_case
-         |                                    WHERE   ds = '$dwd_last_ds'
-         |                                    AND     case_no IS NOT NULL
-         |                                    AND     case_no <> ''
-         |                                    AND     case_no RLIKE '\\d+'
-         |                                ) AS t1
-         |                        FULL JOIN (
-         |                                      SELECT  *
-         |                                      FROM    winhc_eci_dev.dwd_judicial_case
-         |                                      WHERE   ds = '$dwd_last_ds'
-         |                                      AND     case_no IS NOT NULL
-         |                                      AND     case_no <> ''
-         |                                      AND     case_no RLIKE '\\d+'
-         |                                  ) AS t2
-         |                        ON      t1.case_no = t2.case_no
-         |                        AND     t1.id <> t2.id
-         |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
-         |                     ) AS t1
-         |         ) AS t2
-         | WHERE   t2.num = 1
-         |""".stripMargin)
-  }
 }
 
 object JudicialCaseRelation {
@@ -160,7 +277,8 @@ object JudicialCaseRelation {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
     //    jcr.all("justicase")
-    jcr.relation()
+    //    jcr.relation()
+    jcr.relationByGroup()
     spark.stop()
   }
 }

+ 7 - 1
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -118,7 +118,13 @@ trait LoggingUtils extends Logging {
 
   def getPartitions(t: String): Seq[String] = {
     val sql_s = s"show partitions " + t
-    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
+    sql(sql_s).collect.toList.map(r => r.getString(0)).map(r => {
+      if (r.contains("/")) {
+        r.split("/").find(s => s.contains("ds")).map(s => s.split("=")(1)).orNull
+      } else {
+        r.split("=")(1)
+      }
+    })
   }
 
   def getSecondLastPartitionOrElse(t: String, default: String): String = {

+ 9 - 20
src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

@@ -95,26 +95,15 @@ object case_connect_utils {
     0
   }
 
-  def sort(v1: String, v2: String): String = {
-    val seq = Seq(v1, v2)
-    seq.filter(_ != null).sorted.mkString("")
-  }
-
   def main(args: Array[String]): Unit = {
-    for(e<-Seq(("a","b"),("b","a"))){
-      println(sort(e._1,e._2))
-    }
-
-
-
-//    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
-//    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
-//
-//    val current_case_no = ""
-//    val connect_case_no = ""
-//    val current_court_name = ""
-//    val connect_court_name = ""
-//
-//    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
+    //    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
+    //    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
+    //
+    //    val current_case_no = ""
+    //    val connect_case_no = ""
+    //    val current_court_name = ""
+    //    val connect_court_name = ""
+    //
+    //    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
   }
 }