Ver código fonte

fix: 摘要增强版增量计算问题

许家凯 4 anos atrás
pai
commit
0d33d7814f

+ 12 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

@@ -19,6 +19,7 @@ case class CompanySummaryPro(s: SparkSession,
                              project: String, //表所在工程名
                              tableName: String, //表名(不加前辍)
                              cidField: String, // 公司id fieldName,例如:split(rowkey,'_')
+                             distinctField: String = "rowkey", //去重主键
                              groupByInfo: GroupByInfo = null, //group的其它条件
                              where: String = "", //where条件,例如:deleted = 0
                              sortField: String = "ds"
@@ -61,6 +62,7 @@ case class CompanySummaryPro(s: SparkSession,
 
   def calc(is_inc: Boolean = true, target_tab: String = ""): Unit = {
     val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+    val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
     val wh = StringUtils.isEmpty(where) match {
       case true => s""
       case false => s"AND   $where"
@@ -75,10 +77,16 @@ case class CompanySummaryPro(s: SparkSession,
              |FROM    (
              |            SELECT  DISTINCT $cidField as xjk_cid
              |            FROM    $inc_ads_table
-             |            WHERE   ds > $ads_last_ds
+             |            WHERE   ds = $inc_ads_last_ds
              |        ) id_table
              |JOIN (
-             |              SELECT  *
+             |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+             |                      ,$cidField as xjk_cid
+             |              FROM    $inc_ads_table
+             |              WHERE   ds > '$ads_last_ds'
+             |              AND     ds < '$inc_ads_last_ds'
+             |              UNION ALL
+             |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
              |                      ,$cidField as xjk_cid
              |              FROM    $ads_table
              |              WHERE   ds = '$ads_last_ds'
@@ -87,7 +95,7 @@ case class CompanySummaryPro(s: SparkSession,
              |UNION ALL
              |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
              |FROM    $inc_ads_table
-             |WHERE   ds > $ads_last_ds
+             |WHERE   ds = $inc_ads_last_ds
              |""".stripMargin)
           .createOrReplaceTempView(tmp_tab)
       }
@@ -113,7 +121,7 @@ case class CompanySummaryPro(s: SparkSession,
          |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
          |FROM    (
          |            SELECT  tmp.*
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $sortField DESC ) c
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
          |            FROM    $tmp_tab AS tmp
          |        ) tmp2
          |WHERE   tmp2.c = 1