소스 검색

init commit

xufei 4 년 전
커밋
0a9880c83f

+ 23 - 0
.gitignore

@@ -0,0 +1,23 @@
+*~
+*.#*
+*#*#
+*.swp
+*.ipr
+*.iml
+*.iws
+*.pyc
+*.pyo
+.idea/
+.idea_modules/
+.settings
+.cache
+target/
+temp/
+warehouse/
+.project
+.classpath
+.DS_Store
+metastore_db/
+derby.log
+log4j.properties
+dependency-reduced-pom.xml

+ 161 - 0
pom.xml

@@ -0,0 +1,161 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.aliyun.odps.myJava</groupId>
+    <artifactId>Spark_Max</artifactId>
+    <version>1.0</version>
+
+    <properties>
+        <spark.version>2.3.0</spark.version>
+        <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
+        <scala.version>2.11.8</scala.version>
+        <scala.binary.version>2.11</scala.binary.version>
+        <sdk.version>0.33.7-public</sdk.version>
+        <maven.compiler.source>1.8</maven.compiler.source>
+        <maven.compiler.target>1.8</maven.compiler.target>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+
+    <dependencies>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-udf</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-udf-local</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-mapred</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-mapred-local</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scalap</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+            <version>${spark.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>4.5.16</version>
+        </dependency>
+        <!-- https://mvnrepository.com/artifact/com.aliyun.odps/odps-common-local -->
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-common-local</artifactId>
+            <version>0.33.7-public</version>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.3</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <minimizeJar>false</minimizeJar>
+                            <shadedArtifactAttached>true</shadedArtifactAttached>
+                            <artifactSet>
+                                <includes>
+                                    <!--<include>*:*</include>-->
+                                    <include>cn.hutool:*</include>
+                                </includes>
+                            </artifactSet>
+                            <filters>
+                                <filter>
+                                    <artifact>*:*</artifact>
+                                    <excludes>
+                                        <exclude>META-INF/*.SF</exclude>
+                                        <exclude>META-INF/*.DSA</exclude>
+                                        <exclude>META-INF/*.RSA</exclude>
+                                        <exclude>**/log4j.properties</exclude>
+                                    </excludes>
+                                </filter>
+                            </filters>
+                            <transformers>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>reference.conf</resource>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
+                                    <resource>META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+                                    </resource>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.3.2</version>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile-first</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 254 - 0
src/main/java/com/winhc/bigdata/calc/DimScore.java

@@ -0,0 +1,254 @@
+package com.winhc.bigdata.calc;
+
+import cn.hutool.core.date.DateUtil;
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.StrUtil;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Date;
+
+public class DimScore {
+
+    private static final Log LOG = LogFactory.getLog(DimScore.class);
+
+    public static double registEvaluate(String registCapiStr) {
+        //private static final Pattern p = Pattern.compile("[^0-9]");
+        //String registCapiStr = StringUtils.isNotBlank(resEx) ? p.matcher(resEx).replaceAll("").trim() : "";
+        if (StrUtil.isBlank(registCapiStr)) {
+            return 5;
+        }
+        double registCapi = 0d;
+        double registScore = 5;
+        try {
+            registCapi = MoneyConversion.convert(registCapiStr, MoneyConversion.MoneyUnit.万元);
+            if (registCapi < 50) {
+                registScore = 5;
+            } else if (registCapi >= 50 && registCapi < 500) {
+                registScore = 10;
+            } else if (registCapi >= 500 && registCapi < 1000) {
+                registScore = 15;
+            } else if (registCapi >= 1000 && registCapi < 5000) {
+                registScore = 20;
+            } else if (registCapi >= 5000) {
+                registScore = 25;
+            }
+
+        } catch (Exception e) {
+            LOG.error(e.toString());
+        }
+
+        return registScore;
+    }
+
+    public static double actualEvaluate(String recCapStr, String registCapiStr) {
+
+        if (StrUtil.isBlank(recCapStr) || StrUtil.isBlank(registCapiStr)) {
+            return 3;
+        }
+        double registCapi = 0d;
+
+        double recCap = 0d;
+
+        try {
+            registCapi = MoneyConversion.convert(registCapiStr, MoneyConversion.MoneyUnit.万元);
+            recCap = MoneyConversion.convert(recCapStr, MoneyConversion.MoneyUnit.万元);
+
+            if (registCapi <= 0) {
+                return 1;
+            }
+            double rate = NumberUtil.div(recCap, registCapi, 4);
+            if (rate == 0) {
+                return 1;
+            } else if (rate > 0 && rate < NumberUtil.div(1, 5, 4)) {
+                return 2;
+            } else if (rate >= NumberUtil.div(1, 5, 4) && rate < NumberUtil.div(1, 3, 4)) {
+                return 3;
+            } else if (rate >= NumberUtil.div(1, 3, 4) && rate < NumberUtil.div(2, 3, 4)) {
+                return 4;
+            } else if (rate >= NumberUtil.div(2, 3, 4) && rate <= 1) {
+                return 5;
+            } else {
+                return 0;
+            }
+
+        } catch (Exception e) {
+            LOG.error(e.toString());
+        }
+        return 0;
+    }
+
+
+    public static double industryEvaluate(String industry) {
+        if (StrUtil.isBlank(industry)) {
+            return 6;
+        }
+        double score = 0;
+        switch (industry) {
+            case "批发和零售业":
+                score = 4;
+                break;
+            case "农、林、牧、渔业":
+                score = 5;
+                break;
+            case "居民服务、修理和其他服务业":
+            case "信息传输、软件和信息科技服务业":
+                score = 7;
+                break;
+            case "科学研究和技术服务业":
+            case "住宿和餐饮业":
+            case "教育":
+            case "金融业":
+            case "采矿业":
+            case "公共管理、社会保障和社会组织":
+                score = 8;
+                break;
+            case "文化、教育和娱乐业":
+            case "房地产业":
+                score = 9;
+                break;
+            case "制造业":
+            case "建筑业":
+            case "交通运输、仓储和邮政业":
+            case "水利、环境和公共设置管理业":
+            case "卫生和社会工作":
+            case "电力、热力、燃气及谁生产和供应业":
+            case "国际组织":
+                score = 10;
+                break;
+            default:
+                score = 6;
+                break;
+        }
+        return score;
+    }
+
+    public static double companyTypeEvaluate(String econKind) {
+        float score = 6;
+        //TO DO 上市公司加分未考虑
+//        float extScore = 0;
+//        if (StrUtil.containsAny(econKind,"国资委","国家机构", "国有资产")) {
+//            extScore = 15;
+//        }
+//        if ("1".equals(isOnStock) || (econKind.contains("非上市") && econKind.contains("上市"))) {
+//            extScore = 8;
+//        }
+        if (StrUtil.isBlank(econKind)) {
+            return 6;
+        } else if (StrUtil.containsAny(econKind, "普通合伙")) {
+            score = 6;
+        } else if (StrUtil.containsAny(econKind, "个体工商户", "有限合伙", "联营企业")) {
+            score = 7;
+        } else if (econKind.contains("集体所有制")) {
+            score = 8;
+        } else if (StrUtil.containsAny(econKind, "独资企业")) {
+            score = 10;
+        } else if (StrUtil.containsAny(econKind, "外商投资企业")) {
+            score = 12;
+        } else if (StrUtil.containsAny(econKind, "国有", "国企", "国有独资", "国有控股")) {
+            score = 13;
+        } else if (econKind.contains("股份有限公司")) {
+            score = 15;
+        } else if (econKind.contains("有限责任公司")) {
+            score = 10;
+        } else {
+            return 9;
+        }
+        return score;
+    }
+
+    public static double companyStatusEvaluate(String status) {
+        if(StrUtil.isBlank(status)){
+            return 10f;
+        }
+        if (StrUtil.containsAny(status, "在业", "存续", "迁入", "迁出", "在营", "开业")) {
+            return 25;
+        } else if (StrUtil.containsAny(status, "停业")) {
+            return 5;
+        } else if (StrUtil.containsAny(status, "吊销", "清算")) {
+            return 0;
+        } else if (StrUtil.equals(status, "注销")) {
+            return 0;
+        }
+        return 10f;
+    }
+
+    public static double startDateEvaluate(String startDate) {
+        if (StrUtil.isBlank(startDate)) {
+            return 2d;
+        }
+        try {
+            Date start = DateUtil.parse(startDate);
+            Date now = new Date();
+            long dis = DateUtil.betweenYear(start, now, true);
+            if (dis < 1) {
+                return 2d;
+            } else if (dis < 3) {
+                return 5d;
+            } else if (dis < 6) {
+                return 6d;
+            } else if (dis < 10) {
+                return 7f;
+            } else {
+                return 8f;
+            }
+        } catch (Exception e) {
+            LOG.error("", e);
+            return 2f;
+        }
+    }
+
+    public static double endDateEvaluate(String termStart, String termEnd) {
+        if (StrUtil.isBlank(termStart)) {
+            return 2d;
+        }
+        try {
+            Date start = DateUtil.parse(termStart);
+            Date end = new Date();
+            if (StrUtil.isNotBlank(termEnd)) {
+                end = DateUtil.parse(termEnd);
+            }
+            long dis = DateUtil.betweenYear(start, end, true);
+            if (dis < 1) {
+                return 2d;
+            } else if (dis < 5) {
+                return 4d;
+            } else {
+                return 5d;
+            }
+        } catch (Exception e) {
+            LOG.error("", e);
+            return 2d;
+        }
+    }
+
+    public static double addressEvaluate(String address) {
+        if (StrUtil.isBlank(address)) {
+            return 2d;
+        }
+        if (address.contains("北京") || address.contains("上海") || address.contains("广东") || address.contains("江苏")
+                || address.contains("浙江")) {
+            return 7d;
+        } else if (address.contains("山东") || address.contains("四川") || address.contains("河南") ||
+                address.contains("安徽") || address.contains("河北") || address.contains("重庆") || address.contains("湖南") ||
+                address.contains("湖北") || address.contains("天津") || address.contains("贵州")) {
+            return 5d;
+        } else if (address.contains("黑龙江") || address.contains("吉林") || address.contains("辽宁") || address.contains("宁夏") ||
+                address.contains("内蒙古") || address.contains("西藏") || address.contains("新疆") || address.contains("青海") ||
+                address.contains("海南")) {
+            return 4d;
+        } else if (address.contains("山西") || address.contains("陕西") || address.contains("云南") || address.contains("广西")) {
+            return 3d;
+        } else if (address.contains("香港") || address.contains("澳门") || address.contains("台湾")) {
+            return 1d;
+        }
+        return 2d;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(DimScore.actualEvaluate("1", "3"));
+        System.out.println(DimScore.actualEvaluate("1", "0"));
+        System.out.println(DimScore.registEvaluate("400 "));
+        System.out.println(DimScore.registEvaluate("60.16 万美元"));
+    }
+}

+ 112 - 0
src/main/java/com/winhc/bigdata/calc/MoneyConversion.java

@@ -0,0 +1,112 @@
+package com.winhc.bigdata.calc;
+
+import cn.hutool.core.util.NumberUtil;
+import cn.hutool.core.util.StrUtil;
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * @description: 货币换算工具类
+ * @author: yujie
+ * @date 2019/7/1 18:48
+ */
+public class MoneyConversion {
+
+    public static void main(String[] args) {
+        double i = convert("500万元人民币",MoneyUnit.元);
+        double divValue = NumberUtil.div(i, 14*10000, 2);
+        System.out.println(divValue);
+
+
+    }
+
+    /**
+     * 精确单位
+     */
+    public enum MoneyUnit {
+        元, 万元;
+    }
+    /**
+     * 格式换算,例:100万元美元
+     * @param money
+     * @return
+     */
+    public static double convert(String money, MoneyUnit unit) {
+        if (StringUtils.isBlank(money)) {
+            return 0;
+        }
+        String number = getNumber(money);
+        if (StringUtils.isBlank(number)) {
+            return 0;
+        }
+        Number num = NumberUtil.parseNumber(number);
+        if (money.contains("亿")) {
+            num = NumberUtil.mul(num, 10000);
+        } else if (money.contains("千万")) {
+            num = NumberUtil.mul(num, 1000);
+        } else if (money.contains("百万")) {
+            num = NumberUtil.mul(num, 100);
+        } else if (money.contains("万")) {
+            // do nothing
+        } else if (money.contains("元")) {
+            num = NumberUtil.div(num,10000,2);
+        }
+
+        if (unit == MoneyUnit.元) {
+            num = NumberUtil.mul(num,10000);
+        }
+
+        if (money.contains("港元")) {
+            num = NumberUtil.mul(num,0.8840);
+        } else if (money.contains("澳元")) {
+            num = NumberUtil.mul(num, 4.7277);
+        } else if (money.contains("台币")) {
+            num = NumberUtil.mul(num, 0.2205);
+        } else if (money.contains("欧元")) {
+            num = NumberUtil.mul(num, 7.6202);
+        } else if (money.contains("美元")) {
+            num = NumberUtil.mul(num, 6.8698);
+        } else if (money.contains("英镑")) {
+            num = NumberUtil.mul(num, 8.8835);
+        } else if (money.contains("韩元")) {
+            num = NumberUtil.mul(num, 0.0058);
+        } else if (money.contains("日元")) {
+            num = NumberUtil.mul(num, 0.0630);
+        } else if (money.contains("卢布")) {
+            num = NumberUtil.mul(num, 0.1044);
+        } else if (money.contains("铢")) {
+            num = NumberUtil.mul(num, 0.2129);
+        }
+        num = NumberUtil.roundDown(num,2);
+        return num.doubleValue();
+    }
+
+    /**
+     * 获取字符串中的数字,必须是连续的,默认返回0
+     * @param numberStr
+     * @return
+     */
+    public static String getNumber(String numberStr) {
+        if (StringUtils.isBlank(numberStr)) {
+            return "0";
+        }
+        StringBuilder numberBuilder = new StringBuilder();
+        for (int index = 0; index < numberStr.length(); index++) {
+            if (Character.isDigit(numberStr.charAt(index))) {
+                numberBuilder.append(numberStr.charAt(index));
+            } else if (numberStr.charAt(index) == '.'
+                    && StringUtils.isNotBlank(numberBuilder.toString())
+                    && !numberBuilder.toString().contains(".")) {
+                numberBuilder.append(".");
+            } else if (numberStr.charAt(index) == '-') {
+                numberBuilder.append(numberStr.charAt(index));
+            }
+        }
+        if (StrUtil.isNotBlank(numberBuilder.toString())) {
+            return numberBuilder.toString();
+        } else {
+            return "0";
+        }
+    }
+
+
+}

+ 43 - 0
src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala

@@ -0,0 +1,43 @@
+package com.winhc.bigdata.spark
+
+import java.io.File
+
+import com.aliyun.odps.Odps
+import com.aliyun.odps.account.AliyunAccount
+import com.aliyun.odps.local.common.WareHouse
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.SparkSession
+
+object TestSparkSql {
+  private val accessId = "LTAI4G4n7pAW8tUbJVkkZQPD"
+  private val accessKey = "uNJOBskzcDqHq1TYG3m2rebR4c1009"
+  private val endpoint = "http://service.cn.maxcompute.aliyun.com/api"
+  private val defaultProject = "example_project"
+  val account = new AliyunAccount(accessId, accessKey)
+  val od = new Odps(account)
+  od.setEndpoint(endpoint)
+  od.setDefaultProject(defaultProject)
+  def initWarehouse ():WareHouse= {
+    var exampleProjectDir = new File("warehouse" + File.separator + defaultProject)
+    if (exampleProjectDir.exists) return WareHouse.getInstance("warehouse")
+    else {
+      exampleProjectDir = new File("../warehouse" + File.separator + defaultProject)
+      if (exampleProjectDir.exists) return WareHouse.getInstance("../warehouse")
+    }
+    throw new RuntimeException("warehouse dir not exists")
+  }
+  def main(args: Array[String]): Unit = {
+    val spark: SparkSession = SparkUtils.InitEnv
+
+    import spark._
+    val tableName = "ods_company_all"
+    val rdf = sql(s"select * from $tableName limit 10")
+//    rdf.foreach(println(_))
+    val strList = Array("*")
+    val wh = initWarehouse()
+    wh.setOdps(od)
+    wh.readData("winhc_test_dev", tableName, null, null, ',').toArray().foreach(println _)
+//    print(s"rdf size : ${rdf.count()}")
+  }
+
+}

+ 60 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculator.scala

@@ -0,0 +1,60 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CommpanyMapping
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.commons.logging.LogFactory
+import org.apache.spark.sql.SparkSession
+
+object CompanyInfoCalculator extends CommpanyMapping {
+
+  private val LOG = LogFactory.getLog(this.getClass)
+
+  def main(args: Array[String]): Unit = {
+    val spark: SparkSession = SparkUtils.InitEnv
+
+    import spark._
+    LOG.info("company calc start!   ")
+    println("company calc start!   ")
+
+    prepare(spark)
+
+    val tableName = "ads_company"
+    val resultTable = "ads_company_score"
+
+    val df = sql(
+      s"""
+         |SELECT  id
+         |        ,company_name
+         |        ,legal_person_name
+         |        ,reg_capital
+         |        ,reg_capital_score(reg_capital) AS reg_capital_score
+         |        ,reg_capital_num
+         |        ,actual_capital
+         |        ,actual_capital_score(actual_capital,reg_capital) AS actual_capital_score
+         |        ,category_code
+         |        ,industry_score(category_code) AS industry_score
+         |        ,company_type
+         |        ,company_org_type
+         |        ,company_type_score(company_org_type) AS company_type_score
+         |        ,reg_status
+         |        ,company_status_score(reg_status) AS company_status_score
+         |        ,estiblish_time
+         |        ,start_date_score(CAST(estiblish_time AS STRING)) AS start_date_score
+         |        ,from_time
+         |        ,to_time
+         |        ,end_date_score(CAST(from_time AS STRING) ,CAST(to_time AS STRING)) AS end_date_score
+         |        ,reg_location
+         |        ,reg_location_score(reg_location) AS reg_location_score
+         |FROM    ${tableName}
+         |""".stripMargin)
+
+    df.write.mode("overwrite").insertInto(resultTable)
+
+    LOG.info("company calc end!   ")
+    println("company calc end!   ")
+
+    spark.stop();
+  }
+
+
+}

+ 63 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CommpanyMapping.scala

@@ -0,0 +1,63 @@
+package com.winhc.bigdata.spark.udf
+
+import com.winhc.bigdata.calc.DimScore
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.SparkSession
+
+trait CommpanyMapping {
+
+
+  def prepare(spark: SparkSession): Unit = {
+    import spark._
+    //注册资本
+    spark.udf.register("reg_capital_score", (capital: String) => {
+      DimScore.registEvaluate(capital)
+    })
+
+    //实缴资本金额转换
+    spark.udf.register("actual_capital_score", (actual_capital: String, reg_capital: String) => {
+      DimScore.actualEvaluate(actual_capital, reg_capital);
+    })
+
+    //所属行业
+    val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
+      """
+        |select category_code,category_str_big
+        |from const_company_category_code
+      """.stripMargin).collect().map(r => {
+      (r.getString(0), r.getString(1))
+    }).toMap)
+
+    spark.udf.register("industry_score", (code: String) => {
+      DimScore.industryEvaluate(code2Name.value.getOrElse(code, null))
+    })
+
+    //企业类型
+    spark.udf.register("company_type_score", (company_org_type: String) => {
+      DimScore.companyTypeEvaluate(company_org_type);
+    })
+
+    //企业状态
+    spark.udf.register("company_status_score", (reg_status: String) => {
+      DimScore.companyStatusEvaluate(reg_status);
+    })
+
+    //成立日期
+    spark.udf.register("start_date_score", (estiblish_time: String) => {
+      DimScore.startDateEvaluate(estiblish_time);
+    })
+
+    //企业期限
+    spark.udf.register("end_date_score", (from_time: String, to_time: String) => {
+      DimScore.endDateEvaluate(from_time, to_time);
+    })
+
+    //注册地址
+    spark.udf.register("reg_location_score", (reg_location: String) => {
+      DimScore.addressEvaluate(reg_location);
+    })
+
+  }
+
+}

+ 27 - 0
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -0,0 +1,27 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.spark.sql.SparkSession
+
+object SparkUtils {
+
+  def InitEnv = {
+    val spark = SparkSession
+      .builder()
+      .appName(this.getClass.getSimpleName)
+      .config("spark.sql.broadcastTimeout", 20 * 60)
+      .config("spark.sql.crossJoin.enabled", true)
+      .config("odps.exec.dynamic.partition.mode", "nonstrict")
+      .config("spark.hadoop.odps.project.name", "winhc_test_dev")
+      .config("spark.hadoop.odps.access.id", "LTAI4G4n7pAW8tUbJVkkZQPD")
+      .config("spark.hadoop.odps.access.key", "uNJOBskzcDqHq1TYG3m2rebR4c1009")
+      .config("spark.sql.catalogImplementation", "odps")
+      .config("spark.hadoop.odps.end.point", "http://service.cn.maxcompute.aliyun.com/api")
+      .config("spark.hadoop.odps.runtime.end.point", "http://service.cn.maxcompute.aliyun-inc.com/api")
+
+    if(System.getProperty("os.name").contains("Windows")){
+      spark.master("local[*]")
+    }
+    spark.getOrCreate()
+  }
+
+}