|
@@ -0,0 +1,127 @@
|
|
|
+package com.winhc.bigdata.spark.utils
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * @date: 2020/10/22 10:25
|
|
|
+ */
|
|
|
+case class CompanyCidAndNameUtils(s: SparkSession
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ init()
|
|
|
+
|
|
|
+ private def init(): Unit = {
|
|
|
+ println("init。。。")
|
|
|
+ val last_ds = getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping", "0")
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM winhc_eci_dev.base_company_mapping
|
|
|
+ |WHERE ds = '$last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .repartition(500)
|
|
|
+ .cache()
|
|
|
+ .createOrReplaceTempView("all_company_tmp")
|
|
|
+ }
|
|
|
+
|
|
|
+ def addNewNameByCid(org_table_name: String, cidField: String, addName: String): String = {
|
|
|
+
|
|
|
+ val new_tab = replaceNewCidByCid(org_table_name, cidField, cidField)
|
|
|
+ val cols = getColumns(new_tab)
|
|
|
+
|
|
|
+ val res_tab = s"${new_tab}_add_name"
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT ${cols.map("t1."+_).mkString(",")},t2.cname as $addName
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $new_tab
|
|
|
+ | WHERE $cidField is not null
|
|
|
+ | ) AS t1
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM all_company_tmp
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.$cidField = t2.cid
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT ${cols.mkString(",")},null as $addName
|
|
|
+ |FROM $org_table_name
|
|
|
+ |WHERE $cidField IS NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView(res_tab)
|
|
|
+ res_tab
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def replaceNewCidByCid(org_table_name: String, cidField: String, replaceCidField: String): String = {
|
|
|
+ val res_tab = s"${org_table_name}_replace_cid"
|
|
|
+ val cols = getColumns(org_table_name)
|
|
|
+ val f = cols.map(s => {
|
|
|
+ if (s.equals(replaceCidField)) {
|
|
|
+ s"t2.new_cid as $s"
|
|
|
+ } else {
|
|
|
+ s"t1.$s"
|
|
|
+ }
|
|
|
+ })
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT ${f.mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $org_table_name
|
|
|
+ | WHERE $cidField IS NOT NULL
|
|
|
+ | ) AS t1
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM all_company_tmp
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.$cidField = t2.cid
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT ${cols.mkString(",")}
|
|
|
+ |FROM $org_table_name
|
|
|
+ |WHERE $cidField IS NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView(res_tab)
|
|
|
+ res_tab
|
|
|
+ }
|
|
|
+
|
|
|
+ def replaceNewNameByCid(org_table_name: String, cidField: String, replaceName: String): String ={
|
|
|
+ val new_tab = replaceNewCidByCid(org_table_name, cidField, cidField)
|
|
|
+ val cols = getColumns(new_tab)
|
|
|
+ val f = cols.map(s => {
|
|
|
+ if (s.equals(replaceName)) {
|
|
|
+ s"t2.cname as $s"
|
|
|
+ } else {
|
|
|
+ s"t1.$s"
|
|
|
+ }
|
|
|
+ })
|
|
|
+ val res_tab = s"${new_tab}_new_name"
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT ${f.mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $org_table_name
|
|
|
+ | WHERE $cidField IS NOT NULL
|
|
|
+ | ) AS t1
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM all_company_tmp
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.$cidField = t2.cid
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT ${cols.mkString(",")}
|
|
|
+ |FROM $org_table_name
|
|
|
+ |WHERE $cidField IS NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView(res_tab)
|
|
|
+ res_tab
|
|
|
+ }
|
|
|
+}
|