Ver código fonte

Merge remote-tracking branch 'origin/master'

xufei 3 anos atrás
pai
commit
02eb72d7e3

+ 20 - 14
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -46,6 +46,23 @@ case class general_handler(s: SparkSession,
     case true => s"company_id"
     case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
   }
+  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
+
+
+  val clean_up =
+    s"""
+       |company_id <> '0'
+       |AND company_id is not null
+       |AND trim(company_id) <> ''
+       |AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}
+       |AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
+       |""".stripMargin
+
+  val up = inter_cols.contains("update_time") match {
+    case true => " DESC,update_time"
+    case false => ""
+  }
+
 
   def all(): Unit = {
     val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
@@ -54,12 +71,6 @@ case class general_handler(s: SparkSession,
       println("ds is null !!!")
       return
     }
-    val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
-
-    val up = inter_cols.contains("update_time") match {
-      case true => " DESC,update_time"
-      case false => ""
-    }
 
     sql(
       s"""
@@ -81,11 +92,13 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $ods_tab
          |                        WHERE   ds > 0
+         |                        AND     $clean_up
          |                        UNION ALL
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > 0
+         |                        AND     $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
@@ -114,7 +127,6 @@ case class general_handler(s: SparkSession,
       org_ds = inc_ads_ds
     }
 
-
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
       if (inc_ads_sec_ds == null) {
@@ -124,13 +136,6 @@ case class general_handler(s: SparkSession,
       }
     }
 
-    val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
-
-    val up = inter_cols.contains("update_time") match {
-      case true => " DESC,update_time"
-      case false => ""
-    }
-
 
     sql(
       s"""
@@ -152,6 +157,7 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > $org_ds
+         |                        AND     $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -57,10 +57,18 @@ case class inc_company_ng(s: SparkSession,
          |                        SELECT  *
          |                        FROM    $ods_tab
          |                        WHERE   ds > 0
+         |                        AND     company_id is not null
+         |                        AND     trim(company_id) <> ''
+         |                        AND     name is not null
+         |                        AND     trim(name) <> ''
          |                        UNION ALL
          |                        SELECT  *
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > 0
+         |                        AND     company_id is not null
+         |                        AND     trim(company_id) <> ''
+         |                        AND     name is not null
+         |                        AND     trim(name) <> ''
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
@@ -122,6 +130,10 @@ case class inc_company_ng(s: SparkSession,
          |                        SELECT  *
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > $org_ds
+         |                        AND     company_id is not null
+         |                        AND     trim(company_id) <> ''
+         |                        AND     name is not null
+         |                        AND     trim(name) <> ''
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1