|
@@ -17,7 +17,7 @@ import scala.collection.mutable
|
|
|
case class general_handler(s: SparkSession,
|
|
|
project: String, //表所在工程名
|
|
|
md5_fields: Seq[String],
|
|
|
- tn: String
|
|
|
+ tn: String, is_super_filter: Boolean = true //开启强过滤模式
|
|
|
) extends LoggingUtils with BaseFunc {
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
@@ -54,7 +54,13 @@ case class general_handler(s: SparkSession,
|
|
|
|company_id <> '0'
|
|
|
|AND company_id is not null
|
|
|
|AND trim(company_id) <> ''
|
|
|
- |AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}
|
|
|
+ |${
|
|
|
+ is_super_filter match {
|
|
|
+ //每一个去重字段都不允许为空
|
|
|
+ case true => s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
|
|
|
+ case false => ""
|
|
|
+ }
|
|
|
+ }
|
|
|
|AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
|
|
|
|""".stripMargin
|
|
|
|
|
@@ -178,27 +184,12 @@ case class general_handler(s: SparkSession,
|
|
|
|
|
|
object general_handler {
|
|
|
|
|
|
- val tab_md5_fields_map = Map(
|
|
|
- "company_app_info" -> Seq("name")
|
|
|
- , "company_staff" -> Seq("staff_name")
|
|
|
- , "company_holder" -> Seq("holder_name")
|
|
|
- , "company_icp" -> Seq("liscense", "domain")
|
|
|
- , "company_tm" -> Seq("reg_no")
|
|
|
- )
|
|
|
-
|
|
|
-
|
|
|
def run(s: SparkSession, project: String, tn: String, md5_fields: Seq[String] = null): Unit = {
|
|
|
- var f = tab_md5_fields_map.getOrElse(tn, null)
|
|
|
- if (md5_fields != null) {
|
|
|
- f = md5_fields
|
|
|
- }
|
|
|
- if (f == null) {
|
|
|
- println("md5 fields is empty !!!")
|
|
|
- sys.exit(-2)
|
|
|
- }
|
|
|
+ val args_job = args_company_job.get_args_company_job(tn)
|
|
|
+
|
|
|
tn match {
|
|
|
case _ => {
|
|
|
- general_handler(s = s, project = project, tn = tn, md5_fields = f).calc()
|
|
|
+ general_handler(s = s, project = project, tn = tn, md5_fields = args_job.md5_fields, is_super_filter = args_job.is_super_filter).calc()
|
|
|
}
|
|
|
}
|
|
|
}
|