Quellcode durchsuchen

增量单表(Type1)维度的复制方案

yongnian vor 5 Jahren
Ursprung
Commit
146b9fcc49

+ 211 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/CommonTableOps.scala

@@ -0,0 +1,211 @@
+package com.winhc.bigdata.spark.jobs.increment
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, PhoenixUtil}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+
+/**
+ * @Author: Yan Yongina
+ * @Date: 2020/6/17 14:04
+ * @Description:数据ETL
+ */
+
+case class CommonTableOps(s: SparkSession, sourceTable: String, dupCols: Seq[String] = Nil, delCols: Seq[String] = Nil) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  val companyMapping = "company_map_little"
+  val companyTable = "company"
+
+  import org.apache.spark.sql.functions._
+
+  def ods2ads2phx(prefix: String): Unit = {
+    println(s"${this.getClass.getSimpleName} ods start! " + new Date().toString)
+    val odsTable = s"${prefix}ods_${sourceTable}"
+    val adsTable = s"${prefix}ads_${sourceTable}"
+    val phxTable = s"${sourceTable}".toUpperCase
+    val ds = BaseUtil.getPartion(odsTable, spark)
+    //    val ds = "20200608" //测试用数据
+    //table字段
+    val odsColumns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds")) diff delCols
+    val adsColumns: Seq[String] = spark.table(adsTable).schema.map(_.name).filter(!_.equals("ds"))
+
+    sql(s"select ${odsColumns.mkString(",")} from $odsTable where ds = $ds and cid is not null")
+      //    sql(s"select * from $odsTable where ds = 20200608 and cid is not null")
+      //      .drop(delCols:_*)
+      .dropDuplicates(dupCols)
+      .createOrReplaceTempView("t1")
+
+    sql(s"CACHE TABLE t1")
+
+    //替换新的cid
+    val df1 = sql(
+      s"""
+         |SELECT  CONCAT_WS("_",c.cid,c.id) AS rowkey,${adsColumns.filter(i => i != "cid" && i != "rowkey").mkString(",")}
+         |        ,coalesce(d.new_cid,c.cid) as ncid,c.cid
+         |FROM    t1 c
+         |LEFT JOIN $companyMapping d
+         |ON      c.cid = d.cid
+         |""".stripMargin).dropDuplicates(dupCols.:+("cid"))
+    //      .drop(delCols:_*)
+
+    df1.columns.foldLeft(df1) {
+      (currentDF, column) =>
+        currentDF.withColumn(column, col(column).cast("string"))
+          .drop("cid")
+          .withColumnRenamed("ncid", "cid")
+    }.createOrReplaceTempView(s"t2")
+
+    df1.filter("cid!=ncid")
+      .join(
+        sql(s"select ${odsColumns.mkString(",")} from ods_$odsTable where ds = $ds and cid is not null"),
+        Seq("cid"),
+        "left"
+      )
+      .union(sql(s"t2"))
+      .createOrReplaceTempView(s"t3")
+
+    //写入ads表,本地执行用into,集群用overwrite
+    sql(s"insert into table ${adsTable} partition (ds=$ds) select ${adsColumns.mkString(",")} from t3")
+    println(s"${this.getClass.getSimpleName} ads end! " + new Date().toString)
+
+    // 写入PHX的表
+    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
+    val connProp = PhoenixUtil.getPhoenixProperties
+    df1.persist()
+    sql(s"""SELECT ${adsColumns.filter(!_.equals("ROWKEY")).mkString(",")},CONCAT_WS("_",c.cid,c.id) AS ROWKEY FROM t2""")
+      .save2PhoenixByJDBC(s"${phxTable}")
+
+    println(s"${this.getClass.getSimpleName} phx end! " + new Date().toString)
+  }
+
+  def ods2ads2phx4flat(prefix: String): Unit = {
+    println(s"${this.getClass.getSimpleName} ods start! " + new Date().toString)
+    val odsTable = s"${prefix}ods_${sourceTable}"
+    val adsListTable = s"${prefix}ads_${sourceTable}_list"
+    val adsDetailTable = s"${prefix}ads_${sourceTable}_detail"
+    val phxListTable = s"${sourceTable}_LIST".toUpperCase
+    val phxDetailTable = s"${sourceTable}_DETAIL".toUpperCase
+    val ds = BaseUtil.getPartion(odsTable, spark)
+    //table字段
+    val odsListColumns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds")) diff delCols
+    val adsListColumns: Seq[String] = spark.table(adsListTable).schema.map(_.name).filter(!_.equals("ds"))
+    val adsDetailColumns: Seq[String] = spark.table(adsDetailTable).schema.map(_.name).filter(!_.equals("ds"))
+
+    sql(s"select ${odsListColumns.mkString(",")} from $odsTable where ds = $ds and cids is not null")
+      //      .drop(delCols.mkString(","))
+      .dropDuplicates(dupCols)
+      .createOrReplaceTempView("t1")
+
+    sql(s"CACHE TABLE t1")
+
+    //打平新表
+    val df1 = sql(
+      s"""
+         |SELECT  CONCAT_WS("_",c.cid,c.id) AS rowkey,${adsListColumns.filter(i => i != "cid" && i != "rowkey").mkString(",")},${dupCols.mkString(",")}
+         |        ,coalesce(d.new_cid,c.cid) as cid
+         |FROM    (
+         |            SELECT  *
+         |                    ,cids
+         |            FROM    t1 a
+         |            LATERAL VIEW explode(split(cids, ';')) b AS cid
+         |        ) c
+         |LEFT JOIN $companyMapping d
+         |ON      c.cid = d.cid
+         |""".stripMargin).dropDuplicates(dupCols.:+("cid"))
+    df1.columns.foldLeft(df1) {
+      (currentDF, column) => currentDF.withColumn(column, col(column).cast("string"))
+    }.createOrReplaceTempView(s"t2")
+
+    //折叠新cids
+    val df2 = sql(
+      s"""
+         |SELECT t1.id,x.cids,${adsDetailColumns.filter(i => i != "id" && i != "cids").mkString(",")}
+         |FROM    t1
+         |LEFT JOIN (
+         |              SELECT  id
+         |                      ,concat_ws(';',collect_set(cid)) cids
+         |              FROM    t2
+         |              GROUP BY id
+         |          ) x
+         |ON      t1.id = x.id
+         |""".stripMargin)
+
+    df2.columns.foldLeft(df2) {
+      (currentDF, column) => currentDF.withColumn(column, col(column).cast("string"))
+    }.createOrReplaceTempView("t3")
+
+    //写入ads表,local模式下into,集群模式下改overwrite
+    sql(s"insert into table ${adsListTable} partition (ds=${ds}) select ${adsListColumns.mkString(",")} from t2")
+    sql(s"insert into table ${adsDetailTable} partition (ds=${ds}) select ${adsDetailColumns.mkString(",")} from t3")
+    println(s"${this.getClass.getSimpleName} ads end! " + new Date().toString)
+
+    // 写入PHX的LIST表
+    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
+    val connProp = PhoenixUtil.getPhoenixProperties
+    df1.persist()
+    sql(s"SELECT ${adsListColumns.mkString(",")} FROM t2")
+      .save2PhoenixByJDBC(s"${phxListTable}")
+
+    sql(s"SELECT ${adsDetailColumns.filter(!_.equals("ROWKEY")).mkString(",")},id AS ROWKEY FROM t3")
+      .save2PhoenixByJDBC(s"${phxDetailTable}")
+    println(s"${this.getClass.getSimpleName} phx end! " + new Date().toString)
+  }
+
+  def copySolution2ads4Type1(prefix: String): Unit = {
+    println(s"${this.getClass.getSimpleName} type 1 ods start! " + new Date().toString)
+    val odsTable = s"${prefix}ods_${sourceTable}"
+    val dwdTable = s"${prefix}dwd_${sourceTable}"
+    val adsTable = s"${prefix}ads_${sourceTable}"
+    val phxTable = s"${sourceTable}".toUpperCase
+//    val dsNow = "20200608" //测试用数据
+    val dsNow = BaseUtil.getPartion(odsTable, spark)
+    val dsOld = BaseUtil.getPartion(s"ods_${sourceTable}", spark)
+    //table字段
+    val odsColumns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds")) diff delCols
+    if(odsColumns.contains("cids")){
+      println(s"${this.getClass.getSimpleName} type 1 action dismatch the ods table contain cids! " + new Date().toString)
+      return
+    }
+    val adsColumns: Seq[String] = spark.table(adsTable).schema.map(_.name).filter(!_.equals("ds"))
+
+    sql(s"select cid,current_cid from inc_ods_${companyTable} where ds = $dsNow and cid is not null and current_cid is not null")
+      .dropDuplicates("cid")
+      .createOrReplaceTempView("t1")
+
+    //维度本次增量数据
+    val df1 = sql(
+      s"""
+         |SELECT  CONCAT_WS("_",A.cid,A.id) AS rowkey,A.id,coalesce  (B.current_cid,A.cid) as cid,${odsColumns.filter(i => i != "id" && i != "cid").mkString(",")}
+         |FROM    ${odsTable} A
+         |LEFT JOIN t1 B
+         |ON      A.cid = B.cid
+         |WHERE   A.cid IS NOT NULL
+         |""".stripMargin)
+      .union(sql(
+        s"""
+           |SELECT  CONCAT_WS("_",C.cid,D.id) AS rowkey,C.current_cid as cid,${odsColumns.filter(!_.equals("cid")).mkString(",")}
+           |FROM(
+           |    SELECT cid,current_cid
+           |    FROM t1 A
+           |    LEFT ANTI JOIN ${odsTable} B
+           |    ON      A.cid = B.cid
+           |    WHERE   cid IS NOT NULL
+           |    ) C
+           |LEFT JOIN ods_${sourceTable} D
+           |ON C.cid=D.cid
+           |WHERE D.ds=$dsOld
+           |""".stripMargin)
+      )
+      .dropDuplicates(dupCols.:+("cid"))
+      .createOrReplaceTempView(s"t2")
+
+    //写入ads表,本地执行用into,集群用overwrite
+    sql(s"""insert overwrite table  ${adsTable} partition (ds='$dsNow') select ${adsColumns.mkString(",")} from t2""")
+    println(s"${this.getClass.getSimpleName} ads end! " + new Date().toString)
+  }
+}

