Просмотр исходного кода

fix: 司法案件上游fix bugs

许家凯 4 лет назад
Родитель
Сommit
0fbcc4cb22

+ 32 - 26
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -29,6 +29,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 
   import spark.implicits._
 
+  case_no_trim_udf()
   is_id_card_udf()
 
   def etl(ds: String): Unit = {
@@ -37,11 +38,12 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         etl_company_dishonest_info(ds)
         true
       })
-      , ("wenshu etl...", () => {
-        etl_wenshu(ds)
-        true
-      })
       ,
+//      ("wenshu etl...", () => {
+//        etl_wenshu(ds)
+//        true
+//      })
+//      ,
       ("company_zxr etl...", () => {
         etl_company_zxf(ds)
         true
@@ -68,7 +70,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       .withColumn("id", monotonically_increasing_id)
       .rdd.map(r =>
       (s"${r.getAs[String]("rowkey")}_${r.getAs[String]("tn")}", r)
-    ).groupByKey().flatMap(r => {
+    ).groupByKey(500).flatMap(r => {
       val li = r._2
       val id = li.last.getAs[Long]("id")
       li.map(r => {
@@ -91,7 +93,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       StructField("tn", StringType)
     ))
 
-    spark.createDataFrame(rdd, schema).createOrReplaceTempView("all_tmp_xjk")
+    spark.createDataFrame(rdd, schema).createTempView("all_tmp_xjk")
 
 
     sql(
@@ -116,7 +118,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | ,court as connect_court_name
          | ,gist_id as connect_case_no
          | ,null as yg_name
-         | ,null as bg_name
+         | ,cname as bg_name
          | ,ds
          |""".stripMargin
 
@@ -150,7 +152,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
            |        ) AS t2
            |WHERE   t2.num = 1
            |$verify
-           |""".stripMargin).createOrReplaceTempView("xjk_tmp")
+           |""".stripMargin).createTempView("xjk_tmp")
 
       var other_cols: Seq[String] = null
       var result_tab: String = null
@@ -164,7 +166,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
              |FROM    $res_tab
              |""".stripMargin)
           .cache()
-          .createOrReplaceTempView(result_tab)
+          .createTempView(result_tab)
       } else {
         result_tab = s"xjk_tmp_xjk_tmp"
         sql(
@@ -172,7 +174,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
              |SELECT  $view
              |FROM    xjk_tmp
              |""".stripMargin)
-          .createOrReplaceTempView(result_tab)
+          .createTempView(result_tab)
       }
       other_cols = getColumns(result_tab)
 
@@ -245,26 +247,28 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(tmp_tab)
+      .createTempView(tmp_tab)
     val other_cols = getColumns(tmp_tab)
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 1 as main_case_no
-         |        ,case_no
+         |        ,case_no_trim(case_no) as case_no
          |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
+         |WHERE   case_no_trim(case_no) is  not null
          |UNION ALL
-         |SELECT  0 as id
-         |        , 0 as main_case_no
-         |        ,connect_case_no as case_no
+         |SELECT  null as id
+         |        , 1 as main_case_no
+         |        ,case_no_trim(connect_case_no) as case_no
          |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |WHERE   connect_case_no is not null
+         |AND     case_no_trim(connect_case_no) is not null
          |""".stripMargin)
   }
 
@@ -277,7 +281,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | ,case_no
          | ,gist_unit as connect_court_name
          | ,gist_dd as connect_case_no
-         | ,name as yg_name
+         | ,null as yg_name
          | ,name as bg_name
          | ,ds
          |""".stripMargin
@@ -320,7 +324,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(tmp_tab)
+      .createTempView(tmp_tab)
 
 
     sql(
@@ -329,25 +333,27 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
          |""".stripMargin)
       //      .cache()
-      .createOrReplaceTempView(s"explode_$tmp_tab")
+      .createTempView(s"explode_$tmp_tab")
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 1 as main_case_no
-         |        ,case_no
+         |        ,case_no_trim(case_no) as case_no
          |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
+         |WHERE   case_no_trim(case_no) is not null
          |UNION ALL
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 0 as main_case_no
-         |        ,single_connect_case_no as case_no
+         |        ,case_no_trim(single_connect_case_no) as case_no
          |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
+         |AND     case_no_trim(single_connect_case_no) is not null
          |""".stripMargin)
   }
 
@@ -371,7 +377,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |""".stripMargin)
       .repartition(500)
       //      .cache()
-      .createOrReplaceTempView("dwd_judicial_case_tmp")
+      .createTempView("dwd_judicial_case_tmp")
 
     //需要区分group by ,只用一个
     sql(
@@ -408,7 +414,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         list
       })
       .toDF("id_1", "id_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
-      .createOrReplaceTempView("connect_tmp_1")
+      .createTempView("connect_tmp_1")
 
     sql(
       s"""
@@ -426,7 +432,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |AND     t1.id <> t2.id
          |AND     case_equ(t1.case_attribute , t2.case_attribute)
          |""".stripMargin)
-      .createOrReplaceTempView("connect_tmp_2")
+      .createTempView("connect_tmp_2")
 
     sql(
       s"""

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyCidAndNameUtils.scala

@@ -27,7 +27,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |""".stripMargin)
       .repartition(500)
       .cache()
-      .createOrReplaceTempView("all_company_tmp")
+      .createTempView("all_company_tmp")
   }
 
   def addNewNameByCid(org_table_name: String, cidField: String, addName: String): String = {
@@ -54,7 +54,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 
@@ -88,7 +88,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 
@@ -121,7 +121,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 }