|
@@ -17,7 +17,8 @@ import scala.collection.mutable
|
|
|
case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
|
|
|
project: String, //表所在工程名
|
|
|
tableName: String, //表名(不加前后辍)
|
|
|
- dupliCols: Seq[String] // 去重列
|
|
|
+ dupliCols: Seq[String], // 去重列
|
|
|
+ updateCol: String = "update_time" //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
|
|
|
) extends LoggingUtils with CompanyMapping{
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
@@ -100,7 +101,7 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
|
|
|
| ,new_cid
|
|
|
| ,cid
|
|
|
| ,${columns.mkString(",")}
|
|
|
- | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY NVL($updateCol,update_time) DESC ) num
|
|
|
| FROM (
|
|
|
| SELECT "0" AS flag
|
|
|
| ,a.new_cid
|
|
@@ -154,7 +155,7 @@ object CompanyIncrForCidWithoutMD5Utils {
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
- val Array(project, tableName, dupliCols, flag) = args
|
|
|
+ val Array(project, tableName, dupliCols, flag, _) = args
|
|
|
println(
|
|
|
s"""
|
|
|
|project: $project
|
|
@@ -162,17 +163,22 @@ object CompanyIncrForCidWithoutMD5Utils {
|
|
|
|dupliCols: $dupliCols
|
|
|
|flag: $flag
|
|
|
|""".stripMargin)
|
|
|
- if (args.length != 4) {
|
|
|
- println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识 !!!")
|
|
|
+ if (args.length < 4) {
|
|
|
+ println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识, [updateCol:排序列]!!!")
|
|
|
sys.exit(-1)
|
|
|
}
|
|
|
+ //ROW_NUMBER窗口函数的ORDER BY字段,默认(可以不传参数)为update_time
|
|
|
+ var updateCol: String = "update_time"
|
|
|
+ if(args.length == 5){
|
|
|
+ updateCol = args(4)
|
|
|
+ }
|
|
|
val config = mutable.Map(
|
|
|
"spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
"spark.hadoop.odps.spark.local.partition.amt" -> "1"
|
|
|
)
|
|
|
val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
flag match {
|
|
|
- case "cid" => CompanyIncrForCidWithoutMD5Utils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
|
|
|
+ case "cid" => CompanyIncrForCidWithoutMD5Utils(spark, project, tableName, (dupliCols.split(",").toSeq), updateCol).calc()
|
|
|
}
|
|
|
spark.stop()
|
|
|
}
|