|
@@ -0,0 +1,231 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description:司法案件预处理
|
|
|
+ * @author π
|
|
|
+ * @date 2020/9/17 14:45
|
|
|
+ */
|
|
|
+object JudicialCaseRelationPre2 {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val project = "winhc_eci_dev"
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |project: $project
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> s"$project",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ JudicialCaseRelationPre2(spark, project).calc()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+case class JudicialCaseRelationPre2(s: SparkSession, project: String
|
|
|
+ ) extends LoggingUtils with CompanyMapping with BaseFunc {
|
|
|
+ override protected val spark: SparkSession = s
|
|
|
+
|
|
|
+ def precalc(): Unit = {
|
|
|
+ prepareFunctions(spark)
|
|
|
+ //文书预处理
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='wenshu')
|
|
|
+ |SELECT a.judicase_id
|
|
|
+ | ,'0' as flag
|
|
|
+ | ,title
|
|
|
+ | ,case_type
|
|
|
+ | ,case_reason
|
|
|
+ | ,case_no
|
|
|
+ | ,court_name
|
|
|
+ | ,concat_ws('',case_stage(case_no)) as case_stage
|
|
|
+ | --,'裁判文书' lable
|
|
|
+ | --,concat_ws('|','民事判决日期',judge_date,case_id) as detail
|
|
|
+ | ,regexp_replace(yg_name,'\n',',') as yg_name
|
|
|
+ | ,regexp_replace(bg_name,'\n',',') as bg_name
|
|
|
+ | ,judge_date as date
|
|
|
+ | ,case_id as detail_id
|
|
|
+ | ,case_amt
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $project.xjk_ads_judicial_case_relation1_tmp
|
|
|
+ | ) a
|
|
|
+ |JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $project.ods_justicase
|
|
|
+ | WHERE ds = '20200830'
|
|
|
+ | ) b
|
|
|
+ |ON a.id = b.case_id
|
|
|
+ |""".stripMargin).show(10, false)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //法院公告预处理
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='court_open_announcement')
|
|
|
+ |select
|
|
|
+ | judicase_id
|
|
|
+ | ,flag
|
|
|
+ | ,title
|
|
|
+ | ,case_type
|
|
|
+ | ,case_reason
|
|
|
+ | ,case_no
|
|
|
+ | ,court_name
|
|
|
+ | ,case_stage
|
|
|
+ | ,yg_name
|
|
|
+ | ,bg_name
|
|
|
+ | ,date
|
|
|
+ | ,detail_id
|
|
|
+ | ,case_amt
|
|
|
+ |from (
|
|
|
+ | select
|
|
|
+ | md5(cleanup(case_no)) as judicase_id
|
|
|
+ | ,"1" as flag
|
|
|
+ | ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
|
|
|
+ | ,concat_ws('',case_type(case_no)) as case_type
|
|
|
+ | ,case_reason
|
|
|
+ | ,case_no
|
|
|
+ | ,court as court_name
|
|
|
+ | ,concat_ws('',case_stage(case_no)) as case_stage
|
|
|
+ | ,plaintiff as yg_name
|
|
|
+ | ,defendant as bg_name
|
|
|
+ | ,start_date as date
|
|
|
+ | ,rowkey as detail_id
|
|
|
+ | ,0.0 as case_amt
|
|
|
+ | ,row_number() over(partition by rowkey order by update_time desc) num
|
|
|
+ | from $project.inc_ads_company_court_open_announcement
|
|
|
+ | where length(case_no) > 0 and ds > '0'
|
|
|
+ | )
|
|
|
+ |where num = 1
|
|
|
+ |""".stripMargin).show(10, false)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ //tmp_xf_judicial_case_relation_open_counrt
|
|
|
+ //tmp_xf_judicial_case_relation_wenshu
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ prepareFunctions(spark)
|
|
|
+ //预处理数据
|
|
|
+ //precalc()
|
|
|
+
|
|
|
+ //替换司法案件id
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_replace
|
|
|
+ |SELECT COALESCE(b.judicase_id,a.judicase_id) judicase_id
|
|
|
+ | ,a.flag
|
|
|
+ | ,a.title
|
|
|
+ | ,a.case_type
|
|
|
+ | ,a.case_reason
|
|
|
+ | ,a.case_no
|
|
|
+ | ,a.court_name
|
|
|
+ | ,a.case_stage
|
|
|
+ | ,case_label(a.flag) lable
|
|
|
+ | ,concat_ws('|',a.flag,a.date,a.detail_id) as detail
|
|
|
+ | ,a.yg_name
|
|
|
+ | ,a.bg_name
|
|
|
+ | ,a.date
|
|
|
+ | ,a.detail_id
|
|
|
+ | ,a.case_amt
|
|
|
+ |FROM (
|
|
|
+ | select * from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='court_open_announcement'
|
|
|
+ |) a
|
|
|
+ |LEFT JOIN (
|
|
|
+ | select case_no,max(judicase_id) judicase_id
|
|
|
+ | from $project.ads_judicial_case_relation_pre
|
|
|
+ | where ds = '20200924' and tn ='wenshu' and length(trim(case_no)) > 0
|
|
|
+ | group by case_no
|
|
|
+ |) b
|
|
|
+ |ON CLEANUP(a.case_no) = CLEANUP(b.case_no)
|
|
|
+ |union all
|
|
|
+ |SELECT judicase_id
|
|
|
+ | ,flag
|
|
|
+ | ,title
|
|
|
+ | ,case_type
|
|
|
+ | ,case_reason
|
|
|
+ | ,case_no
|
|
|
+ | ,court_name
|
|
|
+ | ,case_stage
|
|
|
+ | ,case_label(flag) lable
|
|
|
+ | ,concat_ws('|',flag,date,detail_id) as detail
|
|
|
+ | ,yg_name
|
|
|
+ | ,bg_name
|
|
|
+ | ,date
|
|
|
+ | ,detail_id
|
|
|
+ | ,case_amt
|
|
|
+ |from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='wenshu' and length(trim(case_no)) > 0
|
|
|
+ |""".stripMargin).show(10, false)
|
|
|
+
|
|
|
+ //司法案件主表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r1
|
|
|
+ |SELECT judicase_id
|
|
|
+ | ,max(first_title) title
|
|
|
+ | ,max(case_type) case_type
|
|
|
+ | ,max(case_reason) case_reason
|
|
|
+ | ,concat_ws(',',collect_set(case_no)) case_no
|
|
|
+ | ,concat_ws(',',collect_set(court_name)) court_name
|
|
|
+ | ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
|
|
|
+ | ,concat_ws(',',max(case_type),collect_set(lable)) lable
|
|
|
+ | ,concat_ws(',',collect_set(detail)) detail
|
|
|
+ | ,max(first_yg_name) AS yg_name
|
|
|
+ | ,max(first_bg_name) AS bg_name
|
|
|
+ | ,max(case_amt) AS case_amt
|
|
|
+ |FROM (
|
|
|
+ | SELECT * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
|
|
|
+ | ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
|
|
|
+ | ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $project.tmp_xf_judicial_case_relation_replace
|
|
|
+ | )
|
|
|
+ | )
|
|
|
+ |GROUP BY judicase_id
|
|
|
+ |""".stripMargin).show(10, false)
|
|
|
+
|
|
|
+ //明细表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r2
|
|
|
+ |SELECT md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
|
|
|
+ | ,judicase_id
|
|
|
+ | ,max(first_title) title
|
|
|
+ | ,max(case_type) case_type
|
|
|
+ | ,max(case_reason) case_reason
|
|
|
+ | ,case_no
|
|
|
+ | ,max(court_name) court_name
|
|
|
+ | ,case_stage
|
|
|
+ | ,concat_ws(',',max(case_type),collect_set(lable)) lable
|
|
|
+ | ,concat_ws(',',collect_set(detail)) detail
|
|
|
+ | ,max(first_yg_name) yg_name
|
|
|
+ | ,max(first_bg_name) bg_name
|
|
|
+ |FROM (
|
|
|
+ | SELECT * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
|
|
|
+ | ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
|
|
|
+ | ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $project.tmp_xf_judicial_case_relation_replace
|
|
|
+ | )
|
|
|
+ |)
|
|
|
+ |GROUP BY judicase_id
|
|
|
+ | ,case_no
|
|
|
+ | ,case_stage
|
|
|
+ |""".stripMargin).show(10, false)
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|