Bläddra i källkod

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	pom.xml
#	src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala
yongnian 4 år sedan
förälder
incheckning
0f413c10fc

+ 13 - 0
pom.xml

@@ -85,6 +85,16 @@
             <version>0.33.7-public</version>
         </dependency>
         <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>cupid-sdk</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-spark-datasource_${scala.binary.version}</artifactId>
+            <version>${cupid.sdk.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.mongodb.spark</groupId>
             <artifactId>mongo-spark-connector_2.11</artifactId>
             <version>2.4.1</version>
@@ -96,6 +106,7 @@
         </dependency>
     </dependencies>
 
+
     <build>
         <plugins>
             <plugin>
@@ -115,8 +126,10 @@
                                 <includes>
                                     <!--<include>*:*</include>-->
                                     <include>cn.hutool:*</include>
+                                    <include>com.aliyun.odps:*</include>
                                     <include>org.mongodb.*:*</include>
                                     <include>org.mongodb:mongo-java-driver</include>
+<!--                                    <include>com.aliyun.odps:odps-spark-datasource_2.11:*</include>-->
                                 </includes>
                             </artifactSet>
                             <filters>

+ 106 - 83
src/main/java/com/winhc/bigdata/calc/DimScore.java

@@ -84,41 +84,45 @@ public class DimScore {
             return 6;
         }
         double score = 0;
-        switch (industry) {
-            case "批发和零售业":
-                score = 4;
-                break;
-            case "农、林、牧、渔业":
-                score = 5;
-                break;
-            case "居民服务、修理和其他服务业":
-            case "信息传输、软件和信息科技服务业":
-                score = 7;
-                break;
-            case "科学研究和技术服务业":
-            case "住宿和餐饮业":
-            case "教育":
-            case "金融业":
-            case "采矿业":
-            case "公共管理、社会保障和社会组织":
-                score = 8;
-                break;
-            case "文化、教育和娱乐业":
-            case "房地产业":
-                score = 9;
-                break;
-            case "制造业":
-            case "建筑业":
-            case "交通运输、仓储和邮政业":
-            case "水利、环境和公共设置管理业":
-            case "卫生和社会工作":
-            case "电力、热力、燃气及谁生产和供应业":
-            case "国际组织":
-                score = 10;
-                break;
-            default:
-                score = 6;
-                break;
+        try {
+            switch (industry) {
+                case "批发和零售业":
+                    score = 4;
+                    break;
+                case "农、林、牧、渔业":
+                    score = 5;
+                    break;
+                case "居民服务、修理和其他服务业":
+                case "信息传输、软件和信息科技服务业":
+                    score = 7;
+                    break;
+                case "科学研究和技术服务业":
+                case "住宿和餐饮业":
+                case "教育":
+                case "金融业":
+                case "采矿业":
+                case "公共管理、社会保障和社会组织":
+                    score = 8;
+                    break;
+                case "文化、教育和娱乐业":
+                case "房地产业":
+                    score = 9;
+                    break;
+                case "制造业":
+                case "建筑业":
+                case "交通运输、仓储和邮政业":
+                case "水利、环境和公共设置管理业":
+                case "卫生和社会工作":
+                case "电力、热力、燃气及谁生产和供应业":
+                case "国际组织":
+                    score = 10;
+                    break;
+                default:
+                    score = 6;
+                    break;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
@@ -133,42 +137,52 @@ public class DimScore {
 //        if ("1".equals(isOnStock) || (econKind.contains("非上市") && econKind.contains("上市"))) {
 //            extScore = 8;
 //        }
-        if (StrUtil.isBlank(econKind)) {
-            return 6;
-        } else if (StrUtil.containsAny(econKind, "普通合伙")) {
-            score = 6;
-        } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
-            score = 7;
-        } else if (econKind.contains("集体所有制")) {
-            score = 8;
-        } else if (StrUtil.containsAny(econKind, "独资企业")) {
-            score = 10;
-        } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
-            score = 12;
-        } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
-            score = 13;
-        } else if (econKind.contains("股份有限公司")) {
-            score = 15;
-        } else if (econKind.contains("有限责任公司")) {
-            score = 10;
-        } else {
-            return 9;
+        try {
+
+            if (StrUtil.isBlank(econKind)) {
+                return 6;
+            } else if (StrUtil.containsAny(econKind, "普通合伙")) {
+                score = 6;
+            } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
+                score = 7;
+            } else if (econKind.contains("集体所有制")) {
+                score = 8;
+            } else if (StrUtil.containsAny(econKind, "独资企业")) {
+                score = 10;
+            } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
+                score = 12;
+            } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
+                score = 13;
+            } else if (econKind.contains("股份有限公司")) {
+                score = 15;
+            } else if (econKind.contains("有限责任公司")) {
+                score = 10;
+            } else {
+                return 9;
+            }
+
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return score;
     }
 
     public static double companyStatusEvaluate(String status) {
-        if(StrUtil.isBlank(status)){
+        if (StrUtil.isBlank(status)) {
             return 10f;
         }
-        if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
-            return 25;
-        } else if (StrUtil.containsAny(status, "停业")) {
-            return 5;
-        } else if (StrUtil.containsAny(status, "吊销", "清算")) {
-            return 0;
-        } else if (StrUtil.equals(status, "注销")) {
-            return 0;
+        try {
+            if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
+                return 25;
+            } else if (StrUtil.containsAny(status, "停业")) {
+                return 5;
+            } else if (StrUtil.containsAny(status, "吊销", "清算")) {
+                return 0;
+            } else if (StrUtil.equals(status, "注销")) {
+                return 0;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 10f;
     }
@@ -226,29 +240,38 @@ public class DimScore {
         if (StrUtil.isBlank(address)) {
             return 2d;
         }
-        if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
-                || address.contains("浙江")) {
-            return 7d;
-        } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
-                address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
-                address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
-            return 5d;
-        } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
-                address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
-                address.contains("海南")) {
-            return 4d;
-        } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
-            return 3d;
-        } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
-            return 1d;
+        try {
+            if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
+                    || address.contains("浙江")) {
+                return 7d;
+            } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
+                    address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
+                    address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
+                return 5d;
+            } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
+                    address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
+                    address.contains("海南")) {
+                return 4d;
+            } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
+                return 3d;
+            } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
+                return 1d;
+            }
+        } catch (Exception e) {
+            LOG.error(e.toString());
         }
         return 2d;
     }
 
     public static void main(String[] args) {
-        System.out.println(DimScore.actualEvaluate("1", "3"));
-        System.out.println(DimScore.actualEvaluate("1", "0"));
-        System.out.println(DimScore.registEvaluate("400 "));
-        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+//        System.out.println(DimScore.actualEvaluate("1", "3"));
+//        System.out.println(DimScore.actualEvaluate("1", "0"));
+//        System.out.println(DimScore.registEvaluate("400 "));
+//        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+
+        System.out.println(new Date().toString());
+        System.out.println(DimScore.industryEvaluate(null));
+        System.out.println(DimScore.industryEvaluate(""));
+        System.out.println(DimScore.industryEvaluate("教育"));
     }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala

