فهرست منبع

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

许家凯 4 سال پیش
والد
کامیت
c56a8ccbc4
1فایلهای تغییر یافته به همراه44 افزوده شده و 29 حذف شده
  1. 44 29
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculatorV2.scala

+ 44 - 29
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyInfoCalculatorV2.scala

@@ -4,6 +4,7 @@ import java.util.Date
 
 import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
 import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.commons.logging.LogFactory
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Row, SparkSession}
@@ -43,9 +44,8 @@ object CompanyInfoCalculatorV2 {
 
     println("company calc start! " + new Date().toString)
 
-    val tableName = "ads_company"
-    val resultTable = "ads_company_score_v2"
-    val ds = "20200526"
+    val tableName = "new_ods_company"
+    val resultTable = "ads_company_score_v3"
 
     //所属行业
     val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
@@ -56,45 +56,54 @@ object CompanyInfoCalculatorV2 {
       (r.getString(0), r.getString(1))
     }).toMap)
 
+    //上市信息
+    val stock: Broadcast[Map[Long, String]] = spark.sparkContext.broadcast(sql(
+      """
+        |select cid,name
+        |from ods_company_stock
+      """.stripMargin).collect().map(r => {
+      (r.getLong(0), "1")
+    }).toMap)
+
     val df = sql(
       s"""
-         |SELECT  id
-         |        ,company_name
-         |        ,legal_person_name
+         |SELECT  id,cid,name
          |        ,reg_capital
-         |        ,reg_capital_num
-         |        ,actual_capital
+         |        ,reg_capital_amount
+         |        ,reg_capital_currency
+         |        ,actual_capital_amount
+         |        ,actual_capital_currency
          |        ,category_code
          |        ,company_type
          |        ,company_org_type
          |        ,reg_status
-         |        ,cast(estiblish_time as string) estiblish_time
-         |        ,cast(from_time as string) from_time
-         |        ,cast(to_time as string) to_time
+         |        ,CAST(estiblish_time AS STRING) estiblish_time
+         |        ,CAST(from_time AS STRING) from_time
+         |        ,CAST(to_time AS STRING) to_time
          |        ,reg_location
          |FROM    ${tableName}
          |""".stripMargin).flatMap(r => {
-      trans(code2Name, r)
-    }).toDF("id", "company_name", "kind", "project", "type", "score", "total", "extraScore")
+      trans(stock, code2Name, r)
+    }).toDF("id", "cid", "name", "kind", "project", "type", "score", "total", "extraScore")
 
     // 写 分区表
     df.createOrReplaceTempView(s"${tableName}_tmp_view")
-    sql(s"insert overwrite table ${resultTable} partition (ds=$ds) select * from ${tableName}_tmp_view")
-
-    df.printSchema()
-    df.show(100)
+    sql(s"insert overwrite table ${resultTable}  select * from ${tableName}_tmp_view")
+    //    df.show(100)
     println("company calc end! " + new Date().toString)
 
     spark.stop();
   }
 
 
