Explorar o código

fix: 过滤爬虫上传的脏数据

许家凯 %!s(int64=3) %!d(string=hai) anos
pai
achega
828a052131

+ 22 - 14
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -46,6 +46,25 @@ case class general_handler(s: SparkSession,
     case true => s"company_id"
     case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
   }
+  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
+
+
+  val clean_up =
+    s"""
+       |${
+      inter_cols.contains("company_id") match {
+        case true => "company_id <> '0' AND"
+        case false => ""
+      }
+    }
+       |trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
+       |""".stripMargin
+
+  val up = inter_cols.contains("update_time") match {
+    case true => " DESC,update_time"
+    case false => ""
+  }
+
 
   def all(): Unit = {
     val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
@@ -54,12 +73,6 @@ case class general_handler(s: SparkSession,
       println("ds is null !!!")
       return
     }
-    val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
-
-    val up = inter_cols.contains("update_time") match {
-      case true => " DESC,update_time"
-      case false => ""
-    }
 
     sql(
       s"""
@@ -81,11 +94,13 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $ods_tab
          |                        WHERE   ds > 0
+         |                        AND     $clean_up
          |                        UNION ALL
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > 0
+         |                        AND     $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
@@ -114,7 +129,6 @@ case class general_handler(s: SparkSession,
       org_ds = inc_ads_ds
     }
 
-
     if (org_ds.equals(target_ds)) {
       val inc_ads_sec_ds = getSecondLastPartitionOrElse(inc_ads_tab, null)
       if (inc_ads_sec_ds == null) {
@@ -124,13 +138,6 @@ case class general_handler(s: SparkSession,
       }
     }
 
-    val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
-
-    val up = inter_cols.contains("update_time") match {
-      case true => " DESC,update_time"
-      case false => ""
-    }
-
 
     sql(
       s"""
@@ -152,6 +159,7 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > $org_ds
+         |                        AND     $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/inc_company_ng.scala

@@ -122,6 +122,8 @@ case class inc_company_ng(s: SparkSession,
          |                        SELECT  *
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > $org_ds
+         |                        AND     name is not null
+         |                        AND     trim(name) <> ''
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1