Browse Source

feat: 全量数据提变更

许家凯 3 years ago
parent
commit
1d620a141b

+ 142 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractAll.scala

@@ -0,0 +1,142 @@
+package com.winhc.bigdata.spark.ng.change
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.BaseUtil.{getYesterday, isWindows}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/30 15:02
+ */
+case class NgChangeExtractAll(s: SparkSession,
+                              project: String, //表所在工程名
+                              tableName: String, //表名(不加前后辍)
+                              primaryKey: String, //此维度主键
+                              primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
+                              , newlyRegister: Boolean = false
+                             ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+  val target_tab = "bds_change_extract_all"
+
+  init()
+
+  private def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS `$project`.`$target_tab` (
+         |  `rowkey` STRING COMMENT '该行数据主键',
+         |  `company_id` STRING comment '公司id',
+         |  `table_name` STRING comment 'hbase表名',
+         |  `update_type` STRING comment '数据展示层面的变更类型,insert、update、deleted、other',
+         |  `old_data` MAP<STRING,STRING> COMMENT '原数据',
+         |  `new_data` MAP<STRING,STRING> COMMENT '新数据',
+         |  `change_fields` STRING comment '哪些字段发生变更',
+         |  `biz_date` STRING comment '数据变更的时间',
+         |  `update_time` STRING comment '当前计算时间')
+         | COMMENT '变更动态 all'
+         |PARTITIONED BY (
+         |  `ds` STRING COMMENT '时间分区',
+         |  `tn` STRING COMMENT '表名分区')
+         |""".stripMargin)
+  }
+
+  def all(): Unit = {
+
+    val cols = primaryFields.filter(!_.equals(primaryKey)).seq
+
+    val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName", cols, false)
+
+    val sql_str = generateAllTabSql(tableName, project)
+    val rdd = sql(sql_str._1)
+      .select(sql_str._2.map(column => col(column).cast("string")): _*)
+      .rdd
+      .map(r => {
+        sql_str._2.map(f => (f, r.getAs[String](f))).toMap
+      })
+      .map(x => {
+        val rowkey = x(sql_str._3)
+        val res = handle.handle(rowkey, null, x)
+        if (res == null) {
+          null
+        } else {
+          val rowkey = res._1
+          val company_id = res._2
+          val update_type = res._3
+          val old_map = res._4
+          val new_map = res._5
+          val change_fields = res._6
+          val biz_date = res._7
+          val update_time = BaseUtil.nowDate()
+          Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, biz_date, update_time)
+        }
+      }).filter(_ != null)
+    val schema = StructType(Array(
+      StructField("rowkey", StringType), //表数据主建
+      StructField("company_id", StringType), //公司id
+      StructField("table_name", StringType), //表名
+      StructField("update_type", StringType), // 变更类型 insert update
+      StructField("old_data", MapType(StringType, StringType)), //变更前数据
+      StructField("new_data", MapType(StringType, StringType)), //变更后数据
+      StructField("change_fields", StringType), //如果是更新 则显示更新字段
+      StructField("biz_date", StringType), //业务时间
+      StructField("update_time", StringType) //处理时间
+    ))
+
+    spark.createDataFrame(rdd, schema)
+      .createOrReplaceTempView(s"tmp_change_extract_view_$tableName")
+
+    val ds = getYesterday()
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName')
+         |SELECT *
+         |FROM
+         |    tmp_change_extract_view_$tableName
+         |WHERE
+         |    update_type = 'insert'
+         |""".stripMargin)
+  }
+}
+
+object NgChangeExtractAll {
+  def main(args: Array[String]): Unit = {
+    val Array(tableName) = args
+    if (args.size != 1) {
+      println("please set tableName ds.")
+      sys.exit(-1)
+    }
+    println(s"table : $tableName")
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("NgChangeExtract", config)
+
+    val seq = Seq("company", "company_holder", "company_staff").toSet
+
+    var start = NgChangeExtractArgs.startArgs.filter(r => !seq.contains(r.tableName))
+    if (!tableName.equals("all")) {
+      val set = tableName.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      val primaryFields: Seq[String] = if (StringUtils.isNotEmpty(e.primaryFields)) e.primaryFields.split(",") else Seq.empty[String]
+      NgChangeExtractAll(spark, e.project, e.tableName, e.primaryKey, primaryFields, newlyRegister = e.newlyRegister).all()
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+    spark.stop()
+  }
+}