Przeglądaj źródła

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 lat temu
rodzic
commit
02bc7328ea

+ 16 - 11
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.implicits
 
 import com.winhc.bigdata.spark.config.HBaseConfig
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -20,18 +21,22 @@ object DataFrame2HBaseHelper {
       val jobConf = HBaseConfig.HBaseOutputJobConf(tableName)
 
       val stringDf = df.select(df.columns.map(column => col(column).cast("string")): _*)
-      stringDf.rdd.map(row => {
-        val id = row.getAs[String](rowkeyFieldName.toLowerCase())
-        val put = new Put(Bytes.toBytes(id))
-        for (f <- fields) {
-          val v = row.getAs[String](f.toLowerCase)
-          if (v != null) {
-            put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+      stringDf.rdd.mapPartitions(it => {
+        it.filter(row => {
+          StringUtils.isNotBlank(row.getAs[String](rowkeyFieldName.toLowerCase()))
+        }).map(row => {
+          val id = row.getAs[String](rowkeyFieldName.toLowerCase())
+          val put = new Put(Bytes.toBytes(id))
+          for (f <- fields) {
+            val v = row.getAs[String](f.toLowerCase)
+            if (v != null) {
+              put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+            }
           }
-        }
-        (new ImmutableBytesWritable, put)
-      }).filter(_ != null)
-        .saveAsHadoopDataset(jobConf)
+          (new ImmutableBytesWritable, put)
+        }).filter(!_._2.isEmpty)
+      }).saveAsHadoopDataset(jobConf)
     }
   }
+
 }

+ 7 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -303,7 +303,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     spark.udf.register("name_aggs", new NameAggs(1000))
     spark.udf.register("case_reason", new CaseReasonAggs(1000))
     //预处理数据
-    val cols = Seq("flag", "date", "detail_id","name")
+    val cols = Seq("flag", "date", "detail_id", "name")
     val t1 = s"$project.inc_ads_company_court_announcement"
     val t2 = s"ads_judicial_case_relation_pre"
     var t2_ds = ds
@@ -487,14 +487,14 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
          |FROM    (
          |            SELECT  judicase_id
-         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r1
+         |                    ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r1
          |            FROM    $project.$t6
          |            WHERE   ds = '$t1_ds'
          |            GROUP BY judicase_id
          |        ) a
          |FULL JOIN (
          |              SELECT  judicase_id
-         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r2
+         |                      ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r2
          |              FROM    $project.$t6
          |              WHERE   ds = '$second_ds'
          |              GROUP BY judicase_id
@@ -508,7 +508,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r1
          |SELECT
-         |    x.judicase_id ,
+         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -563,8 +563,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
          |SELECT
-         |    id    ,
-         |    x.judicase_id ,
+         |    id,
+         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -579,7 +579,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    y.deleted
          |FROM
          |(
-         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
+         |SELECT  md5(concat_ws('',concat_ws('',judicase_id,${t1_ds.substring(2)}),CLEANUP(case_no))) id
          |        ,judicase_id
          |        ,max(first_title) title
          |        ,case_type(max(case_no)) as case_type