فهرست منبع

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
许家凯 4 سال پیش
والد
کامیت
1aad2e9054

+ 198 - 55
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPre2.scala

@@ -1,9 +1,10 @@
 package com.winhc.bigdata.spark.jobs
 
-import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, CourtRank}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
+
 import scala.collection.mutable
 
 /**
@@ -30,15 +31,87 @@ object JudicialCaseRelationPre2 {
 }
 
 case class JudicialCaseRelationPre2(s: SparkSession, project: String
-                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
   override protected val spark: SparkSession = s
 
+
+  private def getStrToMap(cols: Seq[String]): String = {
+    val set = cols.toSet
+    val str = set.map(e => {
+      s"concat_ws('\001','$e',cast($e as string))"
+    }).mkString(",")
+    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
+  }
+
   def precalc(): Unit = {
     prepareFunctions(spark)
+    val t1 = s"$project.inc_ads_company_court_announcement"
+    val t1_ds = BaseUtil.getPartion(t1, spark)
+
+    //法院公告
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='court_announcement')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |     select
+         |     *,row_number() over(partition by detail_id order by date desc) num
+         |     from (
+         |           select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"2" as flag
+         |            ,concat_ws('',plaintiff,'与',litigant,'法院公告') as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,null as case_reason
+         |            ,case_no
+         |            ,court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,replace_char(plaintiff) as yg_name
+         |            ,replace_char(litigant) as bg_name
+         |            ,concat_ws(' ',publish_date,'00:00:00') as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.inc_ads_company_court_announcement
+         |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"2" as flag
+         |            ,concat_ws('',plaintiff,'与',litigant,'法院公告') as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,null as case_reason
+         |            ,case_no
+         |            ,court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,replace_char(plaintiff) as yg_name
+         |            ,replace_char(litigant) as bg_name
+         |            ,concat_ws(' ',publish_date,'00:00:00') as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.ads_company_court_announcement
+         |      where length(case_no) > 0 and ds > '0'
+         |     )
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
     //文书预处理
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='wenshu')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='wenshu')
          |SELECT  a.judicase_id
          |        ,'0' as flag
          |        ,title
@@ -47,8 +120,6 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |        ,case_no
          |        ,court_name
          |        ,concat_ws('',case_stage(case_no)) as case_stage
-         |        --,'裁判文书' lable
-         |        --,concat_ws('|','民事判决日期',judge_date,case_id) as detail
          |        ,regexp_replace(yg_name,'\n',',') as yg_name
          |        ,regexp_replace(bg_name,'\n',',') as bg_name
          |        ,judge_date as date
@@ -59,19 +130,22 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |            FROM $project.xjk_ads_judicial_case_relation1_tmp
          |        ) a
          |JOIN    (
-         |            SELECT  *
-         |            FROM    $project.ods_justicase
-         |            WHERE   ds = '20200830'
+         |            SELECT * FROM
+         |            (
+         |              SELECT  *,row_number() over(partition by docid order by judge_date desc) num
+         |              FROM    $project.ods_justicase
+         |              WHERE   ds > '0'
+         |            )c
+         |            where num = 1
          |        ) b
          |ON      a.id = b.case_id
          |""".stripMargin).show(10, false)
 
 
-
-    //法院公告预处理
+    //开庭公告预处理
     sql(
       s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='court_open_announcement')
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$t1_ds',tn='court_open_announcement')
          |select
          |  judicase_id
          |  ,flag
@@ -87,44 +161,69 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |  ,detail_id
          |  ,case_amt
          |from (
-         |      select
-         |      md5(cleanup(case_no)) as judicase_id
-         |      ,"1" as flag
-         |      ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
-         |      ,concat_ws('',case_type(case_no)) as case_type
-         |      ,case_reason
-         |      ,case_no
-         |      ,court as court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
-         |      ,plaintiff as yg_name
-         |      ,defendant as bg_name
-         |      ,start_date as date
-         |      ,rowkey as detail_id
-         |      ,0.0 as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |    select
+         |    *,
+         |    row_number() over(partition by detail_id order by date desc) num
+         |    from
+         |    (
+         |     select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"1" as flag
+         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,replace_char(plaintiff) as yg_name
+         |            ,replace_char(defendant) as bg_name
+         |            ,start_date as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
          |      from $project.inc_ads_company_court_open_announcement
          |      where length(case_no) > 0 and ds > '0'
+         |      union all
+         |      select
+         |            md5(cleanup(case_no)) as judicase_id
+         |            ,"1" as flag
+         |            ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |            ,concat_ws('',case_type(case_no)) as case_type
+         |            ,case_reason
+         |            ,case_no
+         |            ,court as court_name
+         |            ,concat_ws('',case_stage(case_no)) as case_stage
+         |            ,replace_char(plaintiff) as yg_name
+         |            ,replace_char(defendant) as bg_name
+         |            ,start_date as date
+         |            ,rowkey as detail_id
+         |            ,0.0 as case_amt
+         |      from $project.ads_company_court_open_announcement
+         |      where length(case_no) > 0 and ds > '0'
+         |    )
          |   )
          |where num = 1
          |""".stripMargin).show(10, false)
 
-
-
-    //tmp_xf_judicial_case_relation_open_counrt
-    //tmp_xf_judicial_case_relation_wenshu
-
   }
 
   def calc(): Unit = {
     prepareFunctions(spark)
+    map_2_json()
+    registerCourtRank()
     //预处理数据
     //precalc()
+    val cols = Seq("flag", "date", "detail_id")
+
+    val t1 = s"$project.inc_ads_company_court_announcement"
+    val t2 = s"$project.ads_judicial_case_relation_pre"
+    val t1_ds = BaseUtil.getPartion(t1, spark)
+    val t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
 
     //替换司法案件id
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_replace
-         |SELECT  COALESCE(b.judicase_id,a.judicase_id) judicase_id
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_replace_2 partition (ds = '$t1_ds')
+         |SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
          |        ,a.flag
          |        ,a.title
          |        ,a.case_type
@@ -133,19 +232,22 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |        ,a.court_name
          |        ,a.case_stage
          |        ,case_label(a.flag) lable
-         |        ,concat_ws('|',a.flag,a.date,a.detail_id) as detail
+         |        ,map_2_json(${getStrToMap(cols)}) as detail
          |        ,a.yg_name
          |        ,a.bg_name
          |        ,a.date
          |        ,a.detail_id
          |        ,a.case_amt
          |FROM    (
-         |  select * from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='court_open_announcement'
+         |  select
+         |  *,md5(CLEANUP(case_no)) as new_judicase_id
+         |  from $project.ads_judicial_case_relation_pre
+         |  where ds= '$t2_ds' and tn <> 'wenshu'
          |) a
          |LEFT JOIN (
          |  select case_no,max(judicase_id) judicase_id
          |  from $project.ads_judicial_case_relation_pre
-         |  where ds = '20200924' and tn ='wenshu' and  length(trim(case_no)) > 0
+         |  where ds = '$t2_ds' and tn ='wenshu' and  length(trim(case_no)) > 0
          |  group by case_no
          |) b
          |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
@@ -159,19 +261,44 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |        ,court_name
          |        ,case_stage
          |        ,case_label(flag) lable
-         |        ,concat_ws('|',flag,date,detail_id) as detail
+         |        ,map_2_json(${getStrToMap(cols)}) as detail
          |        ,yg_name
          |        ,bg_name
          |        ,date
          |        ,detail_id
          |        ,case_amt
-         |from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='wenshu' and length(trim(case_no)) > 0
+         |from $project.ads_judicial_case_relation_pre
+         |where ds = '$t2_ds' and tn ='wenshu' and length(trim(case_no)) > 0
          |""".stripMargin).show(10, false)
 
+    //找出增量数据
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $project.tmp_xf_judicial_case_incr_mapping
+         |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
+         |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
+         |FROM    (
+         |            SELECT  judicase_id
+         |                    ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r1
+         |            FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |            WHERE   ds = '$t1_ds'
+         |            GROUP BY judicase_id
+         |        ) a
+         |FULL JOIN (
+         |              SELECT  judicase_id
+         |                      ,md5(concat_ws('',judicase_id, sort(concat_ws('\001',collect_set(case_no))))) r2
+         |              FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |              WHERE   ds < '$t1_ds'
+         |              GROUP BY judicase_id
+         |          ) b
+         |ON  r1 = r2
+         |WHERE   r1 IS NULL OR r2 IS NULL
+         |""".stripMargin)
+
     //司法案件主表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r1
          |SELECT  judicase_id
          |        ,max(first_title) title
          |        ,max(case_type) case_type
@@ -180,18 +307,27 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
          |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
-         |        ,concat_ws(',',collect_set(detail)) detail
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(first_yg_name) AS yg_name
          |        ,max(first_bg_name) AS bg_name
          |        ,max(case_amt) AS case_amt
+         |        ,max(date) AS date
+         |        ,court_level(concat_ws(',',collect_set(court_level))) court_level
+         |        ,max(deleted) deleted
          |FROM    (
-         |        SELECT  * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
-         |                ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
-         |                ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
+         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,b.deleted
          |        FROM    (
-         |                SELECT  *
-         |                FROM    $project.tmp_xf_judicial_case_relation_replace
-         |                )
+         |                   SELECT  *,court_level(court_name) court_level
+         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   WHERE   ds = '$t1_ds'
+         |                ) a JOIN
+         |                (
+         |                   select *
+         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                ) b on a.judicase_id = b.judicase_id
          |        )
          |GROUP BY judicase_id
          |""".stripMargin).show(10, false)
@@ -199,7 +335,7 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
     //明细表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r2
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_judicial_case_relation_r2
          |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
          |        ,judicase_id
          |        ,max(first_title) title
@@ -209,17 +345,24 @@ case class JudicialCaseRelationPre2(s: SparkSession, project: String
          |        ,max(court_name) court_name
          |        ,case_stage
          |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
-         |        ,concat_ws(',',collect_set(detail)) detail
+         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(first_yg_name) yg_name
          |        ,max(first_bg_name) bg_name
+         |        ,max(deleted) deleted
          |FROM    (
-         |        SELECT  * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
-         |                ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
-         |                ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
+         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
+         |                ,b.deleted
          |        FROM    (
-         |                SELECT  *
-         |                FROM    $project.tmp_xf_judicial_case_relation_replace
-         |                )
+         |                   SELECT  *
+         |                   FROM    $project.tmp_xf_judicial_case_relation_replace_2
+         |                   WHERE   ds = '$t1_ds'
+         |                )a JOIN
+         |                (
+         |                   select *
+         |                   from $project.tmp_xf_judicial_case_incr_mapping
+         |                )b on a.judicase_id = b.judicase_id
          |)
          |GROUP BY judicase_id
          |         ,case_no

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -47,6 +47,13 @@ trait CompanyMapping {
       label(l)
     })
 
+    spark.udf.register("sort", (s: String) => {
+      sortString(s)
+    })
+
+    spark.udf.register("court_level", (s: String) => {
+      courtLevel(s)
+    })
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 12 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CourtRank.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.udf
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 
@@ -32,4 +33,15 @@ trait CourtRank {
       }).toMap ++ Map("中华人民共和国最高人民法院" -> "最高法院", "最高人民法院" -> "最高法院")
     spark.sparkContext.broadcast(court_name_rank)
   }
+
+  def registerCourtRank(): Unit = {
+    val court_map = courtRank()
+    spark.udf.register("court_level", (name: String) => {
+      if (StringUtils.isNotBlank(name)) {
+        court_map.value.getOrElse(name, "")
+      } else {
+        ""
+      }
+    })
+  }
 }

+ 52 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -35,6 +35,27 @@ object BaseUtil {
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
   }
 
+  def getPartion(t: String, p: String, @transient spark: SparkSession): String = {
+    import spark._
+    val sql_s = s"show partitions " + t
+    val ps = sql(sql_s).collect.toList.map(r => {
+      val Array(x, y) = r.getString(0).split("/")
+      val Array(_, a2) = x.split("=")
+      val Array(_, b2) = y.split("=")
+      var r1 = ("", "")
+      if (StringUtils.isNumeric(a2)) {
+        r1 = (a2, b2)
+      } else {
+        r1 = (b2, a2)
+      }
+      r1
+    }).filter(_._2.equals(p)).map(_._1).seq
+    if (ps.size > 0) {
+      ps.sorted.last
+    } else {
+      ""
+    }
+  }
 
   def getPartion(t: String, @transient spark: SparkSession) = {
     val ps = getPartitions(t, spark)
@@ -204,12 +225,31 @@ object BaseUtil {
     "其它"
   }
 
+  def courtLevel(s: String): String = {
+    var r = ""
+    if (StringUtils.isNotBlank(s)) {
+      r = s.split(",").filter(StringUtils.isNotBlank(_)).mkString(",")
+    }
+    r
+  }
+
   def label(s: String): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
       r = s match {
-        case "0" => "裁判文书"
-        case "1" => "开庭公告"
+        case "0"  => "裁判文书"//企业
+        case "1"  => "开庭公告"//企业
+        case "2"  => "法院公告"//企业
+        case "3"  => "失信人"//企业
+        case "4"  => "送达公告"//企业
+        case "5"  => "限高"//企业
+        case "6"  => "终本"//企业
+        case "7"  => "被执行人"//企业
+        case "8"  => "立案信息"//企业
+        case "9"  => "失信人"//人
+        case "10" => "被执行人"//人
+        case "11" => "限高"//人
+        case "12" => "终本"//人
         case _ => ""
       }
     }
@@ -256,6 +296,14 @@ object BaseUtil {
     } else null
   }
 
+  def sortString(s: String): String = {
+    var r = ""
+    if (StringUtils.isNotBlank(s)) {
+      r = s.split("\\001").sorted.mkString("")
+    }
+    r
+  }
+
   /**
    * 身份证号格式统一
    *
@@ -271,6 +319,8 @@ object BaseUtil {
 
   def main(args: Array[String]): Unit = {
     println(case_no_trim("(2015)怀执字第03601号号"))
+    val seq = Seq("1", "3", "2", "7").mkString("\001")
+    println(sortString(seq))
     println(id_card_trim("41111119990****062x"))
   }