Browse Source

增量融合修改

xufei 4 years ago
parent
commit
47f40b9354

+ 35 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -12,13 +12,32 @@ import scala.collection.mutable
  */
 object CompanyIncrCombineUtils {
   def main(args: Array[String]): Unit = {
-    val Array(project, source, target) = args
+    var project = ""
+    var source = ""
+    var target = ""
+    var flag = "0"
+    if (args.length == 4) {
+      val Array(project1, source1, target1, flag1) = args
+      project = project1
+      source = source1
+      target = target1
+      flag = flag1
+    } else if (args.length == 3) {
+      val Array(project1, source1, target1) = args
+      project = project1
+      source = source1
+      target = target1
+    } else {
+      println("please set project, source, target, flag")
+      sys.exit(-1)
+    }
 
     println(
       s"""
          |project:$project
          |source:$source
          |target:$target
+         |flag:$flag
          |""".stripMargin)
 
     val config = mutable.Map(
@@ -26,17 +45,26 @@ object CompanyIncrCombineUtils {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanyIncrCombineUtils(spark, source, target).calc()
+    CompanyIncrCombineUtils(spark, source, target, flag).calc()
     spark.stop()
   }
 }
 
-case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String, flag: String = "0" //0=>插入目标表 1=>插入源表分区
+                                  ) extends LoggingUtils {
   override protected val spark: SparkSession = s
 
   def calc(): Unit = {
 
-    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    var ds2 = ""
+    if (flag.equals("1")) {
+      ds2 = BaseUtil.getPartion(s"$source", spark) //源表分区
+    } else {
+      ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    }
+    if (StringUtils.isBlank(ds2)) {
+      ds2 = BaseUtil.getYesterday()
+    }
 
     val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
       !s.equals("ds")
@@ -48,14 +76,14 @@ case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: Stri
          |select max(ds) max_ds from $target where id = -1 and ds > '0'
          |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
 
-    println(s"list: $list")
+    println(s"list: $list" + s", ds: $ds2")
 
     sql(
       s"""
-         |INSERT into table $target PARTITION(ds=$ds2)
+         |INSERT ${if (flag.equals("0")) "INTO" else "OVERWRITE"} table $target PARTITION(ds=$ds2)
          |SELECT ${cols.mkString(",")} from
          |$source
          |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
-         |""".stripMargin)
+         |""".stripMargin).show(100)
   }
 }