Explorar el Código

fix: 查老赖上下游数据调试

许家凯 hace 4 años
padre
commit
9c586cf8e1

+ 28 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.jobs.deadbeat
 
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, DateUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
 import org.apache.spark.sql.types._
@@ -30,8 +30,9 @@ case class deadbeat_info(s: SparkSession,
   )
 
   private def is_con(s: String): Boolean = {
+
     for (e <- filter_ele)
-      if (s.startsWith(e))
+      if (s.split("@@")(0).equals(e))
         return true
     false
   }
@@ -226,12 +227,12 @@ case class deadbeat_info(s: SparkSession,
   def personPre(): Unit = {
     //参与预处理的表
     val mapTables = new mutable.HashMap[String, (String, String, String, String, String)]()
-    mapTables("company_zxr") = ("rowkey", "cname", "id_card_trim(card)", "case_create_time", "deleted")
-    mapTables("company_dishonest_info") = ("rowkey", "name", "id_card_trim(card_num)", "pub_date", "status")
-    mapTables("company_zxr_final_case") = ("rowkey", "name", "id_card_trim(identity_num)", "case_create_time", "deleted")
-    mapTables("company_zxr_restrict") = ("rowkey", "name", "id_card_trim(identity_num)", "case_create_time", "deleted")
+    mapTables("company_zxr") = ("rowkey", "cname", "card", "case_create_time", "deleted")
+    mapTables("company_dishonest_info") = ("rowkey", "name", "card_num", "pub_date", "status")
+    mapTables("company_zxr_final_case") = ("rowkey", "name", "identity_num", "case_create_time", "deleted")
+    mapTables("company_zxr_restrict") = ("rowkey", "name", "identity_num", "case_create_time", "deleted")
     mapTables.foreach(m => {
-      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
+      spark.sparkContext.setJobDescription(s"查老赖(个人)数据预处理:${m._1}")
       deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = null, name = m._2._2, card_num = m._2._3, publish_date = m._2._4, deleted = m._2._5).calc()
     })
   }
@@ -241,7 +242,7 @@ case class deadbeat_info(s: SparkSession,
     val org_tab = s"$project.ads_deadbeat_person"
     val org_last_ds = getLastPartitionsOrElse(org_tab, "0")
     val target_last_ds = getLastPartitionsOrElse(target_tab, "0")
-
+    spark.sparkContext.setJobDescription(s"查老赖聚合(个人)")
     sql(
       s"""
          |SELECT  *
@@ -308,12 +309,13 @@ case class deadbeat_info(s: SparkSession,
   def companyPre(): Unit = {
     //参与预处理的表
     val mapTables = new mutable.HashMap[String, (String, String, String, String, String, String)]()
-    mapTables("company_zxr") = ("rowkey", "cid", "cname", "card", "case_create_time", "deleted")
+    mapTables("company_zxr") = ("rowkey", "cids", "cname", "card", "case_create_time", "deleted")
     mapTables("company_dishonest_info") = ("rowkey", "cid", "name", "card_num", "pub_date", "status")
     mapTables("company_zxr_final_case") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
     mapTables("company_zxr_restrict") = ("rowkey", "cid", "name", "identity_num", "case_create_time", "deleted")
     mapTables.foreach(m => {
-      spark.sparkContext.setJobDescription(s"查老赖数据预处理:${m._1}")
+      println(m)
+      spark.sparkContext.setJobDescription(s"查老赖(企业)数据预处理:${m._1}")
       deadbeat_info_pre(spark, project = project, table = m._1, rowkey = m._2._1, cid = m._2._2, name = m._2._3, card_num = m._2._4, publish_date = m._2._5, deleted = m._2._6).calc()
     })
   }
@@ -326,7 +328,7 @@ case class deadbeat_info(s: SparkSession,
 
     val company_last_ds = getLastPartitionsOrElse(s"$project.ads_company", "0")
     val intersect_company_cols = getColumns(s"$project.ads_company").intersect(getColumns(s"$project.inc_ads_company"))
-
+    spark.sparkContext.setJobDescription(s"查老赖聚合(企业)")
     sql(
       s"""
          |SELECT  *
@@ -520,10 +522,21 @@ object deadbeat_info {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val di = deadbeat_info(spark, "winhc_eci_dev")
     di.reg_udf()
-    di.personPre()
-    di.person()
-    di.companyPre()
-    di.company()
+    AsyncExtract.startAndWait(spark, Seq(
+      ("前置处理。。。", () => {
+        di.personPre()
+        di.companyPre()
+        true
+      })
+    ))
+
+    AsyncExtract.startAndWait(spark, Seq(
+      ("下游处理。。。", () => {
+        di.person()
+        di.company()
+        true
+      })
+    ))
     spark.stop()
   }
 

+ 26 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info_pre.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.jobs.deadbeat
 
 import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.LoggingUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
@@ -43,9 +44,13 @@ case class deadbeat_info_pre(s: SparkSession
     }
 
     val verify = StringUtils.isEmpty(cid) match {
-      case true => s"AND      cid is not null"
-      case false => s"AND     is_id_card($card_num)"
+      case false => s"AND      $cid is not null"
+      case true => s"AND      is_id_card($card_num)"
     }
+     val card_num_f = StringUtils.isEmpty(cid) match {
+       case true => s"id_card_trim($card_num) AS card_num"
+       case false => s"$card_num AS card_num"
+     }
     val inc_ads_table = s"$project.inc_ads_$org_tab"
     val ads_table = s"$project.ads_$org_tab"
 
@@ -59,7 +64,7 @@ case class deadbeat_info_pre(s: SparkSession
          |$rowkey as rowkey
          |${cid_f}
          |,$name as name
-         |,$card_num as card_num
+         |,$card_num_f
          |,$publish_date as publish_date
          |,$deleted as deleted
          |""".stripMargin
@@ -67,7 +72,7 @@ case class deadbeat_info_pre(s: SparkSession
     def all(): Unit = {
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
            |SELECT  $view_fields
            |FROM    (
            |            SELECT  *
@@ -90,7 +95,7 @@ case class deadbeat_info_pre(s: SparkSession
     def inc(last_ds: String): Unit = {
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_ads_last_ds',tn='$org_tab')
            |SELECT  $view_fields
            |FROM    (
            |            SELECT  *
@@ -108,9 +113,11 @@ case class deadbeat_info_pre(s: SparkSession
 
     val ds_cols = sql(s"show partitions $target_table").collect()
       .map(_.getString(0))
-      .filter(_.contains(org_tab))
+      .filter(s=>{
+        s.split("/").filter(_.startsWith("tn")).max.split("=")(1).equals(org_tab)
+      })
       .flatMap(_.split("/"))
-      .filter(_.contains("ds"))
+      .filter(_.startsWith("ds"))
       .map(_.split("=")(1))
     if (ds_cols.isEmpty) {
       println("全量计算")
@@ -122,3 +129,15 @@ case class deadbeat_info_pre(s: SparkSession
     }
   }
 }
+
+/*object deadbeat_info_pre{
+  def main (args: Array[String] ): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    deadbeat_info_pre(spark,"winhc_eci_dev","company_zxr_restrict","rowkey", "cid", "name", "identity_num", "case_create_time", "deleted").calc()
+    spark.stop()
+  }
+}*/