|
@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
|
|
|
|
|
|
import java.util.Date
|
|
|
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import scala.annotation.meta.getter
|
|
@@ -24,9 +25,8 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
|
|
|
val ads_company_tb_list = s"${project}.ads_$sublistTableName" //存量子表数据 用于读取表字段
|
|
|
val inc_ods_company_tb = s"${project}.inc_ods_$mainTableName" //增量数据ods 主表
|
|
|
- val target_inc_ads_company_tb = s"${project}.inc_ads_$mainTableName" //增量数据ads 主表
|
|
|
- val target_inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
|
|
|
-
|
|
|
+ val inc_ads_company_tb = s"${project}.inc_ads_$mainTableName" //增量数据ads 主表
|
|
|
+ val inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
|
|
|
|
|
|
val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
|
|
|
!s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
|
|
@@ -34,7 +34,39 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
|
|
|
println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
|
|
|
- val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
|
|
|
+ //存量表ads最新分区
|
|
|
+ val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
|
|
|
+
|
|
|
+ //增量ads最后一个分区
|
|
|
+ val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
|
|
|
+
|
|
|
+ val list = sql(s"show partitions $inc_ods_company").collect.toList.map(_.getString(0).split("=")(1))
|
|
|
+ //增量ods第一个分区
|
|
|
+ val firstDsIncOds = list.head
|
|
|
+ //增量ods最后一个分区//落表分区
|
|
|
+ val lastDsIncOds = list.last
|
|
|
+ //执行分区
|
|
|
+ var runDs = ""
|
|
|
+ //第一次run
|
|
|
+ if (StringUtils.isBlank(lastDsIncAds)) {
|
|
|
+ runDs = firstDsIncOds
|
|
|
+ } else { //非第一次分区时间 + 1天
|
|
|
+ runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
|
|
|
+ }
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |lastDsIncOds:$lastDsIncOds
|
|
|
+ |lastDsIncAds:$lastDsIncAds
|
|
|
+ |runDs:$runDs
|
|
|
+ |firstDsIncOds:$firstDsIncOds
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //增量ods和增量ads最后一个分区相等,跳出
|
|
|
+ if (lastDsIncOds.equals(lastDsIncAds)) {
|
|
|
+ println("增量ods和增量ads最后一个分区相等,跳出")
|
|
|
+ sys.exit(-1)
|
|
|
+ }
|
|
|
|
|
|
//table字段
|
|
|
val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
|
|
@@ -46,7 +78,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
s"""
|
|
|
|SELECT cid,current_cid as new_cid
|
|
|
|FROM ${inc_ods_company}
|
|
|
- |WHERE ds >= ${firstDs}
|
|
|
+ |WHERE ds >= ${runDs}
|
|
|
|AND cid IS NOT NULL
|
|
|
|AND current_cid IS NOT NULL
|
|
|
|GROUP BY cid,current_cid
|
|
@@ -58,7 +90,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
|SELECT *
|
|
|
|FROM ${inc_ods_company_tb} a
|
|
|
|LATERAL VIEW explode(split(cids,';')) b AS cid
|
|
|
- |WHERE ds >= ${firstDs}
|
|
|
+ |WHERE ds >= ${runDs}
|
|
|
|AND cids IS NOT NULL
|
|
|
|AND trim(cids) <> ''
|
|
|
|""".stripMargin).createOrReplaceTempView("incr_tb")
|
|
@@ -67,7 +99,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
//替换cid,去重,复制老数据
|
|
|
val df1 = sql(
|
|
|
s"""
|
|
|
- |INSERT OVERWRITE TABLE $target_inc_ads_company_tb_list PARTITION(ds='$firstDs')
|
|
|
+ |INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
|
|
|
|SELECT CONCAT_WS('_',new_cid,id) AS rowkey
|
|
|
| ,"0" as flag
|
|
|
| ,CAST(new_cid as string) AS new_cid
|
|
@@ -99,13 +131,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
| JOIN (
|
|
|
| SELECT new_cid AS cid
|
|
|
| ,${columns.mkString(",")}
|
|
|
- | FROM ${target_inc_ads_company_tb_list}
|
|
|
- | WHERE ds >= ${firstDs}
|
|
|
+ | FROM ${inc_ads_company_tb_list}
|
|
|
+ | WHERE ds >= ${runDs}
|
|
|
| UNION ALL
|
|
|
| SELECT new_cid AS cid
|
|
|
| ,${columns.mkString(",")}
|
|
|
| FROM ${ads_company_tb_list}
|
|
|
- | WHERE ds >= ${firstDs}
|
|
|
+ | WHERE ds >= ${remainDs}
|
|
|
| ) b
|
|
|
| ON a.cid = b.cid
|
|
|
| ) c
|
|
@@ -115,13 +147,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
//主表按照id去重落库
|
|
|
sql(
|
|
|
s"""
|
|
|
- |INSERT OVERWRITE TABLE $target_inc_ads_company_tb PARTITION(ds='$firstDs')
|
|
|
+ |INSERT OVERWRITE TABLE $inc_ads_company_tb PARTITION(ds='$lastDsIncOds')
|
|
|
|SELECT cids,${columns.mkString(",")}
|
|
|
|FROM (
|
|
|
| SELECT cids,${columns.mkString(",")}
|
|
|
| ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
|
|
|
| FROM ${inc_ods_company_tb}
|
|
|
- | WHERE ds >= ${firstDs}
|
|
|
+ | WHERE ds >= ${runDs}
|
|
|
| AND cids IS NOT NULL
|
|
|
| AND trim(cids) <> ''
|
|
|
| ) a
|
|
@@ -133,9 +165,9 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
MaxComputer2Phoenix(
|
|
|
spark,
|
|
|
colsList,
|
|
|
- target_inc_ads_company_tb_list,
|
|
|
+ inc_ads_company_tb_list,
|
|
|
sublistTableName,
|
|
|
- firstDs,
|
|
|
+ lastDsIncOds,
|
|
|
Seq("new_cid","id")
|
|
|
).syn()
|
|
|
|
|
@@ -144,9 +176,9 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
MaxComputer2Phoenix(
|
|
|
spark,
|
|
|
cols,
|
|
|
- target_inc_ads_company_tb,
|
|
|
+ inc_ads_company_tb,
|
|
|
mainTableName,
|
|
|
- firstDs,
|
|
|
+ lastDsIncOds,
|
|
|
Seq("id")
|
|
|
).syn()
|
|
|
|