Browse Source

feat: base_company_mapping加字段

许家凯 4 years ago
parent
commit
cd660bbd57

+ 172 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyBaseMapping.scala

@@ -0,0 +1,172 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/10/30 10:08
+ */
+case class CompanyBaseMapping(s: SparkSession,
+                              project: String
+                             ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val last_tab = s"$project.base_company_mapping_v1"
+  private val target_tab = s"$project.base_company_mapping"
+
+  def init(): Unit = {
+    if (!spark.catalog.tableExists(target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS $target_tab (
+           |  `cid` STRING COMMENT 'cid',
+           |  `cname` STRING COMMENT 'cname',
+           |  `new_cid` STRING COMMENT 'new_cid',
+           |  `update_time` TIMESTAMP COMMENT '更新时间',
+           |  `company_type` STRING COMMENT 'company_type',
+           |  `deleted` STRING COMMENT 'deleted'
+           |)
+           | COMMENT '公司全量数据cid到最新new_cid表,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+           |PARTITIONED BY (
+           |  `ds` STRING COMMENT '分区')
+           |LIFECYCLE 15
+           |""".stripMargin)
+      val last_tab_ds = getLastPartitionsOrElse(last_tab, null)
+      if (last_tab_ds == null) {
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='20200604')
+             |SELECT  cid
+             |        ,name AS cname
+             |        ,coalesce(current_cid,cid) AS new_cid
+             |        ,now() as update_time
+             |FROM    winhc_eci_dev.ads_company
+             |WHERE   ds = '20200604'
+             |""".stripMargin)
+      } else {
+        val cols = getColumns(target_tab).diff(Seq("ds"))
+        val last_tab_cols_set = getColumns(last_tab).toSet
+
+        val company_cols = getColumns("winhc_eci_dev.ads_company").intersect(getColumns("winhc_eci_dev.inc_ads_company"))
+        val ads_com_ds = getLastPartitionsOrElse("winhc_eci_dev.ads_company", "0")
+        sql(
+          s"""
+             |SELECT  *
+             |FROM    (
+             |            SELECT  *
+             |                    ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY ds DESC ) AS num
+             |            FROM    (
+             |                        SELECT  ${company_cols.mkString(",")}
+             |                        FROM    winhc_eci_dev.ads_company
+             |                        WHERE   ds = '$ads_com_ds'
+             |                        UNION ALL
+             |                        SELECT  ${company_cols.mkString(",")}
+             |                        FROM    winhc_eci_dev.inc_ads_company
+             |                        WHERE   ds > '$ads_com_ds'
+             |                    ) AS t1
+             |        ) AS t2
+             |WHERE   t2.num = 1
+             |""".stripMargin)
+          .createTempView("tmp_all_company")
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='$last_tab_ds')
+             |SELECT  ${cols.map(f => if (last_tab_cols_set.contains(f)) s"t1.$f as $f" else s"t2.$f as $f").mkString(",")}
+             |FROM    (
+             |            SELECT  ${cols.map(f => if (last_tab_cols_set.contains(f)) s"$f" else s"null as $f").mkString(",")}
+             |            FROM    $last_tab
+             |            WHERE   ds = '$last_tab_ds'
+             |        ) AS t1
+             |LEFT JOIN (
+             |              SELECT  *
+             |              FROM    tmp_all_company
+             |          ) AS t2
+             |ON      t1.cid = t2.cid
+             |""".stripMargin)
+      }
+    }
+  }
+
+  def inc(ds: String): Unit = {
+    val other_cols = getColumns(target_tab).diff(Seq("ds","cid","cname","new_cid","update_time"))
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='$ds')
+         |SELECT  t1.cid AS cid
+         |        ,t1.cname AS cname
+         |        ,COALESCE(t2.new_cid,t1.new_cid) AS new_cid
+         |        ,COALESCE(t2.update_time,t1.update_time) AS update_time
+         |        ,${other_cols.map(f=>s"t2.$f").mkString(",")}
+         |FROM    (
+         |            SELECT  ${getColumns(target_tab).diff(Seq("ds")).mkString(",")}
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS c
+         |                        FROM    (
+         |                                    SELECT  cid,cname,new_cid,update_time,${other_cols.mkString(",")}
+         |                                    FROM    $target_tab
+         |                                    WHERE   ds = '${BaseUtil.atDaysAfter(-1, ds)}'
+         |                                    UNION ALL
+         |                                    SELECT  cid
+         |                                            ,name AS cname
+         |                                            ,coalesce(current_cid,cid) AS new_cid
+         |                                            ,now() as update_time
+         |                                            ,${other_cols.mkString(",")}
+         |                                    FROM    winhc_eci_dev.inc_ods_company
+         |                                    WHERE   ds = '$ds'
+         |                                    AND     cid IS NOT NULL
+         |                                )
+         |                    ) AS all_mapping
+         |            WHERE   all_mapping.c = 1
+         |        ) AS t1
+         |LEFT JOIN (
+         |              SELECT  cid
+         |                      ,current_cid AS new_cid
+         |                      ,now() AS update_time
+         |                      ,${other_cols.mkString(",")}
+         |              FROM    winhc_eci_dev.inc_ods_company
+         |              WHERE   ds = '$ds'
+         |              AND     cid IS NOT NULL
+         |              AND     current_cid IS NOT NULL
+         |              group by cid,current_cid
+         |          ) AS t2
+         |ON      t1.new_cid = t2.cid
+         |""".stripMargin)
+  }
+
+  def calc(): Unit = {
+    init()
+    val lastDs = getLastPartitionsOrElse(target_tab, null)
+    val dss = getPartitions("winhc_eci_dev.inc_ods_company").filter(_ > lastDs)
+
+    println("计算分区:" + dss.mkString(","))
+
+    for (ds <- dss) {
+      inc(ds)
+    }
+  }
+}
+
+object CompanyBaseMapping {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("inc_company_mapping", config)
+    CompanyBaseMapping(s=spark,"winhc_eci_dev").calc
+
+    spark.stop()
+  }
+
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_mapping.scala

@@ -15,6 +15,7 @@ import scala.collection.mutable
  * @Date: 2020/7/30 10:34
  * @Description:
  */
+@deprecated
 object inc_company_mapping {
 
   case class IncCompanyMappingUtil(s: SparkSession,