Преглед изворни кода

Merge remote-tracking branch 'origin/master'

xufei пре 4 година
родитељ
комит
cbfb136a4e

+ 18 - 50
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import com.winhc.bigdata.spark.utils.BaseUtil.{MD5hash, isWindows}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.graphx._
 import org.apache.spark.internal.Logging
@@ -12,7 +12,6 @@ import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
-import org.apache.spark.sql.functions.monotonically_increasing_id
 
 case class GraphX4Judicase(s: SparkSession,
                            project: String, //表所在工程名
@@ -24,50 +23,18 @@ case class GraphX4Judicase(s: SparkSession,
   justicase_ops()
 
   def calc(): Unit = {
-    //1.将id换成图所需的Long类型
-    val twoPart = BaseUtil.getLastPartion("winhc_eci_dev.dwd_judicial_case", spark)
-    sql(
-      s"""
-         |SELECT  id, case_no, tn
-         |FROM(
-         |  SELECT id, case_no, tn, ROW_NUMBER() OVER(PARTITION BY id, main_case_no, tn ORDER BY id) AS num
-         |  FROM    $project.dwd_judicial_case
-         |  WHERE   ${twoPart.map(m=>{s"ds = '${m._2}' AND tn = '${m._1}'"}).mkString("(", ") OR (", ")")}
-         |)
-         |WHERE num=1
-         |""".stripMargin)
-      .withColumn("id_Long",monotonically_increasing_id)
-      .createOrReplaceTempView("tmp_graphx_dwd_judicial_case")
-    //2.跑图计算
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph_pre
-         |SELECT  id_1, id_2, id_1_long, C.id_Long AS id_2_long, case_no_1, case_no_2, tn_1, tn_2
-         |FROM(
-         |  SELECT   id_1, id_2, B.id_Long AS id_1_long, case_no_1, case_no_2, tn_1, tn_2, connect_type
-         |          ,ROW_NUMBER() OVER(PARTITION BY id_1, id_2 ORDER BY id_1) AS num
-         |  FROM(
-         |      SELECT *
-         |      FROM $project.$tableName
-         |      WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
-         |   ) A
-         |  LEFT JOIN tmp_graphx_dwd_judicial_case B
-         |  ON A.id_1=B.id
-         |) AB
-         |LEFT JOIN tmp_graphx_dwd_judicial_case C
-         |ON AB.id_2=C.id
-         |WHERE num=1 AND id_1 IS NOT NULL AND C.id_Long IS NOT NULL
-         |""".stripMargin)
+    val srcAllCols = getColumns(s"$project.$tableName").filter(_ != "ds")
     val dfRelations = sql(
       s"""
-         |SELECT  id_1_long, id_2_long
-         |FROM    $project.ads_judicial_case_relation_graph_pre
+         |SELECT  *
+         |FROM    $project.$tableName
+         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
          |""".stripMargin)
-    val edgeRDD: RDD[Edge[Long]] = dfRelations /*.select(dfRelations.columns.map(column => col(column).cast("string")): _*) */.rdd.map(r => {
-      val case_no_from = r.getAs[Long]("id_2_long")
-      val case_no_to = r.getAs[Long]("id_1_long")
-      val from = case_no_from//.toLong
-      val to = case_no_to//.toLong
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val from = case_no_from.toLong
+      val to = case_no_to.toLong
       Edge(from, to)
     })
     // 根据边构造图
@@ -79,15 +46,15 @@ case class GraphX4Judicase(s: SparkSession,
       .map(r => (r._1, Set(r._2)))
       .reduceByKey(_ ++ _)
       .flatMap(r => {
-        val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
+        val judicase_id = MD5hash(r._2.toSeq.sorted.mkString(","))
         var mp: Map[Long, Map[String, String]] = Map()
         r._2.map(r => {
-          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
+          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id))
         })
         mp
       })
       .map(r => {
-      Row(r._1.toString, r._2("judicase_id"), "0")
+      Row(r._1.toString, r._2("judicase_id"), "1")
     })
     val schemaJust = StructType(Array(
       StructField("id", StringType),
@@ -96,7 +63,8 @@ case class GraphX4Judicase(s: SparkSession,
     ))
     //仅包含这3个字段的表在后面融入全量时再实例其他属性
     val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
-    //3.将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    cleanup()
     sql(
 /*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
          |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
@@ -114,10 +82,10 @@ case class GraphX4Judicase(s: SparkSession,
          |ON A.case_id=B.id
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
       s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph
-         |SELECT B.id AS id, judicase_id, case_no, B.tn AS flag
+         |SELECT B.id AS id, rowkey, IF(A.judicase_id IS NOT NULL, A.judicase_id, MD5(cleanup(CONCAT_WS('',B.id,B.tn)))) AS judicase_id, B.case_no, B.tn AS flag
          |FROM tmp_edgelets_$tableName A
-         |LEFT JOIN tmp_graphx_dwd_judicial_case B
-         |ON A.id=B.id_Long
+         |RIGHT JOIN $project.dwd_judicial_case_2 B
+         |ON A.id=B.id
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
   }
 }

+ 59 - 16
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -5,7 +5,9 @@ import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils, case_connect_utils}
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.monotonically_increasing_id
+import org.apache.spark.sql.types._
+import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -51,19 +53,58 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
   }
 
   private def etc_dwd_judicial_case(ds: String): Unit = {
-    sql(
+
+    val rdd = sql(
       s"""
-         |INSERT OVERWRITE TABLE winhc_eci_dev.dwd_judicial_case PARTITION(ds,tn)
-         |SELECT  DENSE_RANK() OVER(PARTITION BY md5(concat_ws('',id,tn)) ORDER BY id) AS id
-         |        ,main_case_no
+         |SELECT  main_case_no
          |        ,case_no
-         |        ,id AS rowkey
+         |        ,rowkey
          |        ,case_attribute
          |        ,ds
          |        ,tn
          |FROM    winhc_eci_dev.dwd_judicial_case
          |WHERE   ds = '$ds'
          |""".stripMargin)
+      .withColumn("id", monotonically_increasing_id)
+      .rdd.map(r =>
+      (s"${r.getAs[String]("rowkey")}_${r.getAs[String]("tn")}", r)
+    ).groupByKey().flatMap(r => {
+      val li = r._2
+      val id = li.last.getAs[Long]("id")
+      li.map(r => {
+        val main_case_no = r.getAs[Long]("main_case_no")
+        val case_no = r.getAs[String]("case_no")
+        val rowkey = r.getAs[String]("rowkey")
+        val case_attribute = r.getAs[Map[String, String]]("case_attribute")
+        val ds = r.getAs[String]("ds")
+        val tn = r.getAs[String]("tn")
+        Row(id, main_case_no, case_no, rowkey, case_attribute, ds, tn)
+      })
+    })
+    val schema = StructType(Array(
+      StructField("id", LongType),
+      StructField("main_case_no", LongType),
+      StructField("case_no", StringType),
+      StructField("rowkey", StringType),
+      StructField("case_attribute", MapType(StringType, StringType)),
+      StructField("ds", StringType),
+      StructField("tn", StringType)
+    ))
+
+    spark.createDataFrame(rdd, schema).createOrReplaceTempView("all_tmp_xjk")
+
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE winhc_eci_dev.dwd_judicial_case_2 PARTITION(ds='$ds')
+         |select id
+         |       ,main_case_no
+         |       ,case_no
+         |       ,rowkey
+         |       ,case_attribute
+         |       ,tn
+         |from   all_tmp_xjk
+         |""".stripMargin)
   }
 
   private def etl_company_zxf(ds: String): Unit = {
@@ -210,17 +251,17 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
-         |SELECT  id
+         |SELECT  0 as id
          |        , 1 as main_case_no
          |        ,case_no
-         |        ,0 as rowkey
+         |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |UNION ALL
-         |SELECT  id
+         |SELECT  0 as id
          |        , 0 as main_case_no
          |        ,connect_case_no as case_no
-         |        ,0 as rowkey
+         |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |WHERE   connect_case_no is not null
@@ -293,17 +334,17 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
-         |SELECT  $table_id as id
+         |SELECT  0 as id
          |        , 1 as main_case_no
          |        ,case_no
-         |        ,0 as rowkey
+         |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |UNION ALL
-         |SELECT  $table_id as id
+         |SELECT  0 as id
          |        , 0 as main_case_no
          |        ,single_connect_case_no as case_no
-         |        ,0 as rowkey
+         |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
@@ -312,7 +353,9 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 
 
   private def relationByGroup(): Unit = {
-    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+    val org_tab = s"$project.dwd_judicial_case_2"
+
+    val dwd_last_ds = getLastPartitionsOrElse(org_tab, "0")
     spark.udf.register("case_equ", case_equ _)
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
     spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
@@ -320,7 +363,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     sql(
       s"""
          | SELECT  *
-         | FROM    winhc_eci_dev.dwd_judicial_case
+         | FROM    $org_tab
          | WHERE   ds = '$dwd_last_ds'
          | AND     case_no IS NOT NULL
          | AND     case_no <> ''

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
+import java.security.MessageDigest
 import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
@@ -183,6 +184,11 @@ object BaseUtil {
     }
     return hash
   }
+  def MD5hash(content: String): String = {
+    val md5 = MessageDigest.getInstance("MD5")
+    val encoded = md5.digest((content).getBytes)
+    encoded.map("%02x".format(_)).mkString
+  }
 
   def nameJudge(name: String, yg_name: String, bg_name: String): String = {
     if (StringUtils.isNotBlank(name)) {