Jelajahi Sumber

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
许家凯 4 tahun lalu
induk
melakukan
84776f7d24

+ 21 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -20,11 +20,21 @@ object BaseUtil {
   }
 
   def getPartion(t: String, @transient spark: SparkSession) = {
-    getPartitions(t, spark).last
+    val ps = getPartitions(t, spark)
+    if (ps.length > 1) {
+      ps.last
+    } else {
+      ""
+    }
   }
 
   def getFirstPartion(t: String, @transient spark: SparkSession) = {
-    getPartitions(t, spark).head
+    val ps = getPartitions(t, spark)
+    if (ps.length > 1) {
+      ps.head
+    } else {
+      ""
+    }
   }
 
   def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {
@@ -33,4 +43,13 @@ object BaseUtil {
     c.add(Calendar.MONTH, -1 * n)
     DateFormatUtils.format(c.getTime.getTime, pattern)
   }
+
+  def atDaysAfter(n: Int, time: String, pattern: String = "yyyyMMdd"): String = {
+    import java.text.SimpleDateFormat
+    val newtime: Date = new SimpleDateFormat("yyyyMMdd").parse(time)
+    val c = Calendar.getInstance(Locale.CHINA)
+    c.setTimeInMillis(newtime.getTime)
+    c.add(Calendar.DATE, 1 * n)
+    DateFormatUtils.format(c.getTime.getTime, pattern)
+  }
 }

