|
@@ -24,6 +24,9 @@ case class CompanyIncSummary(s: SparkSession,
|
|
@(transient@getter) val spark: SparkSession = s
|
|
@(transient@getter) val spark: SparkSession = s
|
|
private val f_bytes: Array[Byte] = Bytes.toBytes("F")
|
|
private val f_bytes: Array[Byte] = Bytes.toBytes("F")
|
|
private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
|
|
private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
|
|
|
|
+ val updateTimeMapping = Map(
|
|
|
|
+ "wenshu_detail_combine" -> "update_date" //文书排序时间
|
|
|
|
+ )
|
|
|
|
|
|
def calc(): Unit = {
|
|
def calc(): Unit = {
|
|
val ads_table = s"${project}.ads_$tableName" //存量ads表
|
|
val ads_table = s"${project}.ads_$tableName" //存量ads表
|
|
@@ -57,7 +60,7 @@ case class CompanyIncSummary(s: SparkSession,
|
|
|SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
|
|
|SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
|
|
|FROM (
|
|
|FROM (
|
|
| SELECT tmp.*
|
|
| SELECT tmp.*
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) c
|
|
|
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY ${dupliCols.mkString(",")} ORDER BY ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC ) c
|
|
| FROM (
|
|
| FROM (
|
|
| SELECT ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
|
|
| SELECT ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
|
|
| FROM (
|
|
| FROM (
|