Prechádzať zdrojové kódy

fix: 通用摘要计算程序兼容inc不存在

许家凯 4 rokov pred
rodič
commit
32fcd5667c

+ 32 - 21
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryBySingle.scala

@@ -39,26 +39,37 @@ object CompanySummaryBySingle extends Logging {
       }
 
       val cols = Seq("rowkey", "ds")
-      val df = sql(
-        s"""
-           |SELECT  split(rowkey,'_')[0] AS cid
-           |        ,COUNT(1) as ${tableName}
-           |FROM    (
-           |            SELECT  t1.*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
-           |            FROM    (
-           |                        SELECT  ${cols.mkString(",")}
-           |                        FROM    winhc_eci_dev.ads_company_change
-           |                        WHERE   ds = ${lastDs}
-           |                        UNION ALL
-           |                        SELECT  ${cols.mkString(",")}
-           |                        FROM    winhc_eci_dev.inc_ads_company_change
-           |                        WHERE   ds > $lastDs
-           |                    ) AS t1
-           |        ) AS t2
-           |WHERE   t2.num = 1
-           |GROUP BY split(rowkey,'_')[0]
-           |""".stripMargin)
+      val df =
+
+        sql(
+          s"""
+             |SELECT  split(rowkey,'_')[0] AS cid
+             |        ,COUNT(1) as ${tableName}
+             |FROM    (
+             |            SELECT  t1.*
+             |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
+             |            FROM    (
+             |                        SELECT  ${cols.mkString(",")}
+             |                        FROM    winhc_eci_dev.ads_${tableName}
+             |                        WHERE   ds = ${lastDs}
+             |                        ${
+            spark.catalog.tableExists(s"winhc_eci_dev.inc_ads_${tableName}") match {
+              case true => {
+                s"""
+                   |                        UNION ALL
+                   |                        SELECT  ${cols.mkString(",")}
+                   |                        FROM    winhc_eci_dev.inc_ads_${tableName}
+                   |                        WHERE   ds > $lastDs
+                   |""".stripMargin
+              }
+              case _ => ""
+            }
+          }
+             |                    ) AS t1
+             |        ) AS t2
+             |WHERE   t2.num = 1
+             |GROUP BY split(rowkey,'_')[0]
+             |""".stripMargin)
 
       if (out != null) {
         df.createTempView("xjk_tmp_summary_test")
@@ -95,7 +106,7 @@ object CompanySummaryBySingle extends Logging {
 
     val spark = SparkUtils.InitEnv("CompanySummaryBySingle", config)
 
-    CompanySummaryBySingleUtil(spark, "winhc_eci_dev").all(tableName,out = "xjk_tmp_summ")
+    CompanySummaryBySingleUtil(spark, "winhc_eci_dev").all(tableName, out = "xjk_tmp_summ")
 
     spark.stop()
   }