فهرست منبع

fix: 司法案件上游数据添加唯一id

许家凯 4 سال پیش
والد
کامیت
851ffab236
1فایلهای تغییر یافته به همراه59 افزوده شده و 16 حذف شده
  1. 59 16
      src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

+ 59 - 16
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -5,7 +5,9 @@ import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils, case_connect_utils}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -51,19 +53,58 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
   }
 
   private def etc_dwd_judicial_case(ds: String): Unit = {
-    sql(
+
+    val rdd = sql(
       s"""
-         |INSERT OVERWRITE TABLE winhc_eci_dev.dwd_judicial_case PARTITION(ds,tn)
-         |SELECT  DENSE_RANK() OVER(PARTITION BY md5(concat_ws('',id,tn)) ORDER BY id) AS id
-         |        ,main_case_no
+         |SELECT  main_case_no
          |        ,case_no
-         |        ,id AS rowkey
+         |        ,rowkey
          |        ,case_attribute
          |        ,ds
          |        ,tn
          |FROM    winhc_eci_dev.dwd_judicial_case
          |WHERE   ds = '$ds'
          |""".stripMargin)
+      .withColumn("id", monotonically_increasing_id)
+      .rdd.map(r =>
+      (s"${r.getAs[String]("rowkey")}_${r.getAs[String]("tn")}", r)
+    ).groupByKey().flatMap(r => {
+      val li = r._2
+      val id = li.last.getAs[Long]("id")
+      li.map(r => {
+        val main_case_no = r.getAs[Long]("main_case_no")
+        val case_no = r.getAs[String]("case_no")
+        val rowkey = r.getAs[String]("rowkey")
+        val case_attribute = r.getAs[Map[String, String]]("case_attribute")
+        val ds = r.getAs[String]("ds")
+        val tn = r.getAs[String]("tn")
+        Row(id, main_case_no, case_no, rowkey, case_attribute, ds, tn)
+      })
+    })
+    val schema = StructType(Array(
+      StructField("id", LongType),
+      StructField("main_case_no", LongType),
+      StructField("case_no", StringType),
+      StructField("rowkey", StringType),
+      StructField("case_attribute", MapType(StringType, StringType)),
+      StructField("ds", StringType),
+      StructField("tn", StringType)
+    ))
+
+    spark.createDataFrame(rdd, schema).createOrReplaceTempView("all_tmp_xjk")
+
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE winhc_eci_dev.dwd_judicial_case_2 PARTITION(ds='$ds')
+         |select id
+         |       ,main_case_no
+         |       ,case_no
+         |       ,rowkey
+         |       ,case_attribute
+         |       ,tn
+         |from   all_tmp_xjk
+         |""".stripMargin)
   }
 
   private def etl_company_zxf(ds: String): Unit = {
@@ -210,17 +251,17 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
-         |SELECT  id
+         |SELECT  0 as id
          |        , 1 as main_case_no
          |        ,case_no
-         |        ,0 as rowkey
+         |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |UNION ALL
-         |SELECT  id
+         |SELECT  0 as id
          |        , 0 as main_case_no
          |        ,connect_case_no as case_no
-         |        ,0 as rowkey
+         |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |WHERE   connect_case_no is not null
@@ -293,17 +334,17 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
-         |SELECT  $table_id as id
+         |SELECT  0 as id
          |        , 1 as main_case_no
          |        ,case_no
-         |        ,0 as rowkey
+         |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |UNION ALL
-         |SELECT  $table_id as id
+         |SELECT  0 as id
          |        , 0 as main_case_no
          |        ,single_connect_case_no as case_no
-         |        ,0 as rowkey
+         |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
@@ -312,7 +353,9 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 
 
   private def relationByGroup(): Unit = {
-    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+    val org_tab = s"$project.dwd_judicial_case_2"
+
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
     spark.udf.register("case_equ", case_equ _)
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
     spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
@@ -320,7 +363,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          | SELECT  *
-         | FROM    winhc_eci_dev.dwd_judicial_case
+         | FROM    $org_tab
          | WHERE   ds = '$dwd_last_ds'
          | AND     case_no IS NOT NULL
          | AND     case_no <> ''