Quellcode durchsuchen

fix: ds和update_time同时排序

许家凯 vor 3 Jahren
Ursprung
Commit
d0a108ccdd

+ 13 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -117,8 +117,15 @@ case class CompanyIndexJob(s: SparkSession,
   }
 
   private def inc(): Unit = {
-    val target_ds = getLastPartitionsOrElse(target_tab, null)
+    var target_ds = getLastPartitionsOrElse(target_tab, null)
     val insert_ds = getLastPartitionsOrElse(s"winhc_ng.inc_${org_prefix}_company", null)
+    if (insert_ds.equals(target_ds)) {
+      target_ds = getSecondLastPartitionOrElse(target_tab, null)
+    }
+    if (target_ds == null) {
+      all()
+      return
+    }
 
     if (target_ds == null) {
       print("target tab is not exists !!!")
@@ -418,12 +425,16 @@ case class CompanyIndexJob(s: SparkSession,
     val inc_org_tab = s"winhc_ng.inc_${org_prefix}_$tab"
     val cols = getColumns(org_tab).intersect(getColumns(inc_org_tab)).map(f => s"cast($f as string) as $f").mkString(",")
 
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
     sql(
       s"""
          |SELECT  *
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY ${partition_by.mkString(",")} ORDER BY $order_by DESC ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY ${partition_by.mkString(",")} ORDER BY $order_by$up DESC ) AS num
          |            FROM    (
          |                        SELECT  ${cols}
          |                        FROM    $org_tab

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -57,7 +57,7 @@ case class general_handler(s: SparkSession,
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
     val up = inter_cols.contains("update_time") match {
-      case true => ",update_time"
+      case true => " DESC,update_time"
       case false => ""
     }
 
@@ -114,7 +114,7 @@ case class general_handler(s: SparkSession,
     val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
     val up = inter_cols.contains("update_time") match {
-      case true => ",update_time"
+      case true => " DESC,update_time"
       case false => ""
     }
 

+ 13 - 4
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -32,13 +32,18 @@ case class inc_company_ng(s: SparkSession,
       return
     }
 
+    val cols = getColumns(inc_ads_tab).diff(Seq("ds"))
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$inc_ods_ds')
          |SELECT  ${getColumns(ads_tab).diff(Seq("ds")).mkString(",")}
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    $ods_tab
@@ -81,15 +86,19 @@ case class inc_company_ng(s: SparkSession,
         org_ds = inc_ads_sec_ds
       }
     }
-
+    val cols = getColumns(inc_ads_tab).diff(Seq("ds"))
+    val up = cols.contains("update_time") match {
+      case true => " DESC,update_time"
+      case false => ""
+    }
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$target_ds')
-         |SELECT  ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
+         |SELECT  ${cols.mkString(",")}
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds${up} DESC) AS num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    $inc_ods_tab