Explorar o código

公司基本信息索引部分调整输出

许家凯 %!s(int64=4) %!d(string=hai) anos
pai
achega
b3216d1694

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
 import com.winhc.bigdata.spark.const.BaseConst
+import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
@@ -68,10 +69,11 @@ object CompanyIncCompany2Es {
     , "DELETED"
   )
 
-  case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
+  case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
     def calc() {
+      val code = code2Name()
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
         println("biz date is error!")
@@ -119,8 +121,6 @@ object CompanyIncCompany2Es {
            |FROM
            |    tmp_company_inc
            |""".stripMargin)
-
-      import spark.implicits._
       //写出到hbase
       import org.apache.spark.sql.functions.col
       val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY")
@@ -140,7 +140,7 @@ object CompanyIncCompany2Es {
 
       //写出到es
       import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
-      stringDf.companyIndexSave2Es()
+      stringDf.companyIndexSave2Es(code._1, code._2)
 
     }
   }

+ 48 - 3
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,13 +1,15 @@
 package com.winhc.bigdata.spark.udf
 
+import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
 import com.winhc.bigdata.spark.utils.BaseUtil
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
-
-import scala.annotation.meta.getter
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
+import scala.annotation.meta.getter
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/10 13:49
@@ -18,7 +20,50 @@ trait BaseFunc {
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
 
-  def cleanup(): Unit ={
+  def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
+    val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
+      s"""
+         |select category_code,
+         |       cate_first,
+         |       cate_second,
+         |       cate_third
+         |from winhc_eci_dev.ods_category_code
+         |where ds = '20200604'
+      """.stripMargin).collect().map(r => {
+      (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
+    }).toMap)
+
+    val areaCode2Name = spark.sparkContext.broadcast(spark.sql(
+      s"""
+         |select area_code,province,city,district
+         |from winhc_eci_dev.ods_area_code where ds = '20200604'
+      """.stripMargin).collect().map(r => {
+      (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
+    }).toMap)
+
+    spark.udf.register("get_category_first", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 0)
+    })
+    spark.udf.register("get_category_second", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 1)
+    })
+    spark.udf.register("get_category_third", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 2)
+    })
+
+    spark.udf.register("get_province_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 0)
+    })
+    spark.udf.register("get_city_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 1)
+    })
+    spark.udf.register("get_county_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 2)
+    })
+    (categoryCode2Name, areaCode2Name)
+  }
+
+  def cleanup(): Unit = {
     //清理特殊字符
     spark.udf.register("cleanup", (col: String) => {
       BaseUtil.cleanup(col)