Browse Source

司法案件后置处理

xufei 4 years ago
parent
commit
e563c5defb

+ 149 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelation.scala

@@ -0,0 +1,149 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件输出表
+ * @author π
+ * @date 2020/9/99:56
+ */
+object JudicialCaseRelation {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 1) {
+      println("请输入 project:项目 !!!")
+      sys.exit(-1)
+    }
+    val Array(project) = args
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelation(spark, project).calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelation(s: SparkSession, project: String
+                               ) extends LoggingUtils with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  def calc(): Unit = {
+
+    prepareFunctions(spark)
+
+    val t1 = s"$project.xjk_ads_judicial_case_relation1_tmp" //司法案件关联id表
+    val t2 = s"$project.ods_justicase" //司法案件源表
+    val t3 = s"$project.tmp_xf_judicial_case_relation_1" //司法案件主表
+    val t4 = s"$project.base_company_mapping" //公司name和cid映射
+    val t5 = s"$project.tmp_xf_judicial_case_relation_2" //企业司法案件
+    val t6 = s"$project.tmp_xf_judicial_case_relation_3" //司法案件明细
+
+    val t2_ds = BaseUtil.getPartion(t2, spark)
+    val t4_ds = BaseUtil.getPartion(t4, spark)
+    //mapping映射表
+    sql(
+      s"""
+         |SELECT  a.judicase_id
+         |        ,b.*
+         |FROM    (
+         |            SELECT  *
+         |            FROM $t1
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    $t2
+         |            WHERE   ds = '$t2_ds'
+         |        ) b
+         |ON      a.id = b.case_id
+         |""".stripMargin).repartition(1024).createOrReplaceTempView("mapping")
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t3
+         |SELECT  a.*
+         |        ,b.yg_name
+         |        ,b.bg_name
+         |FROM    (
+         |            SELECT  judicase_id
+         |                    ,max(title) title
+         |                    ,max(case_type) case_type
+         |                    ,max(case_reason)case_reason
+         |                    ,concat_ws('\n',collect_set(case_no)) case_no
+         |                    ,concat_ws('\n',collect_set(court_name)) court_name
+         |                    ,max(case_stage) case_stage
+         |                    ,max(concat_ws(' ',case_type,'裁判文书')) lable
+         |                    ,concat_ws('\n',collect_set(concat_ws('',case_type,case_stage,'|民事判决日期:',judge_date))) apps
+         |            FROM    mapping
+         |            GROUP BY judicase_id
+         |        ) a
+         |JOIN    (
+         |            SELECT  judicase_id
+         |                    ,yg_name
+         |                    ,bg_name
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER (PARTITION BY judicase_id ORDER BY judge_date DESC ) num
+         |                        FROM    mapping
+         |                    )
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.judicase_id = b.judicase_id
+         |""".stripMargin)
+
+    //企业司法案件表
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t5
+         |SELECT  concat_ws('_',b.new_cid,judicase_id) AS rowkey
+         |        ,b.new_cid AS cid
+         |        ,a.*
+         |FROM    (
+         |            SELECT  name_judge(name,yg_name,bg_name) AS name_type
+         |                    ,*
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    $t3
+         |                        LATERAL VIEW explode(split(concat_ws('\n',yg_name,bg_name) ,'\n')) t AS name
+         |                    )
+         |            WHERE   LENGTH(name) > 4
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    $t4
+         |            WHERE   ds = '$t4_ds'
+         |        ) b
+         |ON      cleanup(a.name) = cleanup(b.cname)
+         |""".stripMargin)
+
+    //司法案件明细
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t6
+         |SELECT  judicase_id
+         |        ,case_type
+         |        ,case_stage
+         |        ,max(case_no) case_no
+         |        ,max(case_reason)case_reason
+         |        ,max(yg_name) yg_name
+         |        ,max(bg_name) bg_name
+         |        ,max(court_name) court_name
+         |        ,concat_ws('\n',collect_set(concat_ws(':','民事判决日期',judge_date,case_id))) apps
+         |FROM    mapping
+         |GROUP BY judicase_id
+         |         ,case_type
+         |         ,case_stage
+         |""".stripMargin)
+
+  }
+}

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -27,6 +27,10 @@ trait CompanyMapping {
     spark.udf.register("rowkey_trans", (col: String, tab: String) => {
       rowkey_trans(col, tab)
     })
+
+    spark.udf.register("name_judge", (name: String, yg_name: String, bg_name: String) => {
+      nameJudge(name, yg_name, bg_name)
+    })
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 18 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -115,6 +115,7 @@ object BaseUtil {
       ""
     }
   }
+
   def BKDRHash(str: String): Long = {
     val seed: Long = 1313131313 // 31 131 1313 13131 131313 etc..
     var hash: Long = 0
@@ -125,7 +126,23 @@ object BaseUtil {
     return hash
   }
 
+  def nameJudge(name: String, yg_name: String, bg_name: String): String = {
+    if(StringUtils.isNotBlank(name)){
+      if(StringUtils.isNotBlank(yg_name)&&yg_name.contains(name)){
+        return "y"
+      }else if(StringUtils.isNotBlank(bg_name)&&bg_name.contains(name)){
+        return "b"
+      }
+    }
+     ""
+  }
+
   def main(args: Array[String]): Unit = {
-    println(getYesterday())
+    println(nameJudge(null, "魏贤永\n" +
+      "马亚燕\n" +
+      "魏国民\n" +
+      "钟红辉\n" +
+      "魏亚德\n" +
+      "陈裕梅", "中国农业银行股份有限公司化州市支行"))
   }
 }