Explorar el Código

fix: 司法案件上游数据添加唯一id

许家凯 hace 4 años
padre
commit
65e79deda3

+ 24 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -45,9 +45,27 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         true
       })
     ))
+
+    etc_dwd_judicial_case(ds)
     relationByGroup()
   }
 
+  private def etc_dwd_judicial_case(ds: String): Unit = {
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE winhc_eci_dev.dwd_judicial_case PARTITION(ds,tn)
+         |SELECT  DENSE_RANK() OVER(PARTITION BY id,tn ORDER BY id) AS id
+         |        ,main_case_no
+         |        ,case_no
+         |        ,id AS rowkey
+         |        ,case_attribute
+         |        ,ds
+         |        ,tn
+         |FROM    winhc_eci_dev.dwd_judicial_case
+         |WHERE   ds = '$ds'
+         |""".stripMargin)
+  }
+
   private def etl_company_zxf(ds: String): Unit = {
     val view =
       s"""
@@ -195,12 +213,14 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |SELECT  id
          |        , 1 as main_case_no
          |        ,case_no
+         |        ,0 as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |UNION ALL
          |SELECT  id
          |        , 0 as main_case_no
          |        ,connect_case_no as case_no
+         |        ,0 as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |WHERE   connect_case_no is not null
@@ -267,7 +287,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |SELECT  *
          |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
          |""".stripMargin)
-//      .cache()
+      //      .cache()
       .createOrReplaceTempView(s"explode_$tmp_tab")
 
     sql(
@@ -276,12 +296,14 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |SELECT  $table_id as id
          |        , 1 as main_case_no
          |        ,case_no
+         |        ,0 as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |UNION ALL
          |SELECT  $table_id as id
          |        , 0 as main_case_no
          |        ,single_connect_case_no as case_no
+         |        ,0 as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
@@ -305,7 +327,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | AND     match_case_no(case_no)
          |""".stripMargin)
       .repartition(500)
-//      .cache()
+      //      .cache()
       .createOrReplaceTempView("dwd_judicial_case_tmp")
 
     //需要区分group by ,只用一个