|
@@ -8,9 +8,8 @@ import org.apache.spark.sql.{Row, SparkSession}
|
|
|
import scala.annotation.meta.getter
|
|
|
import scala.collection.mutable
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
|
|
|
-import org.apache.spark.sql.types.{LongType, MapType, StringType, StructField, StructType}
|
|
|
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|
|
import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
-import org.apache.spark.sql.functions.col
|
|
|
|
|
|
case class JustiCase(s: SparkSession,
|
|
|
project: String, //表所在工程名
|
|
@@ -35,7 +34,20 @@ case class JustiCase(s: SparkSession,
|
|
|
|FROM $project.inc_ods_$tableName
|
|
|
|WHERE ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
|
|
|
|""".stripMargin)
|
|
|
- val edgeRDD = dfRelations.select(allCols.map(column => col(column).cast("string")): _*).rdd.flatMap(r => {
|
|
|
+ /*
|
|
|
+ s"""SELECT *,get_justicase_id(case_no) AS case_no_hash FROM (
|
|
|
+ |SELECT '(2016)赣04民终1687号' AS case_no, '(2016)赣0429民初650号' AS connect_case_no
|
|
|
+ |UNION
|
|
|
+ |SELECT '(2020)鄂0606民初2872号' AS case_no, '(2020)冀0984执486号' AS connect_case_no
|
|
|
+ |UNION
|
|
|
+ |SELECT '(2020)赣0302财保10号' AS case_no, '(2020)冀0984执486号' AS connect_case_no
|
|
|
+ |UNION
|
|
|
+ |SELECT '(2017)粤0608民初2531号' AS case_no, '(2017)粤0608执1658号' AS connect_case_no
|
|
|
+ |UNION
|
|
|
+ |SELECT '(2016)赣0429民初650号' AS case_no, '(2020)赣0302财保10号' AS connect_case_no)
|
|
|
+ |""".stripMargin)
|
|
|
+ */
|
|
|
+ val edgeRDD = dfRelations /*.select(allCols.map(column => col(column).cast("string")): _*)*/ .rdd.flatMap(r => {
|
|
|
val case_no_from = r.getAs[String](fromCol)
|
|
|
val case_no_tos = r.getAs[String](toCol)
|
|
|
// val allColsMap = allCols.map(f => (f, r.getAs[String](f))).toMap
|
|
@@ -43,56 +55,56 @@ case class JustiCase(s: SparkSession,
|
|
|
var edges: Set[Edge[String]] = Set[Edge[String]]()
|
|
|
for (each <- case_no_tos.split("\n")) {
|
|
|
val to = BKDRHash(each)
|
|
|
- edges += Edge(from, to, "1")
|
|
|
+ edges += Edge(from, to)
|
|
|
}
|
|
|
edges
|
|
|
})
|
|
|
// 根据边构造图
|
|
|
- val graph: Graph[String, String] = Graph.fromEdges(edgeRDD, defaultValue = "")
|
|
|
-
|
|
|
- // 获取连通分量
|
|
|
- val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
|
|
|
+ val graph = Graph.fromEdges(edgeRDD, defaultValue = "")
|
|
|
|
|
|
// 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
|
|
|
- val tripleRDD = connetedGraph.triplets.map(t => (t.srcAttr, Set((t.dstId, t.attr))))
|
|
|
+ val tripleRDD = graph.connectedComponents().vertices
|
|
|
+ .map(tp => (tp._2, tp._1)) //尝试N次明确必须这样交换,否则得到的不是极大连通子图
|
|
|
+ .map(r => (r._1, Set(r._2)))
|
|
|
.reduceByKey(_ ++ _)
|
|
|
.flatMap(r => {
|
|
|
- val ss = Set((r._1, "0")) ++ r._2
|
|
|
- val justicase_id = BKDRHash(ss.map(_._1).toSeq.sorted.mkString(","))
|
|
|
+ val justicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
|
|
|
var mp: Map[Long, Map[String, String]] = Map()
|
|
|
- ss.map(r => {
|
|
|
- mp = mp ++ Map(r._1 -> Map("justicase_id" -> justicase_id.toString))
|
|
|
+ r._2.map(r => {
|
|
|
+ mp = mp ++ Map(r -> Map("justicase_id" -> justicase_id.toString))
|
|
|
})
|
|
|
mp
|
|
|
}).map(r => {
|
|
|
- Row(r._1.toString, r._2("justicase_id"))
|
|
|
+ Row(r._1.toString, r._2("justicase_id"), "1")
|
|
|
})
|
|
|
val schemaJust = StructType(Array(
|
|
|
StructField("case_no_hash", StringType),
|
|
|
- StructField("justicase_id", StringType)
|
|
|
+ StructField("justicase_id", StringType),
|
|
|
+ StructField("flag", StringType)
|
|
|
))
|
|
|
- val dfJust = spark.createDataFrame(tripleRDD, schemaJust)
|
|
|
- dfJust.join(dfRelations, "case_no_hash") //有边的case_no补全信息
|
|
|
- .drop("case_no_hash")
|
|
|
- .union(sql( //孤立的case_no
|
|
|
- s"""
|
|
|
- |SELECT get_justicase_id(CASE_NO) AS justicase_id, *
|
|
|
- |FROM $project.ods_$tableName
|
|
|
- |WHERE ds=${ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})<=1
|
|
|
- |UNION
|
|
|
- |SELECT get_justicase_id(CASE_NO) AS justicase_id, *
|
|
|
- |FROM $project.inc_ods_$tableName
|
|
|
- |WHERE ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})<=1
|
|
|
- |""".stripMargin))
|
|
|
- .createOrReplaceTempView(s"tmp_graphx_$tableName")
|
|
|
-
|
|
|
+ //仅包含这3个字段的表在后面融入全量时再实例其他属性
|
|
|
+ val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
|
|
|
+ //将图结果融入全量数据中,case_no对应的司法案件号以图为准
|
|
|
sql(
|
|
|
- s"""
|
|
|
- |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_${tableName}_graphx PARTITION(ds='$inc_ods_ds')
|
|
|
- |SELECT case_no, justicase_id, ${allCols.filter(_ != "case_no").mkString(",")}
|
|
|
- |FROM
|
|
|
- | tmp_graphx_$tableName
|
|
|
- |""".stripMargin)
|
|
|
+ s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_${tableName}_graphx PARTITION(ds='$inc_ods_ds')
|
|
|
+ |SELECT IF(B.case_no_hash IS NOT NULL,B.justicase_id,A.case_no_hash) AS justicase_id
|
|
|
+ |,IF(B.case_no_hash IS NOT NULL,B.flag,A.flag) AS flag
|
|
|
+ |,${allCols.mkString(",")}
|
|
|
+ |FROM(
|
|
|
+ | SELECT get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
|
|
|
+ | FROM $project.ods_$tableName
|
|
|
+ | WHERE ds=${ods_ds} AND ${toCol} IS NOT NULL
|
|
|
+ | UNION
|
|
|
+ | SELECT get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
|
|
|
+ | FROM $project.inc_ods_$tableName
|
|
|
+ | WHERE ds=${inc_ods_ds} AND ${toCol} IS NOT NULL
|
|
|
+ |) A
|
|
|
+ |LEFT JOIN
|
|
|
+ |(
|
|
|
+ | SELECT case_no_hash, justicase_id , flag FROM tmp_edgelets_$tableName
|
|
|
+ |) B
|
|
|
+ |ON A.case_no_hash=B.case_no_hash
|
|
|
+ |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
|
|
|
}
|
|
|
}
|
|
|
|