|
@@ -2,12 +2,12 @@ package com.winhc.bigdata.spark.jobs.chance
|
|
|
|
|
|
import com.winhc.bigdata.spark.config.EsConfig
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
|
import org.apache.spark.internal.Logging
|
|
|
import org.apache.spark.sql.functions.col
|
|
|
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
|
|
|
-import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
|
|
|
|
|
|
import scala.annotation.meta.getter
|
|
|
import scala.collection.mutable
|
|
@@ -48,7 +48,7 @@ object ChangeExtract {
|
|
|
|
|
|
val target_eci_change_extract = "ads_change_extract"
|
|
|
|
|
|
- def calc(): Unit = {
|
|
|
+ def calc(isCopy: Boolean = true): Unit = {
|
|
|
val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
|
|
|
val ds = inc_ds.replace("-", "")
|
|
@@ -60,96 +60,136 @@ object ChangeExtract {
|
|
|
|
|
|
val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
|
|
|
- val handle = getHandleClazz(tableName, cols)
|
|
|
+ val handle = ReflectUtils.getClazz[CompanyChangeHandle](s"com.winhc.bigdata.spark.jobs.chance.table.$tableName", cols)
|
|
|
+ // val handle = getHandleClazz(tableName, cols)
|
|
|
|
|
|
val update_time = BaseUtil.nowDate()
|
|
|
|
|
|
- sql(
|
|
|
- s"""
|
|
|
- |SELECT cid,current_cid as new_cid
|
|
|
- |FROM ${project}.inc_ods_company
|
|
|
- |WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
- |AND cid IS NOT NULL
|
|
|
- |AND current_cid IS NOT NULL
|
|
|
- |GROUP BY cid,current_cid
|
|
|
- |""".stripMargin).createOrReplaceTempView("mapping")
|
|
|
-
|
|
|
- val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
|
|
|
+ var df: DataFrame = null
|
|
|
+ isCopy match {
|
|
|
+ case true => {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT cid,current_cid as new_cid
|
|
|
+ |FROM ${project}.inc_ods_company
|
|
|
+ |WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
+ |AND cid IS NOT NULL
|
|
|
+ |AND current_cid IS NOT NULL
|
|
|
+ |GROUP BY cid,current_cid
|
|
|
+ |""".stripMargin).createOrReplaceTempView("mapping")
|
|
|
+
|
|
|
+ val cid = getColumns(s"$project.ads_$tableName").filter(f => f.equals("cid") || f.equals("new_cid")).max
|
|
|
+ df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
|
|
|
+ |FROM $project.inc_ads_$tableName
|
|
|
+ |WHERE ds = $ds
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
+ |FROM (
|
|
|
+ | SELECT DISTINCT ${primaryKey}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds = $ds
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
|
|
|
+ | ,${intersectCols.diff(Set("rowkey", "cid", "new_cid")).mkString(",")}
|
|
|
+ | ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
|
|
|
+ | ,tmp.$cid as cid
|
|
|
+ | ,c
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | LEFT JOIN mapping mm
|
|
|
+ | ON tmp.$cid = mm.cid
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ case false => {
|
|
|
+ df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
|
|
|
+ |FROM $project.inc_ads_$tableName
|
|
|
+ |WHERE ds = $ds
|
|
|
+ |UNION ALL
|
|
|
+ |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
+ |FROM (
|
|
|
+ | SELECT DISTINCT ${primaryKey}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds = $ds
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT tmp.*
|
|
|
+ | FROM (
|
|
|
+ | SELECT a.*
|
|
|
+ | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
|
|
|
+ | FROM (
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.ads_$tableName
|
|
|
+ | WHERE ds = $lastDs_ads_all
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT ${intersectCols.mkString(",")}
|
|
|
+ | FROM $project.inc_ads_$tableName
|
|
|
+ | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
+ | ) AS a
|
|
|
+ | ) AS tmp
|
|
|
+ | WHERE tmp.c = 1
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- val rdd = sql(
|
|
|
- s"""
|
|
|
- |SELECT $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
|
|
|
- |FROM $project.inc_ads_$tableName
|
|
|
- |WHERE ds = $ds
|
|
|
- |UNION ALL
|
|
|
- |SELECT t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
|
|
|
- |FROM (
|
|
|
- | SELECT DISTINCT ${primaryKey}
|
|
|
- | FROM $project.inc_ads_$tableName
|
|
|
- | WHERE ds = $ds
|
|
|
- | ) AS t1
|
|
|
- |JOIN (
|
|
|
- | SELECT concat_ws('_',coalesce(mm.new_cid,tmp.$cid),split(rowkey, '_')[1]) AS rowkey
|
|
|
- | ,${intersectCols.diff(Seq("rowkey", "cid", "new_cid")).mkString(",")}
|
|
|
- | ,coalesce(mm.new_cid,tmp.$cid) AS new_cid
|
|
|
- | ,tmp.$cid as cid
|
|
|
- | ,c
|
|
|
- | FROM (
|
|
|
- | SELECT a.*
|
|
|
- | ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY update_time DESC) c
|
|
|
- | FROM (
|
|
|
- | SELECT ${intersectCols.mkString(",")}
|
|
|
- | FROM $project.ads_$tableName
|
|
|
- | WHERE ds = $lastDs_ads_all
|
|
|
- | UNION ALL
|
|
|
- | SELECT ${intersectCols.mkString(",")}
|
|
|
- | FROM $project.inc_ads_$tableName
|
|
|
- | WHERE ds > $lastDs_ads_all and ds < $ds
|
|
|
- | ) AS a
|
|
|
- | ) AS tmp
|
|
|
- | LEFT JOIN mapping mm
|
|
|
- | ON tmp.$cid = mm.cid
|
|
|
- | WHERE tmp.c = 1
|
|
|
- | ) AS t2
|
|
|
- |ON t1.${primaryKey} = t2.${primaryKey}
|
|
|
- |""".stripMargin)
|
|
|
- .select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
- .rdd.map(r => {
|
|
|
- (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
- }).groupByKey()
|
|
|
- .map(x => {
|
|
|
- val rowkey = x._1
|
|
|
- val map_list = x._2
|
|
|
- // try {
|
|
|
- if (map_list.size == 1) {
|
|
|
- val res = handle.handle(rowkey, null, map_list.head)
|
|
|
- Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
- } else {
|
|
|
- if (map_list.size > 2) {
|
|
|
- logInfo("list.size greater than 2! rowkey:" + rowkey)
|
|
|
- }
|
|
|
- val m = getDoubleDataMap(map_list)
|
|
|
|
|
|
- val new_map = m._1
|
|
|
- val old_map = m._2
|
|
|
- val res = handle.handle(rowkey, old_map, new_map)
|
|
|
- if (res == null) {
|
|
|
- null
|
|
|
- } else {
|
|
|
+ val rdd =
|
|
|
+ df.select(all_cols.map(column => col(column).cast("string")): _*)
|
|
|
+ .rdd.map(r => {
|
|
|
+ (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
|
|
|
+ }).groupByKey()
|
|
|
+ .map(x => {
|
|
|
+ val rowkey = x._1
|
|
|
+ val map_list = x._2
|
|
|
+ // try {
|
|
|
+ if (map_list.size == 1) {
|
|
|
+ val res = handle.handle(rowkey, null, map_list.head)
|
|
|
Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
+ } else {
|
|
|
+ if (map_list.size > 2) {
|
|
|
+ logInfo("list.size > 2! rowkey:" + rowkey)
|
|
|
+ }
|
|
|
+ val m = getDoubleDataMap(map_list)
|
|
|
+
|
|
|
+ val new_map = m._1
|
|
|
+ val old_map = m._2
|
|
|
+ val res = handle.handle(rowkey, old_map, new_map)
|
|
|
+ if (res == null) {
|
|
|
+ null
|
|
|
+ } else {
|
|
|
+ Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- /* } catch {
|
|
|
- case e: Exception => {
|
|
|
- logError(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
- logError(e.getMessage, e)
|
|
|
- println(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
- }
|
|
|
- null
|
|
|
- }*/
|
|
|
- }).filter(_ != null)
|
|
|
-
|
|
|
- // (123_abc,insert,{a->b},all,新增某土地公示,1(1.一般变更,2.风险变更))
|
|
|
+ /* } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logError(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ println(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
+ }
|
|
|
+ null
|
|
|
+ }*/
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
val schema = StructType(Array(
|
|
|
StructField("rowkey", StringType), //表数据主建
|
|
|
StructField("cid", StringType), //公司id
|
|
@@ -195,10 +235,13 @@ object ChangeExtract {
|
|
|
// winhc_eci_dev company_own_tax rowkey 20200729 tax_balance,tax_category,tax_num
|
|
|
|
|
|
|
|
|
+ //winhc_eci_dev company_equity_info id 20200730 reg_number false
|
|
|
+
|
|
|
+
|
|
|
// winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
- if (args.length == 5) {
|
|
|
- val Array(project, tableName, rowkey, inc_ds, pf) = args
|
|
|
+ if (args.length >= 5 && args.length <= 6) {
|
|
|
+ val Array(project, tableName, rowkey, inc_ds, pf, isCopy) = if (args.length == 6) args else args :+ "true"
|
|
|
val config = EsConfig.getEsConfigMap ++ mutable.Map(
|
|
|
"spark.hadoop.odps.project.name" -> project,
|
|
|
"spark.hadoop.odps.spark.local.partition.amt" -> "10"
|
|
@@ -206,7 +249,7 @@ object ChangeExtract {
|
|
|
val spark = SparkUtils.InitEnv("ChangeExtract", config)
|
|
|
|
|
|
|
|
|
- ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
|
|
|
+ ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
|
|
|
spark.stop()
|
|
|
} else {
|
|
|
val ds = args(0)
|
|
@@ -231,8 +274,9 @@ object ChangeExtract {
|
|
|
|""".stripMargin.replace("20200717", ds)
|
|
|
for (r <- rows.split("\r\n")) {
|
|
|
if (StringUtils.isNotEmpty(r)) {
|
|
|
- val Array(tmp, tableName, rowkey, inc_ds, pf) = r.split(" ")
|
|
|
- ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
|
|
|
+ val as = r.split(" ")
|
|
|
+ val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
|
|
|
+ ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
|
|
|
}
|
|
|
}
|
|
|
spark.stop()
|