Browse Source

执行人和终本,人的增量,以及司法案件预处理

lyb 4 years ago
parent
commit
a063277ee7

+ 207 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -0,0 +1,207 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object JudicialCaseRelationPre10 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre10(spark, project).precalc()
+    spark.stop()
+  }
+}
+
+
+//司法案件-被执行人数据预处理
+case class JudicialCaseRelationPre10(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf()
+    //被执行人(企业)
+    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"7" as flag
+         |      ,concat_ws('',cname,'被执行人') as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,null as case_reason
+         |      ,case_no
+         |      ,court as court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,null as yg_name
+         |      ,cname as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_money as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_zxr
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+
+    //被执行人预处理(个人)
+    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_person").schema.map(_.name).filter(_!="flag")
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_person", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_person($lastDsIncAds)")
+    //1、先从被执行人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--先从company_zxr_person存量、增量表获取
+         |SELECT rowkey
+         |    ,0 AS flag
+         |    ,new_cid
+         |    ,cid
+         |    ,id
+         |    ,cname
+         |    ,sex
+         |    ,card
+         |    ,court
+         |    ,case_create_time
+         |    ,A.case_no AS case_no
+         |    ,gist_id
+         |    ,case_state
+         |    ,exec_money
+         |    ,type
+         |    ,source
+         |    ,status
+         |    ,appro_time
+         |    ,A.create_time AS create_time
+         |    ,A.update_time AS update_time
+         |    ,A.deleted AS deleted
+         |FROM(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        FROM(
+         |            SELECT *
+         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
+         |            FROM(
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.ads_company_zxr_person
+         |                WHERE ds>'0'
+         |                UNION ALL
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.inc_ads_company_zxr_person
+         |                WHERE ds>'0'
+         |            )
+         |        )
+         |        WHERE num=1
+         |    )
+         | ) A
+         |
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_1")
+
+    //3、再从自身表中根据name和company_name去补全身份证号码
+//    sql(
+//      s"""
+//         |--先从前面结果表获取
+//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_person_cloze PARTITION(ds=$lastDsIncAds)
+//         |SELECT A.rowkey
+//         |    ,IF(A.card IS NULL AND D.card IS NOT NULL,3,0) AS flag
+//         |    ,A.new_cid
+//         |    ,A.cid
+//         |    ,A.id
+//         |    ,A.cname
+//         |    ,A.sex
+//         |    ,COALESCE(A.card,D.card) AS card
+//         |    ,A.court
+//         |    ,A.case_create_time
+//         |    ,A.case_no
+//         |    ,A.gist_id
+//         |    ,A.case_state
+//         |    ,A.exec_money
+//         |    ,A.type
+//         |    ,A.source
+//         |    ,A.status
+//         |    ,A.appro_time
+//         |    ,A.create_time
+//         |    ,A.update_time
+//         |    ,A.deleted
+//         |FROM tmp_person_cloze_1 A
+//         |LEFT JOIN (tmp_person_cloze_1 D)
+//         |
+//         |ON A.cname=D.cname and d.card is not null  --根据实际数据情况
+//         |""".stripMargin
+//    )//.createOrReplaceTempView("tmp_person_cloze_3")
+
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_person')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"10" as flag
+         |      ,concat_ws('',cname,'被执行人') AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,cname as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_money as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from tmp_person_cloze_1
+         |      where length(case_no) > 0
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+  }
+}

+ 226 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre12.scala

