Parcourir la source

feat: ng空间下企业数据动态

许家凯 il y a 4 ans
Parent
commit
24dbd13b57

+ 48 - 29
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -6,7 +6,7 @@ import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -31,7 +31,30 @@ object NgChangeExtract {
                                 ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
-    val target_eci_change_extract = "ads_change_extract"
+    val target_tab = "bds_change_extract"
+
+    def init() {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `$project`.`$target_tab` (
+           |  `rowkey` STRING COMMENT '该行数据主键',
+           |  `company_id` STRING '公司id',
+           |  `table_name` STRING 'hbase表名',
+           |  `update_type` STRING comment '变更类型',
+           |  `old_data` MAP<STRING,STRING> COMMENT '原数据'),
+           |  `new_data` MAP<STRING,STRING> COMMENT '新数据',
+           |  `change_fields` STRING comment '哪些字段发生变更',
+           |  `title` STRING comment '用于展示的标题',
+           |  `label` STRING comment '展示的标签',
+           |  `biz_date` STRING comment '数据变更的时间',
+           |  `update_time` STRING comment '当前计算时间'
+           | COMMENT '变更动态'
+           |PARTITIONED BY (
+           |  `ds` STRING COMMENT '时间分区',
+           |  `tn` STRING COMMENT '表名分区')
+           |""".stripMargin)
+    }
+
 
     val updateTimeMapping = Map(
       "wenshu_detail_combine" -> "update_date", //文书排序时间
@@ -66,7 +89,6 @@ object NgChangeExtract {
 
       val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols)
 
-      val update_time = BaseUtil.nowDate()
 
       val df = sql(
         s"""
@@ -113,24 +135,22 @@ object NgChangeExtract {
 
             val new_map = m._1
             val old_map = m._2
-            if (new_map == null && old_map == null) {
-              null
-            } else if (old_map == null) {
-              val res = handle.handle(rowkey, null, map_list.head)
-              if (res == null) {
-                null
-              } else {
-                Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
-              }
-            } else if (new_map == null) {
+            val res = handle.handle(rowkey, old_map, new_map)
+            if (res == null) {
               null
             } else {
-              val res = handle.handle(rowkey, old_map, new_map)
-              if (res == null) {
-                null
-              } else {
-                Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
-              }
+              val rowkey = res._1
+              val company_id = res._2
+              val update_type = res._3
+              val old_map = res._4
+              val new_map = res._5
+              val change_fields = res._6
+              val title = res._7
+              val label = res._8
+              val biz_date = res._9
+              val update_time = BaseUtil.nowDate()
+
+              Row(rowkey, company_id, tableName, update_type, old_map, new_map, change_fields, title, label, biz_date, update_time)
             }
           }).filter(_ != null)
 
@@ -138,14 +158,14 @@ object NgChangeExtract {
         StructField("rowkey", StringType), //表数据主建
         StructField("company_id", StringType), //公司id
         StructField("table_name", StringType), //表名
-        StructField("type", StringType), // 变更类型 insert update
-        StructField("data", MapType(StringType, StringType)), //变更后数据
-        StructField("fields", StringType), //如果是更新 则显示更新字段
+        StructField("update_type", StringType), // 变更类型 insert update
+        StructField("old_data", MapType(StringType, StringType)), //变更前数据
+        StructField("new_data", MapType(StringType, StringType)), //变更后数据
+        StructField("change_fields", StringType), //如果是更新 则显示更新字段
         StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
-        StructField("label", StringType), // 1.一般变更,2.风险变更
-        StructField("biz_time", StringType), //业务时间
-        StructField("update_time", StringType), //处理时间
-        StructField("old_data", MapType(StringType, StringType)) //变更前数据
+        StructField("label", StringType), // 数据标签
+        StructField("biz_date", StringType), //业务时间
+        StructField("update_time", StringType) //处理时间
       ))
 
       spark.createDataFrame(rdd, schema)
@@ -153,7 +173,7 @@ object NgChangeExtract {
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName1')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
            |SELECT *
            |FROM
            |    tmp_change_extract_view_$tableName1
@@ -165,7 +185,7 @@ object NgChangeExtract {
   private val startArgs = Seq(
     Args(tableName = "company_holder", primaryFields = "percent,deleted")
     , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
-    , Args(tableName = "company", primaryKey ="company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount")
+    , Args(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount")
   )
 
 
@@ -199,5 +219,4 @@ object NgChangeExtract {
 
     spark.stop()
   }
-
 }

+ 99 - 32
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyChangeHandle.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.ng.change
 
 import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 
 import scala.annotation.meta.{getter, setter}
@@ -18,44 +17,121 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
   protected val equCols: Seq[String]
 
   /**
+   * 主入口函数
    *
    * @param rowkey
    * @param oldMap
    * @param newMap
-   * @return rowkey,cid,类型【insert or update】,新数据,更新字段,更新标题,变更标签【1.一般变更,2.风险变更 ...】,业务时间
+   * @return rowkey,company_id,类型【insert or update or deleted】,老数据,新数据,更新字段,更新标题,数据标签,业务时间
    */
-  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String]) = {
-    if(getBizTime(newMap)==null){
-      return null
-    }
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], Map[String, String], String, String, String, String) = {
+    var update_type: NgCompanyUpdateType.UpdateType = null
+
+    val company_id = getCompanyId(rowkey, newMap)
+    var update_fields: String = null
+
+    var title: String = null
+    val label = getLabel(oldMap, newMap)
+    val biz_time = getBizDate(newMap)
+
     if (oldMap == null) {
-      (rowkey, getCompanyId(rowkey, newMap), "insert", newMap, null, getInsertTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), null)
+      newMap.getOrElse("deleted", "0") match {
+        case "0" => {
+          update_type = NgCompanyUpdateType.Insert
+          title = getInsertTitle(newMap)
+        }
+        case _ => null
+      }
     } else {
-      val t = getEquAndFields(oldMap, newMap)
-      if (t._1) {
-        null
-      } else {
-        (rowkey, getCompanyId(rowkey, newMap), "update", newMap,
-          t._2
-          , getUpdateTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), oldMap)
+      val new_deleted = newMap.getOrElse("deleted", "0")
+      val old_deleted = oldMap.getOrElse("deleted", "0")
+
+      s"$old_deleted$new_deleted" match {
+        case "00" => {
+          val t = getEquAndFields(oldMap, newMap)
+          if (!t._1) {
+            update_type = NgCompanyUpdateType.Update
+            title = getUpdateTitle(newMap)
+            update_fields = t._2
+          }
+        }
+        case "01" => {
+          update_type = NgCompanyUpdateType.Deleted
+          title = getDeletedTitle(newMap)
+        }
+        case "10" | "90" | "09" | "19" => null
+
+        case _ => null
       }
     }
