瀏覽代碼

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	pom.xml
许家凯 4 年之前
父節點
當前提交
06d991e03c

+ 23 - 1
pom.xml

@@ -89,7 +89,25 @@
             <groupId>org.mongodb.spark</groupId>
             <artifactId>mongo-spark-connector_2.11</artifactId>
             <version>2.3.3</version>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>cupid-sdk</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
+            <version>${cupid.sdk.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.mongodb.spark</groupId>
+            <artifactId>mongo-spark-connector_2.11</artifactId>
+            <version>2.4.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mongodb</groupId>
+            <artifactId>mongo-java-driver</artifactId>
+            <version>3.4.2</version>
     </dependencies>
 
     <build>
@@ -119,6 +137,10 @@
                                     <include>cn.hutool:*</include>
                                     <include>org.mongodb:*</include>
                                     <include>org.mongodb.spark:*</include>
+                                    <include>com.aliyun.odps:*</include>
+                                    <include>org.mongodb.*:*</include>
+                                    <include>org.mongodb:mongo-java-driver</include>
+<!--                                    <include>com.aliyun.odps:odps-spark-datasource_2.11:*</include>-->
                                 </includes>
                             </artifactSet>
                             <filters>
@@ -171,4 +193,4 @@
         </plugins>
     </build>
 
-</project>
+</project>

+ 106 - 83
src/main/java/com/winhc/bigdata/calc/DimScore.java

@@ -84,41 +84,45 @@ public class DimScore {
             return 6;
         }
         double score = 0;
-        switch (industry) {
-            case "批发和零售业":
-                score = 4;
-                break;
-            case "农、林、牧、渔业":
-                score = 5;
-                break;
-            case "居民服务、修理和其他服务业":
-            case "信息传输、软件和信息科技服务业":
-                score = 7;
-                break;
-            case "科学研究和技术服务业":
-            case "住宿和餐饮业":
-            case "教育":
-            case "金融业":
-            case "采矿业":
-            case "公共管理、社会保障和社会组织":
-                score = 8;
-                break;
-            case "文化、教育和娱乐业":
-            case "房地产业":
-                score = 9;
-                break;
-            case "制造业":
-            case "建筑业":
-            case "交通运输、仓储和邮政业":
-            case "水利、环境和公共设置管理业":
-            case "卫生和社会工作":
-            case "电力、热力、燃气及谁生产和供应业":
-            case "国际组织":
-                score = 10;
-                break;
-            default:
-                score = 6;
-                break;
+        try {
+            switch (industry) {
+                case "批发和零售业":
+                    score = 4;
+                    break;
+                case "农、林、牧、渔业":
+                    score = 5;
+                    break;
+                case "居民服务、修理和其他服务业":
+                case "信息传输、软件和信息科技服务业":
+                    score = 7;
+                    break;
+                case "科学研究和技术服务业":
+                case "住宿和餐饮业":
+                case "教育":
+                case "金融业":
+                case "采矿业":
+                case "公共管理、社会保障和社会组织":
+                    score = 8;
+                    break;
+                case "文化、教育和娱乐业":
+                case "房地产业":
+                    score = 9;
+                    break;
+                case "制造业":
+                case "建筑业":
+                case "交通运输、仓储和邮政业":
+                case "水利、环境和公共设置管理业":
+                case "卫生和社会工作":
+                case "电力、热力、燃气及谁生产和供应业":
+                case "国际组织":
+                    score = 10;
+                    break;
+                default:
+                    score = 6;
+                    break;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
@@ -133,42 +137,52 @@ public class DimScore {
 //        if ("1".equals(isOnStock) || (econKind.contains("非上市") && econKind.contains("上市"))) {
 //            extScore = 8;
 //        }
-        if (StrUtil.isBlank(econKind)) {
-            return 6;
-        } else if (StrUtil.containsAny(econKind, "普通合伙")) {
-            score = 6;
-        } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
-            score = 7;
-        } else if (econKind.contains("集体所有制")) {
-            score = 8;
-        } else if (StrUtil.containsAny(econKind, "独资企业")) {
-            score = 10;
-        } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
-            score = 12;
-        } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
-            score = 13;
-        } else if (econKind.contains("股份有限公司")) {
-            score = 15;
-        } else if (econKind.contains("有限责任公司")) {
-            score = 10;
-        } else {
-            return 9;
+        try {
+
+            if (StrUtil.isBlank(econKind)) {
+                return 6;
+            } else if (StrUtil.containsAny(econKind, "普通合伙")) {
+                score = 6;
+            } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
+                score = 7;
+            } else if (econKind.contains("集体所有制")) {
+                score = 8;
+            } else if (StrUtil.containsAny(econKind, "独资企业")) {
+                score = 10;
+            } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
+                score = 12;
+            } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
+                score = 13;
+            } else if (econKind.contains("股份有限公司")) {
+                score = 15;
+            } else if (econKind.contains("有限责任公司")) {
+                score = 10;
+            } else {
+                return 9;
+            }
+
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
 
     public static double companyStatusEvaluate(String status) {
-        if(StrUtil.isBlank(status)){
+        if (StrUtil.isBlank(status)) {
             return 10f;
         }
-        if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
-            return 25;
-        } else if (StrUtil.containsAny(status, "停业")) {
-            return 5;
-        } else if (StrUtil.containsAny(status, "吊销", "清算")) {
-            return 0;
-        } else if (StrUtil.equals(status, "注销")) {
-            return 0;
+        try {
+            if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
+                return 25;
+            } else if (StrUtil.containsAny(status, "停业")) {
+                return 5;
+            } else if (StrUtil.containsAny(status, "吊销", "清算")) {
+                return 0;
+            } else if (StrUtil.equals(status, "注销")) {
+                return 0;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 10f;
     }
@@ -226,29 +240,38 @@ public class DimScore {
         if (StrUtil.isBlank(address)) {
             return 2d;
         }
-        if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
-                || address.contains("浙江")) {
-            return 7d;
-        } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
-                address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
-                address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
-            return 5d;
-        } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
-                address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
-                address.contains("海南")) {
-            return 4d;
-        } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
-            return 3d;
-        } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
-            return 1d;
+        try {
+            if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
+                    || address.contains("浙江")) {
+                return 7d;
+            } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
+                    address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
+                    address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
+                return 5d;
+            } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
+                    address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
+                    address.contains("海南")) {
+                return 4d;
+            } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
+                return 3d;
+            } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
+                return 1d;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 2d;
     }
 
     public static void main(String[] args) {
-        System.out.println(DimScore.actualEvaluate("1", "3"));
-        System.out.println(DimScore.actualEvaluate("1", "0"));
-        System.out.println(DimScore.registEvaluate("400 "));
-        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+//        System.out.println(DimScore.actualEvaluate("1", "3"));
+//        System.out.println(DimScore.actualEvaluate("1", "0"));
+//        System.out.println(DimScore.registEvaluate("400 "));
+//        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+
+        System.out.println(new Date().toString());
+        System.out.println(DimScore.industryEvaluate(null));
+        System.out.println(DimScore.industryEvaluate(""));
+        System.out.println(DimScore.industryEvaluate("教育"));
     }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala

