浏览代码

司法案件的GraphX处理

晏永年 4 年之前
父节点
当前提交
8255285ff4

+ 109 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -0,0 +1,109 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.graphx._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import org.apache.spark.sql.types.{LongType, MapType, StringType, StructField, StructType}
+import com.winhc.bigdata.spark.udf.BaseFunc
+import org.apache.spark.sql.functions.col
+
+case class JustiCase(s: SparkSession,
+                     project: String, //表所在工程名
+                     tableName: String, //表名(不加前后辍)
+                     fromCol: String, //边的起点列名
+                     toCol: String //边的终点列名
+                    ) extends LoggingUtils with Logging  with BaseFunc{
+  @(transient@getter) val spark: SparkSession = s
+  justicase_ops()
+
+  def calc(): Unit = {
+    val allCols = getColumns(s"$project.ods_$tableName").toSet
+    val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
+    val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
+    val dfRelations = sql(
+      s"""
+         |SELECT  *
+         |FROM    $project.ods_$tableName
+         |WHERE   ds=ods_ds AND ${toCol} != NULL
+         |UNION
+         |SELECT  *
+         |FROM    $project.inc_ods_$tableName
+         |WHERE   ds=inc_ods_ds AND ${toCol} != NULL
+         |""".stripMargin)
+    val edgeRDD = dfRelations.select(allCols.map(column => col(column).cast("string")).toSeq: _*).rdd.flatMap(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_tos = r.getAs[String](toCol)
+      //      val allColsMap = allCols.map(f => (f, r.getAs[String](f))).toMap
+      val from = BKDRHash(case_no_from)
+      var edges: Set[Edge[String]] = Set[Edge[String]]()
+      for (each <- case_no_tos.split(",")) {
+        val to = BKDRHash(each)
+        edges += Edge(from, to, "1")
+      }
+      edges
+    })
+    // 根据边构造图
+    val graph: Graph[String, String] = Graph.fromEdges(edgeRDD, defaultValue = "")
+
+    // 获取连通分量
+    val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
+
+    // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
+    val tripleRDD = connetedGraph.triplets.map(t => (t.srcAttr, Set((t.dstId, t.attr))))
+      .reduceByKey(_ ++ _)
+      .flatMap(r => {
+        val ss = Set((r._1, "0")) ++ r._2
+        val justicase_id = BKDRHash(ss.map(_._1).toSeq.sorted.mkString(","))
+        var mp: Map[Long, Map[String, String]] = Map()
+        ss.map(r => {
+          mp ++ Map(r._1 -> Map("justicase_id"->justicase_id.toString))
+        })
+        mp
+      }).map(r=>{
+      Row(r._1,r._2("justicase_id"))
+    })
+    val schemaJust = StructType(Array(
+      StructField(toCol,StringType),
+      StructField("justicase_id",StringType)
+    ))
+    val dfJust = spark.createDataFrame(tripleRDD, schemaJust)
+    dfJust.join(dfRelations,"case_no")//有边的case_no补全信息
+      .union(sql(//孤立的case_no
+        s"""
+           |SELECT  get_justicase_id(CASE_NO) AS justicase_id, *
+           |FROM    $project.ods_$tableName
+           |WHERE   ds=ods_ds  AND ${toCol} == NULL
+           |UNION
+           |SELECT  *
+           |FROM    $project.inc_ods_$tableName
+           |WHERE   ds=inc_ods_ds AND ${toCol} != NULL
+           |""".stripMargin))//.show(100)
+      .createOrReplaceTempView(s"tmp_graphx_$tableName")
+
+    val ds ="20200802"
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_${tableName}_graphx PARTITION(ds='$inc_ods_ds')
+         |SELECT *
+         |FROM
+         |    tmp_graphx_$tableName
+         |""".stripMargin)
+  }
+}
+
+object JustiCase {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    JustiCase(spark, "winhc_eci_dev", "wenshu_detail", "CONNECT_CASE_NO", "CASE_NO").calc()
+  }
+}

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -9,6 +9,7 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import scala.annotation.meta.getter
+import com.winhc.bigdata.spark.utils.BaseUtil._
 
 /**
  * @Author: XuJiakai
@@ -140,4 +141,10 @@ trait BaseFunc {
       }
     })
   }
+  def justicase_ops() : Unit = {
+    spark.udf.register("get_justicase_id", (case_nos: String) => {
+      BKDRHash(case_nos.split(",").sorted.mkString(","))
+    })
+  }
+
 }

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -115,6 +115,15 @@ object BaseUtil {
       ""
     }
   }
+  def BKDRHash(str: String): Long = {
+    val seed: Long = 131 // 31 131 1313 13131 131313 etc..
+    var hash: Long = 0
+    for (i <- 0 to str.length - 1) {
+      hash = hash * seed + str.charAt(i)
+      hash = hash & 0x7FFFFFFF
+    }
+    return hash
+  }
 
   def main(args: Array[String]): Unit = {
     println(getYesterday())