|
@@ -1,6 +1,6 @@
|
|
|
package com.winhc.bigdata.spark.ng.judicial
|
|
|
|
|
|
-import com.winhc.bigdata.spark.udf.{BaseFunc, CaseAmtAggs, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs, NameAggsPlus}
|
|
|
+import com.winhc.bigdata.spark.udf.{AllNamePlus, AllNamePlusV2, BaseFunc, CaseAmtAggs, CaseAmtAggsPlus, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs, NameAggsPlus}
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
@@ -188,8 +188,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
|
|
|
//主表
|
|
|
val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1"
|
|
|
- //明细表
|
|
|
- val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
|
|
|
+// //明细表
|
|
|
+// val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
|
|
|
+ //明细表(增强)
|
|
|
+ val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3"
|
|
|
//案件移除表
|
|
|
val ads_judicial_case_id_mapping_r1_deleted = s" $project.ads_judicial_case_id_mapping_r1_deleted"
|
|
|
//案件移除表
|
|
@@ -279,12 +281,12 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
| FROM (
|
|
|
| SELECT rowkey_1 AS start_id,tn_1 AS tn
|
|
|
| FROM $bds_judicial_case_relation
|
|
|
- | WHERE ds = '20210604'
|
|
|
+ | WHERE ds = '$calc_ds'
|
|
|
| AND rowkey_1 IS NOT NULL AND tn_1 IS NOT NULL
|
|
|
| UNION ALL
|
|
|
| SELECT rowkey_2 AS start_id,tn_2 AS tn
|
|
|
| FROM $bds_judicial_case_relation
|
|
|
- | WHERE ds = '20210604'
|
|
|
+ | WHERE ds = '$calc_ds'
|
|
|
| AND rowkey_2 IS NOT NULL AND tn_2 IS NOT NULL
|
|
|
| )
|
|
|
| )
|
|
@@ -293,21 +295,26 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
|${if (isWindows) "LIMIT 1000" else ""}
|
|
|
|""".stripMargin).show(1000,false)
|
|
|
|
|
|
+ //todo 去重
|
|
|
sql(
|
|
|
s"""
|
|
|
|INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds')
|
|
|
- |SELECT *
|
|
|
- | ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
|
|
|
- |FROM (
|
|
|
- | SELECT concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
|
|
|
- | ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
|
|
|
- | ,"400" AS topic_type
|
|
|
- | ,connect_type
|
|
|
- | FROM $bds_judicial_case_relation
|
|
|
- | WHERE ds = '20210604'
|
|
|
- | AND rowkey_1 IS NOT NULL AND rowkey_2 IS NOT NULL
|
|
|
- | AND tn_1 IS NOT NULL AND tn_2 IS NOT NULL
|
|
|
- | )
|
|
|
+ |SELECT start_id, end_id, topic_type, connect_type, relation_json
|
|
|
+ |FROM (
|
|
|
+ | SELECT start_id, end_id, topic_type, connect_type
|
|
|
+ | ,to_json(MAP('start_id',start_id,'end_id',end_id,"topic_type",topic_type,"connect_type",connect_type)) relation_json
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION by combine_id(start_id,end_id) ORDER by start_id desc) num
|
|
|
+ | FROM (
|
|
|
+ | SELECT concat_ws('_',rowkey_1, tn_flag(tn_1)) start_id
|
|
|
+ | ,concat_ws('_',rowkey_2, tn_flag(tn_2)) end_id
|
|
|
+ | ,"400" AS topic_type
|
|
|
+ | ,connect_type
|
|
|
+ | FROM $bds_judicial_case_relation
|
|
|
+ | WHERE ds = '$calc_ds'
|
|
|
+ | AND rowkey_1 IS NOT NULL AND rowkey_2 IS NOT NULL
|
|
|
+ | AND tn_1 IS NOT NULL AND tn_2 IS NOT NULL
|
|
|
+ | )
|
|
|
+ |) WHERE num = 1
|
|
|
|${if (isWindows) "LIMIT 1000" else ""}
|
|
|
|""".stripMargin).show(1000,false)
|
|
|
|
|
@@ -374,13 +381,18 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
| FROM (
|
|
|
| SELECT id, judicase_id
|
|
|
| ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY ds DESC) num
|
|
|
- | FROM $ads_judicial_case_relation_r2
|
|
|
+ | FROM $ads_judicial_case_relation_r3
|
|
|
| WHERE ds < '$calc_ds'
|
|
|
| )
|
|
|
| WHERE num = 1
|
|
|
| ) b
|
|
|
|ON a.old_id = b.judicase_id
|
|
|
|""".stripMargin)
|
|
|
+
|
|
|
+ //分区不存在
|
|
|
+ addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r1_deleted, calc_ds)
|
|
|
+ addEmptyPartitionOrSkip(ads_judicial_case_id_mapping_r2_deleted, calc_ds)
|
|
|
+
|
|
|
}
|
|
|
|
|
|
def calc(): Unit = {
|
|
@@ -390,6 +402,9 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
spark.udf.register("name_aggs", new NameAggsPlus(1000))
|
|
|
spark.udf.register("case_reason", new CaseReasonAggs(1000))
|
|
|
spark.udf.register("case_amt", new CaseAmtAggs(1000))
|
|
|
+ spark.udf.register("case_amt_plus", new CaseAmtAggsPlus(1000))
|
|
|
+ spark.udf.register("all_name_plus", new AllNamePlus(1000))
|
|
|
+ spark.udf.register("all_name_plus_v2", new AllNamePlusV2(100))
|
|
|
|
|
|
//detail 文书id
|
|
|
//替换司法案件id
|
|
@@ -423,7 +438,7 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
| SELECT *, md5(concat_ws('',detail_id,tn)) row_id
|
|
|
| ,ROW_NUMBER() OVER (PARTITION BY detail_id,tn ORDER BY ds DESC) num
|
|
|
| FROM $ads_judicial_case_relation_pre
|
|
|
- | WHERE ds > ${calc_last_ds(ads_judicial_case_relation_id)}
|
|
|
+ | WHERE ds > ${calc_last_ds(ads_judicial_case_relation_id)} AND case_no_trim(case_no) is not null AND date is not null
|
|
|
| )
|
|
|
| WHERE num = 1
|
|
|
| ) a
|
|
@@ -439,59 +454,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
|ON a.row_id = b.row_id
|
|
|
|""".stripMargin)
|
|
|
|
|
|
- //司法案件主表
|
|
|
- sql(
|
|
|
- s"""
|
|
|
- |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
|
|
|
- |SELECT
|
|
|
- | judicase_id,
|
|
|
- | title ,
|
|
|
- | case_type ,
|
|
|
- | case_reason ,
|
|
|
- | case_no ,
|
|
|
- | court_name ,
|
|
|
- | case_stage ,
|
|
|
- | lable ,
|
|
|
- | name_aggs['yg_name'] yg_name,
|
|
|
- | name_aggs['bg_name'] bg_name,
|
|
|
- | all_name,
|
|
|
- | case_amt ,
|
|
|
- | date ,
|
|
|
- | court_level ,
|
|
|
- | 0 deleted
|
|
|
- |FROM
|
|
|
- |(
|
|
|
- |SELECT judicase_id
|
|
|
- | ,max(title) title
|
|
|
- | ,concat_ws(',',collect_set(case_type)) case_type
|
|
|
- | ,case_reason(case_reason,date,flag) case_reason
|
|
|
- | ,concat_ws(',',collect_set(case_no)) case_no
|
|
|
- | ,concat_ws(',',collect_set(court_name)) court_name
|
|
|
- | ,max(last_stage) case_stage
|
|
|
- | ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
|
|
|
- | ,case_amt(case_amt) AS case_amt
|
|
|
- | ,max(date) AS date
|
|
|
- | ,trim_black(concat_ws(',',collect_set(court_level))) court_level
|
|
|
- | ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
|
|
|
- | ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
|
|
|
- |FROM (
|
|
|
- | SELECT a.*
|
|
|
- | FROM (
|
|
|
- | SELECT judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
|
|
|
- | ,court_level(court_name) court_level,data
|
|
|
- | ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY data['date'] DESC ) AS last_stage
|
|
|
- | FROM $ads_judicial_case_relation_id
|
|
|
- | WHERE ds = '$calc_ds'
|
|
|
- | ) a
|
|
|
- | )
|
|
|
- |GROUP BY judicase_id
|
|
|
- |)x
|
|
|
- |""".stripMargin).show(20, false)
|
|
|
-
|
|
|
//明细表
|
|
|
sql(
|
|
|
s"""
|
|
|
- |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r2 PARTITION(ds='$calc_ds')
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r3 PARTITION(ds='$calc_ds')
|
|
|
|SELECT
|
|
|
| id,
|
|
|
| judicase_id,
|
|
@@ -506,7 +472,10 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
| name_aggs['yg_name'] yg_name,
|
|
|
| name_aggs['bg_name'] bg_name,
|
|
|
| last_date ,
|
|
|
- | 0 deleted
|
|
|
+ | 0 deleted ,
|
|
|
+ | all_name ,
|
|
|
+ | case_amt ,
|
|
|
+ | court_level
|
|
|
|FROM
|
|
|
|(
|
|
|
|SELECT md5(concat_ws('',concat_ws('',judicase_id),CLEANUP(case_no))) id
|
|
@@ -515,16 +484,23 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
| ,case_type(max(case_no)) as case_type
|
|
|
| ,case_reason(case_reason,date,flag) case_reason
|
|
|
| ,case_no
|
|
|
- | ,max(court_name) court_name
|
|
|
+ | -- ,max(court_name) court_name
|
|
|
+ | ,concat_ws(',',collect_set(court_name)) court_name
|
|
|
| ,case_stage(max(case_no)) as case_stage
|
|
|
| ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
|
|
|
| ,concat('[',concat_ws(',',collect_set(detail)),']') detail
|
|
|
| ,max(last_date) last_date
|
|
|
| ,name_aggs(yg_name,bg_name,flag,data['date']) name_aggs
|
|
|
+ | -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
|
|
|
+ | ,all_name_plus_v2(all_name) all_name
|
|
|
+ | -- ,case_amt(case_amt, cast(data['date'] as string), flag) AS case_amt
|
|
|
+ | ,case_amt_plus(cast(case_amt as string), cast(data['date'] as string), flag) AS case_amt
|
|
|
+ | ,trim_black(concat_ws(',',collect_set(court_level))) court_level
|
|
|
|FROM (
|
|
|
| SELECT a.*
|
|
|
| ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
|
|
|
| ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
|
|
|
+ | ,court_level(court_name) court_level
|
|
|
| FROM (
|
|
|
| SELECT *
|
|
|
| FROM $ads_judicial_case_relation_id
|
|
@@ -536,9 +512,64 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
|
|
|
|) x
|
|
|
|""".stripMargin).show(10, false)
|
|
|
|
|
|
+
|
|
|
+ //司法案件主表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
|
|
|
+ |SELECT
|
|
|
+ | judicase_id,
|
|
|
+ | title ,
|
|
|
+ | case_type ,
|
|
|
+ | case_reason ,
|
|
|
+ | case_no ,
|
|
|
+ | court_name ,
|
|
|
+ | case_stage ,
|
|
|
+ | lable ,
|
|
|
+ | name_aggs['yg_name'] yg_name,
|
|
|
+ | name_aggs['bg_name'] bg_name,
|
|
|
+ | all_name,
|
|
|
+ | case_amt ,
|
|
|
+ | date ,
|
|
|
+ | court_level ,
|
|
|
+ | 0 deleted
|
|
|
+ |FROM
|
|
|
+ |(
|
|
|
+ |SELECT judicase_id
|
|
|
+ | ,max(title) title
|
|
|
+ | ,concat_ws(',',collect_set(case_type)) case_type
|
|
|
+ | ,case_reason(case_reason,date,'0') case_reason
|
|
|
+ | ,concat_ws(',',collect_set(case_no)) case_no
|
|
|
+ | ,trim_black(concat_ws(',',collect_set(court_name))) court_name
|
|
|
+ | ,max(last_stage) case_stage
|
|
|
+ | ,trim_black(concat_ws(',', collect_set(lable)) ) lable
|
|
|
+ | -- ,max(case_amt) AS case_amt
|
|
|
+ | ,max(first_case_amt) case_amt
|
|
|
+ | -- ,cast(case_amt_plus(case_amt['case_amt'], case_amt['date'], case_amt['flag'])['case_amt'] as double) AS case_amt
|
|
|
+ | ,max(date) AS date
|
|
|
+ | ,trim_black(concat_ws(',',collect_set(court_level))) court_level
|
|
|
+ | ,name_aggs(yg_name,bg_name,'0',date) name_aggs
|
|
|
+ | -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
|
|
|
+ | ,all_name_plus_v2(all_name) all_name
|
|
|
+ | -- ,null all_name
|
|
|
+ |FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
|
|
|
+ | ,court_level(court_name) court_level
|
|
|
+ | ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
|
|
|
+ | ,first_value(case_amt['case_amt']) OVER (PARTITION BY judicase_id ORDER BY case_amt['flag'] DESC ) AS first_case_amt
|
|
|
+ | FROM $ads_judicial_case_relation_r3
|
|
|
+ | WHERE ds = '$calc_ds'
|
|
|
+ | ) a
|
|
|
+ | )
|
|
|
+ |GROUP BY judicase_id
|
|
|
+ |)x
|
|
|
+ |""".stripMargin).show(20, false)
|
|
|
+
|
|
|
//分区不存在,插入空分区
|
|
|
addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)
|
|
|
- addEmptyPartitionOrSkip(ads_judicial_case_relation_r2, calc_ds)
|
|
|
+ addEmptyPartitionOrSkip(ads_judicial_case_relation_r3, calc_ds)
|
|
|
}
|
|
|
|
|
|
private def get_partition_order_by(): String = {
|