Browse Source

feat: 查老赖上游数据处理统一处理

许家凯 4 years ago
parent
commit
d55175e0c3

+ 23 - 61
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -1,8 +1,7 @@
 package com.winhc.bigdata.spark.jobs.deadbeat
 
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, DateUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
@@ -216,44 +215,27 @@ case class deadbeat_info(s: SparkSession,
     spark.udf.register("get_birth_year", get_birth_year _)
     spark.udf.register("agg_label", new person_agg_label)
     spark.udf.register("get_empty_map", get_empty_map _)
+    is_id_card()
+    id_card_trim_udf()
 
     def toTime(str: String): String = DateUtils.toMillisTimestamp(str, pattern = "yyyy-MM-dd HH:mm:ss")
 
     spark.udf.register("to_millis_timestamp", toTime _)
   }
+
   def personPre(): Unit = {
     //参与预处理的表
-    val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String, String)]()
-    mapTables("company_zxr") = ("rowkey", "cname", "card", "case_create_time", "deleted", "case_no", "court", "1")
-    mapTables("company_dishonest_info") = ("rowkey", "name", "card_num", "pub_date", "deleted", "case_no", "court", "2")
-    mapTables("company_zxr_final_case") = ("rowkey", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "3")
-    mapTables("company_zxr_restrict") = ("rowkey", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "4")
-    is_id_card()
-    id_card_trimOrRaw_udf()
-    mapTables.map(m => {
-      val lastDsIncAds = BaseUtil.getPartion("inc_ads_" + m._1 + "_person", spark)
-      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}个表聚合($lastDsIncAds)")
-      sql(s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_person partition(ds='$lastDsIncAds',tn='${m._1}_person')
-         |SELECT
-         |FROM(
-         |  SELECT ${m._2._1} AS rowkey
-         |  ,${m._2._2} AS name
-         |  ,${m._2._3} AS card_num
-         |  ,${m._2._4} AS publish_date
-         |  ,${m._2._5} AS deleted
-         |  ,${m._2._6} AS case_no
-         |  ,${m._2._7} AS court_name
-         |  ,${m._2._8} AS flag
-         |  ,ROW_NUMBER() OVER (PARTITION BY card_num ORDER BY publish_date DESC) num
-         |  FROM $project.inc_ads_${m._1}_person
-         |  WHERE ds=$lastDsIncAds AND is_id_card(${m._2._3})
-         |)
-         |WHERE num=1
-         |""".stripMargin
-      )
+    val mapTables = new mutable.HashMap[String, (String, String, String, String, String)]()
+    mapTables("company_zxr") = ("rowkey", "cname", "id_card_trim(card)", "case_create_time", "deleted")
+    mapTables("company_dishonest_info") = ("rowkey", "name", "id_card_trim(card_num)", "pub_date", "status")
+    mapTables("company_zxr_final_case") = ("rowkey", "name", "id_card_trim(identity_num)", "case_create_time", "deleted")
+    mapTables("company_zxr_restrict") = ("rowkey", "name", "id_card_trim(identity_num)", "case_create_time", "deleted")
+    mapTables.foreach(m => {
+      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
+      dishonest_info(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
     })
   }
+
   def person(): Unit = {
     val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_person_out"
     val org_tab = s"$project.ads_deadbeat_person"
@@ -322,40 +304,20 @@ case class deadbeat_info(s: SparkSession,
          |        )
          |""".stripMargin)
   }
+
   def companyPre(): Unit = {
     //参与预处理的表
-    val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String, String, String)]()
-    mapTables("company_zxr") = ("rowkey", "cid", "cname", "card", "case_create_time", "deleted", "case_no", "court", "1")
-    mapTables("company_dishonest_info") = ("rowkey", "cid", "name", "card_num", "reg_time", "deleted", "case_no", "court", "2")
-    mapTables("company_zxr_final_case") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "3")
-    mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "4")
-    is_id_card()
-    id_card_trimOrRaw_udf()
-    mapTables.map(m => {
-      val lastDsIncOds = BaseUtil.getPartion("inc_ads_" + m._1, spark)
-      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}($lastDsIncOds)")
-      sql(s"""
-             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_company partition(ds='$lastDsIncOds',tn='${m._1}')
-             |SELECT
-             |FROM(
-             |  SELECT ${m._2._1} AS rowkey
-             |  ,${m._2._2} AS name
-             |  ,${m._2._3} AS name
-             |  ,${m._2._4} AS card_num
-             |  ,${m._2._5} AS publish_date
-             |  ,${m._2._6} AS deleted
-             |  ,${m._2._7} AS case_no
-             |  ,${m._2._8} AS court_name
-             |  ,${m._2._9} AS flag
-             |  ,ROW_NUMBER() OVER (PARTITION BY card_num ORDER BY publish_date DESC ) num
-             |  FROM $project.inc_ads_${m._1}
-             |  WHERE ds=$lastDsIncOds AND ${m._2._1} IS NULL
-             |)
-             |WHERE num=1
-             |""".stripMargin
-      )
+    val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String)]()
+    mapTables("company_zxr") = ("rowkey", "cid", "cname", "card", "case_create_time", "deleted")
+    mapTables("company_dishonest_info") = ("rowkey", "cid", "name", "card_num", "pub_date", "status")
+    mapTables("company_zxr_final_case") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
+    mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
+    mapTables.foreach(m => {
+      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
+      dishonest_info(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
     })
   }
