Browse Source

fix: 优化分区划分上的问题

许家凯 3 years ago
parent
commit
6a819e55f0

+ 7 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -101,14 +101,19 @@ case class general_handler(s: SparkSession,
     val ads_ds = getLastPartitionsOrElse(ads_tab, null)
 
 
-    if (inc_ods_ds == null || inc_ads_ds == null) {
+    if (inc_ods_ds == null) {
       //没有inc_ods_tab数据,直接重跑全量ods数据
       all()
       return
     }
     target_ds = inc_ods_ds
 
-    org_ds = inc_ads_ds
+    if (inc_ads_ds == null) {
+      org_ds = ads_ds
+    } else {
+      org_ds = inc_ads_ds
+    }
+
 
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)

+ 6 - 3
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -77,15 +77,18 @@ case class inc_company_ng(s: SparkSession,
     val ads_ds = getLastPartitionsOrElse(ads_tab, null)
 
 
-    if (inc_ods_ds == null || inc_ads_ds == null) {
+    if (inc_ods_ds == null ) {
       //没有inc_ods_tab数据,直接重跑全量ods数据
       all()
       return
     }
     target_ds = inc_ods_ds
 
-
-    org_ds = inc_ads_ds
+    if (inc_ads_ds == null) {
+      org_ds = ads_ds
+    } else {
+      org_ds = inc_ads_ds
+    }
 
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.udf
 
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
-import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
 import com.winhc.bigdata.spark.utils.BaseUtil._
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
@@ -16,7 +16,7 @@ import scala.annotation.meta.getter
  * @Date: 2020/7/10 13:49
  * @Description:
  */
-trait BaseFunc {
+trait BaseFunc extends LoggingUtils{
   @(transient@getter) protected val spark: SparkSession
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
@@ -28,7 +28,7 @@ trait BaseFunc {
    }*/
 
   def addEmptyPartitionOrSkip(tab:String,ds:String): Unit ={
-    spark.sql(
+    sql(
       s"""
          |ALTER TABLE $tab ADD IF NOT EXISTS PARTITION(ds='$ds')
          |""".stripMargin)