|
@@ -0,0 +1,163 @@
|
|
|
+package com.winhc.bigdata.spark.ng.jobs
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * @date: 2020/12/21 10:13
|
|
|
+ * ng空间下企业数据通用处理程序
|
|
|
+ */
|
|
|
+case class general_handler(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ md5_fields: Seq[String],
|
|
|
+ tn: String
|
|
|
+ ) extends LoggingUtils with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ val ods_tab = s"$project.ods_$tn"
|
|
|
+ val inc_ods_tab = s"$project.inc_ods_$tn"
|
|
|
+
|
|
|
+ val ads_tab = s"$project.ads_$tn"
|
|
|
+ val inc_ads_tab = s"$project.inc_ads_$tn"
|
|
|
+
|
|
|
+ verify()
|
|
|
+ reg_udf()
|
|
|
+
|
|
|
+ private def verify(): Unit = {
|
|
|
+ val catalog = spark.catalog
|
|
|
+ if (!(catalog.tableExists(ods_tab) && catalog.tableExists(inc_ods_tab) && catalog.tableExists(inc_ads_tab) && catalog.tableExists(ads_tab))) {
|
|
|
+ println("table is not exists !!!")
|
|
|
+ sys.exit(-1)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private def reg_udf(): Unit = {
|
|
|
+ cleanup()
|
|
|
+ }
|
|
|
+
|
|
|
+ val rowkey_f = md5_fields.isEmpty match {
|
|
|
+ case true => s"company_id"
|
|
|
+ case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
|
|
|
+ }
|
|
|
+
|
|
|
+ def all(): Unit = {
|
|
|
+ val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
|
|
|
+
|
|
|
+ if (inc_ods_ds == null) {
|
|
|
+ println("ds is null !!!")
|
|
|
+ return
|
|
|
+ }
|
|
|
+ val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$inc_ods_ds')
|
|
|
+ |SELECT ${getColumns(ads_tab).diff(Seq("ds")).mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT $rowkey_f as rowkey
|
|
|
+ | ,${inter_cols.mkString(",")}
|
|
|
+ | FROM $ods_tab
|
|
|
+ | WHERE ds > 0
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT $rowkey_f as rowkey
|
|
|
+ | ,${inter_cols.mkString(",")}
|
|
|
+ | FROM $inc_ods_tab
|
|
|
+ | WHERE ds > 0
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+ def inc(): Unit = {
|
|
|
+ val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, getLastPartitionsOrElse(ods_tab, null))
|
|
|
+ var ads_ds = getLastPartitionsOrElse(ads_tab, null)
|
|
|
+ if (ads_ds.equals(inc_ods_ds)) {
|
|
|
+ ads_ds = getSecondLastPartitionOrElse(ads_tab, null)
|
|
|
+ if (ads_ds == null) {
|
|
|
+ all()
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
|
|
|
+
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_ds')
|
|
|
+ |SELECT ${getColumns(inc_ads_tab).diff(Seq("ds")).mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT $rowkey_f as rowkey
|
|
|
+ | ,${inter_cols.mkString(",")}
|
|
|
+ | FROM $inc_ods_tab
|
|
|
+ | WHERE ds > $ads_ds
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ val ads_ds = getLastPartitionsOrElse(ads_tab, null)
|
|
|
+ if (ads_ds == null)
|
|
|
+ all()
|
|
|
+ else
|
|
|
+ inc()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+object general_handler {
|
|
|
+
|
|
|
+ val tab_md5_fields_map = Map(
|
|
|
+ "company_app_info" -> Seq("name")
|
|
|
+ , "company_staff" -> Seq("staff_name")
|
|
|
+ , "company_holder" -> Seq("holder_name")
|
|
|
+ , "company_icp" -> Seq("liscense", "domain")
|
|
|
+ , "company_tm" -> Seq("reg_no")
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ def run(s: SparkSession, project: String, tn: String, md5_fields: Seq[String] = null): Unit = {
|
|
|
+ var f = tab_md5_fields_map.getOrElse(tn, null)
|
|
|
+ if (md5_fields != null) {
|
|
|
+ f = md5_fields
|
|
|
+ }
|
|
|
+ if (md5_fields == null) {
|
|
|
+ println("md5 fields is empty !!!")
|
|
|
+ sys.exit(-2)
|
|
|
+ }
|
|
|
+ tn match {
|
|
|
+ case _ => {
|
|
|
+ general_handler(s = s, project = project, tn = tn, md5_fields = f).calc()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val project = "winhc_ng"
|
|
|
+ val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> project,
|
|
|
+ "spark.debug.maxToStringFields" -> "200",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
+ )
|
|
|
+ val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+ general_handler.run(spark, project, "company_app_info", Seq("name"))
|
|
|
+ spark.stop()
|
|
|
+
|
|
|
+ }
|
|
|
+}
|