+ 46 - 14
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -19,15 +20,46 @@ case class CompanyIncrForCidUtils(s: SparkSession,
 
   def calc(): Unit = {
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
-    val ads_company_tb = s"${project}.ads_${tableName}"
-    val inc_ods_company_tb = s"${project}.inc_ods_$tableName"
-    val target_inc_ods_company_tb = s"${project}.inc_ads_$tableName"
+    val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
+    val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
 
 
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
-    //val lastDs = BaseUtil.getPartion("winhc_eci_dev.ads_company_icp", spark)
-    val firstDs = BaseUtil.getFirstPartion(inc_ods_company, spark)
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    println(
+      s"""
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("增量ods和增量ads最后一个分区相等,跳出")
+      sys.exit(-1)
+    }
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
@@ -38,7 +70,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       s"""
          |SELECT  cid,current_cid as new_cid
          |FROM    ${inc_ods_company}
-         |WHERE   ds >= ${firstDs}
+         |WHERE   ds >= ${runDs}
          |AND     cid IS NOT NULL
          |AND     current_cid IS NOT NULL
          |GROUP BY cid,current_cid
@@ -46,7 +78,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
 
     sql(
       s"""
-         |INSERT OVERWRITE TABLE ${target_inc_ods_company_tb} PARTITION(ds=$firstDs)
+         |INSERT OVERWRITE TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
          |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
          |        ,flag
          |        ,new_cid
@@ -63,12 +95,12 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
          |                        FROM    ${ads_company_tb}
-         |                        WHERE   ds >= ${firstDs}
+         |                        WHERE   ds >= ${remainDs}
          |                        UNION ALL
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${target_inc_ods_company_tb}
-         |                        WHERE   ds >= ${firstDs}
+         |                        FROM    ${inc_ads_company_tb}
+         |                        WHERE   ds >= ${runDs}
          |                    ) b
          |            ON      a.cid = b.cid
          |            UNION ALL
@@ -80,7 +112,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |            FROM    ${inc_ods_company_tb} a
          |            LEFT JOIN mapping b
          |            ON      a.cid = b.cid
-         |            WHERE   a.ds >= ${firstDs}
+         |            WHERE   a.ds >= ${runDs}
          |            AND     a.cid IS NOT NULL
          |        ) d
          |WHERE   num = 1
@@ -91,10 +123,10 @@ case class CompanyIncrForCidUtils(s: SparkSession,
     MaxComputer2Phoenix(
       spark,
       colsTotal,
-      target_inc_ods_company_tb,
+      inc_ads_company_tb,
       tableName,
-      firstDs,
-      Seq("new_cid","id")
+      lastDsIncOds,
+      Seq("new_cid", "id")
     ).syn()
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)

+ 48 - 16
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -24,9 +25,8 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
     val ads_company_tb_list = s"${project}.ads_$sublistTableName" //存量子表数据 用于读取表字段
     val inc_ods_company_tb = s"${project}.inc_ods_$mainTableName" //增量数据ods 主表
-    val target_inc_ads_company_tb = s"${project}.inc_ads_$mainTableName" //增量数据ads 主表
-    val target_inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
-
+    val inc_ads_company_tb = s"${project}.inc_ads_$mainTableName" //增量数据ads 主表
+    val inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
 
     val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
       !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
@@ -34,7 +34,39 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
-    val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
+    //存量表ads最新分区
+    val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
+
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+
+    val list = sql(s"show partitions $inc_ods_company").collect.toList.map(_.getString(0).split("=")(1))
+    //增量ods第一个分区
+    val firstDsIncOds = list.head
+    //增量ods最后一个分区//落表分区
+    val lastDsIncOds = list.last
+    //执行分区
+    var runDs = ""
+    //第一次run
+    if (StringUtils.isBlank(lastDsIncAds)) {
+      runDs = firstDsIncOds
+    } else { //非第一次分区时间 + 1天
+      runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
+    }
+
+    println(
+      s"""
+         |lastDsIncOds:$lastDsIncOds
+         |lastDsIncAds:$lastDsIncAds
+         |runDs:$runDs
+         |firstDsIncOds:$firstDsIncOds
+         |""".stripMargin)
+
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("增量ods和增量ads最后一个分区相等,跳出")
+      sys.exit(-1)
+    }
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
@@ -46,7 +78,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       s"""
          |SELECT  cid,current_cid as new_cid
          |FROM    ${inc_ods_company}
-         |WHERE   ds >= ${firstDs}
+         |WHERE   ds >= ${runDs}
          |AND     cid IS NOT NULL
          |AND     current_cid IS NOT NULL
          |GROUP BY cid,current_cid
@@ -58,7 +90,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |SELECT  *
          |FROM    ${inc_ods_company_tb} a
          |LATERAL VIEW explode(split(cids,';')) b AS cid
-         |WHERE   ds >= ${firstDs}
+         |WHERE   ds >= ${runDs}
          |AND     cids IS NOT NULL
          |AND     trim(cids) <> ''
          |""".stripMargin).createOrReplaceTempView("incr_tb")
@@ -67,7 +99,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //替换cid,去重,复制老数据
     val df1 = sql(
       s"""
-         |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb_list PARTITION(ds='$firstDs')
+         |INSERT OVERWRITE TABLE  $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
          |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
          |        ,"0" as flag
          |        ,CAST(new_cid as string) AS new_cid
@@ -99,13 +131,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |            JOIN    (
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${target_inc_ads_company_tb_list}
-         |                        WHERE   ds >= ${firstDs}
+         |                        FROM    ${inc_ads_company_tb_list}
+         |                        WHERE   ds >= ${runDs}
          |                        UNION ALL
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
          |                        FROM    ${ads_company_tb_list}
-         |                        WHERE   ds >= ${firstDs}
+         |                        WHERE   ds >= ${remainDs}
          |                    ) b
          |            ON      a.cid = b.cid
          |        ) c
@@ -115,13 +147,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //主表按照id去重落库
     sql(
       s"""
-         |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb PARTITION(ds='$firstDs')
+         |INSERT OVERWRITE TABLE  $inc_ads_company_tb PARTITION(ds='$lastDsIncOds')
          |SELECT  cids,${columns.mkString(",")}
          |FROM    (
          |            SELECT  cids,${columns.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
          |            FROM    ${inc_ods_company_tb}
-         |            WHERE   ds >= ${firstDs}
+         |            WHERE   ds >= ${runDs}
          |            AND     cids IS NOT NULL
          |            AND     trim(cids) <> ''
          |        ) a
@@ -133,9 +165,9 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     MaxComputer2Phoenix(
       spark,
       colsList,
-      target_inc_ads_company_tb_list,
+      inc_ads_company_tb_list,
       sublistTableName,
-      firstDs,
+      lastDsIncOds,
       Seq("new_cid","id")
     ).syn()
 
@@ -144,9 +176,9 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     MaxComputer2Phoenix(
       spark,
       cols,
-      target_inc_ads_company_tb,
+      inc_ads_company_tb,
       mainTableName,
-      firstDs,
+      lastDsIncOds,
       Seq("id")
     ).syn()