许家凯 пре 4 година
родитељ
комит
45835c5c8d
1 измењених фајлова са 56 додато и 3 уклоњено
  1. 56 3
      src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala

+ 56 - 3
src/main/scala/com/winhc/bigdata/spark/etl/PersonIncrSync.scala

@@ -26,6 +26,60 @@ case class PersonIncrSync(s: SparkSession,
                                               ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
+  def all(): Unit ={
+
+    prepareFunctions(spark)
+    val targetTab = s"$project.ads_${tableName}_person"
+    val orgTab = s"$project.ods_${tableName}"
+
+    val lastDs = getLastPartitionsOrElse(orgTab,"0")
+
+    val columns: Seq[String] = getColumns(targetTab).filter(s => {
+      !s.equals("ds") && !s.equals(s"new_$cidCol") && !s.equals("rowkey")
+    })
+
+    val cols_md5 = dupliCols.filter(!_.equals(s"new_$cidCol"))
+
+    val personIdDs =getLastPartitionsOrElse("ads_person_idcard_cloze","0")
+    val ods_company_tb = s"${project}.ods_$tableName" //增量ods表
+
+    val runDs = getLastPartitionsOrElse(ods_company_tb,"0")
+   sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${targetTab} PARTITION(ds=$lastDs)
+         |SELECT  rowkey
+         |        ,new_$cidCol
+         |        ,$cidCol
+         |        ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).mkString(",")}
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})),MD5(cleanup(${cols_md5.drop(1).map(s => s"a.$s").mkString("")}))) AS rowkey
+         |                    ,MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})) AS new_$cidCol
+         |                    ,null AS $cidCol
+         |                    ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).map(s => s"a.$s").mkString(",").replace(s"a.${idCardCol}",s"i.identity_num AS ${idCardCol}")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(s => s"a.$s").mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
+         |            FROM    (
+         |                        SELECT  ${columns.mkString(",")}
+         |                        FROM    ${ods_company_tb}
+         |                        WHERE   ds >= ${runDs}
+         |                        AND     $cidCol IS NULL
+         |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
+         |                    ) a
+         |             JOIN
+         |                    (
+         |                        SELECT *
+         |                        FROM ${project}.ads_person_idcard_cloze
+         |                        WHERE ds=${personIdDs}
+         |                    ) i
+         |             ON a.${dupliCols(0)}=i.name AND a.case_no=i.case_no
+         |        ) b
+         |WHERE   num = 1
+         |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
+         |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
+         |""".stripMargin)
+  }
+
+
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
@@ -93,7 +147,7 @@ case class PersonIncrSync(s: SparkSession,
          |                    ,a.flag
          |                    ,MD5(cleanup(${dupliCols.map(s => s"a.$s").toVector(0)})) AS new_$cidCol
          |                    ,null AS $cidCol
-         |                    ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).map(s => s"a.$s").mkString(",").replace("${idCardCol}","IF(a.${idCardCol} IS NULL AND c.identity_num IS NOT NULL,c.identity_num,a.${idCardCol}) AS ${idCardCol}")}
+         |                    ,${columns.filter(s=>{s!=s"${cidCol}" && s!=s"new_${cidCol}"}).map(s => s"a.$s").mkString(",").replace(s"a.${idCardCol}",s"i.identity_num ${idCardCol}")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(s => s"a.$s").mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
          |            FROM    (
          |                        SELECT  "1" AS flag
@@ -133,7 +187,6 @@ case class PersonIncrSync(s: SparkSession,
   }
 }
 object PersonIncrSync {
-
   def main(args: Array[String]): Unit = {
 
     val Array(project, tableName, idCardCol, dupliCols, _*) = args
@@ -144,7 +197,7 @@ object PersonIncrSync {
          |idCardCol: $idCardCol
          |dupliCols: $dupliCols
          |""".stripMargin)
-    if (args.length < 5) {
+    if (!(args.length == 4 || args.length ==6)) {
       println("请输入 project:项目, tableName:表名, idCardCol:身份证字段, dupliCols:去重字段, [cidCol:企业标识, [updateCol:排序列]]!!!")
       sys.exit(-1)
     }