Ver código fonte

Merge remote-tracking branch 'origin/master'

xufei 3 anos atrás
pai
commit
f625cd49c5

+ 16 - 3
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -117,8 +117,15 @@ case class CompanyIndexJob(s: SparkSession,
   }
 
   private def inc(): Unit = {
-    val target_ds = getLastPartitionsOrElse(target_tab, null)
+    var target_ds = getLastPartitionsOrElse(target_tab, null)
     val insert_ds = getLastPartitionsOrElse(s"winhc_ng.inc_${org_prefix}_company", null)
+    if (insert_ds.equals(target_ds)) {
+      target_ds = getSecondLastPartitionOrElse(target_tab, null)
+    }
+    if (target_ds == null) {
+      all()
+      return
+    }
 
     if (target_ds == null) {
       print("target tab is not exists !!!")
@@ -242,7 +249,7 @@ case class CompanyIndexJob(s: SparkSession,
          |        ,logo
          |        ,reg_number
          |        ,company_score_weight
-         |        ,deleted
+         |        ,COALESCE(deleted,'0') AS deleted
          |from    $all_tab
          |""".stripMargin)
 
@@ -272,6 +279,8 @@ case class CompanyIndexJob(s: SparkSession,
          |    ds='$target_ds'
          |""".stripMargin)
 
+    addEmptyPartitionOrSkip(target_tab, target_ds)
+    addEmptyPartitionOrSkip(target_tab_simp, target_ds)
   }
 
   private def tab_verify(out_f: Seq[String], tab: String, ignore_f: Seq[String] = Seq("ds")): Unit = {
@@ -418,12 +427,16 @@ case class CompanyIndexJob(s: SparkSession,
     val inc_org_tab = s"winhc_ng.inc_${org_prefix}_$tab"
     val cols = getColumns(org_tab).intersect(getColumns(inc_org_tab)).map(f => s"cast($f as string) as $f").mkString(",")
 
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
     sql(
       s"""
          |SELECT  *
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY ${partition_by.mkString(",")} ORDER BY $order_by DESC ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY ${partition_by.mkString(",")} ORDER BY $order_by$up DESC ) AS num
          |            FROM    (
          |                        SELECT  ${cols}
          |                        FROM    $org_tab

+ 29 - 6
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -57,14 +57,22 @@ case class general_handler(s: SparkSession,
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
     val up = inter_cols.contains("update_time") match {
-      case true => ",update_time"
+      case true => " DESC,update_time"
       case false => ""
     }
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$inc_ods_ds')
-         |SELECT  ${getColumns(ads_tab).diff(Seq("ds")).mkString(",")}
+         |SELECT  ${
+        getColumns(ads_tab).diff(Seq("ds"))
+          .map(f => {
+            if (f.equalsIgnoreCase("deleted"))
+              "COALESCE(deleted,0) as deleted"
+            else
+              f
+          }).mkString(",")
+      }
          |FROM    (
          |            SELECT  *
          |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
@@ -93,14 +101,19 @@ case class general_handler(s: SparkSession,
     val ads_ds = getLastPartitionsOrElse(ads_tab, null)
 
 
-    if (inc_ods_ds == null || inc_ads_ds == null) {
+    if (inc_ods_ds == null) {
       //没有inc_ods_tab数据,直接重跑全量ods数据
       all()
       return
     }
     target_ds = inc_ods_ds
 
-    org_ds = inc_ads_ds
+    if (inc_ads_ds == null) {
+      org_ds = ads_ds
+    } else {
+      org_ds = inc_ads_ds
+    }
+
 
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
@@ -114,7 +127,7 @@ case class general_handler(s: SparkSession,
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
     val up = inter_cols.contains("update_time") match {
-      case true => ",update_time"
+      case true => " DESC,update_time"
       case false => ""
     }
 
@@ -122,7 +135,15 @@ case class general_handler(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$target_ds')
-         |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
+         |SELECT  ${
+        getColumns(inc_ads_tab).diff(Seq("ds"))
+          .map(f => {
+            if (f.equalsIgnoreCase("deleted"))
+              "COALESCE(deleted,0) as deleted"
+            else
+              f
+          }).mkString(",")
+      }
          |FROM    (
          |            SELECT  *
          |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
@@ -135,6 +156,8 @@ case class general_handler(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
+
+    addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
   }
 
 

+ 37 - 8
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -32,13 +32,27 @@ case class inc_company_ng(s: SparkSession,
       return
     }
 
+    val cols = getColumns(inc_ads_tab).diff(Seq("ds"))
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$inc_ods_ds')
-         |SELECT  ${getColumns(ads_tab).diff(Seq("ds")).mkString(",")}
+         |SELECT  ${
+        getColumns(ads_tab).diff(Seq("ds"))
+          .map(f => {
+            if (f.equalsIgnoreCase("deleted"))
+              "COALESCE(deleted,0) as deleted"
+            else
+              f
+          })
+          .mkString(",")
+      }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    $ods_tab
@@ -63,15 +77,18 @@ case class inc_company_ng(s: SparkSession,
     val ads_ds = getLastPartitionsOrElse(ads_tab, null)
 
 
-    if (inc_ods_ds == null || inc_ads_ds == null) {
+    if (inc_ods_ds == null ) {
       //没有inc_ods_tab数据,直接重跑全量ods数据
       all()
       return
     }
     target_ds = inc_ods_ds
 
-
-    org_ds = inc_ads_ds
+    if (inc_ads_ds == null) {
+      org_ds = ads_ds
+    } else {
+      org_ds = inc_ads_ds
+    }
 
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
@@ -81,15 +98,26 @@ case class inc_company_ng(s: SparkSession,
         org_ds = inc_ads_sec_ds
       }
     }
-
+    val cols = getColumns(inc_ads_tab).diff(Seq("ds"))
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$target_ds')
-         |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
+         |SELECT  ${
+        cols.map(f => {
+          if (f.equalsIgnoreCase("deleted"))
+            "COALESCE(deleted,0) as deleted"
+          else
+            f
+        }).mkString(",")
+      }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    $inc_ods_tab
@@ -98,6 +126,7 @@ case class inc_company_ng(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
+    addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
   }
 
   def calc(): Unit = {

+ 9 - 2
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.udf
 
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
-import com.winhc.bigdata.spark.utils.BaseUtil
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
 import com.winhc.bigdata.spark.utils.BaseUtil._
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
@@ -16,7 +16,7 @@ import scala.annotation.meta.getter
  * @Date: 2020/7/10 13:49
  * @Description:
  */
-trait BaseFunc {
+trait BaseFunc extends LoggingUtils{
   @(transient@getter) protected val spark: SparkSession
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
@@ -27,6 +27,13 @@ trait BaseFunc {
      })
    }*/
 
+  def addEmptyPartitionOrSkip(tab:String,ds:String): Unit ={
+    sql(
+      s"""
+         |ALTER TABLE $tab ADD IF NOT EXISTS PARTITION(ds='$ds')
+         |""".stripMargin)
+  }
+
   def case_no_trim_udf(): Unit = {
     spark.udf.register("case_no_trim", case_no_trim _)
   }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -44,7 +44,7 @@ object DateUtils {
       p = "yyyy-MM-dd"
     }
     val fm = new SimpleDateFormat(p)
-    fm.parse(date).getTime + 28800000L + ""
+    fm.parse(date).getTime +  ""
   }
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
@@ -163,8 +163,8 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-    //    println(DateUtils.toMillisTimestamp(date = "2020-09-17 18:02:02"))
-    println(getNotNullStr(null, null))
+        println(DateUtils.toMillisTimestamp(date = "2001-06-05 00:00:00"))
+//    println(getNotNullStr(null, null))
     //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }