Quellcode durchsuchen

feat: 新空间下全量增量v8索引

许家凯 vor 4 Jahren
Ursprung
Commit
50f0d7475d
1 geänderte Dateien mit 445 neuen und 0 gelöschten Zeilen
  1. 445 0
      src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

+ 445 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -0,0 +1,445 @@
+package com.winhc.bigdata.spark.ng.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils._
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/12/10 10:07
+ */
+case class CompanyIndexJob(s: SparkSession,
+                           project: String //表所在工程名
+                          ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  reg_urd()
+
+  def reg_urd(): Unit = {
+    cleanup()
+    //    code2Name()
+    area_code()
+
+    import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+    def history_name(cname: String, history_names: String): String = CompanyIndexUtils.getHistoryName(cname, history_names).toJson()
+
+    spark.udf.register("get_history_name", history_name _)
+
+    def company(name: String): String = CompanyIndexUtils.getCompanyName(name).toJson()
+
+    spark.udf.register("get_company", company _)
+
+    def get_phones_emails(str: String): String = {
+      if (StringUtils.isEmpty(str)) {
+        null
+      } else
+        str.split("\t;\t").map(_.replaceAll("\t;", "")).mkString(",")
+    }
+
+    def tmp(str: String): String = DateUtils.toMillisTimestamp(str)
+
+    spark.udf.register("get_phones_emails", get_phones_emails _)
+    spark.udf.register("to_millis_timestamp", tmp _)
+
+    def hu(id: String, name: String): String = CompanyIndexUtils.getHuman(id, name).toJson()
+
+    spark.udf.register("get_human", hu _)
+
+    spark.udf.register("get_holder", CompanyIndexUtils.get_holder _)
+
+    import com.winhc.bigdata.spark.utils.CompanyIndexUtils.company_score_weight
+
+    spark.udf.register("get_company_score_weight", company_score_weight _)
+
+    /*  def get_category(code: String, index: String): String = {
+        val c = category.category_code.getOrElse(code, category(null, null, null, null, null, null))
+        index match {
+          case "1" => c.cate_first_code
+          case "2" => c.cate_second_code
+          case "3" => c.cate_third_code
+          case _ => null
+        }
+      }
+
+      spark.udf.register("get_category", get_category _)*/
+    spark.udf.register("get_amount", RegCapitalAmount.getAmount _)
+  }
+
+  private val target_tab = "winhc_ng.out_company_v8_index"
+  private val target_tab_simp = "winhc_ng.out_company_v8_index_simp"
+  val target_tab_simp_cols = Seq("id"
+    , "cname"
+    , "history_name"
+    , "new_cid"
+    , "company_tm"
+    , "app_info"
+    , "company_score_weight")
+
+  private val org_prefix = "ods"
+
+  def run(): Unit = {
+    if (!spark.catalog.tableExists(target_tab) || getLastPartitionsOrElse(target_tab, null) == null)
+      all()
+    else
+      inc()
+  }
+
+  private def all(): Unit = {
+
+    val c = get_row_number("company", Seq("company_id"))
+    val h = get_row_number("company_holder", Seq("rowkey"))
+    val s = get_row_number("company_staff", Seq("rowkey"))
+    val t = get_row_number("company_tm", Seq("rowkey"))
+    val a = get_row_number("company_app_info", Seq("rowkey"))
+    val i = get_row_number("company_icp", Seq("rowkey"))
+
+    val default_ds = getLastPartitionsOrElse(s"winhc_ng.${org_prefix}_company", "0")
+    val target_ds = getLastPartitionsOrElse(s"winhc_ng.inc_${org_prefix}_company", default_ds)
+
+    calc(c, h, s, t, a, i, target_ds)
+  }
+
+  private def inc(): Unit = {
+    val target_ds = getLastPartitionsOrElse(target_tab, null)
+    val insert_ds = getLastPartitionsOrElse(s"winhc_ng.inc_${org_prefix}_company", null)
+
+    if (target_ds == null) {
+      print("target tab is not exists !!!")
+      sys.exit(-1)
+    }
+
+    sql(
+      s"""
+         |SELECT  DISTINCT company_id
+         |FROM    (
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company
+         |            WHERE   ds > '$target_ds'
+         |            UNION ALL
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company_holder
+         |            WHERE   ds > '$target_ds'
+         |            UNION ALL
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company_staff
+         |            WHERE   ds > '$target_ds'
+         |            UNION ALL
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company_tm
+         |            WHERE   ds > '$target_ds'
+         |            UNION ALL
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company_icp
+         |            WHERE   ds > '$target_ds'
+         |            UNION ALL
+         |            SELECT  DISTINCT company_id
+         |            FROM    winhc_ng.inc_${org_prefix}_company_app_info
+         |            WHERE   ds > '$target_ds'
+         |        )
+         |""".stripMargin)
+      .cache()
+      .createTempView("alt_all_company_id")
+
+
+    def get_alt_tab(org_t: String, partition_by: Seq[String]): String = {
+      val t1 = get_row_number(org_t, partition_by)
+      sql(
+        s"""
+           |SELECT  t1.*
+           |FROM    $t1 AS t1
+           |JOIN    alt_all_company_id AS t2
+           |ON      t1.company_id = t2.company_id
+           |""".stripMargin)
+        .createTempView(s"alter_t1_$org_t")
+      s"alter_t1_$org_t"
+    }
+
+    val c = get_alt_tab("company", Seq("company_id"))
+    val h = get_alt_tab("company_holder", Seq("rowkey"))
+    val s = get_alt_tab("company_staff", Seq("rowkey"))
+    val t = get_alt_tab("company_tm", Seq("rowkey"))
+    val a = get_alt_tab("company_app_info", Seq("rowkey"))
+    val i = get_alt_tab("company_icp", Seq("rowkey"))
+
+    calc(c, h, s, t, a, i, insert_ds)
+  }
+
+  private def calc(company_tab: String
+                   , holder_tab: String
+                   , staff_tab: String
+                   , company_tm_tab: String
+                   , company_app_info_tab: String
+                   , company_icp_tab: String
+                   , target_ds: String
+                  ): Unit = {
+
+    val base_company = get_company_tab(company_tab)
+    val base_holder = get_company_holder_tab(holder_tab)
+    val base_staff = get_staff_tab(staff_tab)
+
+    val base_tm = getTab(company_tm_tab, "tm_name", "company_tm")
+    val base_app_info = getTab(company_app_info_tab, "filter_name", "app_info")
+    val base_icp = getTab(company_icp_tab, "web_name", "icp")
+
+    var all_tab = addField(base_company, base_holder, "company_id", "holder", "holder")
+
+    all_tab = addField(all_tab, base_staff, "company_id", "staff", "staff")
+
+    all_tab = addField(all_tab, base_tm, "company_id", "company_tm", "company_tm")
+
+    all_tab = addField(all_tab, base_app_info, "company_id", "app_info", "app_info")
+
+    all_tab = addField(all_tab, base_icp, "company_id", "icp", "icp")
+
+    val df = sql(
+      s"""
+         |SELECT  id
+         |        ,name as cname
+         |        ,new_cid
+         |        ,history_name
+         |        ,legal_entity_id
+         |        ,legal_entity_type
+         |        ,legal_entity_name
+         |        ,holder
+         |        ,staff
+         |        ,province_code
+         |        ,city_code
+         |        ,county_code
+         |        ,estiblish_time
+         |        ,cate_first_code
+         |        ,cate_second_code
+         |        ,cate_third_code
+         |        ,reg_status
+         |        ,reg_status_std
+         |        ,company_type
+         |        ,credit_code
+         |        ,reg_capital
+         |        ,reg_capital_amount
+         |        ,reg_location
+         |        ,company_tm
+         |        ,icp
+         |        ,app_info
+         |        ,phones
+         |        ,emails
+         |        ,geo
+         |        ,logo
+         |        ,reg_number
+         |        ,company_score_weight
+         |        ,deleted
+         |from    $all_tab
+         |""".stripMargin)
+
+    val out_f = df.schema.map(_.name).seq
+    df.createTempView("out_company_index")
+
+    tab_verify(out_f, target_tab)
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$target_ds')
+         |SELECT ${getColumns(target_tab).diff(Seq("ds")).mkString(",")}
+         |FROM
+         |    out_company_index
+         |""".stripMargin)
+
+
+    tab_verify(target_tab_simp_cols, target_tab_simp)
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab_simp PARTITION(ds='$target_ds')
+         |SELECT ${getColumns(target_tab_simp).diff(Seq("ds")).mkString(",")}
+         |FROM
+         |    out_company_index
+         |""".stripMargin)
+
+  }
+
+  private def tab_verify(out_f: Seq[String], tab: String, ignore_f: Seq[String] = Seq("ds")): Unit = {
+    val f = out_f.diff(ignore_f)
+    if (!spark.catalog.tableExists(tab)) {
+      create_target_tab(tab, f)
+    }
+
+    val tab_f = getColumns(tab).diff(ignore_f)
+    if (tab_f.diff(f).nonEmpty || f.diff(tab_f).nonEmpty) {
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS $tab
+           |""".stripMargin)
+      create_target_tab(tab, f)
+    }
+  }
+
+  private def get_company_tab(company_tab: String): String = {
+    val alias_tab = "alias_base_" + company_tab
+    sql(
+      s"""
+         |SELECT  company_id AS id
+         |        ,get_company(name) AS name
+         |        ,new_cid
+         |        ,name_alias
+         |        ,get_history_name(name,history_names) AS history_name
+         |        ,cast(legal_entity_id as string) as legal_entity_id
+         |        ,cast(legal_entity_type as string) as legal_entity_type
+         |        ,legal_entity_name
+         |        ,province_code
+         |        ,city_code
+         |        ,county_code
+         |        ,to_millis_timestamp(estiblish_time) AS estiblish_time
+         |        ,cate_first_code
+         |        ,cate_second_code
+         |        ,cate_third_code
+         |        ,reg_status
+         |        ,reg_status_std
+         |        ,company_type
+         |        ,credit_code
+         |        ,reg_capital
+         |        ,get_amount(reg_capital) as reg_capital_amount
+         |        ,reg_location
+         |        ,get_phones_emails(phones) AS phones
+         |        ,get_phones_emails(emails) AS emails
+         |        ,CONCAT_WS(',',lat,lng) AS geo
+         |        ,logo
+         |        ,reg_number
+         |        ,get_company_score_weight(reg_status,name,reg_capital_amount,company_type) as company_score_weight
+         |        ,deleted
+         |FROM    $company_tab
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+  private def get_company_holder_tab(holder_tab: String): String = {
+    val alias_tab = "alias_base_" + holder_tab
+    sql(
+      s"""
+         |SELECT  company_id
+         |        ,concat(
+         |            '['
+         |            ,concat_ws(',',collect_set(get_holder(holder_id,holder_type,holder_name)))
+         |            ,']'
+         |        ) AS holder
+         |FROM    (
+         |        select  *
+         |        from    $holder_tab
+         |        where   deleted = 0
+         |)
+         |GROUP BY company_id
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+  private def get_staff_tab(staff_tab: String): String = {
+    val alias_tab = "alias_base_" + staff_tab
+    sql(
+      s"""
+         |SELECT  company_id
+         |        ,concat(
+         |            '['
+         |            ,concat_ws(',',collect_set(get_human(hid,staff_name)))
+         |            ,']'
+         |        ) AS staff
+         |FROM    (
+         |        select *
+         |        from   $staff_tab
+         |        where  deleted = 0
+         |)
+         |GROUP BY company_id
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+  private def getTab(tab: String, name: String, alias_name: String): String = {
+    val alias_tab = "alias_base_" + tab
+    sql(
+      s"""
+         | SELECT  company_id
+         |         ,concat_ws(',',collect_set($name)) AS $alias_name
+         | FROM    $tab
+         | where $name is not null and trim($name) <> '' and deleted = 0
+         | GROUP BY company_id
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+
+  private def addField(org_tab: String, add_tab: String, con_field: String, add_field: String, alias_name: String): String = {
+    sql(
+      s"""
+         |select t1.*,t2.$add_field as $alias_name from
+         |(select * from $org_tab) as t1
+         |left join
+         |(select * from $add_tab) as t2
+         |on
+         |t1.id = t2.$con_field
+         |""".stripMargin)
+      .createTempView(s"${org_tab}_add_$add_field")
+    s"${org_tab}_add_$add_field"
+  }
+
+  private def create_target_tab(tab: String, f: Seq[String]): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $tab
+         |(
+         |    ${f.map(f => s"$f  STRING COMMENT ''").mkString(",")}
+         |)
+         |COMMENT 'v8版本索引输出表'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |""".stripMargin)
+  }
+
+
+  private def get_row_number(tab: String, partition_by: Seq[String], order_by: String = "ds"): String = {
+    val org_tab = s"winhc_ng.${org_prefix}_$tab"
+    val inc_org_tab = s"winhc_ng.inc_${org_prefix}_$tab"
+    val cols = getColumns(org_tab).intersect(getColumns(inc_org_tab)).map(f => s"cast($f as string) as $f").mkString(",")
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY ${partition_by.mkString(",")} ORDER BY $order_by DESC ) AS num
+         |            FROM    (
+         |                        SELECT  ${cols}
+         |                        FROM    $org_tab
+         |                        WHERE   ds > 0
+         |                        UNION ALL
+         |                        SELECT  ${cols}
+         |                        FROM    $inc_org_tab
+         |                        WHERE   ds > 0
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
+      .createTempView(tab)
+    tab
+  }
+}
+
+
+object CompanyIndexJob {
+
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyIndexJob(spark, "winhc_ng").run()
+    spark.stop()
+  }
+
+}