فهرست منبع

add:测试索引生成

许家凯 4 سال پیش
والد
کامیت
682a593d13
1فایلهای تغییر یافته به همراه278 افزوده شده و 0 حذف شده
  1. 278 0
      src/main/scala/com/winhc/bigdata/spark/jobs/ReIndex.scala

+ 278 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/ReIndex.scala

@@ -0,0 +1,278 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{CompanyIndexUtils, DateUtils, LoggingUtils, RegCapitalAmount, SparkUtils, category}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/23 10:19
+ */
+case class ReIndex(s: SparkSession,
+                   project: String //表所在工程名
+                  ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  reg_urd()
+
+  case class holder(id: String, `type`: String, name: String)
+
+  def reg_urd(): Unit = {
+    cleanup()
+    code2Name()
+    area_code()
+
+    import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+    def history_name(cname: String, history_names: String): String = CompanyIndexUtils.getHistoryName(cname, history_names).toJson()
+
+    spark.udf.register("get_history_name", history_name _)
+
+    def company(name: String): String = CompanyIndexUtils.getCompanyName(name).toJson()
+
+    spark.udf.register("get_company", company _)
+
+    def get_phones_emails(str: String): String = {
+      if (StringUtils.isEmpty(str)) {
+        null
+      } else
+        str.split("\t;\t").map(_.replaceAll("\t;", "")).mkString(",")
+    }
+
+    def tmp(str: String): String = DateUtils.toMillisTimestamp(str)
+
+    spark.udf.register("get_phones_emails", get_phones_emails _)
+    spark.udf.register("to_millis_timestamp", tmp _)
+
+    def hu(id: String, name: String): String = CompanyIndexUtils.getHuman(id, name).toJson()
+
+    spark.udf.register("get_human", hu _)
+
+    def hol(id: String, `type`: String, name: String): String = holder(id, `type`, name).toJson()
+
+    spark.udf.register("get_holder", hol _)
+
+    import com.winhc.bigdata.spark.utils.CompanyIndexUtils.company_score_weight
+
+    spark.udf.register("get_company_score_weight", company_score_weight _)
+
+    def get_category(code: String, index: String): String = {
+      val c = category.category_code.getOrElse(code, category(null, null, null, null, null, null))
+      index match {
+        case "1" => c.cate_first_code
+        case "2" => c.cate_second_code
+        case "3" => c.cate_third_code
+        case _ => null
+      }
+    }
+
+    spark.udf.register("get_category", get_category _)
+    spark.udf.register("get_amount", RegCapitalAmount.getAmount _)
+  }
+
+  private val target_tab = "winhc_eci_dev.tmp_xjk_v8_index"
+
+  def reindex(): Unit = {
+
+    sql(
+      s"""
+         |DROP TABLE IF EXISTS $target_tab
+         |""".stripMargin)
+
+
+    sql(
+      s"""
+         |--- CREATE TABLE IF NOT EXISTS $target_tab as
+         |SELECT  t3.id
+         |        ,t3.name as cname
+         |        ,t3.new_cid
+         |---        ,t3.name_alias
+         |        ,t3.history_name
+         |        ,cast(t3.legal_entity_id as string) legal_entity_id
+         |        ,cast(t3.legal_entity_type as string) legal_entity_type
+         |        ,t3.legal_entity_name
+         |        ,t3.holder
+         |        ,t4.staff
+         |        ,t3.province_code
+         |---        ,t3.province_name
+         |        ,t3.city_code
+         |---        ,t3.city_name
+         |        ,t3.county_code
+         |---        ,t3.county_name
+         |        ,t3.estiblish_time
+         |---        ,t3.category_code
+         |        ,get_category(t3.category_code,'1') as category_first_code
+         |        ,get_category(t3.category_code,'2') as category_second_code
+         |        ,get_category(t3.category_code,'3') as category_third_code
+         |        ,t3.reg_status
+         |        ,t3.reg_status_std
+         |        ,t3.company_type
+         |        ,t3.credit_code
+         |        ,t3.reg_capital
+         |        ,get_amount(t3.reg_capital) as reg_capital_amount
+         |        ,t3.reg_location
+         |        ,t3.phones
+         |        ,t3.emails
+         |        ,t3.geo
+         |        ,t3.logo
+         |        ,t3.reg_number
+         |        ,t3.company_score_weight
+         |FROM    (
+         |            SELECT  t1.id
+         |                    ,t1.name
+         |                    ,t1.new_cid
+         |                    ,t1.name_alias
+         |                    ,t1.history_name
+         |                    ,t1.legal_entity_id
+         |                    ,t1.legal_entity_type
+         |                    ,t1.legal_entity_name
+         |                    ,t2.holder
+         |                    ,t1.province_code
+         |                    ,t1.province_name
+         |                    ,t1.city_code
+         |                    ,t1.city_name
+         |                    ,t1.county_code
+         |                    ,t1.county_name
+         |                    ,t1.estiblish_time
+         |                    ,t1.category_code
+         |                    ,t1.category_first
+         |                    ,t1.category_second
+         |                    ,t1.category_third
+         |                    ,t1.reg_status
+         |                    ,t1.reg_status_std
+         |                    ,t1.company_type
+         |                    ,t1.credit_code
+         |                    ,t1.reg_capital
+         |                    ,t1.reg_capital_amount
+         |                    ,t1.reg_location
+         |                    ,t1.phones
+         |                    ,t1.emails
+         |                    ,t1.geo
+         |                    ,t1.logo
+         |                    ,t1.reg_number
+         |                    ,t1.company_score_weight
+         |            FROM    (
+         |                        SELECT  companny_id AS id
+         |                                ,get_company(name) AS name
+         |                                ,cid as new_cid
+         |                                ,name_alias
+         |                                ,get_history_name(name,history_names) AS history_name
+         |                                ,legal_entity_id
+         |                                ,legal_entity_type
+         |                                ,legal_entity_name
+         |                                ,get_province_code(area_code) AS province_code
+         |                                ,get_province_name_pro(SUBSTRING(area_code,0,2)) AS province_name
+         |                                ,get_city_code(area_code) AS city_code
+         |                                ,get_city_name(area_code) AS city_name
+         |                                ,get_county_code(area_code) AS county_code
+         |                                ,get_county_name(area_code) AS county_name
+         |                                ,to_millis_timestamp(estiblish_time) AS estiblish_time
+         |                                ,category_code
+         |                                ,get_category_first(CAST(category_code AS STRING)) category_first
+         |                                ,get_category_second(CAST(category_code AS STRING)) category_second
+         |                                ,get_category_third(CAST(category_code AS STRING)) category_third
+         |                                ,reg_status
+         |                                ,reg_status_std
+         |                                ,company_type
+         |                                ,credit_code
+         |                                ,reg_capital
+         |                                ,reg_capital_amount
+         |                                ,reg_location
+         |                                ,get_phones_emails(phones) AS phones
+         |                                ,get_phones_emails(emails) AS emails
+         |                                ,CONCAT_WS(',',lat2,lng2) AS geo
+         |                                ,logo
+         |                                ,reg_number
+         |                                ,get_company_score_weight(reg_status,name,reg_capital_amount,company_type) as company_score_weight
+         |                        FROM    winhc_eci_dev.tmp_xf_company_id_mapping_3
+         |                        WHERE   current_cid is Null
+         |                    ) AS t1
+         |            LEFT JOIN (
+         |                          SELECT  companny_id
+         |                                  ,concat(
+         |                                      '['
+         |                                      ,concat_ws(',',collect_set(get_holder(holder_id,holder_type,holder_name)))
+         |                                      ,']'
+         |                                  ) AS holder
+         |                          FROM    winhc_eci_dev.tmp_xf_company_holder
+         |                          GROUP BY companny_id
+         |                      ) AS t2
+         |            ON      t1.id = t2.companny_id
+         |        ) AS t3
+         |LEFT JOIN (
+         |              SELECT  companny_id
+         |                      ,concat(
+         |                          '['
+         |                          ,concat_ws(',',collect_set(get_human(hid,staff_name)))
+         |                          ,']'
+         |                      ) AS staff
+         |              FROM    winhc_eci_dev.tmp_xf_company_staff
+         |              GROUP BY companny_id
+         |          ) AS t4
+         |ON      t3.id = t4.companny_id
+         |""".stripMargin)
+      .createOrReplaceTempView("xjk_tmp_all")
+
+    var tab = "xjk_tmp_all"
+
+    for (t <- Seq(
+      ("company_tm", "companny_id", "tm_name", "company_tm")
+      , ("company_app_info", "companny_id", "filter_name", "app_info")
+      , ("company_icp", "companny_id", "web_name", "icp")
+    )) {
+      getTab(t._1, t._3)
+      tab = addField(tab, t._1, t._2, t._3, t._4)
+    }
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab as
+         |select * from $tab
+         |""".stripMargin)
+  }
+
+  def getTab(tab: String, name: String): Unit = {
+    sql(
+      s"""
+         | SELECT  companny_id
+         |         ,concat_ws(',',collect_set($name)) AS $name
+         | FROM    winhc_eci_dev.tmp_xf_$tab
+         | where $name is not null and trim($name) <> ''
+         | GROUP BY companny_id
+         |""".stripMargin)
+      .createTempView("xjk_tmp_" + tab + name)
+  }
+
+  def addField(org_tab: String, add_tab: String, con_field: String, add_field: String, alias_name: String): String = {
+    sql(
+      s"""
+         |select t1.*,t2.$add_field as $alias_name from
+         |(select * from $org_tab) as t1
+         |left join
+         |(select * from xjk_tmp_$add_tab$add_field) as t2
+         |on
+         |t1.id = t2.$con_field
+         |""".stripMargin)
+      .createTempView(s"${org_tab}_add_$add_field")
+    s"${org_tab}_add_$add_field"
+  }
+}
+
+object ReIndex {
+
+
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    ReIndex(spark, "winhc_eci_dev").reindex()
+    spark.stop()
+  }
+
+}