+
   def company(): Unit = {
     val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_company_out"
     val org_tab = s"$project.ads_deadbeat_company"

+ 42 - 62
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/dishonest_info.scala

@@ -1,40 +1,53 @@
 package com.winhc.bigdata.spark.jobs.deadbeat
 
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
-import org.apache.spark.internal.Logging
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
-import scala.collection.mutable
 
 /**
  * @Author: XuJiakai
  * @Date: 2020/10/12 15:26
  * @Description: 失信人企业+个人 增量+全量
  */
-case class dishonest_info(s: SparkSession,
-                          project: String //表所在工程名
-                         ) extends LoggingUtils with Logging {
+case class dishonest_info(s: SparkSession
+                          , project: String //表所在工程名
+                          , table: String
+                          , rowkey: String
+                          , cid: String
+                          , name: String
+                          , card_num: String
+                          , publish_date: String
+                          , deleted: String
+                         ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
   private def is_deleted(status: Int, deleted: Int): String = if ((status + deleted) == 0) "0" else "1"
 
-  private def calc(is_person: Boolean = true): Unit = {
-    val tn = is_person match {
-      case true => "company_dishonest_info_person"
-      case false => "company_dishonest_info"
+   def calc(): Unit = {
+    val org_tab = StringUtils.isEmpty(cid) match {
+      case true => s"${table}_person"
+      case false => table
     }
-    val cid = is_person match {
+
+    val cid_f = StringUtils.isEmpty(cid) match {
       case true => ""
-      case false => ",cid"
+      case false => s",$cid as cid"
     }
 
-    val target_table = is_person match {
+    val target_table = StringUtils.isEmpty(cid) match {
       case true => s"$project.ads_deadbeat_person"
       case false => s"$project.ads_deadbeat_company"
     }
-    val inc_ads_table = s"$project.inc_ads_$tn"
-    val ads_table = s"$project.ads_$tn"
+
+    val verify = StringUtils.isEmpty(cid) match {
+      case true => s"AND      cid is not null"
+      case false => s"AND     is_id_card($card_num)"
+    }
+    val inc_ads_table = s"$project.inc_ads_$org_tab"
+    val ads_table = s"$project.ads_$org_tab"
 
     val intersect_cols = getColumns(inc_ads_table).intersect(getColumns(ads_table))
     val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
@@ -43,22 +56,22 @@ case class dishonest_info(s: SparkSession,
 
     val view_fields =
       s"""
-         |rowkey
-         |${cid}
-         |,name
-         |,card_num
-         |,pub_date as publish_date
-         |,is_deleted(status,deleted) as deleted
+         |$rowkey as rowkey
+         |${cid_f}
+         |,$name as name
+         |,$card_num as card_num
+         |,$publish_date as publish_date
+         |,$deleted as deleted
          |""".stripMargin
 
     def all(): Unit = {
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$tn')
+           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
            |SELECT  $view_fields
            |FROM    (
            |            SELECT  *
-           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $rowkey ORDER BY ds DESC ) AS num
            |            FROM    (
            |                        SELECT  ${intersect_cols.mkString(",")}
            |                        FROM    $ads_table
@@ -70,67 +83,34 @@ case class dishonest_info(s: SparkSession,
            |                    ) AS t1
            |        ) AS t2
            |WHERE   t2.num = 1
+           |$verify
            |""".stripMargin)
     }
 
     def inc(last_ds: String): Unit = {
-      val tmp_tab = s"tmp_deadbeat_$tn"
       sql(
         s"""
+           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
            |SELECT  $view_fields
-           |        ,ds
            |FROM    $inc_ads_table
            |WHERE   ds > '$last_ds'
+           |$verify
            |""".stripMargin)
-        .createOrReplaceTempView(tmp_tab)
-
-      sql(
-        s"""
-           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$tn')
-           |SELECT  rowkey
-           |        ${cid}
-           |        ,name
-           |        ,card_num
-           |        ,publish_date
-           |        ,deleted
-           |FROM    $tmp_tab
-           |""".stripMargin)
-
     }
 
     val ds_cols = sql(s"show partitions $target_table").collect()
       .map(_.getString(0))
-      .filter(_.contains(tn))
+      .filter(_.contains(org_tab))
       .flatMap(_.split("/"))
       .filter(_.contains("ds"))
       .map(_.split("=")(1))
     if (ds_cols.isEmpty) {
-      println("全量计算,失信人-个人")
+      println("全量计算")
       all()
     } else {
       val max = ds_cols.max
-      println(s"增量计算:$max,失信人-个人")
+      println(s"增量计算:$max")
       inc(max)
     }
   }
-
-  def person(): Unit = calc()
-
-  def company(): Unit = calc(false)
-
-}
-
-object dishonest_info {
-  def main(args: Array[String]): Unit = {
-    val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    val di = dishonest_info(spark, "winhc_eci_dev")
-    di.person()
-    di.company()
-
-    spark.stop()
-  }
 }