xufei пре 4 година
родитељ
комит
89c3b4d4d4

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPreCourtRegister.scala

@@ -1,4 +1,5 @@
 package com.winhc.bigdata.spark.jobs
+
 import com.winhc.bigdata.spark.udf.CourtRank
 import com.winhc.bigdata.spark.utils.{BaseUtil, SparkUtils}
 import scala.collection.mutable

+ 94 - 21
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPreNew.scala

@@ -62,7 +62,10 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
   def precalc(): Unit = {
     prepareFunctions(spark)
     val t1 = s"$project.inc_ads_company_court_announcement"
-    val t1_ds = BaseUtil.getPartion(t1, spark)
+    var t1_ds = ds
+    if(StringUtils.isBlank(ds)){
+      t1_ds = BaseUtil.getPartion(t1, spark)
+    }
 
     //法院公告
     sql(
@@ -89,7 +92,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |           select
          |            md5(cleanup(case_no)) as judicase_id
          |            ,"2" as flag
-         |            ,concat_ws('',plaintiff,'与',litigant,'法院公告') as title
+         |            ,title(plaintiff,litigant,'法院公告') as title
          |            ,concat_ws('',case_type(case_no)) as case_type
          |            ,null as case_reason
          |            ,case_no
@@ -106,7 +109,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |      select
          |            md5(cleanup(case_no)) as judicase_id
          |            ,"2" as flag
-         |            ,concat_ws('',plaintiff,'与',litigant,'法院公告') as title
+         |            ,title(plaintiff,litigant,'法院公告') as title
          |            ,concat_ws('',case_type(case_no)) as case_type
          |            ,null as case_reason
          |            ,case_no
@@ -185,7 +188,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |     select
          |            md5(cleanup(case_no)) as judicase_id
          |            ,"1" as flag
-         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,title(plaintiff,defendant,case_reason) as title
          |            ,concat_ws('',case_type(case_no)) as case_type
          |            ,case_reason
          |            ,case_no
@@ -202,7 +205,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |      select
          |            md5(cleanup(case_no)) as judicase_id
          |            ,"1" as flag
-         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,title(plaintiff,defendant,case_reason) as title
          |            ,concat_ws('',case_type(case_no)) as case_type
          |            ,case_reason
          |            ,case_no
@@ -220,11 +223,73 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |where num = 1
          |""".stripMargin).show(10, false)
 
+
+    //立案信息预处理
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='court_register')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |     select
+         |     *,row_number() over(partition by detail_id order by date desc) num
+         |     from (
+         |           select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"8" as flag
+         |            ,title(plaintiff,defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,plaintiff as yg_name
+         |            ,defendant as bg_name
+         |            ,filing_date as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.inc_ads_company_court_register
+         |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"8" as flag
+         |            ,title(plaintiff,defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,plaintiff as yg_name
+         |            ,defendant as bg_name
+         |            ,filing_date as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.ads_company_court_register
+         |      where length(case_no) > 0 and ds > '0'
+         |     )
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
   }
 
   def calc(): Unit = {
     prepareFunctions(spark)
     map_2_json()
+    case_no_trim_udf()
     registerCourtRank()
     //预处理数据
     val cols = Seq("flag", "date", "detail_id")
@@ -252,7 +317,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,a.title
          |        ,a.case_type
          |        ,a.case_reason
-         |        ,a.case_no
+         |        ,case_no_trim(a.case_no) as case_no
          |        ,a.court_name
          |        ,a.case_stage
          |        ,case_label(a.flag) lable
@@ -264,14 +329,28 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,a.case_amt
          |FROM    (
          |  select
-         |  *,md5(CLEANUP(case_no)) as new_judicase_id
+         |     judicase_id
+         |     ,flag
+         |     ,title
+         |     ,case_type
+         |     ,case_reason
+         |     ,case_no_trim(case_no) as case_no
+         |     ,court_name
+         |     ,case_stage
+         |     ,yg_name
+         |     ,bg_name
+         |     ,date
+         |     ,detail_id
+         |     ,case_amt
+         |     ,md5(CLEANUP(case_no)) as new_judicase_id
          |  from $project.ads_judicial_case_relation_pre
-         |  where ds= '$t2_ds' and tn <> 'wenshu'
+         |  where ds= '$t2_ds' and tn <> 'wenshu' and case_no_trim(case_no) is not null
+         |        and date is not null and length(date) = 19
          |) a
          |LEFT JOIN (
-         |  select case_no,max(judicase_id) judicase_id
+         |  select case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
          |  from $project.ads_judicial_case_relation_pre
-         |  where ds = '$t2_ds' and tn ='wenshu' and  length(trim(case_no)) > 0
+         |  where ds = '$t2_ds' and tn ='wenshu' and case_no_trim(case_no) is not null
          |  group by case_no
          |) b
          |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
@@ -281,7 +360,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,title
          |        ,case_type
          |        ,case_reason
-         |        ,case_no
+         |        ,case_no_trim(case_no) as case_no
          |        ,court_name
          |        ,case_stage
          |        ,case_label(flag) lable
@@ -292,13 +371,14 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,detail_id
          |        ,case_amt
          |from $project.ads_judicial_case_relation_pre
-         |where ds = '$t2_ds' and tn ='wenshu' and length(trim(case_no)) > 0
+         |where ds = '$t2_ds' and tn ='wenshu'  and case_no_trim(case_no) is not null
+         |      and date is not null and length(date) = 19
          |""".stripMargin).show(10, false)
 
     //找出增量数据
-    sql(
+     sql(
       s"""
-         |INSERT OVERWRITE TABLE $project.$t4
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t4
          |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
          |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
          |FROM    (
@@ -319,13 +399,6 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |WHERE   r1 IS NULL OR r2 IS NULL
          |""".stripMargin)
 
-    //    sql(
-    //      s"""
-    //         |SELECT  court_name,court_level(court_name) court_level
-    //         |FROM    $project.$t3
-    //         |WHERE   ds = '$t1_ds'
-    //         |""".stripMargin).show(200, false)
-
     //司法案件主表
     sql(
       s"""

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -54,6 +54,10 @@ trait CompanyMapping {
     spark.udf.register("trim_black", (s: String) => {
       trimBlack(s)
     })
+
+    spark.udf.register("title", (yg: String, bg: String, reason: String) => {
+      title(yg, bg, reason)
+    })
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 14 - 8
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -225,6 +225,10 @@ object BaseUtil {
     "其它"
   }
 
+  def title(ygname: String, bgname: String, reason: String): String = {
+     Seq(ygname,bgname,reason).filter(s=>StringUtils.isNotBlank(s)).mkString(",")
+  }
+
   def trimBlack(s: String): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
@@ -321,21 +325,23 @@ object BaseUtil {
 
   /**
    * 提取省市区
+   *
    * @param addr
    * @return
    */
   def parseAddress(addr: String): (String, String, String) = {
-      val matcher = pat.pattern.matcher(addr)
-      if (!matcher.find()) {
-        return null
-      }
-      val province = matcher.group("province")
-      val city = matcher.group("city")
-      val district = matcher.group("district")
-      (province, city, district)
+    val matcher = pat.pattern.matcher(addr)
+    if (!matcher.find()) {
+      return null
+    }
+    val province = matcher.group("province")
+    val city = matcher.group("city")
+    val district = matcher.group("district")
+    (province, city, district)
   }
 
   def main(args: Array[String]): Unit = {
+    println(title("xx",null,"reason"))
     println(parseAddress("大石桥市人民法院"))
     println(case_no_trim("(2015)怀执字第03601号号"))
     val seq = Seq("1", "3", "2", "7").mkString("\001")