许家凯 4 سال پیش
والد
کامیت
9b18b48c04

+ 2 - 0
.gitignore

@@ -7,6 +7,8 @@
 *.iws
 *.pyc
 *.pyo
+*.log
+appstatus
 .idea/
 .idea_modules/
 .settings

+ 15 - 2
pom.xml

@@ -84,9 +84,14 @@
             <artifactId>odps-common-local</artifactId>
             <version>0.33.7-public</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.mongodb.spark/mongo-spark-connector -->
+        <dependency>
+            <groupId>org.mongodb.spark</groupId>
+            <artifactId>mongo-spark-connector_2.11</artifactId>
+            <version>2.3.3</version>
+        </dependency>
     </dependencies>
 
-
     <build>
         <plugins>
             <plugin>
@@ -103,9 +108,17 @@
                             <minimizeJar>false</minimizeJar>
                             <shadedArtifactAttached>true</shadedArtifactAttached>
                             <artifactSet>
+<!--                                <excludes>-->
+<!--                                    <exclude>jmock:*</exclude>-->
+<!--                                    <exclude>*:xml-apis</exclude>-->
+<!--                                    <exclude>org.apache.maven:lib:tests</exclude>-->
+<!--                                    <exclude>log4j:log4j:jar:</exclude>-->
+<!--                                </excludes>-->
                                 <includes>
-                                    <!--<include>*:*</include>-->
+<!--                                    <include>*:*</include>-->
                                     <include>cn.hutool:*</include>
+                                    <include>org.mongodb:*</include>
+                                    <include>org.mongodb.spark:*</include>
                                 </includes>
                             </artifactSet>
                             <filters>

+ 55 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -0,0 +1,55 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.mongodb.spark.MongoSpark
+import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.bson.Document
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/5/21 11:18
+ * @Description: 单表写入摘要
+ */
+object CompanySummaryBySingle {
+  def main(args: Array[String]): Unit = {
+    /* if (args.length < 1) {
+       sys.exit(-1)
+     }
+     val tableName = args(0)*/
+    val map = Map(
+      "spark.mongodb.output.uri" -> "mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw.company_summary?replicaSet=mgset-6501997"
+      , "spark.hadoop.odps.cupid.vpc.domain.list" -> "{\"regionId\":\"cn-shanghai\",\"vpcs\":[{\"vpcId\":\"vpc-11hby9xee\",\"zones\":[{\"urls\":[{\"domain\":\"dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com\",\"port\":3717}]}]}]}"
+    )
+
+    val spark = SparkUtils.InitEnv("CompanySummaryCalculator", map)
+    import spark._
+
+  /*  var df = sql("select * from ads_company_summary limit 100")
+    val lookup = Map(
+      "company_id" -> "_id"
+      , "ads_judrisk_court_annc_list_num" -> "judrisk_court_annc_list_num"
+      , "ods_company_jud_assist_list_num" -> "company_jud_assist_list_num"
+    )
+
+    for (elem <- lookup) {
+      if (df.columns.contains(elem._1)) {
+        df = df.withColumnRenamed(elem._1, elem._2)
+      }
+    }*/
+
+
+    val document1 = new Document()
+    document1.append("name", "sunshangxiang").append("age", 18).append("sex", "female")
+    val document2 = new Document()
+    document2.append("name", "diaochan").append("age", 24).append("sex", "female")
+    val document3 = new Document()
+    document3.append("name", "huangyueying").append("age", 23).append("sex", "female")
+
+    val seq = Seq(document1, document2, document3)
+    val df = spark.sparkContext.parallelize(seq)
+
+
+    MongoSpark.save(df)
+    spark.stop()
+  }
+}

+ 3 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryCalculator.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.spark.sql.DataFrame
-
+import com.winhc.bigdata.spark.utils.CompanySummaryUtils._
 
 /**
  * @Author: XuJiakai
@@ -15,22 +15,20 @@ object CompanySummaryCalculator {
     "ods_company_jud_assist_list" -> "company_id"
   )
 
-  def getSql(tableName: String, companyIdFieldName: String) = s"select $companyIdFieldName as company_id,count(1) as ${tableName}_num from $tableName where $companyIdFieldName <>0 group by $companyIdFieldName"
 
   def main(args: Array[String]): Unit = {
     val spark = SparkUtils.InitEnv("CompanySummaryCalculator")
     import spark._
     val outputTable = "ads_company_summary"
 
-
     var selectField = List("company_id")
     var df: DataFrame = null
     for (elem <- tableName2FieldName) {
       selectField = selectField :+ elem._1 + "_num"
       if (df == null) {
-        df = sql(getSql(elem._1, elem._2))
+        df = sql(getSummarySql(elem._1, elem._2))
       } else {
-        df = df.join(sql(getSql(elem._1, elem._2)), "company_id").select(selectField.head, selectField.tail: _*)
+        df = df.join(sql(getSummarySql(elem._1, elem._2)), "company_id").select(selectField.head, selectField.tail: _*)
       }
     }
     df.write.mode("overwrite").insertInto(outputTable)

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryUtils.scala

@@ -0,0 +1,10 @@
+package com.winhc.bigdata.spark.utils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/5/21 13:52
+ * @Description:
+ */
+object CompanySummaryUtils {
+  def getSummarySql(tableName: String, companyIdFieldName: String) = s"select $companyIdFieldName as company_id,count(1) as ${tableName}_num from $tableName where $companyIdFieldName <>0 group by $companyIdFieldName"
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -14,6 +14,7 @@ object SparkUtils {
       .appName(appName)
       .config("spark.sql.broadcastTimeout", 20 * 60)
       .config("spark.sql.crossJoin.enabled", true)
+      .config("spark.hadoop.odps.cupid.smartnat.enable", true)
       .config("odps.exec.dynamic.partition.mode", "nonstrict")
       .config("spark.hadoop.odps.project.name", "winhc_test_dev")
       .config("spark.hadoop.odps.access.id", "LTAI4G4n7pAW8tUbJVkkZQPD")