|
@@ -1,5 +1,6 @@
|
|
|
package com.winhc.bigdata.spark.ng.judicial
|
|
|
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
import com.winhc.bigdata.spark.implicits.BaseHelper._
|
|
|
import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
|
|
|
import com.winhc.bigdata.spark.udf.BaseFunc
|
|
@@ -40,6 +41,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
is_id_card_udf()
|
|
|
json_parse_udf()
|
|
|
spark.udf.register("case_equ", case_equ _)
|
|
|
+ spark.udf.register("case_no_split", case_no_split _)
|
|
|
spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
|
|
|
spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
|
|
|
|
|
@@ -245,7 +247,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
val inc_org_tab = s"winhc_ng.inc_ads_$tableName" + "_v9"
|
|
|
|
|
|
val table_id = "rowkey"
|
|
|
- val other_cols = Seq("plaintiff_info", "court_name", "case_no", "litigant_info", "defendant_info") ++ Seq(table_id, "ds", "connect_case_no")
|
|
|
+ val other_cols = Seq("plaintiff_info", "court_name", "case_no", "litigant_info", "defendant_info") ++ Seq(table_id, "ds", "connect_case_no","update_time")
|
|
|
|
|
|
val ods_end_ds = getLastPartitionsOrElse(org_tab, "0")
|
|
|
val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
|
|
@@ -257,7 +259,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
|SELECT *
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ,update_time DESC) AS num
|
|
|
| FROM (
|
|
|
| SELECT ${other_cols.mkString(",")}
|
|
|
| FROM $inc_org_tab
|
|
@@ -273,7 +275,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
|SELECT *
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ,update_time DESC) AS num
|
|
|
| FROM (
|
|
|
| SELECT ${other_cols.mkString(",")}
|
|
|
| FROM $org_tab
|
|
@@ -292,7 +294,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
sql(
|
|
|
s"""
|
|
|
|SELECT *
|
|
|
- |FROM $tmp_tab lateral view OUTER explode(split(connect_case_no,'\\n')) t as single_connect_case_no
|
|
|
+ |FROM $tmp_tab lateral view OUTER explode(case_no_split(connect_case_no)) t as single_connect_case_no
|
|
|
|""".stripMargin)
|
|
|
.createTempView(s"explode_$tmp_tab")
|
|
|
|
|
@@ -331,7 +333,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
|SELECT $view
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ,update_time DESC) AS num
|
|
|
| FROM (
|
|
|
| SELECT *
|
|
|
| FROM $org_inc_ads_tab
|
|
@@ -349,7 +351,7 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
|SELECT $view
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ,update_time DESC) AS num
|
|
|
| FROM (
|
|
|
| SELECT ${intersect_cols.mkString(",")}
|
|
|
| FROM $org_ads_tab
|
|
@@ -787,6 +789,19 @@ case class JudicialCaseRelationRowkeyRelation_v3(s: SparkSession,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ private def case_no_split(no: String): List[String] = {
|
|
|
+ var list: mutable.Seq[String] = mutable.Seq.empty
|
|
|
+ if (StringUtils.isBlank(no)) return list.toList
|
|
|
+ val array = JSON.parseArray(no)
|
|
|
+
|
|
|
+ for (i <- 0 until array.size()) {
|
|
|
+ val case_no = array.getString(i)
|
|
|
+ list = list :+ case_no
|
|
|
+ }
|
|
|
+ list.toList
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
object JudicialCaseRelationRowkeyRelation_v3 {
|