Browse Source

修改部分代码,增加计算资源配置参数

xufei 4 years ago
parent
commit
e1d2b7b9c1

+ 12 - 0
pom.xml

@@ -84,6 +84,16 @@
             <artifactId>odps-common-local</artifactId>
             <version>0.33.7-public</version>
         </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>cupid-sdk</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
     </dependencies>
 
 
@@ -106,6 +116,8 @@
                                 <includes>
                                     <!--<include>*:*</include>-->
                                     <include>cn.hutool:*</include>
+                                    <include>com.aliyun.odps:*</include>
+<!--                                    <include>com.aliyun.odps:odps-spark-datasource_2.11:*</include>-->
                                 </includes>
                             </artifactSet>
                             <filters>

+ 106 - 83
src/main/java/com/winhc/bigdata/calc/DimScore.java

@@ -84,41 +84,45 @@ public class DimScore {
             return 6;
         }
         double score = 0;
-        switch (industry) {
-            case "批发和零售业":
-                score = 4;
-                break;
-            case "农、林、牧、渔业":
-                score = 5;
-                break;
-            case "居民服务、修理和其他服务业":
-            case "信息传输、软件和信息科技服务业":
-                score = 7;
-                break;
-            case "科学研究和技术服务业":
-            case "住宿和餐饮业":
-            case "教育":
-            case "金融业":
-            case "采矿业":
-            case "公共管理、社会保障和社会组织":
-                score = 8;
-                break;
-            case "文化、教育和娱乐业":
-            case "房地产业":
-                score = 9;
-                break;
-            case "制造业":
-            case "建筑业":
-            case "交通运输、仓储和邮政业":
-            case "水利、环境和公共设置管理业":
-            case "卫生和社会工作":
-            case "电力、热力、燃气及谁生产和供应业":
-            case "国际组织":
-                score = 10;
-                break;
-            default:
-                score = 6;
-                break;
+        try {
+            switch (industry) {
+                case "批发和零售业":
+                    score = 4;
+                    break;
+                case "农、林、牧、渔业":
+                    score = 5;
+                    break;
+                case "居民服务、修理和其他服务业":
+                case "信息传输、软件和信息科技服务业":
+                    score = 7;
+                    break;
+                case "科学研究和技术服务业":
+                case "住宿和餐饮业":
+                case "教育":
+                case "金融业":
+                case "采矿业":
+                case "公共管理、社会保障和社会组织":
+                    score = 8;
+                    break;
+                case "文化、教育和娱乐业":
+                case "房地产业":
+                    score = 9;
+                    break;
+                case "制造业":
+                case "建筑业":
+                case "交通运输、仓储和邮政业":
+                case "水利、环境和公共设置管理业":
+                case "卫生和社会工作":
+                case "电力、热力、燃气及谁生产和供应业":
+                case "国际组织":
+                    score = 10;
+                    break;
+                default:
+                    score = 6;
+                    break;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
@@ -133,42 +137,52 @@ public class DimScore {
 //        if ("1".equals(isOnStock) || (econKind.contains("非上市") && econKind.contains("上市"))) {
 //            extScore = 8;
 //        }
-        if (StrUtil.isBlank(econKind)) {
-            return 6;
-        } else if (StrUtil.containsAny(econKind, "普通合伙")) {
-            score = 6;
-        } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
-            score = 7;
-        } else if (econKind.contains("集体所有制")) {
-            score = 8;
-        } else if (StrUtil.containsAny(econKind, "独资企业")) {
-            score = 10;
-        } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
-            score = 12;
-        } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
-            score = 13;
-        } else if (econKind.contains("股份有限公司")) {
-            score = 15;
-        } else if (econKind.contains("有限责任公司")) {
-            score = 10;
-        } else {
-            return 9;
+        try {
+
+            if (StrUtil.isBlank(econKind)) {
+                return 6;
+            } else if (StrUtil.containsAny(econKind, "普通合伙")) {
+                score = 6;
+            } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
+                score = 7;
+            } else if (econKind.contains("集体所有制")) {
+                score = 8;
+            } else if (StrUtil.containsAny(econKind, "独资企业")) {
+                score = 10;
+            } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
+                score = 12;
+            } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
+                score = 13;
+            } else if (econKind.contains("股份有限公司")) {
+                score = 15;
+            } else if (econKind.contains("有限责任公司")) {
+                score = 10;
+            } else {
+                return 9;
+            }
+
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
 
     public static double companyStatusEvaluate(String status) {
-        if(StrUtil.isBlank(status)){
+        if (StrUtil.isBlank(status)) {
             return 10f;
         }
-        if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
-            return 25;
-        } else if (StrUtil.containsAny(status, "停业")) {
-            return 5;
-        } else if (StrUtil.containsAny(status, "吊销", "清算")) {
-            return 0;
-        } else if (StrUtil.equals(status, "注销")) {
-            return 0;
+        try {
+            if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
+                return 25;
+            } else if (StrUtil.containsAny(status, "停业")) {
+                return 5;
+            } else if (StrUtil.containsAny(status, "吊销", "清算")) {
+                return 0;
+            } else if (StrUtil.equals(status, "注销")) {
+                return 0;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 10f;
     }
@@ -226,29 +240,38 @@ public class DimScore {
         if (StrUtil.isBlank(address)) {
             return 2d;
         }
-        if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
-                || address.contains("浙江")) {
-            return 7d;
-        } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
-                address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
-                address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
-            return 5d;
-        } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
-                address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
-                address.contains("海南")) {
-            return 4d;
-        } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
-            return 3d;
-        } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
-            return 1d;
+        try {
+            if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
+                    || address.contains("浙江")) {
+                return 7d;
+            } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
+                    address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
+                    address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
+                return 5d;
+            } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
+                    address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
+                    address.contains("海南")) {
+                return 4d;
+            } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
+                return 3d;
+            } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
+                return 1d;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 2d;
     }
 
     public static void main(String[] args) {
-        System.out.println(DimScore.actualEvaluate("1", "3"));
-        System.out.println(DimScore.actualEvaluate("1", "0"));
-        System.out.println(DimScore.registEvaluate("400 "));
-        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+//        System.out.println(DimScore.actualEvaluate("1", "3"));
+//        System.out.println(DimScore.actualEvaluate("1", "0"));
+//        System.out.println(DimScore.registEvaluate("400 "));
+//        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+
+        System.out.println(new Date().toString());
+        System.out.println(DimScore.industryEvaluate(null));
+        System.out.println(DimScore.industryEvaluate(""));
+        System.out.println(DimScore.industryEvaluate("教育"));
     }
 }

