Browse Source

增量cids生成

xufei 4 years ago
parent
commit
95d3848b79

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCids.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyIncrForCidUtils, CompanyIncrForCidsUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * 增量数据更新
+ * π
+ */
+
+//reg_num,full_name,cat_num,new_cid
+// winhc_eci_dev.inc_ods_company
+// winhc_eci_dev.ads_company_copyright_reg_list
+// winhc_eci_dev.inc_ods_company_copyright_reg
+// winhc_eci_dev.inc_ads_company_copyright_reg
+// winhc_eci_dev.inc_ads_company_copyright_reg_list
+object CompanyIncrForCids {
+
+  def main(args: Array[String]): Unit = {
+    val (cols, t1, t2, t3, t4, t5) = valid(args)
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyIncrForCidsUtils(spark, t1, t2, t3, t4, t5, cols).calc()
+    spark.stop()
+  }
+
+  def valid(args: Array[String]): (Seq[String], String, String, String, String, String) = {
+    println(args.toSeq.mkString(" "))
+    val Array(cols, t1, t2, t3, t4, t5) = args
+    (cols.split(",").toSeq, t1, t2, t3, t4, t5)
+  }
+}

+ 13 - 12
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -31,7 +31,7 @@ object CompanyIntellectualsScore {
     val (sourceTable, flag, time, kind, project) = valid(args)
 
     val config = mutable.Map.empty[String, String]
-    config.+=("spark.hadoop.odps.project.name"->"winhc_eci_dev")
+    config.+=("spark.hadoop.odps.project.name" -> "winhc_eci_dev")
     println(config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
@@ -57,9 +57,9 @@ object CompanyIntellectualsScore {
       sys.exit(-1)
     }
     val Array(sourceTable) = args
-    println(sourceTable.substring(sourceTable.indexOf(".")+1))
+    println(sourceTable.substring(sourceTable.indexOf(".") + 1))
 
-    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable.substring(sourceTable.indexOf(".")+1), ("", "", "", ""))
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable.substring(sourceTable.indexOf(".") + 1), ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
       println("输入表不存在!!!   ")
       sys.exit(-1)
@@ -77,7 +77,8 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
   import spark.implicits._
 
   def calc(): Unit = {
-    val targetTable = "ads_company_total_score"
+    //val targetTable = "ads_company_total_score"
+    val targetTable = sourceTable + "_score"
     val ds = BaseUtil.getPartion(sourceTable, spark)
 
     var appsql = ""
@@ -111,15 +112,15 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"t1_view")
 
-//    logger.info(
-//      s"""
-//         |- - - - - - - - - - - - - - - - - - - - - - - - -
-//         |${showString(sql(s"select * from t1_view"))}
-//         |- - - - - - - - - - - - - - - - - - - - - - - - -
-//       """.stripMargin)
+    //    logger.info(
+    //      s"""
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //         |${showString(sql(s"select * from t1_view"))}
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //       """.stripMargin)
 
-    sql(s"insert overwrite table ${targetTable} " +
-      s"partition (tb='${sourceTable.substring(sourceTable.indexOf(".")+5)}${apptab}' , ds='${ds}')  select * from t1_view")
+    sql(s"insert overwrite table ${targetTable}${apptab} " +
+      s"partition (ds='${ds}')  select * from t1_view")
   }
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {

+ 27 - 15
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -11,15 +11,15 @@ import scala.annotation.meta.getter
  * cid转换
  */
 
-case class CompanyForCidUtils(s: SparkSession, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
-    val odsTable = s"ods_$sourceTable"
-    val adsTable = s"ads_$sourceTable"
-    val companyMapping = "company_name_mapping_pro"
+    val odsTable = s"${space}.$sourceTable"
+    val adsTable = s"${space}.ads_${sourceTable.substring(4)}"
+    val companyMapping = s"${space}.company_map"
     val ds = BaseUtil.getPartion(odsTable, spark)
     //table字段
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
@@ -28,19 +28,31 @@ case class CompanyForCidUtils(s: SparkSession, sourceTable: String, cols: Seq[St
     //替换字段
     sql(
       s"""
-         |SELECT  ${columns.mkString(",a.")},
-         |     coalesce(b.new_cid,a.cid) AS new_cid
-         |FROM $odsTable a
-         |LEFT JOIN $companyMapping b
-         |ON   a.cid = b.cid
-         |WHERE a.ds = $ds and a.cid is not null
-         |""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
-      .createOrReplaceTempView(s"t2")
-
-    sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
+         |INSERT OVERWRITE TABLE ${adsTable} PARTITION(ds=${ds})
+         |SELECT  rowkey,new_cid,${columns.mkString(",")}
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY id DESC ) num
+         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |        FROM    (
+         |                SELECT
+         |                        a.*
+         |                        ,coalesce(b.new_cid,a.cid) AS new_cid
+         |                FROM    $odsTable a
+         |                LEFT JOIN $companyMapping b
+         |                ON      a.cid = b.cid
+         |                WHERE   a.ds = $ds AND a.cid IS NOT NULL
+         |                ) c
+         |        ) d
+         |WHERE   num =1
+         |""".stripMargin)
+//      .createOrReplaceTempView(s"t2")
+
+//    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
 
     //写表
-    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
+//    sql(s"insert into table ${adsTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }

+ 28 - 16
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidsUtils.scala

@@ -29,7 +29,7 @@ case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[S
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
     val disCol = cols
 
-    sql(s"select * from $odsTable where ds = $ds and cids is not null")
+    sql(s"select * from $odsTable where ds = $ds and cids is not null and trim(cids) <> '' ")
       .dropDuplicates(disCol)
       .createOrReplaceTempView("t1")
 
@@ -38,24 +38,36 @@ case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[S
     //拆平新表
     sql(
       s"""
-         |SELECT  c.*
-         |        ,coalesce(d.new_cid,c.cid) as new_cid
+         |SELECT
+         |        rowkey,new_cid,${columns.mkString(",")}
          |FROM    (
-         |            SELECT  *
-         |                    ,cid
-         |            FROM    t1 a
-         |            LATERAL VIEW explode(split(cids, ';')) b AS cid
-         |        ) c
-         |LEFT JOIN $companyMapping d
-         |ON      c.cid = d.cid
-         |""".stripMargin).dropDuplicates(disCol.:+("new_cid"))
+         |        SELECT
+         |                *
+         |                ,ROW_NUMBER() OVER (PARTITION BY id,new_cid ORDER BY - ABS(CAST(new_cid AS BIGINT )- CAST(cid AS BIGINT )) DESC ) num
+         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |        FROM    (
+         |                SELECT
+         |                        c.*
+         |                        ,coalesce(d.new_cid,c.cid) AS new_cid
+         |                FROM    (
+         |                        SELECT
+         |                                *
+         |                        FROM    t1 a
+         |                        LATERAL VIEW explode(split(cids,';')) b AS cid
+         |                        ) c
+         |                LEFT JOIN $companyMapping d
+         |                ON      c.cid = d.cid
+         |                ) e
+         |        ) f
+         |WHERE   num =1
+         |""".stripMargin)
       .createOrReplaceTempView(s"t2")
 
     //聚合新cids
     val df1 = sql(
       s"""
          |SELECT
-         |${columns.mkString(",")},x.new_cids
+         |x.new_cids,${columns.mkString(",")}
          |FROM    t1
          |LEFT JOIN (
          |              SELECT  id as new_id
@@ -68,12 +80,12 @@ case class CompanyForCidsUtils(s: SparkSession, sourceTable: String, cols: Seq[S
 
     df1.createOrReplaceTempView("t3")
 
-    //sql(s"select ${columns.mkString(",")},new_cid from t2").show(10)
-    //sql(s"select ${columns.mkString(",")},new_cids from t3").show(10)
+    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
+    sql(s"select new_cids,${columns.mkString(",")} from t3").show(10)
 
     //写表
-    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cid from t2")
-    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select ${columns.mkString(",")},new_cids from t3")
+    sql(s"insert overwrite table ${adsListTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
+    sql(s"insert overwrite table ${adsTable} partition (ds=${ds}) select new_cids,${columns.mkString(",")} from t3")
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }

+ 123 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -0,0 +1,123 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * π
+ * 增量cids转换
+ */
+
+case class CompanyIncrForCidsUtils(s: SparkSession,
+                                   inc_ods_company: String,//每日公司基本信息增量
+                                   ads_company_tb: String,//存量维度数据
+                                   inc_ods_company_tb: String,//增量维度ods数据
+                                   target_inc_ads_company_tb: String,//维度主表信息
+                                   target_inc_ads_company_tb_list: String,//维度列表信息
+                                   cols: Seq[String]) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
+
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids")
+    })
+
+    //mapping 映射关系
+    sql(
+      s"""
+         |SELECT  cid,current_cid as new_cid
+         |FROM    ${inc_ods_company}
+         |WHERE   ds >= ${firstDs}
+         |AND     cid IS NOT NULL
+         |AND     current_cid IS NOT NULL
+         |GROUP BY cid,current_cid
+         |""".stripMargin).cache().createOrReplaceTempView("mapping")
+
+    //增量打平
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    ${inc_ods_company_tb} a
+         |LATERAL VIEW explode(split(cids,';')) b AS cid
+         |WHERE   ds >= ${firstDs}
+         |AND     cids IS NOT NULL
+         |AND     trim(cids) <> ''
+         |""".stripMargin).createOrReplaceTempView("incr_tb")
+
+
+    //替换cid,去重,复制老数据
+    val df1 = sql(
+      s"""
+         |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb_list PARTITION(ds='$firstDs')
+         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
+         |        ,"0" as flag
+         |        ,CAST(new_cid as string) AS new_cid
+         |        ,${columns.mkString(",")}
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |        FROM    (
+         |                SELECT
+         |                        c.*
+         |                        ,coalesce(d.new_cid,c.cid) AS new_cid
+         |                FROM    incr_tb c
+         |                LEFT JOIN mapping d
+         |                ON      c.cid = d.cid
+         |                ) e
+         |        ) f
+         |WHERE   num =1
+         |UNION ALL
+         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
+         |        ,"1" as flag
+         |        ,CAST(new_cid as string) AS new_cid
+         |        ,${columns.mkString(",")}
+         |FROM    (
+         |            SELECT  a.new_cid
+         |                    ,${columns.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |            FROM    mapping a
+         |            JOIN    (
+         |                        SELECT  new_cid AS cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${target_inc_ads_company_tb_list}
+         |                        WHERE   ds >= ${firstDs}
+         |                        UNION ALL
+         |                        SELECT  new_cid AS cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${ads_company_tb}
+         |                        WHERE   ds >= ${firstDs}
+         |                    ) b
+         |            ON      a.cid = b.cid
+         |        ) c
+         |WHERE   num = 1
+         |""".stripMargin)
+
+
+    //主表按照id去重落库
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb PARTITION(ds='$firstDs')
+         |SELECT  ${columns.mkString(",")}
+         |FROM    (
+         |            SELECT  ${columns.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
+         |            FROM    ${inc_ods_company_tb}
+         |            WHERE   ds >= ${firstDs}
+         |            AND     cids IS NOT NULL
+         |            AND     trim(cids) <> ''
+         |        ) a
+         |WHERE   num = 1
+         |""".stripMargin)
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}