瀏覽代碼

fix: 司法案件_文书 发布

许家凯 4 年之前
父節點
當前提交
3743dade36
共有 1 個文件被更改,包括 47 次插入82 次删除
  1. 47 82
      src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

+ 47 - 82
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -20,50 +20,75 @@ case class JudicialCaseRelation(s: SparkSession,
                                 project: String //表所在工程名
                                ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  val table_id_map = Map("justicase" -> "case_id")
-  val pat = ".*\\d+.*".r
+  private val table_id_map = Map("justicase" -> "case_id")
+  private val pat = ".*\\d+.*".r
 
   import spark.implicits._
 
-  def all(tableName: String): Unit = {
+  def etl(): Unit = {
+    val ds = "20200913"
+    etl_wenshu(ds)
+    relationByGroup()
+  }
+
+  private def etl_wenshu(ds: String): Unit = {
+    def tableName = "justicase"
+
     val table_id = table_id_map(tableName)
-    val ods_table_name = s"ods_$tableName"
-    val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
-    //    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
-    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name")
+    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name") ++ Seq(table_id,"ds","connect_case_no")
+
+    val ods_end_ds = getLastPartitionsOrElse(s"winhc_eci_dev.ods_$tableName", "0")
+    val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
 
     sql(
       s"""
          |SELECT  *
-         |FROM    winhc_eci_dev.$ods_table_name lateral view explode(split(connect_case_no,'\n')) t as single_connect_case_no
-         |WHERE   ds = '$ods_last_ds'
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  ${other_cols.mkString(",")}
+         |                        FROM    winhc_eci_dev.ods_$tableName
+         |                        WHERE   ds = '$ods_end_ds'
+         |                        UNION ALL
+         |                        SELECT  ${other_cols.mkString(",")}
+         |                        FROM    winhc_eci_dev.inc_ods_$tableName
+         |                        WHERE   ds > $ods_end_ds
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(s"all_case_tmp_$tableName")
+      .createOrReplaceTempView(tmp_tab)
+
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ods_last_ds',tn='$tableName')
+         |SELECT  *
+         |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
+         |""".stripMargin)
+      .cache()
+      .createOrReplaceTempView(s"explode_$tmp_tab")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
          |SELECT  $table_id as id
          |        , 1 as main_case_no
          |        ,case_no
          |        ,${getStrToMap(other_cols)} as case_attribute
-         |FROM    all_case_tmp_$tableName
+         |FROM    explode_$tmp_tab
          |UNION ALL
          |SELECT  $table_id as id
          |        , 0 as main_case_no
          |        ,single_connect_case_no as case_no
          |        ,${getStrToMap(other_cols)} as case_attribute
-         |FROM    all_case_tmp_$tableName
+         |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
          |""".stripMargin)
   }
 
 
-  def inc(tableName: String, ds: String): Unit = {
-  }
-
-
-  def relationByGroup(): Unit = {
+  private def relationByGroup(): Unit = {
     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
     spark.udf.register("case_equ", case_equ _)
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
@@ -174,66 +199,7 @@ case class JudicialCaseRelation(s: SparkSession,
 
   }
 
-  /* def relation(): Unit = {
-     spark.udf.register("case_equ", case_equ _)
-     spark.udf.register("str_sort", sort _)
-     spark.udf.register("match_case_no", match_case_no _)
-     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
-     val ignoreCaseNo = JudicialCaseRelation.getTopCaseNo()
-     sql(
-       s"""
-          | SELECT  *
-          | FROM    winhc_eci_dev.dwd_judicial_case
-          | WHERE   ds = '$dwd_last_ds'
-          | AND     case_no IS NOT NULL
-          | AND     case_no <> ''
-          | AND     match_case_no(case_no)
-          | ${
-         ignoreCaseNo.isEmpty match {
-           case true => ""
-
-           case false => s"AND case_no not in (${ignoreCaseNo.map(ss => "\"" + ss + "\"").mkString(",")})"
-
-         }
-       }
-          |""".stripMargin)
-       .cache()
-       .createOrReplaceTempView("dwd_judicial_case_tmp")
-     sql(
-       s"""
-          |--- INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
-          | SELECT  id_1
-          |         ,id_2
-          |         ,case_no_1
-          |         ,case_no_2
-          |         ,tn_1
-          |         ,tn_2
-          |         ,connect_type
-          | FROM    (
-          |             SELECT  *
-          |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
-          |             FROM    (
-          |                        SELECT  t1.id AS id_1
-          |                                ,t2.id AS id_2
-          |                                ,t1.case_no AS case_no_1
-          |                                ,t2.case_no AS case_no_2
-          |                                ,t1.tn AS tn_1
-          |                                ,t2.tn AS tn_2
-          |                                ,1 as connect_type
-          |                                ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
-          |                        FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
-          |                        FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
-          |                        ON      t1.case_no = t2.case_no
-          |                        AND     t1.id <> t2.id
-          |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
-          |                     ) AS t1
-          |         ) AS t2
-          | WHERE   t2.num = 1
-          |""".stripMargin)
-   }*/
-
-
-  def getStrToMap(cols: Seq[String]): String = {
+  private def getStrToMap(cols: Seq[String]): String = {
     val set = cols.toSet
     val str = set.map(e => {
       s"concat_ws('\001','$e',cast($e as string))"
@@ -243,7 +209,7 @@ case class JudicialCaseRelation(s: SparkSession,
 
   private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
 
-  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -276,9 +242,8 @@ object JudicialCaseRelation {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
-    //    jcr.all("justicase")
-    //    jcr.relation()
-    jcr.relationByGroup()
+    jcr.etl()
+//    jcr.relationByGroup()
     spark.stop()
   }
 }