ソースを参照

图前置还是交由上游处理

晏永年 4 年 前
コミット
3c7c26cca2

+ 18 - 50
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import com.winhc.bigdata.spark.utils.BaseUtil.{MD5hash, isWindows}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.graphx._
 import org.apache.spark.internal.Logging
@@ -12,7 +12,6 @@ import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
-import org.apache.spark.sql.functions.monotonically_increasing_id
 
 case class GraphX4Judicase(s: SparkSession,
                            project: String, //表所在工程名
@@ -24,50 +23,18 @@ case class GraphX4Judicase(s: SparkSession,
   justicase_ops()
 
   def calc(): Unit = {
-    //1.将id换成图所需的Long类型
-    val twoPart = BaseUtil.getLastPartion("winhc_eci_dev.dwd_judicial_case", spark)
-    sql(
-      s"""
-         |SELECT  id, case_no, tn
-         |FROM(
-         |  SELECT id, case_no, tn, ROW_NUMBER() OVER(PARTITION BY id, main_case_no, tn ORDER BY id) AS num
-         |  FROM    $project.dwd_judicial_case
-         |  WHERE   ${twoPart.map(m=>{s"ds = '${m._2}' AND tn = '${m._1}'"}).mkString("(", ") OR (", ")")}
-         |)
-         |WHERE num=1
-         |""".stripMargin)
-      .withColumn("id_Long",monotonically_increasing_id)
-      .createOrReplaceTempView("tmp_graphx_dwd_judicial_case")
-    //2.跑图计算
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph_pre
-         |SELECT  id_1, id_2, id_1_long, C.id_Long AS id_2_long, case_no_1, case_no_2, tn_1, tn_2
-         |FROM(
-         |  SELECT   id_1, id_2, B.id_Long AS id_1_long, case_no_1, case_no_2, tn_1, tn_2, connect_type
-         |          ,ROW_NUMBER() OVER(PARTITION BY id_1, id_2 ORDER BY id_1) AS num
-         |  FROM(
-         |      SELECT *
-         |      FROM $project.$tableName
-         |      WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
-         |   ) A
-         |  LEFT JOIN tmp_graphx_dwd_judicial_case B
-         |  ON A.id_1=B.id
-         |) AB
-         |LEFT JOIN tmp_graphx_dwd_judicial_case C
-         |ON AB.id_2=C.id
-         |WHERE num=1 AND id_1 IS NOT NULL AND C.id_Long IS NOT NULL
-         |""".stripMargin)
+    val srcAllCols = getColumns(s"$project.$tableName").filter(_ != "ds")
     val dfRelations = sql(
       s"""
-         |SELECT  id_1_long, id_2_long
-         |FROM    $project.ads_judicial_case_relation_graph_pre
+         |SELECT  *
+         |FROM    $project.$tableName
+         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
          |""".stripMargin)
-    val edgeRDD: RDD[Edge[Long]] = dfRelations /*.select(dfRelations.columns.map(column => col(column).cast("string")): _*) */.rdd.map(r => {
-      val case_no_from = r.getAs[Long]("id_2_long")
-      val case_no_to = r.getAs[Long]("id_1_long")
-      val from = case_no_from//.toLong
-      val to = case_no_to//.toLong
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val from = case_no_from.toLong
+      val to = case_no_to.toLong
       Edge(from, to)
     })
     // 根据边构造图
@@ -79,15 +46,15 @@ case class GraphX4Judicase(s: SparkSession,
       .map(r => (r._1, Set(r._2)))
       .reduceByKey(_ ++ _)
       .flatMap(r => {
-        val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
+        val judicase_id = MD5hash(r._2.toSeq.sorted.mkString(","))
         var mp: Map[Long, Map[String, String]] = Map()
         r._2.map(r => {
-          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
+          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id))
         })
         mp
       })
       .map(r => {
-      Row(r._1.toString, r._2("judicase_id"), "0")
+      Row(r._1.toString, r._2("judicase_id"), "1")
     })
     val schemaJust = StructType(Array(
       StructField("id", StringType),
@@ -96,7 +63,8 @@ case class GraphX4Judicase(s: SparkSession,
     ))
     //仅包含这3个字段的表在后面融入全量时再实例其他属性
     val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
-    //3.将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    cleanup()
     sql(
 /*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
          |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
@@ -114,10 +82,10 @@ case class GraphX4Judicase(s: SparkSession,
          |ON A.case_id=B.id
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
       s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph
-         |SELECT B.id AS id, judicase_id, case_no, B.tn AS flag
+         |SELECT B.id AS id, rowkey, IF(A.judicase_id IS NOT NULL, A.judicase_id, MD5(cleanup(CONCAT_WS('',B.id,B.tn)))) AS judicase_id, B.case_no, B.tn AS flag
          |FROM tmp_edgelets_$tableName A
-         |LEFT JOIN tmp_graphx_dwd_judicial_case B
-         |ON A.id=B.id_Long
+         |RIGHT JOIN $project.dwd_judicial_case_2 B
+         |ON A.id=B.id
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
   }
 }

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
+import java.security.MessageDigest
 import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
@@ -183,6 +184,11 @@ object BaseUtil {
     }
     return hash
   }
+  def MD5hash(content: String): String = {
+    val md5 = MessageDigest.getInstance("MD5")
+    val encoded = md5.digest((content).getBytes)
+    encoded.map("%02x".format(_)).mkString
+  }
 
   def nameJudge(name: String, yg_name: String, bg_name: String): String = {
     if (StringUtils.isNotBlank(name)) {