Ver código fonte

去重以便进图数据规模最小

晏永年 4 anos atrás
pai
commit
39963b3494

+ 25 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -29,32 +29,45 @@ case class GraphX4Judicase(s: SparkSession,
     sql(
       s"""
          |SELECT  id, case_no, tn
-         |FROM    $project.dwd_judicial_case
-         |WHERE   ${twoPart.map(m=>{s"ds = '${m._2}' AND tn = '${m._1}'"}).mkString("(", ") OR (", ")")}
+         |FROM(
+         |  SELECT id, case_no, tn, ROW_NUMBER() OVER(PARTITION BY id, main_case_no, tn ORDER BY id) AS num
+         |  FROM    $project.dwd_judicial_case
+         |  WHERE   ${twoPart.map(m=>{s"ds = '${m._2}' AND tn = '${m._1}'"}).mkString("(", ") OR (", ")")}
+         |)
+         |WHERE num=1
          |""".stripMargin)
       .withColumn("id_Long",monotonically_increasing_id)
       .createOrReplaceTempView("tmp_graphx_dwd_judicial_case")
     //2.跑图计算
-    val dfRelations = sql(
+    sql(
       s"""
-         |SELECT  id_1, C.id_Long AS id_2
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph_pre
+         |SELECT  id_1, id_2, id_1_long, C.id_Long AS id_2_long, case_no_1, case_no_2, tn_1, tn_2
          |FROM(
-         |  SELECT  B.id_Long AS id_1, id_2, case_no_1, case_no_2, tn_1, tn_2, connect_type
+         |  SELECT   id_1, id_2, B.id_Long AS id_1_long, case_no_1, case_no_2, tn_1, tn_2, connect_type
          |          ,ROW_NUMBER() OVER(PARTITION BY id_1, id_2 ORDER BY id_1) AS num
-         |  FROM    $project.$tableName A
+         |  FROM(
+         |      SELECT *
+         |      FROM $project.$tableName
+         |      WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |   ) A
          |  LEFT JOIN tmp_graphx_dwd_judicial_case B
          |  ON A.id_1=B.id
-         |  WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
          |) AB
          |LEFT JOIN tmp_graphx_dwd_judicial_case C
          |ON AB.id_2=C.id
          |WHERE num=1 AND id_1 IS NOT NULL AND C.id_Long IS NOT NULL
          |""".stripMargin)
-    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(dfRelations.columns.map(column => col(column).cast("string")): _*) .rdd.map(r => {
-      val case_no_from = r.getAs[String](fromCol)
-      val case_no_to = r.getAs[String](toCol)
-      val from = case_no_from.toLong
-      val to = case_no_to.toLong
+    val dfRelations = sql(
+      s"""
+         |SELECT  id_1_long, id_2_long
+         |FROM    $project.ads_judicial_case_relation_graph_pre
+         |""".stripMargin)
+    val edgeRDD: RDD[Edge[Long]] = dfRelations /*.select(dfRelations.columns.map(column => col(column).cast("string")): _*) */.rdd.map(r => {
+      val case_no_from = r.getAs[Long]("id_2_long")
+      val case_no_to = r.getAs[Long]("id_1_long")
+      val from = case_no_from//.toLong
+      val to = case_no_to//.toLong
       Edge(from, to)
     })
     // 根据边构造图