Sfoglia il codice sorgente

有数据调试后的修改

晏永年 4 anni fa
parent
commit
e1c72271a8

+ 23 - 23
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -17,31 +17,31 @@ case class JustiCase(s: SparkSession,
                      tableName: String, //表名(不加前后辍)
                      fromCol: String, //边的起点列名
                      toCol: String //边的终点列名
-                    ) extends LoggingUtils with Logging  with BaseFunc{
+                    ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   justicase_ops()
 
   def calc(): Unit = {
-    val allCols = getColumns(s"$project.ods_$tableName").toSet
+    val allCols = getColumns(s"$project.ods_$tableName").filter(_ != "ds").toSeq
     val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
     val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
     val dfRelations = sql(
       s"""
-         |SELECT  *
+         |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.ods_$tableName
-         |WHERE   ds=${ods_ds} AND ${toCol} != NULL
+         |WHERE   ds=${ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
          |UNION
-         |SELECT  *
+         |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.inc_ods_$tableName
-         |WHERE   ds=${inc_ods_ds} AND ${toCol} != NULL
+         |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
          |""".stripMargin)
-    val edgeRDD = dfRelations.select(allCols.map(column => col(column).cast("string")).toSeq: _*).rdd.flatMap(r => {
+    val edgeRDD = dfRelations.select(allCols.map(column => col(column).cast("string")): _*).rdd.flatMap(r => {
       val case_no_from = r.getAs[String](fromCol)
       val case_no_tos = r.getAs[String](toCol)
       //      val allColsMap = allCols.map(f => (f, r.getAs[String](f))).toMap
       val from = BKDRHash(case_no_from)
       var edges: Set[Edge[String]] = Set[Edge[String]]()
-      for (each <- case_no_tos.split(",")) {
+      for (each <- case_no_tos.split("\n")) {
         val to = BKDRHash(each)
         edges += Edge(from, to, "1")
       }
@@ -61,35 +61,35 @@ case class JustiCase(s: SparkSession,
         val justicase_id = BKDRHash(ss.map(_._1).toSeq.sorted.mkString(","))
         var mp: Map[Long, Map[String, String]] = Map()
         ss.map(r => {
-          mp ++ Map(r._1 -> Map("justicase_id"->justicase_id.toString))
+          mp = mp ++ Map(r._1 -> Map("justicase_id" -> justicase_id.toString))
         })
         mp
-      }).map(r=>{
-      Row(r._1,r._2("justicase_id"))
+      }).map(r => {
+      Row(r._1.toString, r._2("justicase_id"))
     })
     val schemaJust = StructType(Array(
-      StructField(toCol,StringType),
-      StructField("justicase_id",StringType)
+      StructField("case_no_hash", StringType),
+      StructField("justicase_id", StringType)
     ))
     val dfJust = spark.createDataFrame(tripleRDD, schemaJust)
-    dfJust.join(dfRelations,"case_no")//有边的case_no补全信息
-      .union(sql(//孤立的case_no
+    dfJust.join(dfRelations, "case_no_hash") //有边的case_no补全信息
+      .drop("case_no_hash")
+      .union(sql( //孤立的case_no
         s"""
            |SELECT  get_justicase_id(CASE_NO) AS justicase_id, *
            |FROM    $project.ods_$tableName
-           |WHERE   ds=${ods_ds}  AND ${toCol} == NULL
+           |WHERE   ds=${ods_ds}  AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})<=1
            |UNION
-           |SELECT  *
+           |SELECT  get_justicase_id(CASE_NO) AS justicase_id, *
            |FROM    $project.inc_ods_$tableName
-           |WHERE   ds=${inc_ods_ds} AND ${toCol} == NULL
-           |""".stripMargin))//.show(100)
+           |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})<=1
+           |""".stripMargin))
       .createOrReplaceTempView(s"tmp_graphx_$tableName")
 
-    val ds ="20200802"
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_${tableName}_graphx PARTITION(ds='$inc_ods_ds')
-         |SELECT *
+         |SELECT case_no, justicase_id, ${allCols.filter(_ != "case_no").mkString(",")}
          |FROM
          |    tmp_graphx_$tableName
          |""".stripMargin)
@@ -100,10 +100,10 @@ object JustiCase {
   def main(args: Array[String]): Unit = {
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
-    JustiCase(spark, "winhc_eci_dev", "wenshu_detail", "CONNECT_CASE_NO", "CASE_NO").calc()
+    JustiCase(spark, "winhc_eci_dev", "justicase", "connect_case_no", "case_no").calc()
   }
 }

+ 0 - 68
src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala

@@ -1,68 +0,0 @@
-package com.winhc.bigdata.spark.test
-
-import org.apache.spark.graphx._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.{SparkConf, SparkContext}
-import org.json4s.jackson.JsonMethods.parse
-import org.json4s.jackson.Serialization
-import org.json4s.{JValue, ShortTypeHints}
-
-object Justicase {
-  implicit val formats = Serialization.formats(ShortTypeHints(List()))
-  def getJsonValue(parseJson: JValue, key: String): String = {
-    (parseJson \ key).extractOrElse[String]("0")//后面以Edge构造图,故孤立的顶点都连上编号为0的顶点以形成边
-  }
-  def main(args: Array[String]): Unit = {
-    val sparkConf = new SparkConf()
-    sparkConf.setAppName("JusticaseTest")
-    sparkConf.setMaster("local[*]")
-    val sparkContext = new SparkContext(sparkConf)
-
-    // 文书或公告
-    val case_relations: RDD[String] = sparkContext.parallelize(
-      Seq("""{"case_no":"3","from":"2"}"""
-        ,"""{"case_no":"2","from":"1"}"""
-        ,"""{"case_no":"4","from":"3"}"""
-
-        ,"""{"case_no":"8","from":"7"}"""
-
-        ,"""{"case_no":"14","from":"13"}"""
-        ,"""{"case_no":"13","from":"12"}"""
-
-        ,"""{"case_no":"19"}"""
-
-        ,"""{"case_no":"28","from":"27"}"""
-        ,"""{"case_no":"27","from":"26"}"""
-        ,"""{"case_no":"26","from":"25"}"""
-        ,"""{"case_no":"29","from":"28"}"""
-        ,"""{"case_no":"30","from":"29"}"""
-        ,"""{"case_no":"31","from":"30"}"""
-        ,"""{"case_no":"55","from":"30"}"""
-      )
-    )
-
-    // 构造GraphX中Edge对象
-    val edges = case_relations.map(relation => {
-      val jv = parse(relation)
-
-      val from = getJsonValue(jv, "from").toLong
-      val to = getJsonValue(jv, "case_no").toLong
-      Edge(from, to, relation)
-    })
-
-    // 根据边构造图
-    val graph: Graph[String, String] = Graph.fromEdges(edges, defaultValue = "")
-
-    // 获取连通分量
-    val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
-
-    // 将同一连通分量中各个边聚合
-    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcAttr, Set(t.dstId)))
-      .reduceByKey(_ ++ _)
-
-    //逐一输出所有极大连通子图结果:起点Vertex,与其连通的Vertex集合。Vertex可以是案号形成的Long
-    tripleGroup.foreach(i=>{
-      println(i._1 + ": " + i._2.mkString(","))
-    })
-  }
-}