Browse Source

feat: 提变更默认只计新增

许家凯 4 years ago
parent
commit
24f6bef3cf

+ 3 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -4,6 +4,7 @@ import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.ng.change.NgChangeExtract.getDoubleDataMap
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils._
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -241,7 +242,8 @@ object NgChangeExtract {
     }
 
     val a = start.map(e => (e.tableName, () => {
-      NgChangeExtract(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(","), newlyRegister = e.newlyRegister).calc()
+      val primaryFields: Seq[String] = if (StringUtils.isNotEmpty(e.primaryFields)) e.primaryFields.split(",") else Seq.empty[String]
+      NgChangeExtract(spark, e.project, e.tableName, e.primaryKey, inc_ds, primaryFields, newlyRegister = e.newlyRegister).calc()
       true
     }))
 

+ 9 - 5
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala

@@ -7,17 +7,21 @@ package com.winhc.bigdata.spark.ng.change
 case class NgChangeExtractArgs(project: String = "winhc_ng"
                                , tableName: String
                                , primaryKey: String = "rowkey"
-                               , primaryFields: String
+                               , primaryFields: String = null
                                , newlyRegister: Boolean = false
                               )
 
 
 object NgChangeExtractArgs {
-  val startArgs = Seq(
-    NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted",newlyRegister = true)
-    , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted",newlyRegister = true)
+  val startArgs: Seq[NgChangeExtractArgs] = Seq(
+    NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted", newlyRegister = true)
+    , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted", newlyRegister = true)
     , NgChangeExtractArgs(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,reg_capital,reg_location,business_scope,reg_status_std")
     , NgChangeExtractArgs(tableName = "company_tm", primaryFields = "status")
     , NgChangeExtractArgs(tableName = "company_icp", primaryFields = "domain")
-  )
+  ) ++ Seq(
+    "company_abnormal_info"
+    ,"company_app_info"
+    ,"auction_tracking"
+  ).map(r => NgChangeExtractArgs(tableName = r))
 }

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala

@@ -10,7 +10,6 @@ import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
  */
 
 case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable {
-
   /**
    * 获取公司company_id,默认为rowkey前半段
    *