晏永年 4 years ago
parent
commit
e72a261f17
1 changed files with 11 additions and 17 deletions
  1. 11 17
      src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

+ 11 - 17
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -9,6 +9,7 @@ import scala.annotation.meta.getter
 import scala.collection.mutable
 import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
 import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.functions.col
 import com.winhc.bigdata.spark.udf.BaseFunc
 
 case class JustiCase(s: SparkSession,
@@ -34,27 +35,20 @@ case class JustiCase(s: SparkSession,
          |FROM    $project.inc_ods_$tableName
          |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
          |""".stripMargin)
-    /*
+/*
           s"""SELECT *,get_justicase_id(case_no) AS case_no_hash FROM (
-             |SELECT '(2016)赣04民终1687号' AS case_no, '(2016)赣0429民初650号' AS connect_case_no
+             |SELECT '(2016)闽02刑更704号' AS case_no, '(2008)厦刑初字第69号\n(2009)闽刑终字第133号\n(2012)厦刑执字第628号\n(2015)厦刑执字第485号' AS connect_case_no
              |UNION
-             |SELECT '(2020)鄂0606民初2872号' AS case_no, '(2020)冀0984执486号' AS connect_case_no
-             |UNION
-             |SELECT '(2020)赣0302财保10号' AS case_no, '(2020)冀0984执486号' AS connect_case_no
-             |UNION
-             |SELECT '(2017)粤0608民初2531号' AS case_no, '(2017)粤0608执1658号' AS connect_case_no
-             |UNION
-             |SELECT '(2016)赣0429民初650号' AS case_no, '(2020)赣0302财保10号' AS connect_case_no)
+             |SELECT '(2015)厦刑执字第485号' AS case_no, '(2008)厦刑初字第69号\n(2009)闽刑终字第133号\n(2012)厦刑执字第628号' AS connect_case_no)
              |""".stripMargin)
-    */
-    val edgeRDD = dfRelations /*.select(allCols.map(column => col(column).cast("string")): _*)*/ .rdd.flatMap(r => {
-      val case_no_from = r.getAs[String](fromCol)
-      val case_no_tos = r.getAs[String](toCol)
-      //      val allColsMap = allCols.map(f => (f, r.getAs[String](f))).toMap
-      val from = BKDRHash(case_no_from)
+*/
+    val edgeRDD = dfRelations .select(allCols.map(column => col(column).cast("string")): _*) .rdd.flatMap(r => {
+      val case_no_froms = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val to = BKDRHash(case_no_to)
       var edges: Set[Edge[String]] = Set[Edge[String]]()
-      for (each <- case_no_tos.split("\n")) {
-        val to = BKDRHash(each)
+      for (each <- case_no_froms.split("\n")) {
+        val from = BKDRHash(each)
         edges += Edge(from, to)
       }
       edges