@@ -27,7 +27,7 @@ object TestSparkSql {
     throw new RuntimeException("warehouse dir not exists")
   }
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv("spark.conf")
+    val spark: SparkSession = SparkUtils.InitEnv
 
     import spark._
     val tableName = "ods_company_all"

+ 24 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculator.scala

@@ -5,15 +5,37 @@ import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.commons.logging.LogFactory
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object CompanyInfoCalculator extends CompanyMapping {
 
   private val LOG = LogFactory.getLog(this.getClass)
 
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv("CompanyInfoCalculator")
+
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     import spark._
-    LOG.info("company calc start!   ")
     println("company calc start!   ")
 
     prepare(spark)
@@ -50,7 +72,6 @@ object CompanyInfoCalculator extends CompanyMapping {
 
     df.write.mode("overwrite").insertInto(resultTable)
 
-    LOG.info("company calc end!   ")
     println("company calc end!   ")
 
     spark.stop();

+ 39 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryCalculator.scala

@@ -0,0 +1,39 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.DataFrame
+
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/5/20 15:38
+ * @Description:
+ */
+object CompanySummaryCalculator {
+  val tableName2FieldName = Map(
+    "ads_judrisk_court_annc_list" -> "company_id",
+    "ods_company_jud_assist_list" -> "company_id"
+  )
+
+  def getSql(tableName: String, companyIdFieldName: String) = s"select $companyIdFieldName as company_id,count(1) as ${tableName}_num from $tableName where $companyIdFieldName <>0 group by $companyIdFieldName"
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkUtils.InitEnv("CompanySummaryCalculator")
+    import spark._
+    val outputTable = "ads_company_summary"
+
+
+    var selectField = List("company_id")
+    var df: DataFrame = null
+    for (elem <- tableName2FieldName) {
+      selectField = selectField :+ elem._1 + "_num"
+      if (df == null) {
+        df = sql(getSql(elem._1, elem._2))
+      } else {
+        df = df.join(sql(getSql(elem._1, elem._2)), "company_id").select(selectField.head, selectField.tail: _*)
+      }
+    }
+    df.write.mode("overwrite").insertInto(outputTable)
+    spark.stop()
+  }
+}

+ 13 - 1
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -2,9 +2,15 @@ package com.winhc.bigdata.spark.utils
 
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 object SparkUtils {
 
-  def InitEnv(appName: String) = {
+  def InitEnv(appName: String): SparkSession = {
+    InitEnv(appName, null)
+  }
+
+  def InitEnv(appName: String, config: mutable.Map[String, String]): SparkSession = {
     val spark = SparkSession
       .builder()
       .appName(appName)
@@ -17,10 +23,16 @@ object SparkUtils {
       .config("spark.sql.catalogImplementation", "odps")
       .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
       .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
+      .config("spark.hadoop.odps.cupid.vectorization.enable", false)
 
     if (System.getProperty("os.name").contains("Windows")) {
       spark.master("local[*]")
     }
+    if (config != null) {
+      for (e <- config) {
+        spark.config(e._1, e._2)
+      }
+    }
     spark.getOrCreate()
   }
   def InitEnvRaw(appName: String) = {