|
@@ -1,14 +1,13 @@
|
|
|
package com.winhc.bigdata.spark.jobs
|
|
|
|
|
|
+import java.util
|
|
|
import java.util.Date
|
|
|
-
|
|
|
import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
|
|
|
import com.winhc.bigdata.spark.utils.SparkUtils
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
|
import org.apache.commons.logging.LogFactory
|
|
|
import org.apache.spark.broadcast.Broadcast
|
|
|
import org.apache.spark.sql.{Row, SparkSession}
|
|
|
-
|
|
|
import scala.collection.mutable
|
|
|
|
|
|
object CompanyInfoCalculatorV2 {
|
|
@@ -23,7 +22,7 @@ object CompanyInfoCalculatorV2 {
|
|
|
}
|
|
|
|
|
|
var config = mutable.Map.empty[String, String]
|
|
|
- val Array(instances, cores, memory) = args;
|
|
|
+ val Array(instances, cores, memory) = args
|
|
|
|
|
|
println(
|
|
|
s"""
|
|
@@ -44,28 +43,30 @@ object CompanyInfoCalculatorV2 {
|
|
|
|
|
|
println("company calc start! " + new Date().toString)
|
|
|
|
|
|
- val tableName = "new_ods_company"
|
|
|
- val resultTable = "ads_company_score_v3"
|
|
|
+ val ods_company = "new_ods_company"
|
|
|
+ val company_score = "ads_company_score_v3"
|
|
|
+ val company_category = "const_company_category_code"
|
|
|
+ val company_stock = "ods_company_stock"
|
|
|
|
|
|
//所属行业
|
|
|
val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
|
|
|
- """
|
|
|
+ s"""
|
|
|
|select category_code,category_str_big
|
|
|
- |from const_company_category_code
|
|
|
+ |from $company_category
|
|
|
""".stripMargin).collect().map(r => {
|
|
|
(r.getString(0), r.getString(1))
|
|
|
}).toMap)
|
|
|
|
|
|
//上市信息
|
|
|
- val stock: Broadcast[Map[Long, String]] = spark.sparkContext.broadcast(sql(
|
|
|
- """
|
|
|
+ val stock: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
|
|
|
+ s"""
|
|
|
|select cid,name
|
|
|
- |from ods_company_stock
|
|
|
+ |from $company_stock
|
|
|
""".stripMargin).collect().map(r => {
|
|
|
- (r.getLong(0), "1")
|
|
|
+ (r.getLong(0).toString, "1")
|
|
|
}).toMap)
|
|
|
|
|
|
- val df = sql(
|
|
|
+ sql(
|
|
|
s"""
|
|
|
|SELECT id,cid,name
|
|
|
| ,reg_capital
|
|
@@ -81,25 +82,24 @@ object CompanyInfoCalculatorV2 {
|
|
|
| ,CAST(from_time AS STRING) from_time
|
|
|
| ,CAST(to_time AS STRING) to_time
|
|
|
| ,reg_location
|
|
|
- |FROM ${tableName}
|
|
|
+ |FROM ${ods_company}
|
|
|
+ |where cid is not null
|
|
|
|""".stripMargin).flatMap(r => {
|
|
|
trans(stock, code2Name, r)
|
|
|
- }).toDF("id", "cid", "name", "kind", "project", "type", "score", "total", "extraScore")
|
|
|
+ }).toDF("id", "cid", "name", "kind", "kind_code", "project", "project_code", "type", "score", "total", "extraScore")
|
|
|
+ .createOrReplaceTempView(s"${ods_company}_tmp_view")
|
|
|
|
|
|
- // 写 分区表
|
|
|
- df.createOrReplaceTempView(s"${tableName}_tmp_view")
|
|
|
- sql(s"insert overwrite table ${resultTable} select * from ${tableName}_tmp_view")
|
|
|
- // df.show(100)
|
|
|
+ sql(s"insert overwrite table ${company_score} select * from ${ods_company}_tmp_view")
|
|
|
println("company calc end! " + new Date().toString)
|
|
|
|
|
|
spark.stop();
|
|
|
}
|
|
|
|
|
|
|
|
|
- private def trans(stock: Broadcast[Map[Long, String]], code2Name: Broadcast[Map[String, String]], r: Row) = {
|
|
|
+ private def trans(stock: Broadcast[Map[String, String]], code2Name: Broadcast[Map[String, String]], r: Row) = {
|
|
|
|
|
|
val id = r.getAs[Long]("id")
|
|
|
- val cid = r.getAs[Long]("cid")
|
|
|
+ val cid = r.getAs[Long]("cid").toString
|
|
|
val name = r.getAs[String]("name")
|
|
|
val reg_capital = r.getAs[String]("reg_capital")
|
|
|
val actual_capital_amount = r.getAs[Long]("actual_capital_amount")
|
|
@@ -112,7 +112,7 @@ object CompanyInfoCalculatorV2 {
|
|
|
val reg_location = r.getAs[String]("reg_location")
|
|
|
|
|
|
var actual_capital = ""
|
|
|
- //实缴资本转换
|
|
|
+ //实缴资本单位 分转换元
|
|
|
if (actual_capital_amount > 0 && StringUtils.isNotBlank(actual_capital_currency)) {
|
|
|
actual_capital = actual_capital_amount / 100 + "元" + actual_capital_currency
|
|
|
}
|
|
@@ -126,15 +126,8 @@ object CompanyInfoCalculatorV2 {
|
|
|
val r7 = DimScoreV2.bean2Map(DimScoreV2.termScore(to_time))
|
|
|
val r8 = DimScoreV2.bean2Map(DimScoreV2.addressScore(reg_location))
|
|
|
|
|
|
- Seq(
|
|
|
- (id, cid, name, r1.get("kind"), r1.get("project"), r1.get("type"), r1.get("score"), r1.get("total"), r1.get("extraScore")),
|
|
|
- (id, cid, name, r2.get("kind"), r2.get("project"), r2.get("type"), r2.get("score"), r2.get("total"), r2.get("extraScore")),
|
|
|
- (id, cid, name, r3.get("kind"), r3.get("project"), r3.get("type"), r3.get("score"), r3.get("total"), r3.get("extraScore")),
|
|
|
- (id, cid, name, r4.get("kind"), r4.get("project"), r4.get("type"), r4.get("score"), r4.get("total"), r4.get("extraScore")),
|
|
|
- (id, cid, name, r5.get("kind"), r5.get("project"), r5.get("type"), r5.get("score"), r5.get("total"), r5.get("extraScore")),
|
|
|
- (id, cid, name, r6.get("kind"), r6.get("project"), r6.get("type"), r6.get("score"), r6.get("total"), r6.get("extraScore")),
|
|
|
- (id, cid, name, r7.get("kind"), r7.get("project"), r7.get("type"), r7.get("score"), r7.get("total"), r7.get("extraScore")),
|
|
|
- (id, cid, name, r8.get("kind"), r8.get("project"), r8.get("type"), r8.get("score"), r8.get("total"), r8.get("extraScore"))
|
|
|
- )
|
|
|
+ Seq(r1, r2, r3, r4, r5, r6, r7, r8)
|
|
|
+ .map(m => (id, cid, name, m.get("kind"), m.get("kind_code"), m.get("project"), m.get("project_code"),
|
|
|
+ m.get("type"), m.get("score").toFloat, m.get("total").toFloat, m.get("extraScore").toFloat))
|
|
|
}
|
|
|
}
|