瀏覽代碼

更新mongodb

许家凯 5 年之前
父節點
當前提交
d83bc175d2

+ 20 - 29
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.mongodb.spark.MongoSpark
 import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
 import com.winhc.bigdata.spark.utils.SparkUtils
-import org.bson.Document
+import org.apache.commons.logging.LogFactory
 
 import scala.collection.mutable
 
@@ -13,43 +13,34 @@ import scala.collection.mutable
  * @Description: 单表写入摘要
  */
 object CompanySummaryBySingle {
+  private val log = LogFactory.getLog(this.getClass)
+
   def main(args: Array[String]): Unit = {
-    /* if (args.length < 1) {
-       sys.exit(-1)
-     }
-     val tableName = args(0)*/
+    if (args.length < 1) {
+      log.error("请输入表名!")
+      sys.exit(-1)
+    }
+    val tableName = args(0)
     val map = mutable.Map[String, String](
       "spark.mongodb.output.uri" -> "mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw.company_summary?replicaSet=mgset-6501997"
       , "spark.hadoop.odps.cupid.vpc.domain.list" -> "{\"regionId\":\"cn-shanghai\",\"vpcs\":[{\"vpcId\":\"vpc-11hby9xee\",\"zones\":[{\"urls\":[{\"domain\":\"dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com\",\"port\":3717}]}]}]}"
     )
 
-    val spark = SparkUtils.InitEnv("CompanySummaryCalculator", map)
+    val spark = SparkUtils.InitEnv("CompanySummaryBySingle", map)
     import spark._
 
-    /*  var df = sql("select * from ads_company_summary limit 100")
-      val lookup = Map(
-        "company_id" -> "_id"
-        , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
-        , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
-      )
-
-      for (elem <- lookup) {
-        if (df.columns.contains(elem._1)) {
-          df = df.withColumnRenamed(elem._1, elem._2)
-        }
-      }*/
-
-
-    val document1 = new Document()
-    document1.append("name", "sunshangxiang").append("age", 18).append("sex", "female")
-    val document2 = new Document()
-    document2.append("name", "diaochan").append("age", 24).append("sex", "female")
-    val document3 = new Document()
-    document3.append("name", "huangyueying").append("age", 23).append("sex", "female")
-
-    val seq = Seq(document1, document2, document3)
-    val df = spark.sparkContext.parallelize(seq)
+    var df = sql(getSummarySql(tableName, "company_id"))
+    val lookup = Map(
+      "company_id" -> "_id"
+      , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
+      , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
+    )
 
+    for (elem <- lookup) {
+      if (df.columns.contains(elem._1)) {
+        df = df.withColumnRenamed(elem._1, elem._2)
+      }
+    }
 
     MongoSpark.save(df)
     spark.stop()

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryCalculator.scala

@@ -1,8 +1,8 @@
 package com.winhc.bigdata.spark.jobs
 
+import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
 import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.spark.sql.DataFrame
-import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
 
 /**
  * @Author: XuJiakai