许家凯 пре 4 година
родитељ
комит
da73f5b922

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala

@@ -27,7 +27,7 @@ object TestSparkSql {
     throw new RuntimeException("warehouse dir not exists")
   }
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv
+    val spark: SparkSession = SparkUtils.InitEnv("test")
 
     import spark._
     val tableName = "ods_company_all"

+ 26 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -1,7 +1,9 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.mongodb.spark.MongoSpark
+import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
 import com.winhc.bigdata.spark.utils.SparkUtils
+import org.bson.Document
 
 import scala.collection.mutable
 
@@ -24,20 +26,32 @@ object CompanySummaryBySingle {
     val spark = SparkUtils.InitEnv("CompanySummaryCalculator", map)
     import spark._
 
-    var df = sql("select * from ads_company_summary limit 100")
-    val lookup = Map(
-      "company_id" -> "_id"
-      , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
-      , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
-    )
+    /*  var df = sql("select * from ads_company_summary limit 100")
+      val lookup = Map(
+        "company_id" -> "_id"
+        , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
+        , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
+      )
+
+      for (elem <- lookup) {
+        if (df.columns.contains(elem._1)) {
+          df = df.withColumnRenamed(elem._1, elem._2)
+        }
+      }*/
+
+
+    val document1 = new Document()
+    document1.append("name", "sunshangxiang").append("age", 18).append("sex", "female")
+    val document2 = new Document()
+    document2.append("name", "diaochan").append("age", 24).append("sex", "female")
+    val document3 = new Document()
+    document3.append("name", "huangyueying").append("age", 23).append("sex", "female")
+
+    val seq = Seq(document1, document2, document3)
+    val df = spark.sparkContext.parallelize(seq)
 
-    for (elem <- lookup) {
-      if (df.columns.contains(elem._1)) {
-        df = df.withColumnRenamed(elem._1, elem._2)
-      }
-    }
 
-    MongoSpark.save(df.write)
+    MongoSpark.save(df)
     spark.stop()
   }
 }