xufei пре 4 година
родитељ
комит
72a97c5c52

+ 108 - 55
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -40,59 +40,99 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
     spark.sparkContext.setJobDescription(s"处理zxr($lastDsIncAds)")
     sql(
       s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr')
-         |select
-         |  judicase_id
-         |  ,flag
-         |  ,title
-         |  ,case_type
-         |  ,case_reason
-         |  ,case_no
-         |  ,court_name
-         |  ,case_stage
-         |  ,yg_name
-         |  ,bg_name
-         |  ,date
-         |  ,detail_id
-         |  ,case_amt
-         |from (
-         |      select
-         |         md5(cleanup(case_no)) as judicase_id
-         |         ,"7" as flag
-         |         ,concat_ws('',cname,'被执行人') as title
-         |         ,concat_ws('',case_type(case_no)) as case_type
-         |         ,null as case_reason
-         |         ,case_no
-         |         ,court as court_name
-         |         ,concat_ws('',case_stage(case_no)) as case_stage
-         |         ,null as yg_name
-         |         ,cname as bg_name
-         |         ,case_create_time as date
-         |         ,rowkey as detail_id
-         |         ,exec_money as case_amt
-         |         ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.inc_ads_company_zxr
-         |      where length(case_no) > 0 and ds > '0'
-         |      union all
-         |      select
-         |         md5(cleanup(case_no)) as judicase_id
-         |         ,"7" as flag
-         |         ,concat_ws('',cname,'被执行人') as title
-         |         ,concat_ws('',case_type(case_no)) as case_type
-         |         ,null as case_reason
-         |         ,case_no
-         |         ,court as court_name
-         |         ,concat_ws('',case_stage(case_no)) as case_stage
-         |         ,null as yg_name
-         |         ,cname as bg_name
-         |         ,case_create_time as date
-         |         ,rowkey as detail_id
-         |         ,exec_money as case_amt
-         |         ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.ads_company_zxr
-         |      where length(case_no) > 0 and ds > '0'
-         |   )
-         |where num = 1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$lastDsIncAds',tn='zxr')
+         |SELECT
+         |        judicase_id
+         |        ,flag
+         |        ,title
+         |        ,case_type
+         |        ,case_reason
+         |        ,case_no
+         |        ,court_name
+         |        ,case_stage
+         |        ,yg_name
+         |        ,bg_name
+         |        ,DATE
+         |        ,detail_id
+         |        ,case_amt
+         |FROM    (
+         |        SELECT
+         |                A.*
+         |                ,COALESCE(C.judicase_id,md5(cleanup(case_no))) AS judicase_id
+         |                ,row_number() OVER(PARTITION BY detail_id,A.case_no ORDER BY DATE DESC ) num
+         |        FROM    (
+         |                SELECT
+         |                        "7" AS flag
+         |                        ,concat_ws('',cname,'被执行人') AS title
+         |                        ,concat_ws('',case_type(case_no)) AS case_type
+         |                        ,NULL AS case_reason
+         |                        ,case_no
+         |                        ,court AS court_name
+         |                        ,concat_ws('',case_stage(case_no)) AS case_stage
+         |                        ,NULL AS yg_name
+         |                        ,cname AS bg_name
+         |                        ,case_create_time AS DATE
+         |                        ,rowkey AS detail_id
+         |                        ,exec_money AS case_amt
+         |                FROM    $project.inc_ads_company_zxr
+         |                WHERE   length(case_no) >0 AND ds> '0'
+         |                UNION ALL
+         |                SELECT
+         |                        "7" AS flag
+         |                        ,concat_ws('',cname,'被执行人') AS title
+         |                        ,concat_ws('',case_type(case_no)) AS case_type
+         |                        ,NULL AS case_reason
+         |                        ,case_no
+         |                        ,court AS court_name
+         |                        ,concat_ws('',case_stage(case_no)) AS case_stage
+         |                        ,NULL AS yg_name
+         |                        ,cname AS bg_name
+         |                        ,case_create_time AS DATE
+         |                        ,rowkey AS detail_id
+         |                        ,exec_money AS case_amt
+         |                FROM    $project.ads_company_zxr
+         |                WHERE   length(case_no) >0 AND ds> '0'
+         |                UNION ALL
+         |                SELECT
+         |                        "7" AS flag
+         |                        ,concat_ws('',cname,'被执行人') AS title
+         |                        ,concat_ws('',case_type(case_no)) AS case_type
+         |                        ,NULL AS case_reason
+         |                        ,gist_id as case_no
+         |                        ,court AS court_name
+         |                        ,concat_ws('',case_stage(case_no)) AS case_stage
+         |                        ,NULL AS yg_name
+         |                        ,cname AS bg_name
+         |                        ,case_create_time AS DATE
+         |                        ,rowkey AS detail_id
+         |                        ,exec_money AS case_amt
+         |                FROM    $project.inc_ads_company_zxr
+         |                WHERE   length(gist_id) >0 AND ds> '0'
+         |                UNION ALL
+         |                SELECT
+         |                        "7" AS flag
+         |                        ,concat_ws('',cname,'被执行人') AS title
+         |                        ,concat_ws('',case_type(case_no)) AS case_type
+         |                        ,NULL AS case_reason
+         |                        ,gist_id as case_no
+         |                        ,court AS court_name
+         |                        ,concat_ws('',case_stage(case_no)) AS case_stage
+         |                        ,NULL AS yg_name
+         |                        ,cname AS bg_name
+         |                        ,case_create_time AS DATE
+         |                        ,rowkey AS detail_id
+         |                        ,exec_money AS case_amt
+         |                FROM    $project.ads_company_zxr
+         |                WHERE   length(gist_id) >0 AND ds> '0'
+         |                ) A
+         |        LEFT JOIN (
+         |                SELECT  *
+         |                FROM    $project.ads_judicial_case_relation_graph
+         |                WHERE   flag = 'company_zxr'
+         |                  ) C
+         |        ON      A.detail_id = C.id
+         |        )
+         |WHERE   num =1
          |""".stripMargin).show(10, false)
 
 
@@ -122,7 +162,7 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |  ,case_amt
          |from (
          |      select
-         |      md5(cleanup(A.case_no)) as judicase_id
+         |      COALESCE(C.judicase_id,md5(cleanup(A.case_no))) as judicase_id
          |      ,"10" as flag
          |      ,concat_ws('',A.cname,'被执行人') AS title
          |      ,concat_ws('',case_type(A.case_no)) as case_type
@@ -135,7 +175,7 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
          |      ,rowkey as detail_id
          |      ,exec_money as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      ,row_number() over(partition by rowkey,A.case_no order by update_time desc) num
          |      from (
          |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
          |        from $project.ads_company_zxr_person
@@ -144,6 +184,14 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
          |        from $project.inc_ads_company_zxr_person
          |        where length(case_no) > 0 and ds>'0'
+         |        union all
+         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        from $project.ads_company_zxr_person
+         |        where length(gist_id) > 0 and ds>'0'
+         |        union all
+         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        from $project.inc_ads_company_zxr_person
+         |        where length(gist_id) > 0 and ds>'0'
          |      ) A
          |      left join(
          |        select name, identity_num, case_no
@@ -151,6 +199,11 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |        where ds=${personIdDs}
          |      ) B
          |      on A.cname=B.name AND A.case_no=B.case_no
+         |      left join(
+         |        SELECT  *
+         |            FROM $project.ads_judicial_case_relation_graph WHERE flag = 'company_zxr_person'
+         |      ) C
+         |      on A.rowkey=C.id
          |      where is_id_card(A.card) OR is_id_card(B.identity_num)
          |   )
          |where num = 1

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre12.scala

@@ -56,6 +56,10 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
          |  ,detail_id
          |  ,case_amt
          |from (
+         |   select
+         |      * ,
+         |      row_number() over(partition by detail_id order by date desc) num
+         |   from (
          |      select
          |         md5(cleanup(case_no)) as judicase_id
          |         ,"6" as flag
@@ -70,7 +74,6 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
          |         ,case_create_time as date
          |         ,rowkey as detail_id
          |         ,exec_amount as case_amt
-         |         ,row_number() over(partition by rowkey order by update_time desc) num
          |      from $project.inc_ads_company_zxr_final_case
          |      where length(case_no) > 0 and ds > '0'
          |      union all
@@ -88,9 +91,9 @@ case class JudicialCaseRelationPre12(s: SparkSession, project: String
          |         ,case_create_time as date
          |         ,rowkey as detail_id
          |         ,exec_amount as case_amt
-         |         ,row_number() over(partition by rowkey order by update_time desc) num
          |      from $project.ads_company_zxr_final_case
          |      where length(case_no) > 0 and ds > '0'
+         |     )
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)

+ 9 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -153,7 +153,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,case_amt
          |FROM    (
          |            SELECT  *
-         |            FROM $project.ads_judicial_case_relation_graph
+         |            FROM $project.ads_judicial_case_relation_graph WHERE flag = 'wenshu_detail'
          |        ) a
          |RIGHT JOIN (
          |            SELECT *,md5(cleanup(case_no_trim(case_no))) as new_judicase_id
@@ -303,7 +303,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     val cols = Seq("flag", "date", "detail_id")
 
     val t1 = s"$project.inc_ads_company_court_announcement"
-    val t2 = s"$project.ads_judicial_case_relation_pre"
+    val t2 = s"ads_judicial_case_relation_pre"
     var t2_ds = ds
     var t1_ds = ds
     if (StringUtils.isBlank(ds)) {
@@ -354,13 +354,13 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |     ,case_amt
          |     ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
          |  from $project.$t2
-         |  where ds= '$t2_ds' and tn <> 'wenshu' and case_no_trim(case_no) is not null
+         |  where ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
          |        and date is not null and length(date) = 19
          |) a
          |LEFT JOIN (
          |  select case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
          |  from $project.$t2
-         |  where ds = '$t2_ds' and tn ='wenshu' and case_no_trim(case_no) is not null
+         |  where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
          |  group by case_no
          |) b
          |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
@@ -381,7 +381,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,detail_id
          |        ,case_amt
          |from $project.$t2
-         |where ds = '$t2_ds' and tn ='wenshu'  and case_no_trim(case_no) is not null
+         |where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
          |      and date is not null and length(date) = 19
          |""".stripMargin).show(10, false)
 
@@ -520,14 +520,14 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
-         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
+         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
          |        ,judicase_id
          |        ,max(first_title) title
          |        ,max(case_type) case_type
-         |        ,max(case_reason) case_reason
+         |        ,max(last_case_reason) case_reason
          |        ,case_no
          |        ,max(court_name) court_name
-         |        ,case_stage
+         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
          |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(first_yg_name) yg_name
@@ -539,6 +539,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
+         |                ,max(case_reason) OVER (PARTITION BY a.judicase_id ) AS last_case_reason
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
@@ -552,7 +553,6 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |)
          |GROUP BY judicase_id
          |         ,case_no
-         |         ,case_stage
          |""".stripMargin).show(10, false)
 
   }