|
@@ -0,0 +1,166 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.judicial
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, case_connect_utils}
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/8/28 16:52
|
|
|
+ * @Description:
|
|
|
+ */
|
|
|
+case class JudicialCaseRelation(s: SparkSession,
|
|
|
+ project: String //表所在工程名
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+ val table_id_map = Map("justicase" -> "case_id")
|
|
|
+
|
|
|
+
|
|
|
+ def getStrToMap(cols: Seq[String]): String = {
|
|
|
+ val set = cols.toSet
|
|
|
+ val str = set.map(e => {
|
|
|
+ s"concat_ws('\001','$e',cast($e as string))"
|
|
|
+ }).mkString(",")
|
|
|
+ s"str_to_map(concat_ws('\002',$str),'\002','\001')"
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def all(tableName: String): Unit = {
|
|
|
+ val table_id = table_id_map(tableName)
|
|
|
+ val ods_table_name = s"ods_$tableName"
|
|
|
+ val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
|
|
|
+ val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM winhc_eci_dev.$ods_table_name lateral view explode(split(connect_case_no,'\n')) t as single_connect_case_no
|
|
|
+ |WHERE ds = '$ods_last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView(s"all_case_tmp_$tableName")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case PARTITION(ds='$ods_last_ds',tn='$tableName')
|
|
|
+ |SELECT $table_id as id
|
|
|
+ | , 1 as main_case_no
|
|
|
+ | ,case_no
|
|
|
+ | ,${getStrToMap(other_cols)} as case_attribute
|
|
|
+ |FROM all_case_tmp_$tableName
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT $table_id as id
|
|
|
+ | , 0 as main_case_no
|
|
|
+ | ,single_connect_case_no as case_no
|
|
|
+ | ,${getStrToMap(other_cols)} as case_attribute
|
|
|
+ |FROM all_case_tmp_$tableName
|
|
|
+ |WHERE single_connect_case_no is not null
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def inc(tableName: String, ds: String): Unit = {
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ private def getVal(map: Map[String, String], key: String): String = {
|
|
|
+ map.getOrElse(key, "")
|
|
|
+ }
|
|
|
+
|
|
|
+ def sort(v1: String, v2: String): String = {
|
|
|
+ val seq = Seq(v1, v2)
|
|
|
+ seq.filter(_ != null).sorted.mkString("")
|
|
|
+ }
|
|
|
+
|
|
|
+ def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
|
|
|
+
|
|
|
+ try {
|
|
|
+ val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
|
|
|
+ val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
|
|
|
+
|
|
|
+ val current_case_no = getVal(m1, "case_no")
|
|
|
+ val connect_case_no = getVal(m2, "case_no")
|
|
|
+ val current_court_name = getVal(m1, "court_name")
|
|
|
+ val connect_court_name = getVal(m2, "court_name")
|
|
|
+
|
|
|
+ case_connect_utils.isConnect(current_case_party_list_org, connect_case_party_list_org, current_case_no, connect_case_no, current_court_name, connect_court_name)
|
|
|
+ } catch {
|
|
|
+ case ex: Exception => {
|
|
|
+ logError(ex.getMessage)
|
|
|
+ println("error")
|
|
|
+ println(m1)
|
|
|
+ println(m2)
|
|
|
+ }
|
|
|
+ false
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def relation(): Unit = {
|
|
|
+ spark.udf.register("case_equ", case_equ _)
|
|
|
+ spark.udf.register("str_sort", sort _)
|
|
|
+ val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
|
|
|
+ | SELECT id_1
|
|
|
+ | ,id_2
|
|
|
+ | ,case_no_1
|
|
|
+ | ,case_no_2
|
|
|
+ | ,tn_t1
|
|
|
+ | ,tn_t2
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT t1.id AS id_1
|
|
|
+ | ,t2.id AS id_2
|
|
|
+ | ,t1.case_no AS case_no_1
|
|
|
+ | ,t2.case_no AS case_no_2
|
|
|
+ | ,t1.tn AS tn_t1
|
|
|
+ | ,t2.tn AS tn_t2
|
|
|
+ | ,concat(concat(t1.id,t1.tn),concat(t2.id,t2.tn)) as xjk_sorted
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM winhc_eci_dev.dwd_judicial_case
|
|
|
+ | WHERE ds = '$dwd_last_ds'
|
|
|
+ | AND case_no IS NOT NULL
|
|
|
+ | AND case_no <> ''
|
|
|
+ | AND case_no RLIKE '\\d+'
|
|
|
+ | ) AS t1
|
|
|
+ | FULL JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM winhc_eci_dev.dwd_judicial_case
|
|
|
+ | WHERE ds = '$dwd_last_ds'
|
|
|
+ | AND case_no IS NOT NULL
|
|
|
+ | AND case_no <> ''
|
|
|
+ | AND case_no RLIKE '\\d+'
|
|
|
+ | ) AS t2
|
|
|
+ | ON t1.case_no = t2.case_no
|
|
|
+ | AND t1.id <> t2.id
|
|
|
+ | AND case_equ(t1.case_attribute , t2.case_attribute)
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ | WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+object JudicialCaseRelation {
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
|
|
|
+ // jcr.all("justicase")
|
|
|
+ jcr.relation()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|