Pārlūkot izejas kodu

feat: 数据变更单独抽取

许家凯 4 gadi atpakaļ
vecāks
revīzija
1531853f0f

+ 4 - 9
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -32,6 +32,7 @@ object NgChangeExtract {
     @(transient@getter) val spark: SparkSession = s
 
     val target_tab = "bds_change_extract"
+    init()
 
     def init() {
       sql(
@@ -40,12 +41,10 @@ object NgChangeExtract {
            |  `rowkey` STRING COMMENT '该行数据主键',
            |  `company_id` STRING '公司id',
            |  `table_name` STRING 'hbase表名',
-           |  `update_type` STRING comment '变更类型',
+           |  `update_type` STRING comment '数据展示层面的变更类型,insert、update、deleted、other',
            |  `old_data` MAP<STRING,STRING> COMMENT '原数据'),
            |  `new_data` MAP<STRING,STRING> COMMENT '新数据',
            |  `change_fields` STRING comment '哪些字段发生变更',
-           |  `title` STRING comment '用于展示的标题',
-           |  `label` STRING comment '展示的标签',
            |  `biz_date` STRING comment '数据变更的时间',
            |  `update_time` STRING comment '当前计算时间'
            | COMMENT '变更动态'
@@ -145,12 +144,10 @@ object NgChangeExtract {
               val old_map = res._4
               val new_map = res._5
               val change_fields = res._6
-              val title = res._7
-              val label = res._8
-              val biz_date = res._9
+              val biz_date = res._7
               val update_time = BaseUtil.nowDate()
 
-              Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, title, label, biz_date, update_time)
+              Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, biz_date, update_time)
             }
           }).filter(_ != null)
 
@@ -162,8 +159,6 @@ object NgChangeExtract {
         StructField("old_data", MapType(StringType, StringType)), //变更前数据
         StructField("new_data", MapType(StringType, StringType)), //变更后数据
         StructField("change_fields", StringType), //如果是更新 则显示更新字段
-        StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
-        StructField("label", StringType), // 数据标签
         StructField("biz_date", StringType), //业务时间
         StructField("update_time", StringType) //处理时间
       ))

+ 20 - 57
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyChangeHandle.scala

@@ -25,40 +25,35 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
    * @param newMap
    * @return rowkey,company_id,类型【insert or update or deleted】,老数据,新数据,更新字段,更新标题,数据标签,业务时间
    */
-  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], Map[String, String], String, String, String, String) = {
-    var update_type: NgCompanyUpdateType.UpdateType = null
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], Map[String, String], String, String) = {
+    var dynamic_type: NgCompanyUpdateType.UpdateType = NgCompanyUpdateType.Other
 
     val company_id = getCompanyId(rowkey, newMap)
     var update_fields: String = null
 
-    var title: String = null
-    val label = getLabel(oldMap, newMap)
     val biz_time = getBizDate(newMap)
 
     if (oldMap == null) {
       newMap.getOrElse("deleted", "0") match {
         case "0" => {
-          update_type = NgCompanyUpdateType.Insert
-          title = getInsertTitle(newMap)
+          dynamic_type = NgCompanyUpdateType.Insert
         }
         case _ => null
       }
     } else {
       val new_deleted = newMap.getOrElse("deleted", "0")
       val old_deleted = oldMap.getOrElse("deleted", "0")
-
+      update_fields = getNotEquFields(oldMap, newMap)
+      if (update_fields == null) {
+        //没有发生字段变化则只接过滤
+        return null
+      }
       s"$old_deleted$new_deleted" match {
         case "00" => {
-          val t = getEquAndFields(oldMap, newMap)
-          if (!t._1) {
-            update_type = NgCompanyUpdateType.Update
-            title = getUpdateTitle(newMap)
-            update_fields = t._2
-          }
+          dynamic_type = NgCompanyUpdateType.Update
         }
         case "01" => {
-          update_type = NgCompanyUpdateType.Deleted
-          title = getDeletedTitle(newMap)
+          dynamic_type = NgCompanyUpdateType.Deleted
         }
         case "10" | "90" | "09" | "19" => null
 
@@ -66,12 +61,9 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
       }
     }
 
-    if (update_type == null || title == null || biz_time == null) {
-      return null
-    }
-    (rowkey, company_id, update_type.toString, oldMap, newMap,
+    (rowkey, company_id, dynamic_type.toString, oldMap, newMap,
       update_fields
-      , title, label, biz_time)
+      , biz_time)
   }
 
   /**
@@ -84,39 +76,6 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
   protected def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey.split("_")(0)
 
   /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  protected def getUpdateTitle(newMap: Map[String, String]): String
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  protected def getInsertTitle(newMap: Map[String, String]): String
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  protected def getDeletedTitle(newMap: Map[String, String]): String
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String
-
-  /**
    * 获取变更的业务时间
    *
    * @param newMap
@@ -131,13 +90,17 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
    * @param newMap
    * @return true(相同) 差异字段
    */
