Browse Source

feat:司法案件之送达公告、限制消费令(分企业和自然人2种)预处理

晏永年 4 years ago
parent
commit
1adcc0142b

+ 0 - 124
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre45.scala

@@ -1,124 +0,0 @@
-package com.winhc.bigdata.spark.jobs.judicial
-
-import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
-import org.apache.spark.sql.SparkSession
-
-import scala.collection.mutable
-
-/**
- * @Description:司法案件预处理
- * @author Yan Yongnian
- * @date 2020/9/27
- */
-object JudicialCaseRelationPre45 {
-  def main(args: Array[String]): Unit = {
-    val project = "winhc_eci_dev"
-    println(
-      s"""
-         |project: $project
-         |""".stripMargin)
-
-    val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> s"$project",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    JudicialCaseRelationPre45(spark, project).precalc()
-    spark.stop()
-  }
-}
-case class JudicialCaseRelationPre45(s: SparkSession, project: String
-                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
-  override protected val spark: SparkSession = s
-
-  def precalc(): Unit = {
-    prepareFunctions(spark)
-    //送达公告预处理
-    sql(
-      s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200927',tn='send_announcement')
-         |select
-         |  judicase_id
-         |  ,flag
-         |  ,title
-         |  ,case_type
-         |  ,case_reason
-         |  ,case_no
-         |  ,court_name
-         |  ,case_stage
-         |  ,yg_name
-         |  ,bg_name
-         |  ,date
-         |  ,detail_id
-         |  ,case_amt
-         |from (
-         |      select
-         |      md5(cleanup(case_no)) as judicase_id
-         |      ,"4" as flag
-         |      ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
-         |      ,concat_ws('',case_type(case_no)) as case_type
-         |      ,case_reason
-         |      ,case_no
-         |      ,court as court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
-         |      ,plaintiff as yg_name
-         |      ,defendant as bg_name
-         |      ,start_date as date
-         |      ,rowkey as detail_id
-         |      ,0.0 as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.inc_ads_company_send_announcement
-         |      where length(case_no) > 0 and ds > '0'
-         |   )
-         |where num = 1
-         |""".stripMargin).show(10, false)
-
-    //限制高消费预处理
-    sql(
-      s"""
-         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200927',tn='zxr_restrict')
-         |select
-         |  judicase_id
-         |  ,flag
-         |  ,title
-         |  ,case_type
-         |  ,case_reason
-         |  ,case_no
-         |  ,court_name
-         |  ,case_stage
-         |  ,yg_name
-         |  ,bg_name
-         |  ,date
-         |  ,detail_id
-         |  ,case_amt
-         |from (
-         |      select
-         |      md5(cleanup(case_no)) as judicase_id
-         |      ,"5" as flag
-         |      ,if(length(company_name)>3 OR length(company_info)>3, concat_ws('',coalesce(company_name,company_info),'及',name,'被采取限制消费措施'),concat_ws('',name,'被采取限制消费措施')) AS title
-         |      ,concat_ws('',case_type(case_no)) as case_type
-         |      ,NULL AS case_reason
-         |      ,case_no
-         |      ,court_name
-         |      ,concat_ws('',case_stage(case_no)) as case_stage
-         |      ,NULL as yg_name
-         |      ,if(length(company_name)>3 OR length(company_info)>3, coalesce(company_name,company_info),name) as bg_name
-         |      ,appro_time as date--目前天眼查数据没有执行日期,先以此代替
-         |      ,rowkey as detail_id
-         |      ,0.0 as case_amt
-         |      ,row_number() over(partition by rowkey order by update_time desc) num
-         |      from $project.inc_ads_company_zxr_restrict
-         |      where length(case_no) > 0 and ds > '0'
-         |   )
-         |where num = 1
-         |""".stripMargin).show(10, false)
-
-
-
-    //tmp_xf_judicial_case_relation_open_counrt
-    //tmp_xf_judicial_case_relation_wenshu
-
-  }
-}

+ 327 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre456.scala

@@ -0,0 +1,327 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件预处理
+ * @author Yan Yongnian
+ * @date 2020/9/27
+ */
+object JudicialCaseRelationPre456 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre456(spark, project).precalc()
+    spark.stop()
+  }
+}
+case class JudicialCaseRelationPre456(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf()
+    //送达公告预处理
+    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_send_announcement", spark)
+    spark.sparkContext.setJobDescription(s"处理send_announcement($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='send_announcement')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"4" as flag
+         |      ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,case_reason
+         |      ,case_no
+         |      ,court as court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,plaintiff as yg_name
+         |      ,defendant as bg_name
+         |      ,start_date as date
+         |      ,rowkey as detail_id
+         |      ,0.0 as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_send_announcement
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+    //限制高消费预处理(企业)
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_restrict", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_restrict($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_restrict')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"5" as flag
+         |      ,if(length(company_name)>3 OR length(company_info)>3, concat_ws('',coalesce(company_name,company_info),'及',name,'被采取限制消费措施'),concat_ws('',name,'被采取限制消费措施')) AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,if(length(company_name)>3 OR length(company_info)>3, coalesce(company_name,company_info),name) as bg_name
+         |      ,appro_time as date--目前天眼查数据没有执行日期,先以此代替
+         |      ,rowkey as detail_id
+         |      ,0.0 as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_zxr_restrict
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+    //限制高消费预处理(个人)
+    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_restrict_person").schema.map(_.name).filter(_!="flag")
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_restrict_person", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_restrict_person($lastDsIncAds)")
+    //1、先从被执行人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--先从company_zxr_restrict_person存量、增量表获取
+         |SELECT rowkey
+         |    ,IF(A.identity_num IS NULL AND B.card IS NOT NULL,1,0) AS flag
+         |    ,new_cid
+         |    ,cid
+         |    ,id
+         |    ,name_hid
+         |    ,name
+         |    ,sex
+         |    ,COALESCE(A.identity_num,B.card) AS identity_num
+         |    ,court_name
+         |    ,court_code
+         |    ,case_create_time
+         |    ,A.case_no AS case_no
+         |    ,content
+         |    ,oss_path
+         |    ,file_path
+         |    ,province
+         |    ,xgid
+         |    ,company_name
+         |    ,company_info
+         |    ,source
+         |    ,status
+         |    ,appro_time
+         |    ,A.create_time AS create_time
+         |    ,A.update_time AS update_time
+         |    ,A.deleted AS deleted
+         |FROM(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        FROM(
+         |            SELECT *
+         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',name,case_no)) ORDER BY update_time DESC ) num
+         |            FROM(
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.ads_company_zxr_restrict_person
+         |                WHERE ds>'0'
+         |                UNION ALL
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.inc_ads_company_zxr_restrict_person
+         |                WHERE ds>'0'
+         |            )
+         |        )
+         |        WHERE num=1
+         |    )
+         | ) A
+         |LEFT JOIN
+         |(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
+         |        FROM (
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.ods_company_zxr
+         |            WHERE ds>'0'
+         |            UNION ALL
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.inc_ods_company_zxr
+         |            WHERE ds>'0'
+         |        )
+         |    )
+         |    WHERE num=1 AND type='1'
+         |) B
+         |ON A.name=B.cname AND A.case_no=B.case_no
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_1")
+    //2、再从失信人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--再从前面结果表获取
+         |SELECT rowkey
+         |    ,IF(A.identity_num IS NULL AND C.card_num IS NOT NULL,2,0) AS flag
+         |    ,A.new_cid
+         |    ,A.cid
+         |    ,A.id
+         |    ,name_hid
+         |    ,A.name
+         |    ,A.sex
+         |    ,COALESCE(A.identity_num,C.card_num) AS identity_num
+         |    ,A.court_name
+         |    ,A.court_code
+         |    ,A.case_create_time
+         |    ,A.case_no
+         |    ,A.content
+         |    ,A.oss_path
+         |    ,A.file_path
+         |    ,A.province
+         |    ,A.xgid
+         |    ,A.company_name
+         |    ,A.company_info
+         |    ,A.source
+         |    ,A.status
+         |    ,A.appro_time
+         |    ,A.create_time
+         |    ,A.update_time
+         |    ,A.deleted
+         |FROM tmp_person_cloze_1 A
+         |LEFT JOIN
+         |(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',name,case_no)) ORDER BY update_time DESC ) num
+         |        FROM (
+         |            SELECT name,card_num,case_no,update_time
+         |            FROM $project.ods_company_dishonest_info
+         |            WHERE ds>'0'
+         |            UNION ALL
+         |            SELECT name,card_num,case_no,update_time
+         |            FROM $project.inc_ods_company_dishonest_info
+         |            WHERE ds>'0'
+         |        )
+         |    )
+         |    WHERE num=1
+         |) C
+         |ON A.name=C.name AND A.case_no=C.case_no
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_2")
+    //3、再从自身表中根据name和company_name去补全身份证号码
+    sql(
+      s"""
+         |--先从前面结果表获取
+         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_restrict_person_cloze PARTITION(ds=$lastDsIncAds)
+         |SELECT A.rowkey
+         |    ,IF(A.identity_num IS NULL AND D.identity_num IS NOT NULL,3,0) AS flag
+         |    ,A.new_cid
+         |    ,A.cid
+         |    ,A.id
+         |    ,A.name_hid
+         |    ,A.name
+         |    ,A.sex
+         |    ,COALESCE(A.identity_num,D.identity_num) AS identity_num
+         |    ,A.court_name
+         |    ,A.court_code
+         |    ,A.case_create_time
+         |    ,A.case_no
+         |    ,A.content
+         |    ,A.oss_path
+         |    ,A.file_path
+         |    ,A.province
+         |    ,A.xgid
+         |    ,A.company_name
+         |    ,A.company_info
+         |    ,A.source
+         |    ,A.status
+         |    ,A.appro_time
+         |    ,A.create_time
+         |    ,A.update_time
+         |    ,A.deleted
+         |FROM tmp_person_cloze_2 A
+         |LEFT JOIN
+         |tmp_person_cloze_2 D
+         |ON A.name=D.name AND (cleanup(A.company_name)=cleanup(D.company_name) OR (cleanup(A.company_info)=cleanup(D.company_info)))--根据实际数据情况
+         |""".stripMargin
+    )//.createOrReplaceTempView("tmp_person_cloze_3")
+
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_restrict_person')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"5" as flag
+         |      ,concat_ws('',name,'被采取限制消费措施') AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,name as bg_name
+         |      ,appro_time as date--目前天眼查数据没有执行日期,先以此代替
+         |      ,rowkey as detail_id
+         |      ,0.0 as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.ads_company_zxr_restrict_person_cloze
+         |      where length(case_no) > 0 and ds=$lastDsIncAds
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+  }
+}