浏览代码

feat: 企业提变更加入新成立公司初始化信息的识别

许家凯 4 年之前
父节点
当前提交
d2eb288c44

+ 51 - 12
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -22,10 +22,12 @@ case class NgChangeExtract(s: SparkSession,
                            primaryKey: String, //此维度主键
                            inc_ds: String, //需要计算的分区
                            primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
+                           , newlyRegister: Boolean = false
                           ) extends LoggingUtils with Logging {
   @(transient@getter) val spark: SparkSession = s
 
-  val target_tab = "bds_change_extract"
+  val target_tab = "bds_change_extract_test"
+  init()
 
   def init() {
     sql(
@@ -45,6 +47,11 @@ case class NgChangeExtract(s: SparkSession,
          |  `ds` STRING COMMENT '时间分区',
          |  `tn` STRING COMMENT '表名分区')
          |""".stripMargin)
+
+    def convert_update_type(update_type: String, company_id: String): String = if (company_id == null) update_type else "create"
+
+    spark.udf.register("convert_update_type", convert_update_type _)
+
   }
 
 
@@ -159,19 +166,52 @@ case class NgChangeExtract(s: SparkSession,
     spark.createDataFrame(rdd, schema)
       .createOrReplaceTempView(s"tmp_change_extract_view_$tableName1")
 
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
-         |SELECT *
-         |FROM
-         |    tmp_change_extract_view_$tableName1
-         |""".stripMargin)
+    if ("company".equals(tableName1) || !newlyRegister) {
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
+           |SELECT *
+           |FROM
+           |    tmp_change_extract_view_$tableName1
+           |""".stripMargin)
+    } else {
+      def wait: Boolean = {
+        val last_ds = getLastPartitionsOrElse(s"${project}.$target_tab", null, expression = "tn=company")
+        !ds.equals(last_ds)
+      }
+      //等待公司基本信息完成识别
+      while (wait) {
+        logInfo("wait company tab 。。。")
+        println("wait company tab 。。。")
+        Thread.sleep(100000)
+      }
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_tab PARTITION(ds='$ds',tn='$tableName1')
+           |SELECT  t1.rowkey
+           |        ,t1.company_id
+           |        ,t1.table_name
+           |        ,convert_update_type(t1.update_type,t2.company_id) as update_type
+           |        ,t1.old_data
+           |        ,t1.new_data
+           |        ,t1.change_fields
+           |        ,t1.biz_date
+           |        ,t1.update_time
+           |FROM    tmp_change_extract_view_$tableName1 AS t1
+           |LEFT JOIN (
+           |              SELECT  *
+           |              FROM    winhc_ng.bds_change_extract
+           |              WHERE   ds = '$ds'
+           |              AND     tn = 'company'
+           |              AND     update_type = 'insert'
+           |          ) AS t2
+           |ON      t1.company_id = t2.company_id
+           |""".stripMargin)
+    }
   }
 }
 
 
-
-
 object NgChangeExtract {
 
   //判断两个map在指定key上是否相等,如不等反回不相等字段
@@ -181,7 +221,6 @@ object NgChangeExtract {
   }
 
 
-
   def main(args: Array[String]): Unit = {
     val Array(tableName, inc_ds) = args
     if (args.size != 2) {
@@ -202,7 +241,7 @@ object NgChangeExtract {
     }
 
     val a = start.map(e => (e.tableName, () => {
-      NgChangeExtract(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc()
+      NgChangeExtract(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(","), newlyRegister = e.newlyRegister).calc()
       true
     }))
 

+ 6 - 4
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala

@@ -7,14 +7,16 @@ package com.winhc.bigdata.spark.ng.change
 case class NgChangeExtractArgs(project: String = "winhc_ng"
                                , tableName: String
                                , primaryKey: String = "rowkey"
-                               , primaryFields: String)
+                               , primaryFields: String
+                               , newlyRegister: Boolean = false
+                              )
 
 
 object NgChangeExtractArgs {
   val startArgs = Seq(
-    NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted")
-    , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted")
-    , NgChangeExtractArgs(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,deleted")
+    NgChangeExtractArgs(tableName = "company_holder", primaryFields = "holder_id,percent,amount,deleted",newlyRegister = true)
+    , NgChangeExtractArgs(tableName = "company_staff", primaryFields = "staff_type,deleted",newlyRegister = true)
+    , NgChangeExtractArgs(tableName = "company", primaryKey = "company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount,legal_entity_name,legal_entity_id,reg_capital,reg_location,business_scope,reg_status_std")
     , NgChangeExtractArgs(tableName = "company_tm", primaryFields = "status")
     , NgChangeExtractArgs(tableName = "company_icp", primaryFields = "domain")
   )

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyUpdateType.scala

@@ -6,6 +6,7 @@ package com.winhc.bigdata.spark.ng.change
  */
 object NgCompanyUpdateType extends Enumeration {
   type UpdateType = Value //声明枚举对外暴露的变量类型
+  val Create = Value("create") //公司新成立时的初始化信息
   val Update = Value("update")
   val Deleted = Value("deleted")
   val Insert = Value("insert")