瀏覽代碼

增量数据结合调度

许家凯 4 年之前
父節點
當前提交
6db783b08c

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -27,7 +27,7 @@ object CompanyForCid {
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyForCidUtils(spark, space, sourceTable, cols).calc()
+//    CompanyForCidUtils(spark, space, sourceTable, cols).calc()
     spark.stop()
   }
 

+ 14 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCid.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.jobs
 
-import com.winhc.bigdata.spark.utils.{CompanyForCidUtils, CompanyIncrForCidUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{CompanyIncrForCidUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
 import scala.collection.mutable
@@ -23,14 +23,24 @@ object CompanyIncrForCid {
       "ods_company_mortgage_info" -> ("reg_date,reg_num,amount,new_cid", "", "", "", "") //
     )
 
+
+  //winhc_eci_dev company_icp liscense,domain,new_cid
   def main(args: Array[String]): Unit = {
-    val (cols, t1, t2, t3, t4) = valid(args)
-    var config = mutable.Map(
+    //    val (cols, t1, t2, t3, t4) = valid(args)
+    val Array(project, tableName, dupliCols) = args
+    println(
+      s"""
+         |$project
+         |$tableName
+         |$dupliCols
+         |""".stripMargin)
+    val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyIncrForCidUtils(spark, t1, t2, t3, t4, cols).calc()
+    CompanyIncrForCidUtils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
+    //    CompanyIncrForCidUtils(spark, t1, t2, t3, t4, cols).calc()
     spark.stop()
   }
 

+ 7 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCids.scala

@@ -18,14 +18,19 @@ import scala.collection.mutable
 // winhc_eci_dev.inc_ads_company_copyright_reg_list
 object CompanyIncrForCids {
 
+  //winhc_eci_dev company_copyright_reg company_copyright_reg_list reg_num,full_name,cat_num,new_cid
   def main(args: Array[String]): Unit = {
-    val (cols, t1, t2, t3, t4, t5) = valid(args)
+
+    val Array(project,mainTableName,sublistTableName,dupliCols) = args
+
+//    val (cols, t1, t2, t3, t4, t5) = valid(args)
     var config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyIncrForCidsUtils(spark, t1, t2, t3, t4, t5, cols).calc()
+    CompanyIncrForCidsUtils(spark,project,mainTableName,sublistTableName,dupliCols.split(",").seq).calc()
+//    CompanyIncrForCidsUtils(spark, t1, t2, t3, t4, t5, cols).calc()
     spark.stop()
   }
 

+ 21 - 0
src/main/scala/com/winhc/bigdata/spark/test/TestFlow.scala

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.spark.test
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:32
+ * @Description:
+ */
+object TestFlow {
+  def main(args: Array[String]): Unit = {
+    val spark = SparkUtils.InitEnv("test touch flow")
+
+    val Array(test1, test2, test3) = args
+
+    println(test1, test2, test3)
+    val df1 = spark.createDataFrame(Seq((test1, test2, test3))).toDF("col0", "col1", "col2")
+    df1.show
+    spark.stop()
+  }
+}

+ 13 - 9
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -10,20 +10,24 @@ import scala.annotation.meta.getter
  * π
  * 增量cid转换
  */
-
 case class CompanyIncrForCidUtils(s: SparkSession,
-                                  inc_ods_company: String,
-                                  ads_company_tb: String,
-                                  inc_ods_company_tb: String,
-                                  target_inc_ods_company_tb: String,
-                                  cols: Seq[String]) extends LoggingUtils {
+                                  project: String, //表所在工程名
+                                  tableName: String, //表名(不加前后辍)
+                                  dupliCols: Seq[String] // 去重列
+                                 ) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_${tableName}"
+    val inc_ods_company_tb = s"${project}.inc_ods_$tableName"
+    val target_inc_ods_company_tb = s"${project}.inc_ads_$tableName"
+
+
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     //val lastDs = BaseUtil.getPartion("winhc_eci_dev.ads_company_icp", spark)
-    val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
+    val firstDs = BaseUtil.getFirstPartion(inc_ods_company, spark)
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
@@ -53,7 +57,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                    ,a.new_cid
          |                    ,b.cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |            FROM    mapping a
          |            JOIN    (
          |                        SELECT  new_cid AS cid
@@ -72,7 +76,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |                    ,coalesce(b.new_cid,a.cid) new_cid
          |                    ,a.cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |            FROM    ${inc_ods_company_tb} a
          |            LEFT JOIN mapping b
          |            ON      a.cid = b.cid

+ 22 - 10
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -12,15 +12,27 @@ import scala.annotation.meta.getter
  */
 
 case class CompanyIncrForCidsUtils(s: SparkSession,
-                                   inc_ods_company: String,//每日公司基本信息增量
-                                   ads_company_tb: String,//存量维度数据
-                                   inc_ods_company_tb: String,//增量维度ods数据
-                                   target_inc_ads_company_tb: String,//维度主表信息
-                                   target_inc_ads_company_tb_list: String,//维度列表信息
-                                   cols: Seq[String]) extends LoggingUtils {
+                                   project: String, //表所在工程名
+                                   mainTableName: String, //主表名(不加前辍)
+                                   sublistTableName: String, //子表(不加前辍)
+                                   dupliCols: Seq[String] // 去重列
+                                  ) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
+    val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
+    val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
+    val ads_company_tb_list = s"${project}.ads_$sublistTableName" //存量子表数据 用于读取表字段
+    val inc_ods_company_tb = s"${project}.inc_ods_$mainTableName" //增量数据ods 主表
+    val target_inc_ads_company_tb = s"${project}.inc_ads_$mainTableName" //增量数据ads 主表
+    val target_inc_ads_company_tb_list = s"${project}.inc_ads_$sublistTableName" //增量数据ads 子表
+
+
+    val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey")
+    }).seq
+
+
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
@@ -60,11 +72,11 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
          |        ,"0" as flag
          |        ,CAST(new_cid as string) AS new_cid
-         |        ,${columns.mkString(",")}
+         |        ,${sublistTableFieldName.mkString(",")}
          |FROM    (
          |        SELECT
          |                *
-         |                ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |                ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |        FROM    (
          |                SELECT
          |                        c.*
@@ -79,11 +91,11 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
          |        ,"1" as flag
          |        ,CAST(new_cid as string) AS new_cid
-         |        ,${columns.mkString(",")}
+         |        ,${sublistTableFieldName.mkString(",")}
          |FROM    (
          |            SELECT  a.new_cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |            FROM    mapping a
          |            JOIN    (
          |                        SELECT  new_cid AS cid

+ 50 - 0
src/main/scala/com/winhc/bigdata/spark/utils/Odps2PhoenixUtils.scala

@@ -0,0 +1,50 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 16:33
+ * @Description:
+ */
+case class Odps2PhoenixUtils(s: SparkSession,
+                             orgTableName: String,
+                             phoenixTableName: String,
+                             partitions: String,
+                             ignoreCols: Seq[String],
+                             renameCols: Map[String, String]
+                            ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  def save(): Unit = {
+    println(
+      s"""
+         |tableName : $orgTableName
+         |phoenix tableName : $phoenixTableName
+         |partitions : $partitions
+         |ignore cols: $ignoreCols
+         |renameCols : $renameCols
+         |""".stripMargin)
+
+
+    val fields = spark.table(orgTableName).schema.map(schemaType => {
+      val name = schemaType.name
+
+      if (ignoreCols.contains(name))
+        return null
+      else
+        return s"CAST($name as string) as ${renameCols.getOrElse(name, name)}"
+    }).filter(_ != null).seq
+
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+
+    sql(
+      s"""
+         |select ${fields.mkString(",")}
+         |form $orgTableName
+         |where ds = '$partitions'
+         |""".stripMargin).save2PhoenixByJDBC(phoenixTableName)
+  }
+}