|
@@ -1,6 +1,6 @@
|
|
|
package com.winhc.bigdata.spark.jobs.judicial
|
|
|
|
|
|
-import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, CourtRank}
|
|
|
+import com.winhc.bigdata.spark.udf.{BaseFunc, CaseReasonAggsTmp, CaseReasonAggs, CompanyMapping, CourtRank, NameAggsTmp, NameAggs}
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
@@ -205,7 +205,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
| ,replace_char(plaintiff) as yg_name
|
|
|
| ,replace_char(defendant) as bg_name
|
|
|
| ,start_date as date
|
|
|
- | ,rowkey as detail_id
|
|
|
+ | ,md5(cleanup(CONCAT_WS('',case_no,start_date))) as detail_id
|
|
|
| ,0.0 as case_amt
|
|
|
| from $project.inc_ads_company_court_open_announcement
|
|
|
| where length(case_no) > 0 and ds > '0'
|
|
@@ -221,7 +221,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
| ,concat_ws('',case_stage(case_no)) as case_stage
|
|
|
| ,replace_char(plaintiff) as yg_name
|
|
|
| ,replace_char(defendant) as bg_name
|
|
|
- | ,start_date as date
|
|
|
+ | ,md5(cleanup(CONCAT_WS('',case_no,start_date))) as detail_id
|
|
|
| ,rowkey as detail_id
|
|
|
| ,0.0 as case_amt
|
|
|
| from $project.ads_company_court_open_announcement
|
|
@@ -299,6 +299,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
map_2_json()
|
|
|
case_no_trim_udf()
|
|
|
registerCourtRank()
|
|
|
+ spark.udf.register("name_aggs", new NameAggs(1000))
|
|
|
+ spark.udf.register("case_reason", new CaseReasonAggs(1000))
|
|
|
//预处理数据
|
|
|
val cols = Seq("flag", "date", "detail_id")
|
|
|
|
|
@@ -482,25 +484,42 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
sql(
|
|
|
s"""
|
|
|
|INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r1
|
|
|
+ |SELECT
|
|
|
+ | judicase_id ,
|
|
|
+ | title ,
|
|
|
+ | case_type ,
|
|
|
+ | case_reason ,
|
|
|
+ | case_no ,
|
|
|
+ | court_name ,
|
|
|
+ | case_stage ,
|
|
|
+ | lable ,
|
|
|
+ | detail ,
|
|
|
+ | name_aggs['yg_name'] yg_name,
|
|
|
+ | name_aggs['bg_name'] bg_name,
|
|
|
+ | case_amt ,
|
|
|
+ | date ,
|
|
|
+ | court_level ,
|
|
|
+ | deleted ,
|
|
|
+ | cids
|
|
|
+ |FROM
|
|
|
+ |(
|
|
|
|SELECT judicase_id
|
|
|
| ,max(first_title) title
|
|
|
| ,max(case_type) case_type
|
|
|
- | ,max(case_reason) case_reason
|
|
|
+ | ,case_reason(case_reason,date,flag) case_reason
|
|
|
| ,concat_ws(',',collect_set(case_no)) case_no
|
|
|
| ,concat_ws(',',collect_set(court_name)) court_name
|
|
|
| ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
|
|
|
| ,concat_ws(',',max(case_type),collect_set(lable)) lable
|
|
|
| ,concat('[',concat_ws(',',collect_set(detail)),']') detail
|
|
|
- | ,max(first_yg_name) AS yg_name
|
|
|
- | ,max(first_bg_name) AS bg_name
|
|
|
| ,max(case_amt) AS case_amt
|
|
|
| ,max(date) AS date
|
|
|
| ,trim_black(concat_ws(',',collect_set(court_level))) court_level
|
|
|
| ,max(deleted) deleted
|
|
|
| ,concat_ws(',',collect_set(cids)) cids
|
|
|
+ | ,name_aggs(yg_name,bg_name,flag,date) name_aggs
|
|
|
|FROM (
|
|
|
- | SELECT a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
|
|
|
- | ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
|
|
|
+ | SELECT a.*
|
|
|
| ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
|
|
|
| ,b.deleted
|
|
|
| FROM (
|
|
@@ -514,32 +533,47 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
| ) b on a.judicase_id = b.judicase_id
|
|
|
| )
|
|
|
|GROUP BY judicase_id
|
|
|
+ |)
|
|
|
|""".stripMargin).show(20, false)
|
|
|
|
|
|
//明细表
|
|
|
sql(
|
|
|
s"""
|
|
|
|INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
|
|
|
+ |SELECT
|
|
|
+ | id ,
|
|
|
+ | judicase_id ,
|
|
|
+ | title ,
|
|
|
+ | case_type ,
|
|
|
+ | case_reason ,
|
|
|
+ | case_no ,
|
|
|
+ | court_name ,
|
|
|
+ | case_stage ,
|
|
|
+ | lable ,
|
|
|
+ | detail ,
|
|
|
+ | name_aggs['yg_name'] yg_name,
|
|
|
+ | name_aggs['bg_name'] bg_name,
|
|
|
+ | last_date ,
|
|
|
+ | deleted
|
|
|
+ |FROM
|
|
|
+ |(
|
|
|
|SELECT md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
|
|
|
| ,judicase_id
|
|
|
| ,max(first_title) title
|
|
|
| ,max(case_type) case_type
|
|
|
- | ,max(last_case_reason) case_reason
|
|
|
+ | ,case_reason(case_reason,date,flag) case_reason
|
|
|
| ,case_no
|
|
|
| ,max(court_name) court_name
|
|
|
- | ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
|
|
|
+ | ,case_stage(max(case_no)) as case_stage
|
|
|
| ,concat_ws(',',max(case_type),collect_set(lable)) lable
|
|
|
| ,concat('[',concat_ws(',',collect_set(detail)),']') detail
|
|
|
- | ,max(first_yg_name) yg_name
|
|
|
- | ,max(first_bg_name) bg_name
|
|
|
| ,max(last_date) last_date
|
|
|
| ,max(deleted) deleted
|
|
|
+ | ,name_aggs(yg_name,bg_name,flag,date) name_aggs
|
|
|
|FROM (
|
|
|
- | SELECT a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
|
|
|
- | ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
|
|
|
+ | SELECT a.*
|
|
|
| ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
|
|
|
| ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
|
|
|
- | ,max(case_reason) OVER (PARTITION BY a.judicase_id ) AS last_case_reason
|
|
|
| ,b.deleted
|
|
|
| FROM (
|
|
|
| SELECT *
|
|
@@ -553,6 +587,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
|
|
|
|)
|
|
|
|GROUP BY judicase_id
|
|
|
| ,case_no
|
|
|
+ |)
|
|
|
|""".stripMargin).show(10, false)
|
|
|
|
|
|
}
|