|
@@ -4,6 +4,7 @@ import com.winhc.bigdata.spark.config.EsConfig
|
|
|
import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
|
|
|
import scala.annotation.meta.getter
|
|
@@ -16,11 +17,16 @@ import scala.collection.mutable
|
|
|
*/
|
|
|
case class general_handler(s: SparkSession,
|
|
|
project: String, //表所在工程名
|
|
|
- md5_fields: Seq[String],
|
|
|
- tn: String, is_super_filter: Boolean = true //开启强过滤模式
|
|
|
+ job_args: args_company_job
|
|
|
) extends LoggingUtils with BaseFunc {
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
+ val md5_fields: Seq[String] = job_args.md5_fields
|
|
|
+ val tn: String = job_args.tableName
|
|
|
+ val is_super_filter: Boolean = job_args.is_super_filter
|
|
|
+ val where: String = job_args.where
|
|
|
+
|
|
|
+
|
|
|
val ods_tab = s"$project.ods_$tn"
|
|
|
val inc_ods_tab = s"$project.inc_ods_$tn"
|
|
|
|
|
@@ -40,21 +46,50 @@ case class general_handler(s: SparkSession,
|
|
|
|
|
|
private def reg_udf(): Unit = {
|
|
|
cleanup()
|
|
|
- }
|
|
|
|
|
|
- val rowkey_f = md5_fields.isEmpty match {
|
|
|
- case true => s"company_id"
|
|
|
- case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
|
|
|
+ def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
|
|
|
+ if (StringUtils.isEmpty(old_rowkey)) {
|
|
|
+ new_rowkey
|
|
|
+ } else {
|
|
|
+ old_rowkey
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def company_holder_rowkey(holder_type: Long, holder_id: String, holder_name: String): String = {
|
|
|
+ if (2 == holder_type) {
|
|
|
+ holder_id
|
|
|
+ } else {
|
|
|
+ holder_name
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ spark.udf.register("holder_switch_rowkey", company_holder_rowkey _)
|
|
|
+
|
|
|
}
|
|
|
- val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
|
|
|
|
|
|
+ private def get_rowkey_udf(): String = {
|
|
|
+ if (StringUtils.isNotEmpty(job_args.rowkey_udf)) {
|
|
|
+ return job_args.rowkey_udf
|
|
|
+ }
|
|
|
|
|
|
- val clean_up =
|
|
|
+ md5_fields.isEmpty match {
|
|
|
+ case true => s"company_id"
|
|
|
+ case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private def get_clean_up(): String = {
|
|
|
s"""
|
|
|
|company_id <> '0'
|
|
|
|AND company_id is not null
|
|
|
|AND trim(company_id) <> ''
|
|
|
|${
|
|
|
+ StringUtils.isEmpty(where) match {
|
|
|
+ case true => ""
|
|
|
+ case false => s"AND ($where)"
|
|
|
+ }
|
|
|
+ }
|
|
|
+ |${
|
|
|
is_super_filter match {
|
|
|
//每一个去重字段都不允许为空
|
|
|
case true => s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
|
|
@@ -63,13 +98,26 @@ case class general_handler(s: SparkSession,
|
|
|
}
|
|
|
|AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
|
|
|
|""".stripMargin
|
|
|
+ }
|
|
|
|
|
|
- val up = inter_cols.contains("update_time") match {
|
|
|
- case true => " DESC,update_time"
|
|
|
- case false => ""
|
|
|
+ private def get_partition_order_by(): String = {
|
|
|
+ inter_cols.contains("update_time") match {
|
|
|
+ case true => " ds DESC,update_time DESC "
|
|
|
+ case false => " ds DESC "
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|
|
|
+ val rowkey_f = get_rowkey_udf()
|
|
|
+ val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
|
|
|
+
|
|
|
+
|
|
|
+ val clean_up = get_clean_up()
|
|
|
+
|
|
|
+
|
|
|
+ val up = get_partition_order_by()
|
|
|
+
|
|
|
+
|
|
|
def all(): Unit = {
|
|
|
val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
|
|
|
|
|
@@ -92,7 +140,7 @@ case class general_handler(s: SparkSession,
|
|
|
}
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up) AS num
|
|
|
| FROM (
|
|
|
| SELECT $rowkey_f as rowkey
|
|
|
| ,${inter_cols.mkString(",")}
|
|
@@ -157,7 +205,7 @@ case class general_handler(s: SparkSession,
|
|
|
}
|
|
|
|FROM (
|
|
|
| SELECT *
|
|
|
- | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds${up} DESC) AS num
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ${up} ) AS num
|
|
|
| FROM (
|
|
|
| SELECT $rowkey_f as rowkey
|
|
|
| ,${inter_cols.mkString(",")}
|
|
@@ -184,12 +232,12 @@ case class general_handler(s: SparkSession,
|
|
|
|
|
|
object general_handler {
|
|
|
|
|
|
- def run(s: SparkSession, project: String, tn: String, md5_fields: Seq[String] = null): Unit = {
|
|
|
+ def run(s: SparkSession, project: String, tn: String): Unit = {
|
|
|
val args_job = args_company_job.get_args_company_job(tn)
|
|
|
|
|
|
tn match {
|
|
|
case _ => {
|
|
|
- general_handler(s = s, project = project, tn = tn, md5_fields = args_job.md5_fields, is_super_filter = args_job.is_super_filter).calc()
|
|
|
+ general_handler(s = s, project = project, args_job).calc()
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -203,7 +251,7 @@ object general_handler {
|
|
|
"spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
)
|
|
|
val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":" + tn, config)
|
|
|
- general_handler.run(spark, project, tn, null)
|
|
|
+ general_handler.run(spark, project, tn)
|
|
|
spark.stop()
|
|
|
|
|
|
}
|