Browse Source

提取动态

许家凯 4 years ago
parent
commit
60c7004d74

+ 26 - 26
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -1,9 +1,9 @@
 package com.winhc.bigdata.spark.jobs.chance
 
 import com.winhc.bigdata.spark.config.EsConfig
-import com.winhc.bigdata.spark.utils.BaseUtil.{cleanup, isWindows}
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils.getCurrentMap
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils.getDoubleDataMap
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
 import org.apache.spark.sql.{Row, SparkSession}
@@ -18,14 +18,14 @@ import scala.collection.mutable
  */
 object ChangeExtract {
 
-  case class ChangeExtractUtils(s: SparkSession,
-                                project: String, //表所在工程名
-                                tableName: String, //表名(不加前后辍)
-                                primaryKey: String, //此维度主键
-                                inc_ds: String, //需要计算的分区
-                                primaryFields: Seq[String], //主要字段,该字段任意一个不同 则认为发生变化
-                                label: (Map[String, String], Map[String, String]) => String // 去重列
-                               ) extends LoggingUtils {
+  case class ChangeExtractHandle(s: SparkSession,
+                                 project: String, //表所在工程名
+                                 tableName: String, //表名(不加前后辍)
+                                 primaryKey: String, //此维度主键
+                                 inc_ds: String, //需要计算的分区
+                                 primaryFields: Seq[String], //主要字段,该字段任意一个不同 则认为发生变化
+                                 label: (Map[String, String], Map[String, String]) => String // 去重列
+                                ) extends LoggingUtils {
     @(transient@getter) val spark: SparkSession = s
 
     def calc(): Unit = {
@@ -39,6 +39,8 @@ object ChangeExtract {
 
       val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
 
+      val handle = ChangeExtractUtils.getHandleClazz(tableName, cols)
+
       val rdd = sql(
         s"""
            |SELECT  $primaryKey,${cols.mkString(",")},'0' as change_flag
@@ -78,39 +80,37 @@ object ChangeExtract {
           val rowkey = x._1
           val map_list = x._2
           if (map_list.size == 1) {
-            Row(rowkey, "insert", map_list.head, "新增")
+            val res = handle.handle(rowkey, null, map_list.head)
+            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6)
           } else {
             if (map_list.size > 2) {
               logger.error("list.size greater than 2! rowkey:" + rowkey)
             }
-            val m = getCurrentMap(map_list)
+            val m = getDoubleDataMap(map_list)
 
             val new_map = m._1
             val old_map = m._2
-            val tmp = cols.map(f => {
-              (f, cleanup(new_map(f)).equals(cleanup(old_map(f))))
-            })
-            val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
-
-            if (eq) {
-              null
-            } else {
-              Row(rowkey, "update", new_map, s"更新字段:${tmp.filter(!_._2).map(_._1).mkString(",")}")
-            }
+            val res = handle.handle(rowkey, old_map, new_map)
+            Row(res._1, tableName, res._2, res._3, res._4, res._5, res._6)
           }
         }).filter(_ != null)
 
+      // (123_abc,insert,{a->b},all,新增某土地公示,1(1.一般变更,2.风险变更))
       val schema = StructType(Array(
         StructField("rowkey", StringType),
+        StructField("table_name", StringType),
         StructField("type", StringType),
         StructField("data", MapType(StringType, StringType)),
-        StructField("label", StringType)))
+        StructField("fields", StringType),
+        StructField("title", StringType),
+        StructField("label", StringType)
+      ))
 
       val df = spark.createDataFrame(rdd, schema) //
 
       df.write
         .mode(if (isWindows) "append" else "overwrite")
-        .insertInto(s"${project}.tmp_xjk_icp_change")
+        .insertInto(s"${project}.tmp_xjk_icp_change_v2")
     }
   }
 
@@ -130,7 +130,7 @@ object ChangeExtract {
 
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
-    ChangeExtractUtils(spark, project, tableName, rowkey, inc_ds, pf.split(","), label).calc
+    ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(","), label).calc
     spark.stop()
   }
 

+ 66 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -0,0 +1,66 @@
+package com.winhc.bigdata.spark.jobs.chance
+
+import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
+
+import scala.annotation.meta.{getter, setter}
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/9 16:44
+ * @Description:
+ */
+
+trait CompanyChangeHandle extends Serializable {
+  @getter
+  @setter
+  protected val equCols: Seq[String]
+
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)
+
+  def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
+    val tmp = equCols.map(f => {
+      (f, cleanup(newMap(f)).equals(cleanup(oldMap(f))))
+    })
+    val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
+    if (eq) {
+      (true, null)
+    } else {
+      (eq, tmp.filter(!_._2).map(_._1).mkString(","))
+    }
+  }
+}
+
+case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String) = {
+    if (oldMap == null) {
+      (rowkey, "insert", newMap, "", s"新增某地块公示", "1")
+    } else {
+      val t = getEquAndFields(oldMap, newMap)
+      if (t._1) {
+        null
+      } else {
+        (rowkey, "update", newMap,
+          t._2
+          , s"更新某地块公示", "1")
+      }
+    }
+  }
+}
+
+
+case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
+  override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String) = {
+    if (oldMap == null) {
+      (rowkey, "insert", newMap, "", s"新增一家公司", "1")
+    } else {
+      val t = getEquAndFields(oldMap, newMap)
+      if (t._1) {
+        null
+      } else {
+        (rowkey, "update", newMap,
+          t._2
+          , s"更新一家公司", "1")
+      }
+    }
+  }
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Phoenix2.scala

@@ -28,14 +28,14 @@ object TestSpark2Phoenix2 {
 
     val colsList = sublistTableFieldName ++ Seq("new_cid")
     println(colsList)
-    MaxComputer2Phoenix(
+/*    MaxComputer2Phoenix(
       sparkSession,
       colsList,
       odpsTable,
       phoenixTable,
       "20200621",
       Seq("new_cid","id")
-    ).syn()
+    ).syn()*/
 
 
     sparkSession.stop()

+ 15 - 20
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -1,36 +1,31 @@
 package com.winhc.bigdata.spark.utils
 
-import org.apache.commons.lang3.StringUtils
-
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/7 13:59
  * @Description:
  */
+
+
 object ChangeExtractUtils {
-  def getCurrentMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
+
+  //判断两个map在指定key上是否相等,如不等反回不相等字段
+  def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
     val map = iterable.map(m => (m("change_flag"), m)).toMap
     (map("0"), map("1"))
   }
 
-  def main(args: Array[String]): Unit = {
-  /*  val m1 = Map("a" -> "0"
-      , "b" -> "0"
-      , "change_flag" -> "0"
-    )
-    val m2 = Map("a" -> "1"
-      , "b" -> "1"
-      , "change_flag" -> "1"
-    )
-    val m3 = Map("a" -> "03"
-      , "b" -> "1"
-      , "change_flag" -> "0"
-    )
-    println(getCurrentMap(Iterable(m1, m2, m3)))*/
-
+  def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)} = {
+    val clazz = s"com.winhc.bigdata.spark.jobs.chance.$tableName"
+    val foo = Class.forName(clazz)
+      .getConstructors.head.newInstance(Seq("a"))
+      .asInstanceOf[ {
+      def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String)
+    }]
+    foo
+  }
 
-   println(BaseUtil.cleanup("218.000000万人民币              "))
-//    println(StringUtils.isNoneBlank(null))
+  def main(args: Array[String]): Unit = {
   }
 
 }