Browse Source

fix: base_company_mapping表支持跳分区

许家凯 3 years ago
parent
commit
e7543e0b86

+ 9 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyBaseMapping.scala

@@ -96,7 +96,12 @@ case class CompanyBaseMapping(s: SparkSession,
   }
 
   def inc(ds: String): Unit = {
-    val other_cols = getColumns(target_tab).diff(Seq("ds","cid","cname","new_cid","update_time"))
+    val other_cols = getColumns(target_tab).diff(Seq("ds", "cid", "cname", "new_cid", "update_time"))
+    val last_ds = getLastPartitionsOrElse(target_tab, null)
+    if (last_ds == null) {
+      println(s"$target_tab last partition is null !!!")
+      sys.exit(-1)
+    }
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='$ds')
@@ -104,7 +109,7 @@ case class CompanyBaseMapping(s: SparkSession,
          |        ,t1.cname AS cname
          |        ,COALESCE(t2.new_cid,t1.new_cid) AS new_cid
          |        ,COALESCE(t2.update_time,t1.update_time) AS update_time
-         |        ,${other_cols.map(f=>s"t1.$f").mkString(",")}
+         |        ,${other_cols.map(f => s"t1.$f").mkString(",")}
          |FROM    (
          |            SELECT  ${getColumns(target_tab).diff(Seq("ds")).mkString(",")}
          |            FROM    (
@@ -113,7 +118,7 @@ case class CompanyBaseMapping(s: SparkSession,
          |                        FROM    (
          |                                    SELECT  cid,cname,new_cid,update_time,${other_cols.mkString(",")}
          |                                    FROM    $target_tab
-         |                                    WHERE   ds = '${BaseUtil.atDaysAfter(-1, ds)}'
+         |                                    WHERE   ds = '$last_ds'
          |                                    UNION ALL
          |                                    SELECT  cid
          |                                            ,name AS cname
@@ -163,7 +168,7 @@ object CompanyBaseMapping {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv("inc_company_mapping", config)
-    CompanyBaseMapping(s=spark,"winhc_eci_dev").calc
+    CompanyBaseMapping(s = spark, "winhc_eci_dev").calc
 
     spark.stop()
   }