|
@@ -0,0 +1,105 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.graphx._
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.rdd.RDD
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+import org.apache.spark.ui.graphx.GraphXTab._
|
|
|
+
|
|
|
+case class GraphX4Judicase(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ tableName: String, //表名(不加前后辍)
|
|
|
+ fromCol: String, //边的起点列名
|
|
|
+ toCol: String //边的终点列名
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+ justicase_ops()
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+// val allCols = getColumns(s"$project.ods_$tableName").filter(_ != "ds").toSeq
|
|
|
+// val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
|
|
|
+// val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
|
|
|
+ val srcAllCols = getColumns(s"$project.xjk_ads_judicial_case_relation1").filter(_ != "ds").toSeq
|
|
|
+ val desAllCols = getColumns(s"$project.ods_justicase").filter(_ != "ds").toSeq
|
|
|
+ val dfRelations = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM $project.$tableName
|
|
|
+ |WHERE ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
|
|
|
+ val case_no_from = r.getAs[String](fromCol)
|
|
|
+ val case_no_to = r.getAs[String](toCol)
|
|
|
+ val from = case_no_from.toLong
|
|
|
+ val to = case_no_to.toLong
|
|
|
+ Edge(from, to)
|
|
|
+ })
|
|
|
+ // 根据边构造图
|
|
|
+ val graph = Graph.fromEdges(edgeRDD, defaultValue = 0)
|
|
|
+
|
|
|
+ // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
|
|
|
+ val tripleRDD = graph.connectedComponents().vertices
|
|
|
+ .map(tp => (tp._2, tp._1)) //尝试N次明确必须这样交换,否则得到的不是极大连通子图
|
|
|
+ .map(r => (r._1, Set(r._2)))
|
|
|
+ .reduceByKey(_ ++ _)
|
|
|
+ .flatMap(r => {
|
|
|
+ val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
|
|
|
+ var mp: Map[Long, Map[String, String]] = Map()
|
|
|
+ r._2.map(r => {
|
|
|
+ mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
|
|
|
+ })
|
|
|
+ mp
|
|
|
+ })
|
|
|
+ .map(r => {
|
|
|
+ Row(r._1.toString, r._2("judicase_id"), "1")
|
|
|
+ })
|
|
|
+ val schemaJust = StructType(Array(
|
|
|
+ StructField("id", StringType),
|
|
|
+ StructField("judicase_id", StringType),
|
|
|
+ StructField("flag", StringType)
|
|
|
+ ))
|
|
|
+ //仅包含这3个字段的表在后面融入全量时再实例其他属性
|
|
|
+ val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
|
|
|
+ //将图结果融入全量数据中,case_no对应的司法案件号以图为准
|
|
|
+ sql(
|
|
|
+/* s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
|
|
|
+ |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
|
|
|
+ |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
|
|
|
+ |,${desAllCols.mkString(",")}
|
|
|
+ |FROM(
|
|
|
+ | SELECT '0' AS flag, *
|
|
|
+ | FROM $project.ods_justicase
|
|
|
+ | WHERE ds='20200830'
|
|
|
+ |) A
|
|
|
+ |LEFT JOIN
|
|
|
+ |(
|
|
|
+ | SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
|
|
|
+ |) B
|
|
|
+ |ON A.case_id=B.id
|
|
|
+ |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
|
|
|
+ s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.xjk_ads_judicial_case_relation1_tmp
|
|
|
+ |SELECT id, judicase_id, flag
|
|
|
+ |FROM tmp_edgelets_$tableName
|
|
|
+ |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+object GraphX4Judicase {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ spark.sparkContext.setLogLevel("All")
|
|
|
+ GraphX4Judicase(spark, "winhc_eci_dev", "xjk_ads_judicial_case_relation1", "id_2", "id_1").calc()
|
|
|
+ }
|
|
|
+}
|