Browse Source

按新流程预处理司法案件的送达公告、限高(包含企业和自然人)的数据预处理

晏永年 4 years ago
parent
commit
7c07aa0ca4

+ 0 - 107
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/zxr_restrict.scala

@@ -1,107 +0,0 @@
-package com.winhc.bigdata.spark.jobs.deadbeat
-
-import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.mutable
-
-/**
- * @Description:查失信、查被执之限高数据预处理
- * @author Yan Yongnian
- * @date 2020/10/13
- */
-object zxr_restrict {
-  def main(args: Array[String]): Unit = {
-    val project = "winhc_eci_dev"
-    println(
-      s"""
-         |project: $project
-         |""".stripMargin)
-
-    val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> s"$project",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    zxr_restrict(spark, project).precalc()
-    spark.stop()
-  }
-}
-case class zxr_restrict(s: SparkSession, project: String
-                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
-  override protected val spark: SparkSession = s
-
-  def precalc(): Unit = {
-    prepareFunctions(spark)
-    case_no_trim_udf()
-    //限制高消费预处理(企业)
-    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_restrict", spark)
-    spark.sparkContext.setJobDescription(s"处理zxr_restrict($lastDsIncAds)")
-    sql(
-      s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_company partition(ds='$lastDsIncAds',tn='zxr_restrict')
-         |select
-         |  rowkey
-         |  ,cid
-         |  ,name
-         |  ,card_num
-         |  ,publish_date
-         |  ,deleted
-         |from (
-         |      select
-         |      rowkey
-         |      ,new_cid as cid
-         |      ,coalesce(company_name,company_info) as name
-         |      ,identity_num as card_num
-         |      ,case_create_time AS publish_date
-         |      ,case_no
-         |      ,court_name
-         |      ,deleted
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from (
-         |        select rowkey,new_cid,company_name,company_info,identity_num,case_create_time,case_no,court_name,deleted
-         |        from $project.ads_company_zxr_restrict
-         |        where length(case_no) > 0 and ds > '0'
-         |        union all
-         |        select rowkey,new_cid,company_name,company_info,identity_num,case_create_time,case_no,court_name,deleted
-         |        from $project.inc_ads_company_zxr_restrict
-         |        where length(case_no) > 0 and ds > '0'
-         |      )
-         |   )
-         |where num = 1
-         |""".stripMargin).show(10, false)
-    //限制高消费预处理(个人)
-    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_restrict_person").schema.map(_.name).filter(_!="flag")
-    lastDsIncAds = BaseUtil.getPartion(s"$project.ads_company_zxr_restrict_person_cloze", spark)
-    spark.sparkContext.setJobDescription(s"处理zxr_restrict_person($lastDsIncAds)")
-    sql(
-      s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_person partition(ds='$lastDsIncAds',tn='zxr_restrict_person')
-         |select
-         |  rowkey
-         |  ,cid
-         |  ,name
-         |  ,card_num
-         |  ,publish_date
-         |  ,deleted
-         |from (
-         |      select
-         |      rowkey
-         |      ,new_cid as cid
-         |      ,coalesce(company_name,company_info) as name
-         |      ,identity_num as card_num
-         |      ,case_create_time AS publish_date
-         |      ,case_no
-         |      ,court_name
-         |      ,deleted
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.ads_company_zxr_restrict_person_cloze
-         |      where length(case_no) > 0 and ds=$lastDsIncAds
-         |   )
-         |where num = 1
-         |""".stripMargin).show(10, false)
-
-  }
-}

+ 26 - 8
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre456.scala

@@ -135,6 +135,10 @@ case class JudicialCaseRelationPre456(s: SparkSession, project: String
          |""".stripMargin).show(10, false)
          |""".stripMargin).show(10, false)
     //限制高消费预处理(个人)
     //限制高消费预处理(个人)
     lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_restrict_person", spark)
     lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_restrict_person", spark)
+    //身份证补全表最新分区
+    val personIdDs = BaseUtil.getPartion("ads_person_idcard_cloze", spark)
+    is_id_card()
+    spark.sparkContext.setJobDescription(s"处理zxr_restrict_person($lastDsIncAds)")
     sql(
     sql(
       s"""
       s"""
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_restrict_person')
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_restrict_person')
@@ -154,22 +158,36 @@ case class JudicialCaseRelationPre456(s: SparkSession, project: String
          |  ,case_amt
          |  ,case_amt
          |from (
          |from (
          |      select
          |      select
-         |      md5(cleanup(case_no)) as judicase_id
+         |      md5(cleanup(A.case_no)) as judicase_id
          |      ,"11" as flag
          |      ,"11" as flag
-         |      ,concat_ws('',name,'被采取限制消费措施') AS title
-         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,concat_ws('',A.name,'被采取限制消费措施') AS title
+         |      ,concat_ws('',case_type(A.case_no)) as case_type
          |      ,NULL AS case_reason
          |      ,NULL AS case_reason
-         |      ,case_no
+         |      ,A.case_no
          |      ,court_name
          |      ,court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,concat_ws('',case_stage(A.case_no)) as case_stage
          |      ,NULL as yg_name
          |      ,NULL as yg_name
-         |      ,name as bg_name
+         |      ,A.name as bg_name
          |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
          |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
          |      ,rowkey as detail_id
          |      ,rowkey as detail_id
          |      ,0.0 as case_amt
          |      ,0.0 as case_amt
          |      ,row_number() over(partition by rowkey order by update_time desc) num
          |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.ads_company_zxr_restrict_person_cloze
-         |      where length(case_no) > 0 and ds=$lastDsIncAds
+         |      from (
+         |        select case_no,court_name,name,case_create_time,rowkey,update_time,identity_num
+         |        from $project.ads_company_zxr_restrict_person
+         |        where length(case_no) > 0 and ds>'0'
+         |        union all
+         |        select case_no,court_name,name,case_create_time,rowkey,update_time,identity_num
+         |        from $project.inc_ads_company_zxr_restrict_person
+         |        where length(case_no) > 0 and ds>'0'
+         |      ) A
+         |      left join(
+         |        select name, identity_num, case_no
+         |        from $project.ads_person_idcard_cloze
+         |        where ds=${personIdDs}
+         |      ) B
+         |      on A.name=B.name AND A.case_no=B.case_no
+         |      where is_id_card(A.identity_num) OR is_id_card(B.identity_num)
          |   )
          |   )
          |where num = 1
          |where num = 1
          |""".stripMargin).show(10, false)
          |""".stripMargin).show(10, false)