Просмотр исходного кода

fix: 司法协助改变去重字段

许家凯 4 лет назад
Родитель
Сommit
82805f3a25

+ 10 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/company_judicial_assistance.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.PhoenixConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyCidAndNameUtils, CompanySummaryPro, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col
@@ -206,8 +206,10 @@ case class company_judicial_assistance(s: SparkSession,
   def calc(): Unit = {
     val ods_tab = s"$project.ods_company_judicial_assistance"
     val inc_ods_tab = s"$project.inc_ods_company_judicial_assistance"
+
     val inc_ads_tab = s"$project.inc_ads_company_judicial_assistance"
     val ads_tab = s"$project.ads_company_judicial_assistance"
+
     val ads_list_tab = s"$project.ads_company_judicial_assistance_list"
     val inc_ads_list_tab = s"$project.inc_ads_company_judicial_assistance_list"
 
@@ -225,6 +227,7 @@ case class company_judicial_assistance(s: SparkSession,
     val inc_ads_list_last_ds = getLastPartitionsOrElse(inc_ads_list_tab, "0")
 
     val list_tab_row_num = "cleanup(concat_ws('',rowkey,cid,flag,execute_notice_num))"
+    val main_tab_row_num = "cleanup(CONCAT_ws('',cid,executed_person,execute_notice_num,executive_court,type_state))"
 
 
     def all(): Unit = {
@@ -249,9 +252,9 @@ case class company_judicial_assistance(s: SparkSession,
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$ods_last_ds')
            |SELECT  ${ads_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
-           |            SELECT  MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
+           |            SELECT  MD5($main_tab_row_num) AS rowkey
            |                    ,*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $main_tab_row_num ORDER BY ds DESC ) AS num
            |            FROM    $new_tab
            |        ) AS t1
            |WHERE   t1.num = 1
@@ -352,9 +355,9 @@ case class company_judicial_assistance(s: SparkSession,
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_last_ds')
            |SELECT  ${ads_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
-           |            SELECT  MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
+           |            SELECT  MD5($main_tab_row_num) AS rowkey
            |                    ,*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $main_tab_row_num ORDER BY ds DESC ) AS num
            |            FROM    $new_tab
            |        ) AS t1
            |WHERE   t1.num = 1
@@ -504,12 +507,8 @@ case class company_judicial_assistance(s: SparkSession,
         .jdbc(PhoenixConfig.getPhoenixJDBCUrl, "COMPANY_JUDICIAL_ASSISTANCE_LIST", PhoenixConfig.getPhoenixProperties)
       //        .save2PhoenixByJDBC("COMPANY_JUDICIAL_ASSISTANCE_LIST")
 
-      CompanySummaryPro(s = spark
-        , project = "winhc_eci_dev"
-        , tableName = "company_judicial_assistance_list"
-        , cidField = "split(rowkey,'_')[0]"
-        , where = "deleted = 0"
-      ).calc()
+      CompanySummaryPro.run(spark, "company_judicial_assistance_list")
+
     }