@@ -0,0 +1,226 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object JudicialCaseRelationPre12 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre12(spark, project).precalc()
+    spark.stop()
+  }
+}
+
+
+//司法案件,终本数据预处理
+case class JudicialCaseRelationPre12(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    case_no_trim_udf()
+    //终本案件-企业
+    var lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_final_case", spark)
+    spark.sparkContext.setJobDescription(s"处理company_zxr_final_case($lastDsIncAds)")
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_final_case')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"6" as flag
+         |      ,concat_ws('',name,'终本案件') as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL as case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,name as bg_name
+         |      ,case_create_time as date
+         |      ,rowkey as detail_id
+         |      ,exec_amount as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_zxr_final_case
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+
+    //终本案件预处理(个人)
+    val columns: Seq[String] = spark.table(s"$project.inc_ads_company_zxr_final_case_person").schema.map(_.name).filter(_!="flag")
+    lastDsIncAds = BaseUtil.getPartion(s"$project.inc_ads_company_zxr_final_case_person", spark)
+    spark.sparkContext.setJobDescription(s"处理zxr_final_case_person($lastDsIncAds)")
+    //1、先从被执行人中用name和case_no关联补全身份证号码
+    sql(
+      s"""
+         |--先从company_zxr_final_case_person存量、增量表获取
+         |SELECT rowkey
+         |    ,IF(A.identity_num IS NULL AND B.card IS NOT NULL,1,0) AS flag
+         |    ,new_cid
+         |    ,cid
+         |    ,id
+         |    ,name
+         |    ,sex
+         |    ,COALESCE(A.identity_num,B.card) AS identity_num
+         |    ,court_name
+         |    ,case_final_time
+         |    ,case_create_time
+         |    ,A.case_no AS case_no
+         |    ,description
+         |    ,no_exec_amount
+         |    ,exec_amount
+         |    ,source
+         |    ,status
+         |    ,appro_time
+         |    ,A.create_time AS create_time
+         |    ,A.update_time AS update_time
+         |    ,A.deleted AS deleted
+         |FROM(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        FROM(
+         |            SELECT *
+         |            ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',name,case_no)) ORDER BY update_time DESC ) num
+         |            FROM(
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.ads_company_zxr_final_case_person
+         |                WHERE ds>'0'
+         |                UNION ALL
+         |                SELECT ${columns.mkString(",")}
+         |                FROM $project.inc_ads_company_zxr_final_case_person
+         |                WHERE ds>'0'
+         |            )
+         |        )
+         |        WHERE num=1
+         |    )
+         | ) A
+         |LEFT JOIN
+         |(
+         |    SELECT *
+         |    FROM(
+         |        SELECT *
+         |        ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',cname,case_no)) ORDER BY update_time DESC ) num
+         |        FROM (
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.ods_company_zxr
+         |            WHERE ds>'0'
+         |            UNION ALL
+         |            SELECT cname,card,case_no,type,update_time
+         |            FROM $project.inc_ods_company_zxr
+         |            WHERE ds>'0'
+         |        )
+         |    )
+         |    WHERE num=1 AND type='1'
+         |) B
+         |ON A.name=B.cname AND A.case_no=B.case_no
+         |""".stripMargin
+    ).createOrReplaceTempView("tmp_person_cloze_1")
+    //2、再从失信人中用name和case_no关联补全身份证号码
+
+    //3、再从自身表中根据name和company_name去补全身份证号码
+//    sql(
+//      s"""
+//         |--先从前面结果表获取
+//         |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_zxr_final_case_person_cloze PARTITION(ds=$lastDsIncAds)
+//         |SELECT A.rowkey
+//         |    ,IF(A.identity_num IS NULL AND D.identity_num IS NOT NULL,3,0) AS flag
+//         |    ,A.new_cid
+//         |    ,A.cid
+//         |    ,A.id
+//         |    ,A.name
+//         |    ,A.sex
+//         |    ,COALESCE(A.identity_num,D.identity_num) AS identity_num
+//         |    ,A.court_name
+//         |    ,A.case_final_time
+//         |    ,A.case_create_time
+//         |    ,A.case_no
+//         |    ,A.description
+//         |    ,A.no_exec_amount
+//         |    ,A.exec_amount
+//         |    ,A.source
+//         |    ,A.status
+//         |    ,A.appro_time
+//         |    ,A.create_time
+//         |    ,A.update_time
+//         |    ,A.deleted
+//         |FROM tmp_person_cloze_1 A
+//         |LEFT JOIN
+//         |tmp_person_cloze_1 D
+//         |ON A.name=D.name --根据实际数据情况
+//         |""".stripMargin
+//    )//.createOrReplaceTempView("tmp_person_cloze_3")
+
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='$lastDsIncAds',tn='zxr_final_case_person')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"12" as flag
+         |      ,concat_ws('',name,'终本案件') AS title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,NULL AS case_reason
+         |      ,case_no
+         |      ,court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,NULL as yg_name
+         |      ,name as bg_name
+         |      ,case_create_time as date--目前天眼查数据没有执行日期,先以此代替
+         |      ,rowkey as detail_id
+         |      ,exec_amount as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from tmp_person_cloze_1
+         |      where length(case_no) > 0
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+  }
+}

+ 129 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForPersonCidsUtils.scala

@@ -0,0 +1,129 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author lyb
+ * @Date 2020/9/28
+ * @Description 增量ods到ads的仅针对人员的同步
+ */
+case class CompanyIncrForPersonCidsUtils(s: SparkSession,
+                                         project: String, //表所在工程名
+                                         tableName: String, //表名(不加前后辍)
+                                         dupliCols: Seq[String], // 去重列
+                                         updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+                                              ) extends LoggingUtils with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
+    val ods_company_tb = s"${project}.ods_$tableName" //增量ods表
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+    val inc_ads_company_tb = s"${project}.inc_ads_${tableName}_person" //增量ads表
+
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("cids") && !s.equals("rowkey")
+    })
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |cols_md5:$cols_md5
+         |remainDs:$remainDs
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
+         |SELECT  rowkey
+         |        ,flag
+         |        ,new_cid
+         |        ,cid
+         |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols(0)})),MD5(cleanup(${cols_md5.drop(1).mkString("")}))) AS rowkey
+         |                    ,flag
+         |                    ,MD5(cleanup(${dupliCols(0)})) AS new_cid
+         |                    ,null AS cid
+         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
+         |            FROM    (
+         |                        SELECT  "1" AS flag
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${inc_ods_company_tb}
+         |                        WHERE   ds >= ${runDs}
+         |                        AND     cids IS NULL
+         |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
+         |                    ) a
+         |        ) b
+         |WHERE   num = 1
+         |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
+         |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
+         |""".stripMargin)
+
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      inc_ads_company_tb,
+      tableName+"_PERSON",
+      lastDsIncOds,
+      s"CONCAT_WS('_',new_cid,MD5(cleanup(${cols_md5.drop(1).mkString("")})))"
+    ).syn()
+
+//    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+
+
+
+
+

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForPersonUtils.scala

@@ -150,6 +150,8 @@ object CompanyIncrForPersonUtils {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     flag match {
       case "cid" => CompanyIncrForPersonUtils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
+      case "cids" => CompanyIncrForPersonCidsUtils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
+
     }
     spark.stop()
   }