Selaa lähdekoodia

查老赖统一预处理

晏永年 4 vuotta sitten
vanhempi
commit
7b773ade4d

+ 71 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -1,7 +1,8 @@
 package com.winhc.bigdata.spark.jobs.deadbeat
 
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, DateUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
@@ -205,7 +206,39 @@ case class deadbeat_info(s: SparkSession,
 
     spark.udf.register("to_millis_timestamp", toTime _)
   }
-
+  def personPre(): Unit = {
+    //参与预处理的表
+    val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String, String)]()
+    mapTables("company_zxr") = ("rowkey", "cname", "card", "case_create_time", "deleted", "case_no", "court", "1")
+    mapTables("company_dishonest_info") = ("rowkey", "name", "card_num", "reg_time", "deleted", "case_no", "court", "2")
+    mapTables("company_zxr_final_case") = ("rowkey", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "3")
+    mapTables("company_zxr_restrict") = ("rowkey", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "4")
+    is_id_card()
+    id_card_trimOrRaw_udf()
+    mapTables.map(m => {
+      val lastDsIncAds = BaseUtil.getPartion("inc_ads_" + m._1 + "_person", spark)
+      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}个表聚合($lastDsIncAds)")
+      sql(s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_person partition(ds='$lastDsIncAds',tn='${m._1}_person')
+         |SELECT
+         |FROM(
+         |  SELECT ${m._2._1} AS rowkey
+         |  ,${m._2._2} AS name
+         |  ,${m._2._3} AS card_num
+         |  ,${m._2._4} AS publish_date
+         |  ,${m._2._5} AS deleted
+         |  ,${m._2._6} AS case_no
+         |  ,${m._2._7} AS court_name
+         |  ,${m._2._8} AS flag
+         |  ,ROW_NUMBER() OVER (PARTITION BY card_num ORDER BY publish_date DESC) num
+         |  FROM $project.inc_ads_${m._1}_person
+         |  WHERE ds=$lastDsIncAds AND is_id_card(${m._2._3})
+         |)
+         |WHERE num=1
+         |""".stripMargin
+      )
+    })
+  }
   def person(): Unit = {
     val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_person_out"
     val org_tab = s"$project.ads_deadbeat_person"
@@ -274,8 +307,40 @@ case class deadbeat_info(s: SparkSession,
          |        )
          |""".stripMargin)
   }
-
-
+  def companyPre(): Unit = {
+    //参与预处理的表
+    val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String, String, String, String)]()
+    mapTables("company_zxr") = ("rowkey", "cid", "cname", "card", "case_create_time", "deleted", "case_no", "court", "1")
+    mapTables("company_dishonest_info") = ("rowkey", "cid", "name", "card_num", "reg_time", "deleted", "case_no", "court", "2")
+    mapTables("company_zxr_final_case") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "3")
+    mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted", "case_no", "court_name", "4")
+    is_id_card()
+    id_card_trimOrRaw_udf()
+    mapTables.map(m => {
+      val lastDsIncOds = BaseUtil.getPartion("inc_ads_" + m._1, spark)
+      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}($lastDsIncOds)")
+      sql(s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_deadbeat_company partition(ds='$lastDsIncOds',tn='${m._1}')
+             |SELECT
+             |FROM(
+             |  SELECT ${m._2._1} AS rowkey
+             |  ,${m._2._2} AS name
+             |  ,${m._2._3} AS name
+             |  ,${m._2._4} AS card_num
+             |  ,${m._2._5} AS publish_date
+             |  ,${m._2._6} AS deleted
+             |  ,${m._2._7} AS case_no
+             |  ,${m._2._8} AS court_name
+             |  ,${m._2._9} AS flag
+             |  ,ROW_NUMBER() OVER (PARTITION BY card_num ORDER BY publish_date DESC ) num
+             |  FROM $project.inc_ads_${m._1}
+             |  WHERE ds=$lastDsIncOds AND ${m._2._1} IS NULL
+             |)
+             |WHERE num=1
+             |""".stripMargin
+      )
+    })
+  }
   def company(): Unit = {
     val target_tab = s"${getEnvProjectName(env, project)}.ads_deadbeat_company_out"
     val org_tab = s"$project.ads_deadbeat_company"
@@ -478,7 +543,9 @@ object deadbeat_info {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val di = deadbeat_info(spark, "winhc_eci_dev")
     di.reg_udf()
+    di.personPre()
     di.person()
+    di.companyPre()
     di.company()
     spark.stop()
   }