-  protected def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
-    val tmp = equCols.map(f => (f, cleanup(newMap(f)).equals(cleanup(oldMap(f)))))
+  protected def getNotEquFields(oldMap: Map[String, String], newMap: Map[String, String]): String = {
+    var cols: Seq[String] = equCols
+    if (oldMap.contains("deleted") && newMap.contains("deleted")) {
+      cols = equCols :+ "deleted"
+    }
+    val tmp = cols.distinct.map(f => (f, cleanup(newMap(f)).equals(cleanup(oldMap(f)))))
     val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
     if (eq) {
-      (true, null)
+      null
     } else {
-      (eq, tmp.filter(!_._2).map(_._1).mkString(","))
+      tmp.filter(!_._2).map(_._1).mkString(",")
     }
   }
 

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyUpdateType.scala

@@ -9,6 +9,7 @@ object NgCompanyUpdateType extends Enumeration {
   val Update = Value("update")
   val Deleted = Value("deleted")
   val Insert = Value("insert")
+  val Other = Value("other")
 
   def checkExists(update_type: String) = this.values.exists(_.toString == update_type) //检测是否存在此枚举值
 

+ 0 - 32
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala

@@ -20,38 +20,6 @@ case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Seri
    */
   override protected def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey
 
-  /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getUpdateTitle(newMap: Map[String, String]): String = "公司基本信息发生变化"
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getInsertTitle(newMap: Map[String, String]): String = null
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getDeletedTitle(newMap: Map[String, String]): String = null
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = null
 
   /**
    * 获取变更的业务时间

+ 0 - 33
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_holder.scala

@@ -12,39 +12,6 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 case class company_holder(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
 
   /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getUpdateTitle(newMap: Map[String, String]): String = newMap("holder_name") + "股东出资发生变化"
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getInsertTitle(newMap: Map[String, String]): String = "新增股东:" + newMap("holder_name")
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getDeletedTitle(newMap: Map[String, String]): String = newMap("holder_name") + "股东退出"
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
-
-  /**
    * 获取变更的业务时间
    *
    * @param newMap

+ 0 - 42
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_icp.scala

@@ -11,48 +11,6 @@ import com.winhc.bigdata.spark.utils.ChangeExtractUtils
  */
 //网站
 case class company_icp(equCols: Seq[String]) extends NgCompanyChangeHandle {
-  /*
-    override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"${newMap("domain")}网站备案发生变更")
-
-    override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站备案")
-
-    override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站备案", newMap("domain"), newMap("examine_date"), newMap("liscense"))
-  */
-
-  //  override def getBizTime(newMap: Map[String, String]): String = newMap("examine_date")
-  /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getUpdateTitle(newMap: Map[String, String]): String = null
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站备案")
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getDeletedTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"移除${newMap("domain")}网站备案")
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站备案", newMap("domain"), newMap("examine_date"), newMap("liscense"))
-
   /**
    * 获取变更的业务时间
    *

+ 0 - 33
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_staff.scala

@@ -12,39 +12,6 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
 case class company_staff(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
 
   /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getUpdateTitle(newMap: Map[String, String]): String = null
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getInsertTitle(newMap: Map[String, String]): String = "新增主要成员:" + newMap("staff_name")
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getDeletedTitle(newMap: Map[String, String]): String = newMap("staff_name") + "主要成员退出"
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
-
-  /**
    * 获取变更的业务时间
    *
    * @param newMap

+ 1 - 35
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_tm.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.ng.change.table
 
 import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
-import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
+import com.winhc.bigdata.spark.utils.DateUtils
 
 /**
  * @Author: XuJiakai
@@ -11,39 +11,6 @@ import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
  */
 //商标
 case class company_tm(equCols: Seq[String]) extends NgCompanyChangeHandle {
-  /**
-   * 如果数据发生更新,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("tm_name"), s"${newMap("tm_name")}商标发生变更")
-
-  /**
-   * 如果数据发生新增,则显示该标题
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("tm_name"), s"新增${newMap("tm_name")}商标")
-
-  /**
-   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
-   *
-   * @param newMap
-   * @return
-   */
-  override protected def getDeletedTitle(newMap: Map[String, String]): String = null
-
-  /**
-   * 用于展示的标签
-   *
-   * @param oldMap
-   * @param newMap
-   * @return
-   */
-  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("商标", newMap("tm_name"), newMap("app_date"), newMap("reg_no"))
-
 
   /**
    * 获取变更的业务时间
@@ -52,5 +19,4 @@ case class company_tm(equCols: Seq[String]) extends NgCompanyChangeHandle {
    * @return
    */
   override protected def getBizDate(newMap: Map[String, String]): String = DateUtils.getBizDate(newMap("app_date"), newMap("update_time"))
-
 }