晏永年 4 rokov pred
rodič
commit
4bf74fcb19

+ 54 - 26
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -9,6 +9,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.storage.StorageLevel
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -18,11 +19,12 @@ case class GraphX4Judicase(s: SparkSession,
                            tableName: String, //表名(不加前后辍)
                            fromCol: String, //边的起点列名
                            toCol: String //边的终点列名
-                    ) extends LoggingUtils with Logging with BaseFunc {
+                          ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   justicase_ops()
 
   def calc(): Unit = {
+    spark.sparkContext.setCheckpointDir("graph")
     val srcAllCols = getColumns(s"$project.$tableName").filter(_ != "ds")
     val dfRelations = sql(
       s"""
@@ -30,15 +32,15 @@ case class GraphX4Judicase(s: SparkSession,
          |FROM    $project.$tableName
          |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
          |""".stripMargin)
-    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+    val edgeRDD: RDD[Edge[Long]] = dfRelations.select(srcAllCols.map(column => col(column).cast("string")): _*).rdd.map(r => {
       val case_no_from = r.getAs[String](fromCol)
       val case_no_to = r.getAs[String](toCol)
       val from = case_no_from.toLong
       val to = case_no_to.toLong
       Edge(from, to)
-    })
+    }).repartition(40).asInstanceOf[RDD[Edge[Long]]]
     // 根据边构造图
-    val graph = Graph.fromEdges(edgeRDD, defaultValue = 0)
+    val graph = Graph.fromEdges(edgeRDD, defaultValue = 0, StorageLevel.MEMORY_AND_DISK, StorageLevel.MEMORY_AND_DISK)
 
     // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
     val tripleRDD = graph.connectedComponents().vertices
@@ -54,8 +56,8 @@ case class GraphX4Judicase(s: SparkSession,
         mp
       })
       .map(r => {
-      Row(r._1.toString, r._2("judicase_id"), "1")
-    })
+        Row(r._1.toString, r._2("judicase_id"), "1")
+      })
     val schemaJust = StructType(Array(
       StructField("id", StringType),
       StructField("judicase_id", StringType),
@@ -65,28 +67,54 @@ case class GraphX4Judicase(s: SparkSession,
     val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
     //将图结果融入全量数据中,case_no对应的司法案件号以图为准
     cleanup()
+    val ds = BaseUtil.getPartion(s"$project.dwd_judicial_case_2", spark)
     sql(
-/*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
-         |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
-         |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
-         |,${desAllCols.mkString(",")}
-         |FROM(
-         |  SELECT  '0' AS flag, *
-         |  FROM    $project.ods_justicase
-         |  WHERE   ds='20200830'
-         |) A
-         |LEFT JOIN
-         |(
-         |  SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
-         |) B
-         |ON A.case_id=B.id
-         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
-      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph
-         |SELECT B.id AS id, rowkey, IF(A.judicase_id IS NOT NULL, A.judicase_id, MD5(cleanup(CONCAT_WS('',B.id,B.tn)))) AS judicase_id, B.case_no, B.tn AS flag
+      /*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
+               |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
+               |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
+               |,${desAllCols.mkString(",")}
+               |FROM(
+               |  SELECT  '0' AS flag, *
+               |  FROM    $project.ods_justicase
+               |  WHERE   ds='20200830'
+               |) A
+               |LEFT JOIN
+               |(
+               |  SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
+               |) B
+               |ON A.case_id=B.id
+               |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
+      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph_pre
+         |SELECT A.id AS id, rowkey, A.judicase_id, B.case_no, B.tn AS flag
          |FROM tmp_edgelets_$tableName A
-         |RIGHT JOIN $project.dwd_judicial_case_2 B
+         |LEFT JOIN (
+         |  SELECT *
+         |  FROM $project.dwd_judicial_case_2
+         |  WHERE ds='$ds'
+         |) B
          |ON A.id=B.id
-         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+         |""".stripMargin)
+    //将与图中数据通过case_no关联的外部数据统一一个司法案件号
+    sql(
+      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph
+         |SELECT id, rowkey, judicase_id, case_no, flag
+         |FROM(
+         |  SELECT B.id AS id
+         |        ,B.rowkey
+         |        ,IF(A.judicase_id IS NOT NULL,A.judicase_id,MD5(B.id)) AS judicase_id
+         |        ,B.case_no
+         |        ,B.tn AS flag
+         |        ,ROW_NUMBER() OVER (PARTITION BY B.id,B.tn ORDER BY B.id) num
+         |  FROM $project.ads_judicial_case_relation_graph_pre A
+         |  RIGHT JOIN(
+         |    SELECT *
+         |    FROM $project.dwd_judicial_case_2
+         |    WHERE ds='$ds'
+         |  ) B
+         |  ON A.id=B.id
+         |)
+         |WHERE num=1
+         |""".stripMargin) //.createOrReplaceTempView(s"tmp_graphx_$tableName")
   }
 }
 
@@ -97,7 +125,7 @@ object GraphX4Judicase {
       "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    spark.sparkContext.setLogLevel("Warn")
+//    spark.sparkContext.setLogLevel("ERROR")
     GraphX4Judicase(spark, "winhc_eci_dev", "ads_judicial_case_relation", "id_2", "id_1").calc()
   }
 }