yongnian %!s(int64=4) %!d(string=hai) anos
pai
achega
3b29b50a23

+ 12 - 1
pom.xml

@@ -84,9 +84,18 @@
             <artifactId>odps-common-local</artifactId>
             <version>0.33.7-public</version>
         </dependency>
+        <dependency>
+            <groupId>org.mongodb.spark</groupId>
+            <artifactId>mongo-spark-connector_2.11</artifactId>
+            <version>2.4.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>3.4.2</version>
+        </dependency>
     </dependencies>
 
-
     <build>
         <plugins>
             <plugin>
@@ -106,6 +115,8 @@
                                 <includes>
                                     <!--<include>*:*</include>-->
                                     <include>cn.hutool:*</include>
+                                    <include>org.mongodb.*:*</include>
+                                    <include>org.mongodb:mongo-java-driver</include>
                                 </includes>
                             </artifactSet>
                             <filters>

+ 44 - 0
src/main/scala/com/winhc/bigdata/spark/summary/ProbidCalculator.scala

@@ -0,0 +1,44 @@
+package com.winhc.bigdata.spark.summary
+
+import com.mongodb.spark.MongoSpark
+import com.winhc.bigdata.spark.jobs.CompanyInfoCalculator.{LOG, prepare}
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.commons.logging.LogFactory
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.SparkSession
+
+object ProbidCalculator {
+  private val LOG = LogFactory.getLog(this.getClass)
+
+  def main(args: Array[String]): Unit = {
+    val database = "itslaw"
+    val collection = "probid_commonpro"
+//    val host = "dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997"
+    val host = "dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997"
+    val outPutUri = s"mongodb://itslaw:itslaw_168@$host"
+    val spark: SparkSession = SparkUtils.InitEnvRaw("ProbidCalculator")
+      .config("spark.mongodb.input.uri", outPutUri)
+      .config("spark.mongodb.input.collection", collection)
+      .config("spark.mongodb.output.uri", outPutUri)
+      .config("spark.mongodb.output.database",database)
+      .config("spark.mongodb.output.collection",collection)
+      .config("spark.hadoop.odps.cupid.smartnat.enable",true)
+      .getOrCreate()
+
+    LOG.info("probid calc start!")
+    println("probid calc start!")
+
+    val srcTableName = "ods_probid_commonpro_winbidding_companylist"
+
+    val df = spark.sql(s"SELECT  company_id,company_name,SUM(winbidding_id) FROM ${srcTableName} GROUP BY company_id,company_name LIMIT 100".stripMargin)
+    MongoSpark.save(
+      df
+        .write
+        .mode(SaveMode.Append)
+    )
+
+    LOG.info("probid calc end!")
+    println("probid calc end!")
+    spark.stop();
+  }
+}

+ 18 - 0
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -23,5 +23,23 @@ object SparkUtils {
     }
     spark.getOrCreate()
   }
+  def InitEnvRaw(appName: String) = {
+    val spark = SparkSession
+      .builder()
+      .appName(appName)
+      .config("spark.sql.broadcastTimeout", 20 * 60)
+      .config("spark.sql.crossJoin.enabled", true)
+      .config("odps.exec.dynamic.partition.mode", "nonstrict")
+      .config("spark.hadoop.odps.project.name", "winhc_test_dev")
+      .config("spark.hadoop.odps.access.id", "LTAI4G4n7pAW8tUbJVkkZQPD")
+      .config("spark.hadoop.odps.access.key", "uNJOBskzcDqHq1TYG3m2rebR4c1009")
+      .config("spark.sql.catalogImplementation", "odps")
+      .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
+      .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
 
+    if (System.getProperty("os.name").contains("Windows")) {
+      spark.master("local[*]")
+    }
+    spark
+  }
 }