|
@@ -2,8 +2,8 @@ package com.winhc.bigdata.spark.jobs.chance
|
|
|
|
|
|
import com.winhc.bigdata.spark.config.EsConfig
|
|
import com.winhc.bigdata.spark.config.EsConfig
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils.getDoubleDataMap
|
|
|
|
-import com.winhc.bigdata.spark.utils.{BaseUtil, ChangeExtractUtils, LoggingUtils, SparkUtils}
|
|
|
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
|
+import org.apache.spark.internal.Logging
|
|
import org.apache.spark.sql.functions.col
|
|
import org.apache.spark.sql.functions.col
|
|
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
|
|
import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
|
|
import org.apache.spark.sql.{Row, SparkSession}
|
|
import org.apache.spark.sql.{Row, SparkSession}
|
|
@@ -18,15 +18,35 @@ import scala.collection.mutable
|
|
*/
|
|
*/
|
|
object ChangeExtract {
|
|
object ChangeExtract {
|
|
|
|
|
|
|
|
+ //判断两个map在指定key上是否相等,如不等反回不相等字段
|
|
|
|
+ def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
|
|
|
|
+ val map = iterable.map(m => (m("change_flag"), m)).toMap
|
|
|
|
+ (map("0"), map("1"))
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String)} = {
|
|
|
|
+ val clazz = s"com.winhc.bigdata.spark.jobs.chance.$tableName"
|
|
|
|
+ val foo = Class.forName(clazz)
|
|
|
|
+ .getConstructors.head.newInstance(equCols)
|
|
|
|
+ .asInstanceOf[ {
|
|
|
|
+ def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String)
|
|
|
|
+ }]
|
|
|
|
+ foo
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
case class ChangeExtractHandle(s: SparkSession,
|
|
case class ChangeExtractHandle(s: SparkSession,
|
|
project: String, //表所在工程名
|
|
project: String, //表所在工程名
|
|
tableName: String, //表名(不加前后辍)
|
|
tableName: String, //表名(不加前后辍)
|
|
primaryKey: String, //此维度主键
|
|
primaryKey: String, //此维度主键
|
|
inc_ds: String, //需要计算的分区
|
|
inc_ds: String, //需要计算的分区
|
|
primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
|
|
primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
|
|
- ) extends LoggingUtils {
|
|
|
|
|
|
+ ) extends LoggingUtils with Logging {
|
|
@(transient@getter) val spark: SparkSession = s
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
|
|
+
|
|
|
|
+ val target_eci_change_extract = "ads_change_extract"
|
|
|
|
+
|
|
def calc(): Unit = {
|
|
def calc(): Unit = {
|
|
val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
val cols = primaryFields.filter(!_.equals(primaryKey)).seq
|
|
|
|
|
|
@@ -39,7 +59,7 @@ object ChangeExtract {
|
|
|
|
|
|
val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
|
|
|
|
|
|
- val handle = ChangeExtractUtils.getHandleClazz(tableName, cols)
|
|
|
|
|
|
+ val handle = getHandleClazz(tableName, cols)
|
|
|
|
|
|
val update_time = BaseUtil.nowDate()
|
|
val update_time = BaseUtil.nowDate()
|
|
val rdd = sql(
|
|
val rdd = sql(
|
|
@@ -80,9 +100,10 @@ object ChangeExtract {
|
|
.map(x => {
|
|
.map(x => {
|
|
val rowkey = x._1
|
|
val rowkey = x._1
|
|
val map_list = x._2
|
|
val map_list = x._2
|
|
|
|
+ // try {
|
|
if (map_list.size == 1) {
|
|
if (map_list.size == 1) {
|
|
val res = handle.handle(rowkey, null, map_list.head)
|
|
val res = handle.handle(rowkey, null, map_list.head)
|
|
- Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6, res._7, update_time)
|
|
|
|
|
|
+ Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time)
|
|
} else {
|
|
} else {
|
|
if (map_list.size > 2) {
|
|
if (map_list.size > 2) {
|
|
logger.error("list.size greater than 2! rowkey:" + rowkey)
|
|
logger.error("list.size greater than 2! rowkey:" + rowkey)
|
|
@@ -92,13 +113,26 @@ object ChangeExtract {
|
|
val new_map = m._1
|
|
val new_map = m._1
|
|
val old_map = m._2
|
|
val old_map = m._2
|
|
val res = handle.handle(rowkey, old_map, new_map)
|
|
val res = handle.handle(rowkey, old_map, new_map)
|
|
- Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6, res._7, update_time)
|
|
|
|
|
|
+ if (res == null) {
|
|
|
|
+ null
|
|
|
|
+ } else {
|
|
|
|
+ Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time)
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
+ /* } catch {
|
|
|
|
+ case e: Exception => {
|
|
|
|
+ logError(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
|
+ logError(e.getMessage, e)
|
|
|
|
+ println(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
|
|
|
|
+ }
|
|
|
|
+ null
|
|
|
|
+ }*/
|
|
}).filter(_ != null)
|
|
}).filter(_ != null)
|
|
|
|
|
|
// (123_abc,insert,{a->b},all,新增某土地公示,1(1.一般变更,2.风险变更))
|
|
// (123_abc,insert,{a->b},all,新增某土地公示,1(1.一般变更,2.风险变更))
|
|
val schema = StructType(Array(
|
|
val schema = StructType(Array(
|
|
StructField("rowkey", StringType), //表数据主建
|
|
StructField("rowkey", StringType), //表数据主建
|
|
|
|
+ StructField("cid", StringType), //公司id
|
|
StructField("table_name", StringType), //表名
|
|
StructField("table_name", StringType), //表名
|
|
StructField("type", StringType), // 变更类型 insert update
|
|
StructField("type", StringType), // 变更类型 insert update
|
|
StructField("data", MapType(StringType, StringType)), //变更后数据
|
|
StructField("data", MapType(StringType, StringType)), //变更后数据
|
|
@@ -109,15 +143,24 @@ object ChangeExtract {
|
|
StructField("update_time", StringType) //处理时间
|
|
StructField("update_time", StringType) //处理时间
|
|
))
|
|
))
|
|
|
|
|
|
- val df = spark.createDataFrame(rdd, schema) //
|
|
|
|
|
|
+ spark.createDataFrame(rdd, schema)
|
|
|
|
+ .createOrReplaceTempView("tmp_change_extract_view") //
|
|
|
|
|
|
- df.write
|
|
|
|
- .mode(if (isWindows) "append" else "overwrite")
|
|
|
|
- .insertInto(s"${project}.tmp_xjk_icp_change_v2")
|
|
|
|
|
|
+ sql(
|
|
|
|
+ s"""
|
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName')
|
|
|
|
+ |SELECT *
|
|
|
|
+ |FROM
|
|
|
|
+ | tmp_change_extract_view
|
|
|
|
+ |""".stripMargin)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
+ // winhc_eci_dev company_tm rowkey 20200707 rowkey,status_new
|
|
|
|
+ // winhc_eci_dev company_patent_list rowkey 20200707 rowkey,status_new
|
|
|
|
+
|
|
|
|
+
|
|
// winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
|
|
// winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
|
|
def main(args: Array[String]): Unit = {
|
|
def main(args: Array[String]): Unit = {
|
|
val Array(project, tableName, rowkey, inc_ds, pf) = args
|
|
val Array(project, tableName, rowkey, inc_ds, pf) = args
|