Forráskód Böngészése

Merge remote-tracking branch 'origin/master'

xufei 3 éve
szülő
commit
59f9e2d52c

+ 8 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -4,13 +4,19 @@ package com.winhc.bigdata.spark.ng.jobs
  * @author: XuJiakai
  * @date: 2021/1/14 19:21
  */
-case class args_company_job(tableName: String, md5_fields: Seq[String], is_super_filter: Boolean = true)
+case class args_company_job(tableName: String
+                            , md5_fields: Seq[String] //该维度的核心字段
+                            , rowkey_udf: String = null //用于自定义rowkey的结构,会覆盖md5_fields的默认rowkey
+                            , is_super_filter: Boolean = true //是否不允许主键都为空
+                            , where: String = "" // ods层数据的过滤条件
+                            , id_user_defined_rowkey: Boolean = false //是否读取ods层的用户自定义rowkey
+                           )
 
 object args_company_job {
   val tab_md5_fields = Seq(
     args_company_job("company_app_info", Seq("name"))
     , args_company_job("company_staff", Seq("staff_name"))
-    , args_company_job("company_holder", Seq("holder_name"))
+    , args_company_job("company_holder", Seq("holder_name"), rowkey_udf = "concat_ws('_',company_id,md5(cleanup(concat_ws('',holder_switch_rowkey(holder_type,holder_id,holder_name)))))", where = " holder_name is not null and trim(holder_name) <> '' AND ( not (holder_type = 2 AND length(holder_id) <> 32)) ")
     , args_company_job("company_icp", Seq("liscense", "domain"))
     , args_company_job("company_tm", Seq("reg_no"))
   )

+ 64 - 16
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -4,6 +4,7 @@ import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -16,11 +17,16 @@ import scala.collection.mutable
  */
 case class general_handler(s: SparkSession,
                            project: String, //表所在工程名
-                           md5_fields: Seq[String],
-                           tn: String, is_super_filter: Boolean = true //开启强过滤模式
+                           job_args: args_company_job
                           ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
+  val md5_fields: Seq[String] = job_args.md5_fields
+  val tn: String = job_args.tableName
+  val is_super_filter: Boolean = job_args.is_super_filter
+  val where: String = job_args.where
+
+
   val ods_tab = s"$project.ods_$tn"
   val inc_ods_tab = s"$project.inc_ods_$tn"
 
@@ -40,21 +46,50 @@ case class general_handler(s: SparkSession,
 
   private def reg_udf(): Unit = {
     cleanup()
-  }
 
-  val rowkey_f = md5_fields.isEmpty match {
-    case true => s"company_id"
-    case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
+    def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
+      if (StringUtils.isEmpty(old_rowkey)) {
+        new_rowkey
+      } else {
+        old_rowkey
+      }
+    }
+
+    def company_holder_rowkey(holder_type: Long, holder_id: String, holder_name: String): String = {
+      if (2 == holder_type) {
+        holder_id
+      } else {
+        holder_name
+      }
+    }
+
+    spark.udf.register("holder_switch_rowkey", company_holder_rowkey _)
+
   }
-  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
+  private def get_rowkey_udf(): String = {
+    if (StringUtils.isNotEmpty(job_args.rowkey_udf)) {
+      return job_args.rowkey_udf
+    }
 
-  val clean_up =
+    md5_fields.isEmpty match {
+      case true => s"company_id"
+      case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
+    }
+  }
+
+  private def get_clean_up(): String = {
     s"""
        |company_id <> '0'
        |AND company_id is not null
        |AND trim(company_id) <> ''
        |${
+      StringUtils.isEmpty(where) match {
+        case true => ""
+        case false => s"AND ($where)"
+      }
+    }
+       |${
       is_super_filter match {
         //每一个去重字段都不允许为空
         case true => s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
@@ -63,13 +98,26 @@ case class general_handler(s: SparkSession,
     }
        |AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
        |""".stripMargin
+  }
 
-  val up = inter_cols.contains("update_time") match {
-    case true => " DESC,update_time"
-    case false => ""
+  private def get_partition_order_by(): String = {
+    inter_cols.contains("update_time") match {
+      case true => " ds DESC,update_time DESC "
+      case false => " ds DESC "
+    }
   }
 
 
+  val rowkey_f = get_rowkey_udf()
+  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
+
+
+  val clean_up = get_clean_up()
+
+
+  val up = get_partition_order_by()
+
+
   def all(): Unit = {
     val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
 
@@ -92,7 +140,7 @@ case class general_handler(s: SparkSession,
       }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up) AS num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
@@ -157,7 +205,7 @@ case class general_handler(s: SparkSession,
       }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ${up} ) AS num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
@@ -184,12 +232,12 @@ case class general_handler(s: SparkSession,
 
 object general_handler {
 
-  def run(s: SparkSession, project: String, tn: String, md5_fields: Seq[String] = null): Unit = {
+  def run(s: SparkSession, project: String, tn: String): Unit = {
     val args_job = args_company_job.get_args_company_job(tn)
 
     tn match {
       case _ => {
-        general_handler(s = s, project = project, tn = tn, md5_fields = args_job.md5_fields, is_super_filter = args_job.is_super_filter).calc()
+        general_handler(s = s, project = project, args_job).calc()
       }
     }
   }
@@ -203,7 +251,7 @@ object general_handler {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":" + tn, config)
-    general_handler.run(spark, project, tn, null)
+    general_handler.run(spark, project, tn)
     spark.stop()
 
   }