Kaynağa Gözat

过滤脏数据

xufei 4 yıl önce
ebeveyn
işleme
6d1ffab302

+ 16 - 11
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.implicits
 
 import com.winhc.bigdata.spark.config.HBaseConfig
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -20,18 +21,22 @@ object DataFrame2HBaseHelper {
       val jobConf = HBaseConfig.HBaseOutputJobConf(tableName)
 
       val stringDf = df.select(df.columns.map(column => col(column).cast("string")): _*)
-      stringDf.rdd.map(row => {
-        val id = row.getAs[String](rowkeyFieldName.toLowerCase())
-        val put = new Put(Bytes.toBytes(id))
-        for (f <- fields) {
-          val v = row.getAs[String](f.toLowerCase)
-          if (v != null) {
-            put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+      stringDf.rdd.mapPartitions(it => {
+        it.filter(row => {
+          StringUtils.isNotBlank(row.getAs[String](rowkeyFieldName.toLowerCase()))
+        }).map(row => {
+          val id = row.getAs[String](rowkeyFieldName.toLowerCase())
+          val put = new Put(Bytes.toBytes(id))
+          for (f <- fields) {
+            val v = row.getAs[String](f.toLowerCase)
+            if (v != null) {
+              put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+            }
           }
-        }
-        (new ImmutableBytesWritable, put)
-      }).filter(_ != null)
-        .saveAsHadoopDataset(jobConf)
+          (new ImmutableBytesWritable, put)
+        }).filter(!_._2.isEmpty)
+      }).saveAsHadoopDataset(jobConf)
     }
   }
+
 }