Browse Source

feat:自然人的增量同步

晏永年 4 years ago
parent
commit
f39ba79e7a

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForPersonUtils.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/9/28
+ * @Description 增量ods到ads的仅针对人员的同步
+ */
+case class CompanyIncrForPersonUtils(s: SparkSession,
+                                     project: String, //表所在工程名
+                                     tableName: String, //表名(不加前后辍)
+                                     dupliCols: Seq[String], // 去重列
+                                     updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+                                              ) extends LoggingUtils with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
+    val ods_company_tb = s"${project}.ods_$tableName" //增量ods表
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+    val inc_ads_company_tb = s"${project}.inc_ads_${tableName}_person" //增量ads表
+
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey")
+    })
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      runDs = lastDsIncOds
+      //sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |cols_md5:$cols_md5
+         |remainDs:$remainDs
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
+         |SELECT  rowkey
+         |        ,flag
+         |        ,new_cid
+         |        ,cid
+         |        ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',MD5(cleanup(${dupliCols(0)})),MD5(cleanup(${cols_md5.drop(1).mkString("")}))) AS rowkey
+         |                    ,flag
+         |                    ,MD5(cleanup(${dupliCols(0)})) AS new_cid
+         |                    ,null AS cid
+         |                    ,${columns.filter(s=>{s!="cid" && s!="new_cid"}).mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
+         |            FROM    (
+         |                        SELECT  "1" AS flag
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${inc_ods_company_tb}
+         |                        WHERE   ds >= ${runDs}
+         |                        AND     cid IS NULL
+         |                        AND     ${dupliCols.mkString(""," IS NOT NULL AND "," IS NOT NULL")}
+         |                    ) a
+         |        ) b
+         |WHERE   num = 1
+         |AND     CONCAT_WS('',${cols_md5.mkString(",")}) IS NOT NULL
+         |AND     trim(CONCAT_WS('',${cols_md5.mkString(",")})) <> ''
+         |""".stripMargin)
+
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      inc_ads_company_tb,
+      tableName+"_PERSON",
+      lastDsIncOds,
+      s"CONCAT_WS('_',new_cid,MD5(cleanup(${cols_md5.drop(1).mkString("")})))"
+    ).syn()
+
+//    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}
+object CompanyIncrForPersonUtils {
+
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, tableName, dupliCols, flag, _) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |dupliCols: $dupliCols
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length < 4) {
+      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
+      sys.exit(-1)
+    }
+    //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
+    var updateCol: String = "update_time"
+    if (args.length == 5 && !args(4).endsWith("}")) {
+      updateCol = args(4)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    flag match {
+      case "cid" => CompanyIncrForPersonUtils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
+    }
+    spark.stop()
+  }
+}