ソースを参照

移除id输出

xufei 4 年 前
コミット
b78a34dbea

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_back.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:删除数据回流处理
+ * @author π
+ * @date 2021/3/15 10:59
+ */
+
+case class inc_human_relation_back(s: SparkSession,
+                                   project: String, //表所在工程名
+                                   ds: String //分区
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  val inc_ods_company_human_relation_merge = "winhc_ng.inc_ods_company_human_relation_merge"
+  val inc_ads_company_human_relation_back_deleted = "winhc_ng.inc_ads_company_human_relation_back_deleted"
+
+  def calc() = {
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_back_deleted PARTITION (ds='$ds')
+        |SELECT  person_name
+        |        ,person_id
+        |        ,create_time
+        |        ,update_time
+        |        ,deleted
+        |FROM    $inc_ods_company_human_relation_merge
+        |WHERE   ds = '$ds'
+        |AND     length(company_id) = 0   AND   length(person_id) = 33
+        |""".stripMargin)
+  }
+
+}
+
+object inc_human_relation_back {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_human_relation_back(s = spark, project = project, ds = ds)
+    re.calc()
+    spark.stop()
+  }
+}