瀏覽代碼

Merge remote-tracking branch 'origin/master'

许家凯 4 年之前
父節點
當前提交
b97ad989b0

+ 6 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -126,9 +126,10 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |                WHERE   length(gist_id) >0 AND ds> '0'
          |                ) A
          |        LEFT JOIN (
-         |                SELECT  *
+         |                SELECT  id,max(judicase_id) judicase_id
          |                FROM    $project.ads_judicial_case_relation_graph
          |                WHERE   flag = 'company_zxr'
+         |                GROUP BY id
          |                  ) C
          |        ON      A.detail_id = C.id
          |        )
@@ -200,8 +201,10 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |      ) B
          |      on A.cname=B.name AND A.case_no=B.case_no
          |      left join(
-         |        SELECT  *
-         |            FROM $project.ads_judicial_case_relation_graph WHERE flag = 'company_zxr_person'
+         |        SELECT  id,max(judicase_id) judicase_id
+         |        FROM $project.ads_judicial_case_relation_graph
+         |        WHERE flag = 'company_zxr_person'
+         |        GROUP BY id
          |      ) C
          |      on A.rowkey=C.id
          |      where is_id_card(A.card) OR is_id_card(B.identity_num)

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre39.scala

@@ -39,9 +39,10 @@ case class JudicialCaseRelationPre39(s: SparkSession,
 
       sql(
         s"""
-           |SELECT  *
+           |SELECT  id,max(judicase_id) judicase_id
            |FROM    winhc_eci_dev.ads_judicial_case_relation_graph
            |WHERE   flag = '$table_name'
+           |group by id
            |""".stripMargin)
         .createOrReplaceTempView("all_judicial_mapping_tmp")
 

+ 58 - 19
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.jobs.judicial
 
-import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, CourtRank}
+import com.winhc.bigdata.spark.udf.{BaseFunc, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
@@ -152,8 +152,9 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,case_id as detail_id
          |        ,case_amt
          |FROM    (
-         |            SELECT  *
+         |            SELECT  id,max(judicase_id) judicase_id
          |            FROM $project.ads_judicial_case_relation_graph WHERE flag = 'wenshu_detail'
+         |            GROUP BY id
          |        ) a
          |RIGHT JOIN (
          |            SELECT *,md5(cleanup(case_no_trim(case_no))) as new_judicase_id
@@ -205,7 +206,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |            ,replace_char(plaintiff) as yg_name
          |            ,replace_char(defendant) as bg_name
          |            ,start_date as date
-         |            ,rowkey as detail_id
+         |            ,md5(cleanup(CONCAT_WS('',case_no,start_date))) as detail_id
          |            ,0.0 as case_amt
          |      from $project.inc_ads_company_court_open_announcement
          |      where length(case_no) > 0 and ds > '0'
@@ -221,7 +222,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |            ,concat_ws('',case_stage(case_no)) as case_stage
          |            ,replace_char(plaintiff) as yg_name
          |            ,replace_char(defendant) as bg_name
-         |            ,start_date as date
+         |            ,md5(cleanup(CONCAT_WS('',case_no,start_date))) as detail_id
          |            ,rowkey as detail_id
          |            ,0.0 as case_amt
          |      from $project.ads_company_court_open_announcement
@@ -299,6 +300,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     map_2_json()
     case_no_trim_udf()
     registerCourtRank()
+    spark.udf.register("name_aggs", new NameAggs(1000))
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
     //预处理数据
     val cols = Seq("flag", "date", "detail_id")
 
@@ -354,13 +357,15 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |     ,case_amt
          |     ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
          |  from $project.$t2
-         |  where ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
+         |  where ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |        and case_no_trim(case_no) is not null
          |        and date is not null and length(date) = 19
          |) a
          |LEFT JOIN (
          |  select case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
          |  from $project.$t2
-         |  where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
+         |  where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |  and case_no_trim(case_no) is not null
          |  group by case_no
          |) b
          |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
@@ -381,7 +386,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,detail_id
          |        ,case_amt
          |from $project.$t2
-         |where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person') and case_no_trim(case_no) is not null
+         |where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |      and case_no_trim(case_no) is not null
          |      and date is not null and length(date) = 19
          |""".stripMargin).show(10, false)
 
@@ -482,25 +488,42 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r1
+         |SELECT
+         |    judicase_id ,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    case_amt    ,
+         |    date        ,
+         |    court_level ,
+         |    deleted     ,
+         |    cids
+         |FROM
+         |(
          |SELECT  judicase_id
          |        ,max(first_title) title
          |        ,max(case_type) case_type
-         |        ,max(case_reason) case_reason
+         |        ,case_reason(case_reason,date,flag) case_reason
          |        ,concat_ws(',',collect_set(case_no)) case_no
          |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
          |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
-         |        ,max(first_yg_name) AS yg_name
-         |        ,max(first_bg_name) AS bg_name
          |        ,max(case_amt) AS case_amt
          |        ,max(date) AS date
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
          |        ,max(deleted) deleted
          |        ,concat_ws(',',collect_set(cids)) cids
+         |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
-         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
-         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |        SELECT  a.*
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,b.deleted
          |        FROM    (
@@ -514,32 +537,47 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                ) b on a.judicase_id = b.judicase_id
          |        )
          |GROUP BY judicase_id
+         |)
          |""".stripMargin).show(20, false)
 
     //明细表
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
+         |SELECT
+         |    id    ,
+         |    judicase_id ,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    detail      ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    last_date   ,
+         |    deleted
+         |FROM
+         |(
          |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
          |        ,judicase_id
          |        ,max(first_title) title
          |        ,max(case_type) case_type
-         |        ,max(last_case_reason) case_reason
+         |        ,case_reason(case_reason,date,flag) case_reason
          |        ,case_no
          |        ,max(court_name) court_name
-         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
+         |        ,case_stage(max(case_no)) as case_stage
          |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
-         |        ,max(first_yg_name) yg_name
-         |        ,max(first_bg_name) bg_name
          |        ,max(last_date) last_date
          |        ,max(deleted) deleted
+         |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
-         |        SELECT  a.* ,first_value(yg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_yg_name
-         |                ,first_value(bg_name) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_bg_name
+         |        SELECT  a.*
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
-         |                ,max(case_reason) OVER (PARTITION BY a.judicase_id ) AS last_case_reason
          |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
@@ -553,6 +591,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |)
          |GROUP BY judicase_id
          |         ,case_no
+         |)
          |""".stripMargin).show(10, false)
 
   }

+ 75 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CaseReasonAggs.scala

@@ -0,0 +1,75 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+import scala.collection.mutable
+
+/**
+ * @Description:案由聚合
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class CaseReasonAggs(max: Int) extends UserDefinedAggregateFunction {
+
+  val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("case_reason", DataTypes.StringType)
+    , StructField("bus_date", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.StringType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    if (buffer.size >= max) {
+      return
+    }
+    val case_reason = input.getString(0)
+    val bus_date = input.getString(1)
+    val flag = input.getString(2)
+    if (StringUtils.isBlank(case_reason)) {
+      return
+    }
+    if (!flags.contains(flag)) {
+      return
+    }
+
+    buffer(0) = mutable.Map(bus_date -> s"$case_reason") ++= scala.collection.mutable.Map[String, String](buffer.getMap[String, String](0).toMap.toSeq: _*)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    if (buffer1.size >= max) {
+      return
+    }
+    buffer1(0) = buffer1.getAs[Map[String, String]](0) ++ buffer2.getAs[Map[String, String]](0)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var case_reason = ""
+    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
+    if (m0.isEmpty) {
+      return case_reason
+    } else {
+      val key = m0.keySet.toSeq.sorted.head
+      case_reason = m0(key)
+    }
+    case_reason
+  }
+}

+ 97 - 0
src/main/scala/com/winhc/bigdata/spark/udf/NameAggs.scala

@@ -0,0 +1,97 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+/**
+ * @Description:原告,被告聚合
+ * @author π
+ * @date 2020/10/26 15:15
+ */
+
+class NameAggs(max: Int) extends UserDefinedAggregateFunction {
+
+  val flags = Seq("0", "1", "2", "4", "8")
+  val split = "\u0001"
+
+  override def inputSchema: StructType = StructType(Array[StructField](
+    StructField("yg_name", DataTypes.StringType)
+    , StructField("bg_name", DataTypes.StringType)
+    , StructField("flag", DataTypes.StringType)
+    , StructField("bus_date", DataTypes.StringType)
+  ))
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](
+      StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+      ,StructField("t2", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+    )
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+    buffer.update(1, Map[String, String]())
+  }
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+
+    if (buffer.size >= max) {
+      return
+    }
+    val yg_name = input.getString(0)
+    val bg_name = input.getString(1)
+    val flag = input.getString(2)
+    val bus_date = input.getString(3)
+
+    if (StringUtils.isBlank(yg_name) && StringUtils.isBlank(bg_name)) {
+      return
+    }
+    if (!flags.contains(flag)) {
+      return
+    }
+    val map0 = buffer.getMap[String, String](0).toMap
+    val map1 = buffer.getMap[String, String](1).toMap
+    var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
+    var map_new1 = scala.collection.mutable.Map[String, String](map1.toSeq: _*)
+    if (StringUtils.isNotBlank(yg_name) && StringUtils.isNotBlank(bg_name)) {
+      map_new0 ++= Map(bus_date -> s"$yg_name$split$bg_name")
+    } else {
+      map_new1 ++= Map(bus_date -> s"$yg_name$split$bg_name")
+    }
+    buffer.update(0, map_new0)
+    buffer.update(1, map_new1)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1(0) = buffer1.getAs[Map[String, String]](0) ++ buffer2.getAs[Map[String, String]](0)
+    buffer1(1) = buffer1.getAs[Map[String, String]](1) ++ buffer2.getAs[Map[String, String]](1)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    var yg_name = ""
+    var bg_name = ""
+    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
+    val m1: Map[String, String] = buffer.getAs[Map[String, String]](1)
+    println("m0" + m0 + "m1" + m1)
+    if (m0.isEmpty && m1.isEmpty) {
+      return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+    }else if(!m0.isEmpty){
+      val key = m0.keySet.toSeq.sorted.head
+      val Array(a, b) = m0(key).split(s"$split",-1)
+      yg_name = a
+      bg_name = b
+    }else{
+      val key = m1.keySet.toSeq.sorted.head
+      val Array(x, y) = m1(key).split(s"$split",-1)
+      yg_name = x
+      bg_name = y
+    }
+    Map("yg_name" -> yg_name, "bg_name" -> bg_name)
+  }
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -274,8 +274,8 @@ object BaseUtil {
         case "2" => "法院公告" //企业
         case "3" => "失信人" //企业
         case "4" => "送达公告" //企业
-        case "5" => "限高" //企业
-        case "6" => "终本" //企业
+        case "5" => "限消费" //企业
+        case "6" => "终本案件" //企业
         case "7" => "被执行人" //企业
         case "8" => "立案信息" //企业
         case "9" => "失信人" //人