+ 31 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/IncDataCopySolution4Type1.scala

@@ -0,0 +1,31 @@
+package com.winhc.bigdata.spark.jobs.increment
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+/**
+ * @Author: Yan Yongina
+ * @Date: 2020/6/19 14:04
+ * @Description:Type1类型的增量数据复制方案:无cids的单表维度操作
+ */
+object IncDataCopySolution4Type1 {
+  def main(args:Array[String]):Unit={
+    if (args.length < 1) {
+      println("please enter args:")
+      println("1:source table")
+      println("2:duplicate cols")
+      println("3:delete cols")
+      return
+    }
+    val sourceTable=args(0)
+    val dupCols=args(1).split(",")
+    val delCols=args(2).split(",")
+    val spark = SparkUtils.InitEnv("company_env_punishment inc's data from ods to phoenix", null)
+    CommonTableOps(spark,
+      sourceTable,
+//      Array("source_url"),
+      dupCols,
+//      Array("_update_cids","_update_id","_update_type","_update_createtime","_update_table","_update_updatefields","_update_primary_id"))
+      delCols,
+      .copySolution2ads4Type1("inc_")
+    spark.stop()
+  }
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/obsolete/inc_phx_company_bid_ods.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.jobs.increment.obsolete
+
+import com.winhc.bigdata.spark.jobs.increment.CommonTableOps
+import com.winhc.bigdata.spark.utils.SparkUtils
+
+object inc_phx_company_bid_ods {
+  def main(args:Array[String]):Unit={
+    val spark = SparkUtils.InitEnv("company_bid inc's data from ods to phoenix", null)
+    CommonTableOps(spark,
+      "company_bid",
+      Array("link"),
+      Array("_update_cids","_update_id","_update_type","_update_createtime","_update_table","_update_updatefields","_update_primary_id"))
+      .ods2ads2phx4flat("inc_")
+    spark.stop()
+  }
+}

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/obsolete/inc_phx_company_env_punishment_ods.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.jobs.increment.obsolete
+
+import com.winhc.bigdata.spark.jobs.increment.CommonTableOps
+import com.winhc.bigdata.spark.utils.SparkUtils
+
+object inc_phx_company_env_punishment_ods {
+  def main(args:Array[String]):Unit={
+    val spark = SparkUtils.InitEnv("company_env_punishment inc's data from ods to phoenix", null)
+    CommonTableOps(spark,
+      "company_env_punishment",
+      Array("source_url"),
+      Array("_update_cids","_update_id","_update_type","_update_createtime","_update_table","_update_updatefields","_update_primary_id"))
+      .ods2ads2phx("inc_")
+    spark.stop()
+  }
+}