Przeglądaj źródła

增加图的预、后处理

晏永年 4 lat temu
rodzic
commit
a9381faf19

+ 31 - 16
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -12,6 +12,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
+import org.apache.spark.sql.functions.monotonically_increasing_id
 
 case class GraphX4Judicase(s: SparkSession,
                            project: String, //表所在工程名
@@ -23,14 +24,33 @@ case class GraphX4Judicase(s: SparkSession,
   justicase_ops()
 
   def calc(): Unit = {
-    val srcAllCols = getColumns(s"$project.$tableName").filter(_ != "ds").toSeq
+    //1.将id换成图所需的Long类型
+    val twoPart = BaseUtil.getLastPartion("winhc_eci_dev.dwd_judicial_case", spark)
+    sql(
+      s"""
+         |SELECT  id, case_no, tn
+         |FROM    $project.dwd_judicial_case
+         |WHERE   ${twoPart.map(m=>{s"ds = '${m._1}' AND tn = '${m._2}'"}).mkString("(", ") OR (", ")")}
+         |""".stripMargin)
+      .withColumn("id_Long",monotonically_increasing_id)
+      .createOrReplaceTempView("tmp_graphx_dwd_judicial_case")
+    //2.跑图计算
     val dfRelations = sql(
       s"""
-         |SELECT  *
-         |FROM    $project.$tableName
-         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |SELECT  id_1, C.id_Long AS id_2
+         |FROM(
+         |  SELECT  B.id_Long AS id_1, id_2, case_no_1, case_no_2, tn_1, tn_2, connect_type
+         |          ,ROW_NUMBER() OVER(PARTITION BY id_1, id_2 ORDER BY id_1) AS num
+         |  FROM    $project.$tableName A
+         |  LEFT JOIN tmp_graphx_dwd_judicial_case B
+         |  ON A.id_1=B.id
+         |  WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |) AB
+         |LEFT JOIN tmp_graphx_dwd_judicial_case C
+         |ON AB.id_2=C.id
+         |WHERE num=1
          |""".stripMargin)
-    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(dfRelations.columns.map(column => col(column).cast("string")): _*) .rdd.map(r => {
       val case_no_from = r.getAs[String](fromCol)
       val case_no_to = r.getAs[String](toCol)
       val from = case_no_from.toLong
@@ -54,14 +74,7 @@ case class GraphX4Judicase(s: SparkSession,
         mp
       })
       .map(r => {
-/*
-        ads_judicial_case_relation的id_1、id_2字段现有数据尾部追加2位数字标志:
-        01:文书;
-        02:被执;
-        03:失信。
-        ads_judicial_case_relation_graph表的flag字段与这2位数字对应
-*/
-      Row(r._1.toString, r._2("judicase_id"), r._1.toString.takeRight(2))
+      Row(r._1.toString, r._2("judicase_id"), "0")
     })
     val schemaJust = StructType(Array(
       StructField("id", StringType),
@@ -70,7 +83,7 @@ case class GraphX4Judicase(s: SparkSession,
     ))
     //仅包含这3个字段的表在后面融入全量时再实例其他属性
     val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
-    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    //3.将图结果融入全量数据中,case_no对应的司法案件号以图为准
     sql(
 /*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
          |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
@@ -88,8 +101,10 @@ case class GraphX4Judicase(s: SparkSession,
          |ON A.case_id=B.id
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
       s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.ads_judicial_case_relation_graph
-         |SELECT id, judicase_id, flag
-         |FROM tmp_edgelets_$tableName
+         |SELECT B.id AS id, judicase_id, B.tn AS flag
+         |FROM tmp_edgelets_$tableName A
+         |LEFT JOIN tmp_graphx_dwd_judicial_case B
+         |ON A.id=B.id_Long
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
   }
 }

+ 28 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -75,6 +75,34 @@ object BaseUtil {
     }
   }
 
+  def getLastPartion(t: String, @transient spark: SparkSession): Array[(String, String)] = {
+    import spark._
+    val sql_s = s"show partitions " + t
+    val ps = sql(sql_s).collect.toList.map(r => {
+      val tmp = r.getString(0).split("/")
+      val Array(x, y) = {
+        if (tmp.length == 2) {
+          tmp
+        }
+        else {
+          Array(tmp(0), "")
+        }
+      }
+      val Array(_, a2) = x.split("=")
+      val Array(_, b2) = if (y != "") y.split("=") else Array("", "")
+      var r1 = ("", "")
+      if (StringUtils.isNumeric(a2)) {
+        r1 = (b2, a2) //维度名在前、日期在后
+      } else {
+        r1 = (a2, b2)
+      }
+      r1
+    }).groupBy(_._1)
+      .map(m => (m._2.maxBy(n => n._2)._2, m._1))
+      .filter(_._2 != "")
+    ps.toArray
+  }
+
   def nowDate(pattern: String = "yyyy-MM-dd"): String = {
     new SimpleDateFormat(pattern).format(new Date)
   }