Browse Source

fix: 查老赖上游数据处理统一处理

许家凯 4 years ago
parent
commit
efbe298296

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -232,7 +232,7 @@ case class deadbeat_info(s: SparkSession,
     mapTables("company_zxr_restrict") = ("rowkey", "name", "id_card_trim(identity_num)", "case_create_time", "deleted")
     mapTables.foreach(m => {
       spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
-      dishonest_info(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
+      deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
     })
   }
 
@@ -314,7 +314,7 @@ case class deadbeat_info(s: SparkSession,
     mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
     mapTables.foreach(m => {
       spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
-      dishonest_info(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
+      deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
     })
   }
 

+ 19 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/dishonest_info.scala

@@ -12,15 +12,15 @@ import scala.annotation.meta.getter
  * @Date: 2020/10/12 15:26
  * @Description: 失信人企业+个人 增量+全量
  */
-case class dishonest_info(s: SparkSession
-                          , project: String //表所在工程名
-                          , table: String
-                          , rowkey: String
-                          , cid: String
-                          , name: String
-                          , card_num: String
-                          , publish_date: String
-                          , deleted: String
+case class deadbeat_info_pre(s: SparkSession
+                             , project: String //表所在工程名
+                             , table: String
+                             , rowkey: String
+                             , cid: String
+                             , name: String
+                             , card_num: String
+                             , publish_date: String
+                             , deleted: String
                          ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
@@ -92,8 +92,16 @@ case class dishonest_info(s: SparkSession
         s"""
            |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
            |SELECT  $view_fields
-           |FROM    $inc_ads_table
-           |WHERE   ds > '$last_ds'
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $rowkey ORDER BY ds DESC ) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $inc_ads_table
+           |                        WHERE   ds > '$last_ds'
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
            |$verify
            |""".stripMargin)
     }