+ 24 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculator.scala

@@ -5,15 +5,37 @@ import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.commons.logging.LogFactory
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object CompanyInfoCalculator extends CompanyMapping {
 
   private val LOG = LogFactory.getLog(this.getClass)
 
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv("CompanyInfoCalculator")
+
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     import spark._
-    LOG.info("company calc start!   ")
     println("company calc start!   ")
 
     prepare(spark)
@@ -50,7 +72,6 @@ object CompanyInfoCalculator extends CompanyMapping {
 
     df.write.mode("overwrite").insertInto(resultTable)
 
-    LOG.info("company calc end!   ")
     println("company calc end!   ")
 
     spark.stop();

+ 4 - 1
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -2,13 +2,15 @@ package com.winhc.bigdata.spark.utils
 
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object SparkUtils {
 
   def InitEnv(appName: String): SparkSession = {
     InitEnv(appName, null)
   }
 
-  def InitEnv(appName: String, config: Map[String, String]): SparkSession = {
+  def InitEnv(appName: String, config: mutable.Map[String, String]): SparkSession = {
     val spark = SparkSession
       .builder()
       .appName(appName)
@@ -21,6 +23,7 @@ object SparkUtils {
       .config("spark.sql.catalogImplementation", "odps")
       .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
       .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
+      .config("spark.hadoop.odps.cupid.vectorization.enable", false)
 
     if (System.getProperty("os.name").contains("Windows")) {
       spark.master("local[*]")