ソースを参照

立案信息到司法案件中=

yandawei 4 年 前
コミット
553e89996c

+ 286 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPreCourtRegister.scala

@@ -0,0 +1,286 @@
+import com.winhc.bigdata.spark.udf.CourtRank
+import com.winhc.bigdata.spark.utils.{BaseUtil, SparkUtils}
+
+import scala.collection.mutable
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import org.apache.spark.sql.SparkSession
+
+
+
+object JudicialCaseRelationPre2 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    val r = JudicialCaseRelationPre2(spark, project)
+    //r.precalc()
+    r.calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelationPre2(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+
+  private def getStrToMap(cols: Seq[String]): String = {
+    val set = cols.toSet
+    val str = set.map(e => {
+      s"concat_ws('\001','$e',cast($e as string))"
+    }).mkString(",")
+    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
+  }
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    val t1 = s"$project.inc_ads_company_court_register"
+    val t1_ds = BaseUtil.getPartion(t1, spark)
+
+    //立案信息预处理
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='court_register')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |     select
+         |     *,row_number() over(partition by detail_id order by date desc) num
+         |     from (
+         |           select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"1" as flag
+         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,plaintiff as yg_name
+         |            ,defendant as bg_name
+         |            ,start_time as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.inc_ads_company_court_register
+         |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"1" as flag
+         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,plaintiff as yg_name
+         |            ,defendant as bg_name
+         |            ,start_time as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.ads_company_court_register
+         |      where length(case_no) > 0 and ds > '0'
+         |     )
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    map_2_json()
+    registerCourtRank()
+    //预处理数据
+    //precalc()
+    val cols = Seq("flag", "date", "detail_id")
+
+    val t1 = s"$project.inc_ads_company_court_register"
+    val t2 = s"$project.ads_judicial_case_relation_pre"
+    val t1_ds = BaseUtil.getPartion(t1, spark)
+    val t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
+
+    //替换司法案件id
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_replace_2 partition (ds = '$t1_ds')
+         |SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
+         |        ,a.flag
+         |        ,a.title
+         |        ,a.case_type
+         |        ,a.case_reason
+         |        ,a.case_no
+         |        ,a.court_name
+         |        ,a.case_stage
+         |        ,case_label(a.flag) lable
+         |        ,map_2_json(${getStrToMap(cols)}) as detail
+         |        ,a.yg_name
+         |        ,a.bg_name
+         |        ,a.date
+         |        ,a.detail_id
+         |        ,a.case_amt
+         |FROM    (
+         |  select
+         |  *,md5(CLEANUP(case_no)) as new_judicase_id
+         |  from $project.ads_judicial_case_relation_pre
+         |  where ds= '$t2_ds' and tn <> 'wenshu'
+         |) a
+         |LEFT JOIN (
+         |  select case_no,max(judicase_id) judicase_id
+         |  from $project.ads_judicial_case_relation_pre
+         |  where ds = '$t2_ds' and tn ='wenshu' and  length(trim(case_no)) > 0
+         |  group by case_no
+         |) b
+         |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
+         |union all
+         |SELECT   judicase_id
+         |        ,flag
+         |        ,title
+         |        ,case_type
+         |        ,case_reason
+         |        ,case_no
+         |        ,court_name
+         |        ,case_stage
+         |        ,case_label(flag) lable
+         |        ,map_2_json(${getStrToMap(cols)}) as detail
+         |        ,yg_name
+         |        ,bg_name
+         |        ,date
+         |        ,detail_id
+         |        ,case_amt
+         |from $project.ads_judicial_case_relation_pre
+         |where ds = '$t2_ds' and tn ='wenshu' and length(trim(case_no)) > 0
+         |""".stripMargin).show(10, false)
+
+    //找出增量数据
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $project.tmp_xf_judicial_case_incr_mapping
+         |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
+         |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
+         |FROM    (
+         |            SELECT  judicase_id
+         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r1
+         |            FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |            WHERE   ds = '$t1_ds'
+         |            GROUP BY judicase_id
+         |        ) a
+         |FULL JOIN (
+         |              SELECT  judicase_id
+         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r2
+         |              FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |              WHERE   ds < '$t1_ds'
+         |              GROUP BY judicase_id
+         |          ) b
+         |ON  r1 = r2
+         |WHERE   r1 IS NULL OR r2 IS NULL
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |SELECT  court_name,court_level(court_name) court_level
+         |FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |WHERE   ds = '$t1_ds'
+         |""".stripMargin).show(200, false)
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r1
+         |SELECT  judicase_id
+         |        ,max(first_title) title
+         |        ,max(case_type) case_type
+         |        ,max(case_reason) case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
+         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,max(first_yg_name) AS yg_name
+         |        ,max(first_bg_name) AS bg_name
+         |        ,max(case_amt) AS case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,b.deleted
+         |        FROM    (
+         |                   SELECT  *,court_level(court_name) court_level
+         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   WHERE   ds = '$t1_ds'
+         |                ) a JOIN
+         |                (
+         |                   select *
+         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                ) b on a.judicase_id = b.judicase_id
+         |        )
+         |GROUP BY judicase_id
+         |""".stripMargin).show(20, false)
+
+    //明细表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r2
+         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
+         |        ,judicase_id
+         |        ,max(first_title) title
+         |        ,max(case_type) case_type
+         |        ,max(case_reason) case_reason
+         |        ,case_no
+         |        ,max(court_name) court_name
+         |        ,case_stage
+         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,max(first_yg_name) yg_name
+         |        ,max(first_bg_name) bg_name
+         |        ,max(deleted) deleted
+         |FROM    (
+         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,b.deleted
+         |        FROM    (
+         |                   SELECT  *
+         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   WHERE   ds = '$t1_ds'
+         |                )a JOIN
+         |                (
+         |                   select *
+         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                )b on a.judicase_id = b.judicase_id
+         |)
+         |GROUP BY judicase_id
+         |         ,case_no
+         |         ,case_stage
+         |""".stripMargin).show(10, false)
+
+  }
+
+}