Просмотр исходного кода

Merge remote-tracking branch 'origin/master'

许家凯 4 лет назад
Родитель
Сommit
dd475976c0

+ 16 - 11
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.implicits
 
 import com.winhc.bigdata.spark.config.HBaseConfig
+import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -20,18 +21,22 @@ object DataFrame2HBaseHelper {
       val jobConf = HBaseConfig.HBaseOutputJobConf(tableName)
 
       val stringDf = df.select(df.columns.map(column => col(column).cast("string")): _*)
-      stringDf.rdd.map(row => {
-        val id = row.getAs[String](rowkeyFieldName.toLowerCase())
-        val put = new Put(Bytes.toBytes(id))
-        for (f <- fields) {
-          val v = row.getAs[String](f.toLowerCase)
-          if (v != null) {
-            put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+      stringDf.rdd.mapPartitions(it => {
+        it.filter(row => {
+          StringUtils.isNotBlank(row.getAs[String](rowkeyFieldName.toLowerCase()))
+        }).map(row => {
+          val id = row.getAs[String](rowkeyFieldName.toLowerCase())
+          val put = new Put(Bytes.toBytes(id))
+          for (f <- fields) {
+            val v = row.getAs[String](f.toLowerCase)
+            if (v != null) {
+              put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
+            }
           }
-        }
-        (new ImmutableBytesWritable, put)
-      }).filter(_ != null)
-        .saveAsHadoopDataset(jobConf)
+          (new ImmutableBytesWritable, put)
+        }).filter(!_._2.isEmpty)
+      }).saveAsHadoopDataset(jobConf)
     }
   }
+
 }

+ 9 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -303,7 +303,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     spark.udf.register("name_aggs", new NameAggs(1000))
     spark.udf.register("case_reason", new CaseReasonAggs(1000))
     //预处理数据
-    val cols = Seq("flag", "date", "detail_id","name")
+    val cols = Seq("flag", "date", "detail_id", "name")
     val t1 = s"$project.inc_ads_company_court_announcement"
     val t2 = s"ads_judicial_case_relation_pre"
     var t2_ds = ds
@@ -487,14 +487,14 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
          |FROM    (
          |            SELECT  judicase_id
-         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r1
+         |                    ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r1
          |            FROM    $project.$t6
          |            WHERE   ds = '$t1_ds'
          |            GROUP BY judicase_id
          |        ) a
          |FULL JOIN (
          |              SELECT  judicase_id
-         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws(',',collect_set(case_no)),','), sort(concat_ws(',',collect_set(cids)),','))) r2
+         |                      ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r2
          |              FROM    $project.$t6
          |              WHERE   ds = '$second_ds'
          |              GROUP BY judicase_id
@@ -506,9 +506,9 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     //司法案件主表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r1
          |SELECT
-         |    x.judicase_id ,
+         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -561,10 +561,10 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     //明细表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r2
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
          |SELECT
-         |    id    ,
-         |    x.judicase_id ,
+         |    id,
+         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -579,7 +579,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    y.deleted
          |FROM
          |(
-         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
+         |SELECT  md5(concat_ws('',concat_ws('',judicase_id,${t1_ds.substring(2)}),CLEANUP(case_no))) id
          |        ,judicase_id
          |        ,max(first_title) title
          |        ,case_type(max(case_no)) as case_type

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -187,7 +187,7 @@ object BaseUtil {
 
   def MD5hash(content: String): String = {
     val md5 = MessageDigest.getInstance("MD5")
-    val encoded = md5.digest((content).getBytes)
+    val encoded = md5.digest(cleanup(content).getBytes)
     encoded.map("%02x".format(_)).mkString
   }
 

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/utils/IDCard_Completion_Utils.scala

@@ -25,6 +25,7 @@ case class IDCard_Completion_Utils(s: SparkSession,
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     prepareFunctions(spark)
+    is_id_card_udf()
 
     //参与补全的表
     var mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String)]()
@@ -62,7 +63,6 @@ case class IDCard_Completion_Utils(s: SparkSession,
       println("not all tables have the same partition of newest !!!")
       sys.exit(-1)
     }
-    is_id_card_udf()
     id_card_trim_udf()
     lastDsIncOds = minDs
     spark.sparkContext.setJobDescription(s"补全身份证号码:${mapTables.size}个表聚合($lastDsIncOds)")
@@ -70,11 +70,11 @@ case class IDCard_Completion_Utils(s: SparkSession,
       s"""
          |SELECT ${m._2._2} AS name, ${m._2._3} AS identity_num, ${m._2._4} AS company_name, ${m._2._5} AS case_no, ${m._2._6} AS court_name, ${m._2._7} AS source, ${m._2._7} AS flag
          |FROM $project.ods_${m._1}
-         |WHERE ds>'0' AND ${m._2._1} IS NULL
+         |WHERE ds>'0' AND is_id_card${m._2._3}//严格限制必须有符合要求的身份证号码
          |UNION ALL
          |SELECT ${m._2._2} AS name, ${m._2._3} AS identity_num, ${m._2._4} AS company_name, ${m._2._5} AS case_no, ${m._2._6} AS court_name, ${m._2._7} AS source, ${m._2._7} AS flag
          |FROM $project.inc_ods_${m._1}
-         |WHERE ds>'0' AND ${m._2._1} IS NULL
+         |WHERE ds>'0' AND is_id_card${m._2._3}//严格限制必须有符合要求的身份证号码
          |""".stripMargin
     }).toArray.mkString(" UNION ALL ")
     ).where("name IS NOT NULL AND case_no IS NOT NULL AND LENGTH(name)>0 AND LENGTH(case_no)>0")