xufei 4 лет назад
Родитель
Сommit
383b6c5477
1 измененных файлов с 63 добавлено и 0 удалено
  1. 63 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCidChange.scala

+ 63 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCidChange.scala

@@ -0,0 +1,63 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object CompanyCidChange {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyCidChange(spark, project).calc
+    spark.stop()
+  }
+
+}
+
+case class CompanyCidChange(s: SparkSession,
+                            project: String //表所在工程名
+                           ) extends LoggingUtils with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  def calc = {
+    val ods_company = s"$project.inc_ods_company"
+
+    val ds = BaseUtil.getPartion(ods_company, spark)
+
+    val env = "prod"
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.ads_company_cid_change PARTITION(ds='$ds')
+         |SELECT  old_cid
+         |        ,old_company_name
+         |        ,new_cid
+         |        ,new_company_name
+         |        ,STATUS
+         |        ,DATE
+         |FROM    (
+         |            SELECT  cid AS old_cid
+         |                    ,name AS old_company_name
+         |                    ,current_cid AS new_cid
+         |                    ,NULL AS new_company_name
+         |                    ,0 AS STATUS
+         |                    ,SUBSTR(update_time,1,10) DATE
+         |                    ,ROW_NUMBER() OVER(PARTITION BY cid,current_cid ORDER BY update_time) num
+         |            FROM    $ods_company
+         |            WHERE   ds = '$ds'
+         |            AND     cid IS NOT NULL
+         |            AND     current_cid IS NOT NULL
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
+  }
+}