Prechádzať zdrojové kódy

自然人增量同步程序更名

晏永年 4 rokov pred
rodič
commit
4ee5d44474

+ 29 - 21
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForPersonUtils.scala

@@ -1,9 +1,10 @@
-package com.winhc.bigdata.spark.utils
+package com.winhc.bigdata.spark.etl
 
 import java.util.Date
 
 import com.winhc.bigdata.spark.udf.CompanyMapping
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -15,11 +16,12 @@ import scala.collection.mutable
  * @Date 2020/9/28
  * @Description 增量ods到ads的仅针对人员的同步
  */
-case class CompanyIncrForPersonUtils(s: SparkSession,
-                                     project: String, //表所在工程名
-                                     tableName: String, //表名(不加前后辍)
-                                     dupliCols: Seq[String], // 去重列
-                                     updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+case class PersonIncrSync(s: SparkSession,
+                          project: String, //表所在工程名
+                          tableName: String, //表名(不加前后辍)
+                          idCardCol: String, //身份证字段
+                          dupliCols: Seq[String], // 去重列
+                          updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
                                               ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
@@ -28,15 +30,15 @@ case class CompanyIncrForPersonUtils(s: SparkSession,
 
     prepareFunctions(spark)
 
-    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
-    val ods_company_tb = s"${project}.ods_$tableName" //增量ods表
     val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
     val inc_ads_company_tb = s"${project}.inc_ads_${tableName}_person" //增量ads表
 
+    //身份证补全表最新分区
+    val personIdDs = BaseUtil.getPartion("ads_person_idcard_cloze", spark)
+
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
-
     //增量ads最后一个分区
     val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
 
@@ -87,10 +89,10 @@ case class CompanyIncrForPersonUtils(s: SparkSession,
          |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
          |FROM    (
          |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols(0)})),MD5(cleanup(${cols_md5.drop(1).mkString("")}))) AS rowkey
-         |                    ,flag
+         |                    ,a.flag
          |                    ,MD5(cleanup(${dupliCols(0)})) AS new_cid
          |                    ,null AS cid
-         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
          |            FROM    (
          |                        SELECT  "1" AS flag
@@ -100,6 +102,13 @@ case class CompanyIncrForPersonUtils(s: SparkSession,
          |                        AND     cid IS NULL
          |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
          |                    ) a
+         |             LEFT JOIN
+         |                    (
+         |                        SELECT *
+         |                        FROM ads_person_idcard_cloze
+         |                        WHERE ds=${personIdDs}
+         |                    ) i
+         |             ON a.name=i.name AND a.case_no=i.case_no
          |        ) b
          |WHERE   num = 1
          |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
@@ -122,35 +131,34 @@ case class CompanyIncrForPersonUtils(s: SparkSession,
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }
-object CompanyIncrForPersonUtils {
+object PersonIncrSync {
 
   def main(args: Array[String]): Unit = {
 
-    val Array(project, tableName, dupliCols, flag, _) = args
+    val Array(project, tableName, idCardCol, dupliCols, flag, _) = args
     println(
       s"""
          |project: $project
          |tableName: $tableName
+         |idCardCol: $idCardCol
          |dupliCols: $dupliCols
          |flag: $flag
          |""".stripMargin)
-    if (args.length < 4) {
-      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
+    if (args.length < 5) {
+      println("请输入 project:项目, tableName:表名, idCardCol:身份证字段, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
       sys.exit(-1)
     }
     //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
     var updateCol: String = "update_time"
-    if (args.length == 5 && !args(4).endsWith("}")) {
-      updateCol = args(4)
+    if (args.length == 6 && !args(5).endsWith("}")) {
+      updateCol = args(5)
     }
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    flag match {
-      case "cid" => CompanyIncrForPersonUtils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
-    }
+    PersonIncrSync(spark, project, tableName, idCardCol, dupliCols.split(",").toSeq, updateCol).calc()
     spark.stop()
   }
 }