@@ -27,7 +27,7 @@ object TestSparkSql {
     throw new RuntimeException("warehouse dir not exists")
   }
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv("appName")
+    val spark: SparkSession = SparkUtils.InitEnv
 
     import spark._
     val tableName = "ods_company_all"

+ 24 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculator.scala

@@ -5,15 +5,37 @@ import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.commons.logging.LogFactory
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object CompanyInfoCalculator extends CompanyMapping {
 
   private val LOG = LogFactory.getLog(this.getClass)
 
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv("CompanyInfoCalculator")
+
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     import spark._
-    LOG.info("company calc start!   ")
     println("company calc start!   ")
 
     prepare(spark)
@@ -50,7 +72,6 @@ object CompanyInfoCalculator extends CompanyMapping {
 
     df.write.mode("overwrite").insertInto(resultTable)
 
-    LOG.info("company calc end!   ")
     println("company calc end!   ")
 
     spark.stop();

+ 44 - 0
src/main/scala/com/winhc/bigdata/spark/summary/ProbidCalculator.scala

@@ -0,0 +1,44 @@
+package com.winhc.bigdata.spark.summary
+
+import com.mongodb.spark.MongoSpark
+import com.winhc.bigdata.spark.jobs.CompanyInfoCalculator.{LOG, prepare}
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.commons.logging.LogFactory
+import org.apache.spark.sql.{SaveMode, SparkSession}
+import org.apache.spark.sql.SparkSession
+
+object ProbidCalculator {
+  private val LOG = LogFactory.getLog(this.getClass)
+
+  def main(args: Array[String]): Unit = {
+    val database = "itslaw"
+    val collection = "probid_commonpro"
+//    val host = "dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997"
+    val host = "dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997"
+    val outPutUri = s"mongodb://itslaw:itslaw_168@$host"
+    val spark: SparkSession = SparkUtils.InitEnvRaw("ProbidCalculator")
+      .config("spark.mongodb.input.uri", outPutUri)
+      .config("spark.mongodb.input.collection", collection)
+      .config("spark.mongodb.output.uri", outPutUri)
+      .config("spark.mongodb.output.database",database)
+      .config("spark.mongodb.output.collection",collection)
+      .config("spark.hadoop.odps.cupid.smartnat.enable",true)
+      .getOrCreate()
+
+    LOG.info("probid calc start!")
+    println("probid calc start!")
+
+    val srcTableName = "ods_probid_commonpro_winbidding_companylist"
+
+    val df = spark.sql(s"SELECT  company_id,company_name,SUM(winbidding_id) FROM ${srcTableName} GROUP BY company_id,company_name LIMIT 100".stripMargin)
+    MongoSpark.save(
+      df
+        .write
+        .mode(SaveMode.Append)
+    )
+
+    LOG.info("probid calc end!")
+    println("probid calc end!")
+    spark.stop();
+  }
+}

+ 22 - 1
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -2,13 +2,15 @@ package com.winhc.bigdata.spark.utils
 
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object SparkUtils {
 
   def InitEnv(appName: String): SparkSession = {
     InitEnv(appName, null)
   }
 
-  def InitEnv(appName: String, config: Map[String, String]): SparkSession = {
+  def InitEnv(appName: String, config: mutable.Map[String, String]): SparkSession = {
     val spark = SparkSession
       .builder()
       .appName(appName)
@@ -22,6 +24,7 @@ object SparkUtils {
       .config("spark.sql.catalogImplementation", "odps")
       .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
       .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
+      .config("spark.hadoop.odps.cupid.vectorization.enable", false)
 
     if (System.getProperty("os.name").contains("Windows")) {
       spark.master("local[*]")
@@ -33,5 +36,23 @@ object SparkUtils {
     }
     spark.getOrCreate()
   }
+  def InitEnvRaw(appName: String) = {
+    val spark = SparkSession
+      .builder()
+      .appName(appName)
+      .config("spark.sql.broadcastTimeout", 20 * 60)
+      .config("spark.sql.crossJoin.enabled", true)
+      .config("odps.exec.dynamic.partition.mode", "nonstrict")
+      .config("spark.hadoop.odps.project.name", "winhc_test_dev")
+      .config("spark.hadoop.odps.access.id", "LTAI4G4n7pAW8tUbJVkkZQPD")
+      .config("spark.hadoop.odps.access.key", "uNJOBskzcDqHq1TYG3m2rebR4c1009")
+      .config("spark.sql.catalogImplementation", "odps")
+      .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
+      .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
 
+    if (System.getProperty("os.name").contains("Windows")) {
+      spark.master("local[*]")
+    }
+    spark
+  }
 }