-  private def trans(code2Name: Broadcast[Map[String, String]], r: Row) = {
+  private def trans(stock: Broadcast[Map[Long, String]], code2Name: Broadcast[Map[String, String]], r: Row) = {
 
     val id = r.getAs[Long]("id")
-    val company_name = r.getAs[String]("company_name")
+    val cid = r.getAs[Long]("cid")
+    val name = r.getAs[String]("name")
     val reg_capital = r.getAs[String]("reg_capital")
-    val actual_capital = r.getAs[String]("actual_capital")
+    val actual_capital_amount = r.getAs[Long]("actual_capital_amount")
+    val actual_capital_currency = r.getAs[String]("actual_capital_currency")
     val category_code = r.getAs[String]("category_code")
     val company_org_type = r.getAs[String]("company_org_type")
     val reg_status = r.getAs[String]("reg_status")
@@ -102,24 +111,30 @@ object CompanyInfoCalculatorV2 {
     val to_time = r.getAs[String]("to_time")
     val reg_location = r.getAs[String]("reg_location")
 
+    var actual_capital = ""
+    //实缴资本转换
+    if (actual_capital_amount > 0 && StringUtils.isNotBlank(actual_capital_currency)) {
+      actual_capital = actual_capital_amount / 100 + "元" + actual_capital_currency
+    }
+
     val r1 = DimScoreV2.bean2Map(DimScoreV2.registCapiScore(reg_capital))
     val r2 = DimScoreV2.bean2Map(DimScoreV2.recCapScore(actual_capital, reg_capital))
     val r3 = DimScoreV2.bean2Map(DimScoreV2.industryScore(code2Name.value.getOrElse(category_code, null)))
-    val r4 = DimScoreV2.bean2Map(DimScoreV2.econKindScore(company_org_type, "")) //注意解决是否上市
+    val r4 = DimScoreV2.bean2Map(DimScoreV2.econKindScore(company_org_type, stock.value.getOrElse(cid, null))) //注意解决是否上市
     val r5 = DimScoreV2.bean2Map(DimScoreV2.eciStatusScore(reg_status))
     val r6 = DimScoreV2.bean2Map(DimScoreV2.startDateScore(estiblish_time))
     val r7 = DimScoreV2.bean2Map(DimScoreV2.termScore(to_time))
     val r8 = DimScoreV2.bean2Map(DimScoreV2.addressScore(reg_location))
 
     Seq(
-      (id, company_name, r1.get("kind"), r1.get("project"), r1.get("type"), r1.get("score"), r1.get("total"), r1.get("extraScore")),
-      (id, company_name, r2.get("kind"), r2.get("project"), r2.get("type"), r2.get("score"), r2.get("total"), r2.get("extraScore")),
-      (id, company_name, r3.get("kind"), r3.get("project"), r3.get("type"), r3.get("score"), r3.get("total"), r3.get("extraScore")),
-      (id, company_name, r4.get("kind"), r4.get("project"), r4.get("type"), r4.get("score"), r4.get("total"), r4.get("extraScore")),
-      (id, company_name, r5.get("kind"), r5.get("project"), r5.get("type"), r5.get("score"), r5.get("total"), r5.get("extraScore")),
-      (id, company_name, r6.get("kind"), r6.get("project"), r6.get("type"), r6.get("score"), r6.get("total"), r6.get("extraScore")),
-      (id, company_name, r7.get("kind"), r7.get("project"), r7.get("type"), r7.get("score"), r7.get("total"), r7.get("extraScore")),
-      (id, company_name, r8.get("kind"), r8.get("project"), r8.get("type"), r8.get("score"), r8.get("total"), r8.get("extraScore"))
+      (id, cid, name, r1.get("kind"), r1.get("project"), r1.get("type"), r1.get("score"), r1.get("total"), r1.get("extraScore")),
+      (id, cid, name, r2.get("kind"), r2.get("project"), r2.get("type"), r2.get("score"), r2.get("total"), r2.get("extraScore")),
+      (id, cid, name, r3.get("kind"), r3.get("project"), r3.get("type"), r3.get("score"), r3.get("total"), r3.get("extraScore")),
+      (id, cid, name, r4.get("kind"), r4.get("project"), r4.get("type"), r4.get("score"), r4.get("total"), r4.get("extraScore")),
+      (id, cid, name, r5.get("kind"), r5.get("project"), r5.get("type"), r5.get("score"), r5.get("total"), r5.get("extraScore")),
+      (id, cid, name, r6.get("kind"), r6.get("project"), r6.get("type"), r6.get("score"), r6.get("total"), r6.get("extraScore")),
+      (id, cid, name, r7.get("kind"), r7.get("project"), r7.get("type"), r7.get("score"), r7.get("total"), r7.get("extraScore")),
+      (id, cid, name, r8.get("kind"), r8.get("project"), r8.get("type"), r8.get("score"), r8.get("total"), r8.get("extraScore"))
     )
   }
 }