|
@@ -1,6 +1,7 @@
|
|
|
package com.winhc.bigdata.spark.ng.change
|
|
|
|
|
|
import com.winhc.bigdata.spark.config.EsConfig
|
|
|
+import com.winhc.bigdata.spark.ng.change.NgChangeExtract.getDoubleDataMap
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
import com.winhc.bigdata.spark.utils._
|
|
|
import org.apache.spark.internal.Logging
|
|
@@ -14,181 +15,171 @@ import scala.collection.mutable
|
|
|
/**
|
|
|
* @Description: 筛选出数据的具体变更
|
|
|
*/
|
|
|
-object NgChangeExtract {
|
|
|
|
|
|
- //判断两个map在指定key上是否相等,如不等反回不相等字段
|
|
|
- def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
|
|
|
- val map = iterable.map(m => (m("change_flag"), m)).toMap
|
|
|
- (map.getOrElse("0", null), map.getOrElse("1", null))
|
|
|
+case class NgChangeExtract(s: SparkSession,
|
|
|
+ project: String, //表所在工程名
|
|
|
+ tableName1: String, //表名(不加前后辍)
|
|
|
+ primaryKey: String, //此维度主键
|
|
|
+ inc_ds: String, //需要计算的分区
|
|
|
+ primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
|
|
|
+ ) extends LoggingUtils with Logging {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ val target_tab = "bds_change_extract"
|
|
|
+
|
|
|
+ def init() {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |CREATE TABLE IF NOT EXISTS `$project`.`$target_tab` (
|
|
|
+ | `rowkey` STRING COMMENT '该行数据主键',
|
|
|
+ | `company_id` STRING comment '公司id',
|
|
|
+ | `table_name` STRING comment 'hbase表名',
|
|
|
+ | `update_type` STRING comment '数据展示层面的变更类型,insert、update、deleted、other',
|
|
|
+ | `old_data` MAP<STRING,STRING> COMMENT '原数据',
|
|
|
+ | `new_data` MAP<STRING,STRING> COMMENT '新数据',
|
|
|
+ | `change_fields` STRING comment '哪些字段发生变更',
|
|
|
+ | `biz_date` STRING comment '数据变更的时间',
|
|
|
+ | `update_time` STRING comment '当前计算时间')
|
|
|
+ | COMMENT '变更动态'
|
|
|
+ |PARTITIONED BY (
|
|
|
+ | `ds` STRING COMMENT '时间分区',
|
|
|
+ | `tn` STRING COMMENT '表名分区')
|
|
|
+ |""".stripMargin)
|
|
|
}
|
|
|
|
|
|
- case class ChangeExtractHandle(s: SparkSession,
|
|
|
- project: String, //表所在工程名
|
|
|
- tableName1: String, //表名(不加前后辍)
|
|
|
- primaryKey: String, //此维度主键
|
|
|
- inc_ds: String, //需要计算的分区
|
|
|
- primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
|
|
|
- ) extends LoggingUtils with Logging {
|
|
|
- @(transient@getter) val spark: SparkSession = s
|
|
|
-
|
|
|
- val target_tab = "bds_change_extract"
|
|
|
-
|
|
|
- def init() {
|
|
|
- sql(
|
|
|
- s"""
|
|
|
- |CREATE TABLE IF NOT EXISTS `$project`.`$target_tab` (
|
|
|
- | `rowkey` STRING COMMENT '该行数据主键',
|
|
|
- | `company_id` STRING comment '公司id',
|
|
|
- | `table_name` STRING comment 'hbase表名',
|
|
|
- | `update_type` STRING comment '数据展示层面的变更类型,insert、update、deleted、other',
|
|
|
- | `old_data` MAP<STRING,STRING> COMMENT '原数据',
|
|
|
- | `new_data` MAP<STRING,STRING> COMMENT '新数据',
|
|
|
- | `change_fields` STRING comment '哪些字段发生变更',
|
|
|
- | `biz_date` STRING comment '数据变更的时间',
|
|
|
- | `update_time` STRING comment '当前计算时间')
|
|
|
- | COMMENT '变更动态'
|
|
|
- |PARTITIONED BY (
|
|
|
- | `ds` STRING COMMENT '时间分区',
|
|
|
- | `tn` STRING COMMENT '表名分区')
|
|
|
- |""".stripMargin)
|
|
|
- }
|
|
|
|
|
|
-
|
|
|
- val updateTimeMapping = Map(
|
|
|
- "wenshu_detail_combine" -> "update_date", //文书排序时间
|
|
|
- "company_equity_info_list" -> "reg_date" //文书排序时间
|
|
|
+ val updateTimeMapping = Map(
|
|
|
+ "wenshu_detail_combine" -> "update_date", //文书排序时间
|
|
|
+ "company_equity_info_list" -> "reg_date" //文书排序时间
|
|
|
+ )
|
|
|
+ //不同name映射table
|
|
|
+ val tabMapping =
|
|
|
+ Map("company_holder_v2" -> "company_holder" //胜诉案件
|
|
|
)
|
|
|
- //不同name映射table
|
|
|
- val tabMapping =
|
|
|
- Map("company_holder_v2" -> "company_holder" //胜诉案件
|
|
|
- )
|
|
|
-
|
|
|
- //转换字段
|
|
|
- def trans(s: String): String = {
|
|
|
- var res = s
|
|
|
- if (tabMapping.contains(s)) {
|
|
|
- res = tabMapping(s)
|
|
|
- }
|
|
|
- res
|
|
|
- }
|
|
|
|
|
|
- def calc(): Unit = {
|
|
|
- val tableName = trans(tableName1)
|
|
|
- val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
-
|
|
|
- val ds = inc_ds.replace("-", "")
|
|
|
-
|
|
|
- val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
|
|
|
-
|
|
|
- val otherAllCols = intersectCols.filter(!primaryKey.equals(_)).toSeq
|
|
|
- val all_cols = primaryKey +: otherAllCols :+ "change_flag"
|
|
|
-
|
|
|
- val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
-
|
|
|
- val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols)
|
|
|
-
|
|
|
-
|
|
|
- val df = sql(
|
|
|
- s"""
|
|
|
- |SELECT $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
|
|
|
- |FROM $project.inc_ads_$tableName
|
|
|
- |WHERE ds = $ds
|
|
|
- |UNION ALL
|
|
|
- |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
- |FROM (
|
|
|
- | SELECT DISTINCT ${primaryKey}
|
|
|
- | FROM $project.inc_ads_$tableName
|
|
|
- | WHERE ds = $ds
|
|
|
- | ) AS t1
|
|
|
- |JOIN (
|
|
|
- | SELECT tmp.*
|
|
|
- | FROM (
|
|
|
- | SELECT a.*
|
|
|
- | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ds DESC, ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
|
|
|
- | FROM (
|
|
|
- | SELECT ${intersectCols.mkString(",")},ds
|
|
|
- | FROM $project.ads_$tableName
|
|
|
- | WHERE ds = $lastDs_ads_all
|
|
|
- | UNION ALL
|
|
|
- | SELECT ${intersectCols.mkString(",")},ds
|
|
|
- | FROM $project.inc_ads_$tableName
|
|
|
- | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
- | ) AS a
|
|
|
- | ) AS tmp
|
|
|
- | WHERE tmp.c = 1
|
|
|
- | ) AS t2
|
|
|
- |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
- |""".stripMargin)
|
|
|
-
|
|
|
-
|
|
|
- val rdd =
|
|
|
- df.select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
- .rdd.map(r => {
|
|
|
- (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
- }).groupByKey()
|
|
|
- .map(x => {
|
|
|
- val rowkey = x._1
|
|
|
- val map_list = x._2
|
|
|
- val m = getDoubleDataMap(map_list)
|
|
|
-
|
|
|
- val new_map = m._1
|
|
|
- val old_map = m._2
|
|
|
- val res = handle.handle(rowkey, old_map, new_map)
|
|
|
- if (res == null) {
|
|
|
- null
|
|
|
- } else {
|
|
|
- val rowkey = res._1
|
|
|
- val company_id = res._2
|
|
|
- val update_type = res._3
|
|
|
- val old_map = res._4
|
|
|
- val new_map = res._5
|
|
|
- val change_fields = res._6
|
|
|
- val biz_date = res._7
|
|
|
- val update_time = BaseUtil.nowDate()
|
|
|
-
|
|
|
- Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, biz_date, update_time)
|
|
|
- }
|
|
|
- }).filter(_ != null)
|
|
|
-
|
|
|
- val schema = StructType(Array(
|
|
|
- StructField("rowkey", StringType), //表数据主建
|
|
|
- StructField("company_id", StringType), //公司id
|
|
|
- StructField("table_name", StringType), //表名
|
|
|
- StructField("update_type", StringType), // 变更类型 insert update
|
|
|
- StructField("old_data", MapType(StringType, StringType)), //变更前数据
|
|
|
- StructField("new_data", MapType(StringType, StringType)), //变更后数据
|
|
|
- StructField("change_fields", StringType), //如果是更新 则显示更新字段
|
|
|
- StructField("biz_date", StringType), //业务时间
|
|
|
- StructField("update_time", StringType) //处理时间
|
|
|
- ))
|
|
|
-
|
|
|
- spark.createDataFrame(rdd, schema)
|
|
|
- .createOrReplaceTempView(s"tmp_change_extract_view_$tableName1")
|
|
|
-
|
|
|
- sql(
|
|
|
- s"""
|
|
|
- |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
|
|
|
- |SELECT *
|
|
|
- |FROM
|
|
|
- | tmp_change_extract_view_$tableName1
|
|
|
- |""".stripMargin)
|
|
|
+ //转换字段
|
|
|
+ def trans(s: String): String = {
|
|
|
+ var res = s
|
|
|
+ if (tabMapping.contains(s)) {
|
|
|
+ res = tabMapping(s)
|
|
|
}
|
|
|
+ res
|
|
|
}
|
|
|
|
|
|
+ def calc(): Unit = {
|
|
|
+ val tableName = trans(tableName1)
|
|
|
+ val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
+
|
|
|
+ val ds = inc_ds.replace("-", "")
|
|
|
+
|
|
|
+ val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
|
|
|
+
|
|
|
+ val otherAllCols = intersectCols.filter(!primaryKey.equals(_)).toSeq
|
|
|
+ val all_cols = primaryKey +: otherAllCols :+ "change_flag"
|
|
|
+
|
|
|
+ val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
+
|
|
|
+ val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols)
|
|
|
+
|
|
|
+
|
|
|
+ val df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
|
|
|
+ |FROM $project.inc_ads_$tableName
|
|
|
+ |WHERE ds = $ds
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
+ |FROM (
|
|
|
+ | SELECT DISTINCT ${primaryKey}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds = $ds
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT tmp.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ds DESC, ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")},ds
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")},ds
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+ val rdd =
|
|
|
+ df.select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
+ .rdd.map(r => {
|
|
|
+ (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
+ }).groupByKey()
|
|
|
+ .map(x => {
|
|
|
+ val rowkey = x._1
|
|
|
+ val map_list = x._2
|
|
|
+ val m = getDoubleDataMap(map_list)
|
|
|
+
|
|
|
+ val new_map = m._1
|
|
|
+ val old_map = m._2
|
|
|
+ val res = handle.handle(rowkey, old_map, new_map)
|
|
|
+ if (res == null) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ val rowkey = res._1
|
|
|
+ val company_id = res._2
|
|
|
+ val update_type = res._3
|
|
|
+ val old_map = res._4
|
|
|
+ val new_map = res._5
|
|
|
+ val change_fields = res._6
|
|
|
+ val biz_date = res._7
|
|
|
+ val update_time = BaseUtil.nowDate()
|
|
|
+
|
|
|
+ Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, biz_date, update_time)
|
|
|
+ }
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
+ val schema = StructType(Array(
|
|
|
+ StructField("rowkey", StringType), //表数据主建
|
|
|
+ StructField("company_id", StringType), //公司id
|
|
|
+ StructField("table_name", StringType), //表名
|
|
|
+ StructField("update_type", StringType), // 变更类型 insert update
|
|
|
+ StructField("old_data", MapType(StringType, StringType)), //变更前数据
|
|
|
+ StructField("new_data", MapType(StringType, StringType)), //变更后数据
|
|
|
+ StructField("change_fields", StringType), //如果是更新 则显示更新字段
|
|
|
+ StructField("biz_date", StringType), //业务时间
|
|
|
+ StructField("update_time", StringType) //处理时间
|
|
|
+ ))
|
|
|
+
|
|
|
+ spark.createDataFrame(rdd, schema)
|
|
|
+ .createOrReplaceTempView(s"tmp_change_extract_view_$tableName1")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
|
|
|
+ |SELECT *
|
|
|
+ |FROM
|
|
|
+ | tmp_change_extract_view_$tableName1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+}
|
|
|
|
|
|
- private val startArgs = Seq(
|
|
|
- Args(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted")
|
|
|
- , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
|
|
|
- , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,deleted")
|
|
|
- , Args(tableName = "company_tm", primaryFields = "status")
|
|
|
- , Args(tableName = "company_icp", primaryFields = "domain")
|
|
|
- )
|
|
|
|
|
|
|
|
|
- private case class Args(project: String = "winhc_ng"
|
|
|
- , tableName: String
|
|
|
- , primaryKey: String = "rowkey"
|
|
|
- , primaryFields: String)
|
|
|
+
|
|
|
+object NgChangeExtract {
|
|
|
+
|
|
|
+ //判断两个map在指定key上是否相等,如不等反回不相等字段
|
|
|
+ def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
|
|
|
+ val map = iterable.map(m => (m("change_flag"), m)).toMap
|
|
|
+ (map.getOrElse("0", null), map.getOrElse("1", null))
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
@@ -204,14 +195,14 @@ object NgChangeExtract {
|
|
|
)
|
|
|
val spark = SparkUtils.InitEnv("NgChangeExtract", config)
|
|
|
|
|
|
- var start = startArgs
|
|
|
+ var start = NgChangeExtractArgs.startArgs
|
|
|
if (!tableName.equals("all")) {
|
|
|
val set = tableName.split(",").toSet
|
|
|
start = start.filter(a => set.contains(a.tableName))
|
|
|
}
|
|
|
|
|
|
val a = start.map(e => (e.tableName, () => {
|
|
|
- ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc()
|
|
|
+ NgChangeExtract(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc()
|
|
|
true
|
|
|
}))
|
|
|
|