|
@@ -0,0 +1,74 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.increment
|
|
|
+
|
|
|
+import java.util.Date
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.jobs.CompanyForCid.valid
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author yyn
|
|
|
+ * @Date 2020/7/15
|
|
|
+ * @Description TODO
|
|
|
+ */
|
|
|
+object inc_phx_cid_ads extends LoggingUtils {
|
|
|
+ var config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ var hasList: Boolean = false
|
|
|
+ if (args.length < 1) {
|
|
|
+ println("请输入:1、表名;2、是否有list表 ")
|
|
|
+ sys.exit(-1)
|
|
|
+ } else if (args.length == 2) {
|
|
|
+ hasList = args(1).toBoolean
|
|
|
+ }
|
|
|
+ val sourceTable = args(0)
|
|
|
+ val adsTable = s"inc_ads_$sourceTable"
|
|
|
+ val phxTable = sourceTable.toUpperCase
|
|
|
+ val adsColumns: Seq[String] = spark.table(adsTable).schema.map(_.name).filter(!_.equals("ds"))
|
|
|
+ import com.winhc.bigdata.spark.implicits.PhoenixHelper._
|
|
|
+ //处理主表
|
|
|
+ //增量ads最后一个分区
|
|
|
+ val lastDsIncAds = BaseUtil.getPartion(adsTable, spark)
|
|
|
+ val df1 = sql(s"""SELECT ${adsColumns.mkString(",")} FROM ${adsTable} WHERE ds=${lastDsIncAds}""")
|
|
|
+ if (hasList) {
|
|
|
+ df1.columns.foldLeft(df1) {
|
|
|
+ (currentDF, column) =>
|
|
|
+ currentDF.withColumn(column, col(column).cast("String"))
|
|
|
+ }.drop("cids").drop("flag")
|
|
|
+ .withColumnRenamed("new_cids", "cids")
|
|
|
+ .createOrReplaceTempView(s"tmp")
|
|
|
+ sql(s"""SELECT id AS ROWKEY,* FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
|
|
|
+ println(s"${this.getClass.getSimpleName} phx main table writed! " + new Date().toString)
|
|
|
+ //处理list表
|
|
|
+ val adsListColumns: Seq[String] = spark.table(adsTable + "_list").schema.map(_.name).filter(!_.equals("ds"))
|
|
|
+ val lastDsIncAdsList = BaseUtil.getPartion(adsTable + "_list", spark)
|
|
|
+ val df2 = sql(s"""SELECT ${adsListColumns.mkString(",")} FROM ${adsTable}_list WHERE ds=${lastDsIncAdsList}""")
|
|
|
+ df2.columns.foldLeft(df2) {
|
|
|
+ (currentDF, column) =>
|
|
|
+ currentDF.withColumn(column, col(column).cast("String"))
|
|
|
+ }.drop("cid")
|
|
|
+ .withColumnRenamed("new_cid", "cid")
|
|
|
+ .createOrReplaceTempView(s"tmp2")
|
|
|
+ sql(s"""SELECT * FROM tmp2""").repartition(200).save2PhoenixByJDBC(s"${phxTable}_LIST")
|
|
|
+ println(s"${this.getClass.getSimpleName} phx list table writed! " + new Date().toString)
|
|
|
+ } else {
|
|
|
+ df1.columns.foldLeft(df1) {
|
|
|
+ (currentDF, column) =>
|
|
|
+ currentDF.withColumn(column, col(column).cast("String"))
|
|
|
+ }.drop("cid").drop("flag")
|
|
|
+ .withColumnRenamed("new_cid", "cid")
|
|
|
+ .createOrReplaceTempView(s"tmp")
|
|
|
+ sql(s"""SELECT * FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
|
|
|
+ println(s"${this.getClass.getSimpleName} phx single table writed! " + new Date().toString)
|
|
|
+ }
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|