+
+    if (update_type == null || title == null || biz_time == null) {
+      return null
+    }
+    (rowkey, company_id, update_type.toString, oldMap, newMap,
+      update_fields
+      , title, label, biz_time)
   }
 
-  def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey.split("_")(0)
+  /**
+   * 获取公司company_id,默认为rowkey前半段
+   *
+   * @param rowkey
+   * @param newMap
+   * @return
+   */
+  protected def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey.split("_")(0)
+
+  /**
+   * 如果数据发生更新,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  protected def getUpdateTitle(newMap: Map[String, String]): String
 
-  def getUpdateTitle(newMap: Map[String, String]): String
+  /**
+   * 如果数据发生新增,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  protected def getInsertTitle(newMap: Map[String, String]): String
 
-  def getInsertTitle(newMap: Map[String, String]): String
+  /**
+   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
+   *
+   * @param newMap
+   * @return
+   */
+  protected def getDeletedTitle(newMap: Map[String, String]): String
 
-  def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String
+  /**
+   * 用于展示的标签
+   *
+   * @param oldMap
+   * @param newMap
+   * @return
+   */
+  protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String
 
-  def getBizTime(newMap: Map[String, String]): String
+  /**
+   * 获取变更的业务时间
+   *
+   * @param newMap
+   * @return
+   */
+  protected def getBizDate(newMap: Map[String, String]): String
 
-  def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
-    val tmp = equCols.map(f => {
-      (f, cleanup(newMap(f)).equals(cleanup(oldMap(f))))
-    })
+  /**
+   * 判断哪些字段不相同
+   *
+   * @param oldMap
+   * @param newMap
+   * @return true(相同) 差异字段
+   */
+  protected def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
+    val tmp = equCols.map(f => (f, cleanup(newMap(f)).equals(cleanup(oldMap(f)))))
     val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
     if (eq) {
       (true, null)
@@ -63,13 +139,4 @@ trait NgCompanyChangeHandle extends Serializable with Logging {
       (eq, tmp.filter(!_._2).map(_._1).mkString(","))
     }
   }
