Ver código fonte

Spark GraphX的案件串联Demo

晏永年 4 anos atrás
pai
commit
43f0d0bb04

+ 6 - 0
pom.xml

@@ -307,6 +307,12 @@
             <artifactId>fastjson</artifactId>
             <version>1.2.72</version>
         </dependency>
+        <!--图处理框架-->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-graphx_${scala.binary.version}</artifactId>
+            <version>2.4.6</version>
+        </dependency>
 
     </dependencies>
 

+ 68 - 0
src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala

@@ -0,0 +1,68 @@
+package com.winhc.bigdata.spark.test
+
+import org.apache.spark.graphx._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.{SparkConf, SparkContext}
+import org.json4s.jackson.JsonMethods.parse
+import org.json4s.jackson.Serialization
+import org.json4s.{JValue, ShortTypeHints}
+
+object Justicase {
+  implicit val formats = Serialization.formats(ShortTypeHints(List()))
+  def getJsonValue(parseJson: JValue, key: String): String = {
+    (parseJson \ key).extractOrElse[String]("0")//后面以Edge构造图,故孤立的顶点都连上编号为0的顶点以形成边
+  }
+  def main(args: Array[String]): Unit = {
+    val sparkConf = new SparkConf()
+    sparkConf.setAppName("JusticaseTest")
+    sparkConf.setMaster("local[*]")
+    val sparkContext = new SparkContext(sparkConf)
+
+    // 文书或公告
+    val case_relations: RDD[String] = sparkContext.parallelize(
+      Seq("""{"case_no":"3","from":"2"}"""
+        ,"""{"case_no":"2","from":"1"}"""
+        ,"""{"case_no":"4","from":"3"}"""
+
+        ,"""{"case_no":"8","from":"7"}"""
+
+        ,"""{"case_no":"14","from":"13"}"""
+        ,"""{"case_no":"13","from":"12"}"""
+
+        ,"""{"case_no":"19"}"""
+
+        ,"""{"case_no":"28","from":"27"}"""
+        ,"""{"case_no":"27","from":"26"}"""
+        ,"""{"case_no":"26","from":"25"}"""
+        ,"""{"case_no":"29","from":"28"}"""
+        ,"""{"case_no":"30","from":"29"}"""
+        ,"""{"case_no":"31","from":"30"}"""
+        ,"""{"case_no":"55","from":"30"}"""
+      )
+    )
+
+    // 构造GraphX中Edge对象
+    val edges = case_relations.map(relation => {
+      val jv = parse(relation)
+
+      val from = getJsonValue(jv, "from").toLong
+      val to = getJsonValue(jv, "case_no").toLong
+      Edge(from, to, relation)
+    })
+
+    // 根据边构造图
+    val graph: Graph[String, String] = Graph.fromEdges(edges, defaultValue = "")
+
+    // 获取连通分量
+    val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
+
+    // 将同一连通分量中各个边聚合
+    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcAttr, Set(t.dstId)))
+      .reduceByKey(_ ++ _)
+
+    //逐一输出所有极大连通子图结果:起点Vertex,与其连通的Vertex集合。Vertex可以是案号形成的Long
+    tripleGroup.foreach(i=>{
+      println(i._1 + ": " + i._2.mkString(","))
+    })
+  }
+}