Explorar o código

司法案件中游图处理

晏永年 %!s(int64=4) %!d(string=hai) anos
pai
achega
5a66d5488f

+ 105 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -0,0 +1,105 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.graphx._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import org.apache.spark.ui.graphx.GraphXTab._
+
+case class GraphX4Judicase(s: SparkSession,
+                           project: String, //表所在工程名
+                           tableName: String, //表名(不加前后辍)
+                           fromCol: String, //边的起点列名
+                           toCol: String //边的终点列名
+                    ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  justicase_ops()
+
+  def calc(): Unit = {
+//    val allCols = getColumns(s"$project.ods_$tableName").filter(_ != "ds").toSeq
+//    val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
+//    val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
+    val srcAllCols = getColumns(s"$project.xjk_ads_judicial_case_relation1").filter(_ != "ds").toSeq
+    val desAllCols = getColumns(s"$project.ods_justicase").filter(_ != "ds").toSeq
+    val dfRelations = sql(
+      s"""
+         |SELECT  *
+         |FROM    $project.$tableName
+         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |""".stripMargin)
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val from = case_no_from.toLong
+      val to = case_no_to.toLong
+      Edge(from, to)
+    })
+    // 根据边构造图
+    val graph = Graph.fromEdges(edgeRDD, defaultValue = 0)
+
+    // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
+    val tripleRDD = graph.connectedComponents().vertices
+      .map(tp => (tp._2, tp._1)) //尝试N次明确必须这样交换,否则得到的不是极大连通子图
+      .map(r => (r._1, Set(r._2)))
+      .reduceByKey(_ ++ _)
+      .flatMap(r => {
+        val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
+        var mp: Map[Long, Map[String, String]] = Map()
+        r._2.map(r => {
+          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
+        })
+        mp
+      })
+      .map(r => {
+      Row(r._1.toString, r._2("judicase_id"), "1")
+    })
+    val schemaJust = StructType(Array(
+      StructField("id", StringType),
+      StructField("judicase_id", StringType),
+      StructField("flag", StringType)
+    ))
+    //仅包含这3个字段的表在后面融入全量时再实例其他属性
+    val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
+    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    sql(
+/*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
+         |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
+         |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
+         |,${desAllCols.mkString(",")}
+         |FROM(
+         |  SELECT  '0' AS flag, *
+         |  FROM    $project.ods_justicase
+         |  WHERE   ds='20200830'
+         |) A
+         |LEFT JOIN
+         |(
+         |  SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
+         |) B
+         |ON A.case_id=B.id
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
+      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.xjk_ads_judicial_case_relation1_tmp
+         |SELECT id, judicase_id, flag
+         |FROM tmp_edgelets_$tableName
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+  }
+}
+
+object GraphX4Judicase {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    attachSparkUITab(spark.sparkContext)
+    GraphX4Judicase(spark, "winhc_eci_dev", "xjk_ads_judicial_case_relation1", "id_2", "id_1").calc()
+  }
+}