ソースを参照

feat: 使用可插拔的强过滤模式

许家凯 3 年 前
コミット
12b6a691c5

+ 21 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.spark.ng.jobs
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/1/14 19:21
+ */
+case class args_company_job(tableName: String, md5_fields: Seq[String], is_super_filter: Boolean = true)
+
+object args_company_job {
+  val tab_md5_fields = Seq(
+    args_company_job("company_app_info", Seq("name"))
+    , args_company_job("company_staff", Seq("staff_name"))
+    , args_company_job("company_holder", Seq("holder_name"))
+    , args_company_job("company_icp", Seq("liscense", "domain"))
+    , args_company_job("company_tm", Seq("reg_no"))
+  )
+
+  def get_args_company_job(tn: String): args_company_job = {
+    tab_md5_fields.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+}

+ 11 - 20
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -17,7 +17,7 @@ import scala.collection.mutable
 case class general_handler(s: SparkSession,
                            project: String, //表所在工程名
                            md5_fields: Seq[String],
-                           tn: String
+                           tn: String, is_super_filter: Boolean = true //开启强过滤模式
                           ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
@@ -54,7 +54,13 @@ case class general_handler(s: SparkSession,
        |company_id <> '0'
        |AND company_id is not null
        |AND trim(company_id) <> ''
-       |AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}
+       |${
+      is_super_filter match {
+        //每一个去重字段都不允许为空
+        case true => s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
+        case false => ""
+      }
+    }
        |AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
        |""".stripMargin
 
@@ -178,27 +184,12 @@ case class general_handler(s: SparkSession,
 
 object general_handler {
 
-  val tab_md5_fields_map = Map(
-    "company_app_info" -> Seq("name")
-    , "company_staff" -> Seq("staff_name")
-    , "company_holder" -> Seq("holder_name")
-    , "company_icp" -> Seq("liscense", "domain")
-    , "company_tm" -> Seq("reg_no")
-  )
-
-
   def run(s: SparkSession, project: String, tn: String, md5_fields: Seq[String] = null): Unit = {
-    var f = tab_md5_fields_map.getOrElse(tn, null)
-    if (md5_fields != null) {
-      f = md5_fields
-    }
-    if (f == null) {
-      println("md5 fields is empty !!!")
-      sys.exit(-2)
-    }
+    val args_job = args_company_job.get_args_company_job(tn)
+
     tn match {
       case _ => {
-        general_handler(s = s, project = project, tn = tn, md5_fields = f).calc()
+        general_handler(s = s, project = project, tn = tn, md5_fields = args_job.md5_fields, is_super_filter = args_job.is_super_filter).calc()
       }
     }
   }