Browse Source

扩充变更字段

许家凯 5 years ago
parent
commit
a1a02793ba

+ 15 - 16
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs.chance
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ChangeExtractUtils.getDoubleDataMap
-import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, ChangeExtractUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SparkSession}
@@ -23,8 +23,7 @@ object ChangeExtract {
                                  tableName: String, //表名(不加前后辍)
                                  primaryKey: String, //此维度主键
                                  inc_ds: String, //需要计算的分区
-                                 primaryFields: Seq[String], //主要字段,该字段任意一个不同 则认为发生变化
-                                 label: (Map[String, String], Map[String, String]) => String // 去重列
+                                 primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
                                 ) extends LoggingUtils {
     @(transient@getter) val spark: SparkSession = s
 
@@ -33,21 +32,23 @@ object ChangeExtract {
 
       val ds = inc_ds.replace("-", "")
 
-      val all_cols = primaryKey +: cols :+ "change_flag"
+      val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
 
-      val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
+      val otherAllCols = intersectCols.filter(!primaryKey.equals(_)).toSeq
+      val all_cols = primaryKey +: otherAllCols :+ "change_flag"
 
-      val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
+      val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
 
       val handle = ChangeExtractUtils.getHandleClazz(tableName, cols)
 
+      val update_time = BaseUtil.nowDate()
       val rdd = sql(
         s"""
-           |SELECT  $primaryKey,${cols.mkString(",")},'0' as change_flag
+           |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
            |FROM    $project.inc_ads_$tableName
            |WHERE   ds = $ds
            |UNION ALL
-           |SELECT  t2.$primaryKey,${cols.map("t2." + _).mkString(",")},'1' as change_flag
+           |SELECT  t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
            |FROM    (
            |            SELECT  DISTINCT ${primaryKey}
            |            FROM    $project.inc_ads_$tableName
@@ -81,7 +82,7 @@ object ChangeExtract {
           val map_list = x._2
           if (map_list.size == 1) {
             val res = handle.handle(rowkey, null, map_list.head)
-            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6)
+            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6, res._7, update_time)
           } else {
             if (map_list.size > 2) {
               logger.error("list.size greater than 2! rowkey:" + rowkey)
@@ -91,7 +92,7 @@ object ChangeExtract {
             val new_map = m._1
             val old_map = m._2
             val res = handle.handle(rowkey, old_map, new_map)
-            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6)
+            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6, res._7, update_time)
           }
         }).filter(_ != null)
 
@@ -103,7 +104,9 @@ object ChangeExtract {
         StructField("data", MapType(StringType, StringType)),
         StructField("fields", StringType),
         StructField("title", StringType),
-        StructField("label", StringType)
+        StructField("label", StringType),
+        StructField("biz_time", StringType),
+        StructField("update_time", StringType)
       ))
 
       val df = spark.createDataFrame(rdd, schema) //
@@ -119,10 +122,6 @@ object ChangeExtract {
   def main(args: Array[String]): Unit = {
     val Array(project, tableName, rowkey, inc_ds, pf) = args
 
-    def label(m1: Map[String, String], m2: Map[String, String]): String = {
-      ""
-    }
-
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
@@ -130,7 +129,7 @@ object ChangeExtract {
 
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
-    ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(","), label).calc
+    ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
     spark.stop()
   }
 

+ 15 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.jobs.chance
 
+import com.winhc.bigdata.spark.utils.BaseUtil
 import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
 
 import scala.annotation.meta.{getter, setter}
@@ -15,7 +16,14 @@ trait CompanyChangeHandle extends Serializable {
   @setter
   protected val equCols: Seq[String]
 
-  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)
+  /**
+   *
+   * @param rowkey
+   * @param oldMap
+   * @param newMap
+   * @return  rowkey,类型【insert or update】,新数据,更新字段,更新标题,变更标签【1.一般变更,2.风险变更 ...】,业务时间
+   */
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String)
 
   def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
     val tmp = equCols.map(f => {
@@ -31,9 +39,9 @@ trait CompanyChangeHandle extends Serializable {
 }
 
 case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
-  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String) = {
+  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String) = {
     if (oldMap == null) {
-      (rowkey, "insert", newMap, "", s"新增某地块公示", "1")
+      (rowkey, "insert", newMap, "", s"新增某地块公示", "1", "业务时间")
     } else {
       val t = getEquAndFields(oldMap, newMap)
       if (t._1) {
@@ -41,7 +49,7 @@ case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHan
       } else {
         (rowkey, "update", newMap,
           t._2
-          , s"更新某地块公示", "1")
+          , s"更新某地块公示", "1", "业务时间")
       }
     }
   }
@@ -49,9 +57,9 @@ case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHan
 
 
 case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
-  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String) = {
+  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String) = {
     if (oldMap == null) {
-      (rowkey, "insert", newMap, "", s"新增一家公司", "1")
+      (rowkey, "insert", newMap, "", s"新增一家公司", "1", "业务时间")
     } else {
       val t = getEquAndFields(oldMap, newMap)
       if (t._1) {
@@ -59,7 +67,7 @@ case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serial
       } else {
         (rowkey, "update", newMap,
           t._2
-          , s"更新一家公司", "1")
+          , s"更新一家公司", "1", "业务时间")
       }
     }
   }

+ 5 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
+import java.text.SimpleDateFormat
 import java.util.{Calendar, Date, Locale}
 
 import org.apache.commons.lang3.StringUtils
@@ -50,6 +51,10 @@ object BaseUtil {
     }
   }
 
+  def nowDate(pattern: String = "yyyy-MM-dd"): String = {
+    new SimpleDateFormat(pattern).format(new Date)
+  }
+
   def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {
     val c = Calendar.getInstance(Locale.CHINA)
     c.setTimeInMillis(new Date().getTime)

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -9,18 +9,20 @@ package com.winhc.bigdata.spark.utils
 
 object ChangeExtractUtils {
 
+
+
   //判断两个map在指定key上是否相等,如不等反回不相等字段
   def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
     val map = iterable.map(m => (m("change_flag"), m)).toMap
     (map("0"), map("1"))
   }
 
-  def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)} = {
+  def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String)} = {
     val clazz = s"com.winhc.bigdata.spark.jobs.chance.$tableName"
     val foo = Class.forName(clazz)
       .getConstructors.head.newInstance(Seq("a"))
       .asInstanceOf[ {
-      def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)
+      def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String)
     }]
     foo
   }