فهرست منبع

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

许家凯 4 سال پیش
والد
کامیت
3cfbd0efe5
1فایلهای تغییر یافته به همراه55 افزوده شده و 0 حذف شده
  1. 55 0
      src/main/scala/com/winhc/bigdata/spark/etl/dwd_company_bid_ods.scala

+ 55 - 0
src/main/scala/com/winhc/bigdata/spark/etl/dwd_company_bid_ods.scala

@@ -0,0 +1,55 @@
+package com.winhc.bigdata.spark.etl
+
+import com.winhc.bigdata.spark.jobs.CompanyInfoCalculator.prepare
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+object dwd_company_bid_ods {
+  def main(args: Array[String]): Unit = {
+
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    import spark._
+    println("ETL start!   ")
+
+    prepare(spark)
+
+    val sourceTable = "ods_company_bid"
+    val resultTable = "dwd_company_bid"
+
+    val df = sql(s"SELECT  * FROM    ${sourceTable}".stripMargin)
+
+    import org.apache.spark.sql.functions._
+    df.dropDuplicates("link").withColumn("cids",explode(split(col("cids"), ";")))
+      .select("id","cids","title","link","intro","abs","publish_time","purchaser","proxy","province","base","type","items","create_time","update_time","deleted")
+      .write.mode("Append")
+      .insertInto(resultTable)
+
+    println("ETL end!   ")
+
+    spark.stop();
+  }
+
+}