|
@@ -0,0 +1,137 @@
|
|
|
+package com.winhc.bigdata.spark.jobs.chance
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.{cleanup, isWindows}
|
|
|
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils.getCurrentMap
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/7/7 11:25
|
|
|
+ * @Description: 筛选出数据的具体变更
|
|
|
+ */
|
|
|
+object ChangeExtract {
|
|
|
+
|
|
|
+ case class ChangeExtractUtils(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ tableName: String, //表名(不加前后辍)
|
|
|
+ primaryKey: String, //此维度主键
|
|
|
+ inc_ds: String, //需要计算的分区
|
|
|
+ primaryFields: Seq[String], //主要字段,该字段任意一个不同 则认为发生变化
|
|
|
+ label: (Map[String, String], Map[String, String]) => String // 去重列
|
|
|
+ ) extends LoggingUtils {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
+
|
|
|
+ val ds = inc_ds.replace("-", "")
|
|
|
+
|
|
|
+ val all_cols = primaryKey +: cols :+ "change_flag"
|
|
|
+
|
|
|
+ val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
+
|
|
|
+ val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
|
|
|
+
|
|
|
+ val rdd = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT $primaryKey,${cols.mkString(",")},'0' as change_flag
|
|
|
+ |FROM $project.inc_ads_$tableName
|
|
|
+ |WHERE ds = $ds
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT t2.$primaryKey,${cols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
+ |FROM (
|
|
|
+ | SELECT DISTINCT ${primaryKey}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds = $ds
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT tmp.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
+ |""".stripMargin)
|
|
|
+ .select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
+ .rdd.map(r => {
|
|
|
+ (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
+ }).groupByKey()
|
|
|
+ .map(x => {
|
|
|
+ val rowkey = x._1
|
|
|
+ val map_list = x._2
|
|
|
+ if (map_list.size == 1) {
|
|
|
+ Row(rowkey, "insert", map_list.head, "新增")
|
|
|
+ } else {
|
|
|
+ if (map_list.size > 2) {
|
|
|
+ logger.error("list.size greater than 2! rowkey:" + rowkey)
|
|
|
+ }
|
|
|
+ val m = getCurrentMap(map_list)
|
|
|
+
|
|
|
+ val new_map = m._1
|
|
|
+ val old_map = m._2
|
|
|
+ val tmp = cols.map(f => {
|
|
|
+ (f, cleanup(new_map(f)).equals(cleanup(old_map(f))))
|
|
|
+ })
|
|
|
+ val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
|
|
|
+
|
|
|
+ if (eq) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ Row(rowkey, "update", new_map, s"更新字段:${tmp.filter(!_._2).map(_._1).mkString(",")}")
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
+ val schema = StructType(Array(
|
|
|
+ StructField("rowkey", StringType),
|
|
|
+ StructField("type", StringType),
|
|
|
+ StructField("data", MapType(StringType, StringType)),
|
|
|
+ StructField("label", StringType)))
|
|
|
+
|
|
|
+ val df = spark.createDataFrame(rdd, schema) //
|
|
|
+
|
|
|
+ df.write
|
|
|
+ .mode(if (isWindows) "append" else "overwrite")
|
|
|
+ .insertInto(s"${project}.tmp_xjk_icp_change")
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val Array(project, tableName, rowkey, inc_ds, pf) = args
|
|
|
+
|
|
|
+ def label(m1: Map[String, String], m2: Map[String, String]): String = {
|
|
|
+ ""
|
|
|
+ }
|
|
|
+
|
|
|
+ val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> project,
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10"
|
|
|
+ )
|
|
|
+
|
|
|
+ val spark = SparkUtils.InitEnv("ChangeExtract", config)
|
|
|
+
|
|
|
+ ChangeExtractUtils(spark, project, tableName, rowkey, inc_ds, pf.split(","), label).calc
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+
|
|
|
+}
|