-
-
-  protected def getValueOrNull(value: String, callBack: String): String = {
-    if (StringUtils.isNotBlank(value)) {
-      callBack
-    } else {
-      null
-    }
-  }
 }

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyUpdateType.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.ng.change
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/1/21 17:00
+ */
+object NgCompanyUpdateType extends Enumeration {
+  type UpdateType = Value //声明枚举对外暴露的变量类型
+  val Update = Value("update")
+  val Deleted = Value("deleted")
+  val Insert = Value("insert")
+
+  def checkExists(update_type: String) = this.values.exists(_.toString == update_type) //检测是否存在此枚举值
+
+  def showAll = this.values.foreach(println) // 打印所有的枚举值
+}

+ 51 - 14
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala

@@ -9,18 +9,55 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  * @Description:公司基本信息
  */
 
-case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
-  override def getUpdateTitle(newMap: Map[String, String]): String = ""
-
-  override def getInsertTitle(newMap: Map[String, String]): String = ""
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
-
-  override def getBizTime(newMap: Map[String, String]): String = {
-    newMap("update_time")
-  }
-
-  override def getCompanyId(rowkey: String, newMap: Map[String, String]): String ={
-    rowkey
-  }
+case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
+
+  /**
+   * 获取公司company_id,默认为rowkey前半段
+   *
+   * @param rowkey
+   * @param newMap
+   * @return
+   */
+  override protected def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey
+
+  /**
+   * 如果数据发生更新,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getUpdateTitle(newMap: Map[String, String]): String = ""
+
+  /**
+   * 如果数据发生新增,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getInsertTitle(newMap: Map[String, String]): String = ""
+
+  /**
+   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getDeletedTitle(newMap: Map[String, String]): String = ""
+
+  /**
+   * 用于展示的标签
+   *
+   * @param oldMap
+   * @param newMap
+   * @return
+   */
+  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  /**
+   * 获取变更的业务时间
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getBizDate(newMap: Map[String, String]): String = newMap("update_time")
 }

+ 39 - 9
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_holder.scala

@@ -1,9 +1,7 @@
 
 package com.winhc.bigdata.spark.ng.change.table
 
-import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
 import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
-import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 
 /**
  * @Author: π
@@ -11,14 +9,46 @@ import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
  * @Description:股东
  */
 
-case class company_holder(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
-  override def getUpdateTitle(newMap: Map[String, String]): String = ""
+case class company_holder(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
 
-  override def getInsertTitle(newMap: Map[String, String]): String = ""
+  /**
+   * 如果数据发生更新,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getUpdateTitle(newMap: Map[String, String]): String = ""
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+  /**
+   * 如果数据发生新增,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getInsertTitle(newMap: Map[String, String]): String = ""
 
-  override def getBizTime(newMap: Map[String, String]): String = {
-    newMap("update_time")
-  }
+  /**
+   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getDeletedTitle(newMap: Map[String, String]): String = ""
+
+  /**
+   * 用于展示的标签
+   *
+   * @param oldMap
+   * @param newMap
+   * @return
+   */
+  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  /**
+   * 获取变更的业务时间
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getBizDate(newMap: Map[String, String]): String = newMap("update_time")
 }

+ 39 - 7
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_staff.scala

@@ -9,14 +9,46 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  * @Description:主要成员
  */
 
-case class company_staff(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
-  override def getUpdateTitle(newMap: Map[String, String]): String = ""
+case class company_staff(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
 
-  override def getInsertTitle(newMap: Map[String, String]): String = ""
+  /**
+   * 如果数据发生更新,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getUpdateTitle(newMap: Map[String, String]): String = ""
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+  /**
+   * 如果数据发生新增,则显示该标题
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getInsertTitle(newMap: Map[String, String]): String = ""
 
-  override def getBizTime(newMap: Map[String, String]): String = {
-    newMap("update_time")
-  }
+  /**
+   * 如果数据发生移除,则显示该标题,deleted:x->9 删除不算
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getDeletedTitle(newMap: Map[String, String]): String = ""
+
+  /**
+   * 用于展示的标签
+   *
+   * @param oldMap
+   * @param newMap
+   * @return
+   */
+  override protected def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  /**
+   * 获取变更的业务时间
+   *
+   * @param newMap
+   * @return
+   */
+  override protected def getBizDate(newMap: Map[String, String]): String = newMap("update_time")
 }