|
@@ -59,6 +59,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|
|
|
|
val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
|
|
val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
|
|
|
|
|
|
|
|
+ //增量ods和增量ads最后一个分区相等,跳出
|
|
|
|
+ if (lastDsIncOds.equals(lastDsIncAds)) {
|
|
|
|
+ println("inc_ods equals inc_ads ds ,please delete last ds !!!")
|
|
|
|
+ runDs = lastDsIncOds
|
|
|
|
+ //sys.exit(-1)
|
|
|
|
+ }
|
|
|
|
+
|
|
println(
|
|
println(
|
|
s"""
|
|
s"""
|
|
|cols_md5:$cols_md5
|
|
|cols_md5:$cols_md5
|
|
@@ -69,12 +76,6 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
|firstDsIncOds:$firstDsIncOds
|
|
|firstDsIncOds:$firstDsIncOds
|
|
|""".stripMargin)
|
|
|""".stripMargin)
|
|
|
|
|
|
- //增量ods和增量ads最后一个分区相等,跳出
|
|
|
|
- if (lastDsIncOds.equals(lastDsIncAds)) {
|
|
|
|
- println("inc_ods equals inc_ads ds ,please delete last ds !!!")
|
|
|
|
- sys.exit(-1)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
//table字段
|
|
//table字段
|
|
val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
|
|
val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
|
|
!s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
|
|
!s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
|
|
@@ -143,7 +144,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
|
|
| SELECT new_cid AS cid
|
|
| SELECT new_cid AS cid
|
|
| ,${sublistTableFieldName.mkString(",")}
|
|
| ,${sublistTableFieldName.mkString(",")}
|
|
| FROM ${inc_ads_company_tb_list}
|
|
| FROM ${inc_ads_company_tb_list}
|
|
- | WHERE ds >= ${runDs}
|
|
|
|
|
|
+ | WHERE ds > ${remainDs}
|
|
| UNION ALL
|
|
| UNION ALL
|
|
| SELECT new_cid AS cid
|
|
| SELECT new_cid AS cid
|
|
| ,${sublistTableFieldName.mkString(",")}
|
|
| ,${sublistTableFieldName.mkString(",")}
|