Parcourir la source

fix: 调整动态

许家凯 il y a 3 ans
Parent
commit
4729d5529e

+ 165 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/DynamicUpdateId.scala

@@ -0,0 +1,165 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+import com.winhc.bigdata.spark.ng.dynamic.utils.DynamicAssociationEntity
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/8/7 15:02
+ */
+
+case class DynamicUpdateId(s: SparkSession
+                       ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val org_tab = "winhc_ng.out_company_dynamic"
+  private val target_tab = "winhc_ng.out_dynamic_update"
+
+
+  private val map: Map[String, DynamicAssociationEntity] = ReflectUtils.subObject[DynamicAssociationEntity](classOf[DynamicAssociationEntity], "com.winhc.bigdata.spark.ng.dynamic.handle", true).map(r => (r.getClass.getSimpleName, r)).toMap
+
+  init()
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    content  STRING COMMENT 'json数据'
+         |)
+         |COMMENT '企业动态添加Id表'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 30
+         |""".stripMargin)
+  }
+
+
+  def calc(ds: String): Unit = {
+    val tns = Seq("company_court_announcement"
+      , "company_court_open_announcement"
+      , "company_court_register"
+      , "company_dishonest_info"
+      , "company_equity_info"
+      , "company_equity_pledge_holder"
+      , "company_judicial_assistance"
+      , "company_land_transfer"
+      , "company_lawsuit"
+      , "company_send_announcement"
+      , "company_zxr"
+      , "company_zxr_final_case"
+      , "company_zxr_restrict"
+      , "restrictions_on_exit"
+      , "zxr_evaluate_results")
+
+    sql(
+      s"""
+         |select to_json(MAP('rowkey',rowkey,"tn",tn,'flag','9')) content
+         |from winhc_ng.bds_change_extract
+         |where ds = '$ds'
+         |and update_type = 'remove'
+         |and tn <> 'company'
+         |and tn <> 'company_staff'
+         |and tn <> 'company_holder'
+         |UNION ALL
+         |select to_json(MAP('rowkey',rowkey,"tn",tn,'flag','8','company_id',new_data['company_id'],'relation_id',new_data['hid'])) content
+         |from winhc_ng.bds_change_extract
+         |where ds = '$ds'
+         |and update_type = 'remove'
+         |and tn  = 'company_staff'
+         |and new_data['hid'] is not null
+         |UNION ALL
+         |select to_json(MAP('rowkey',rowkey,"tn",tn,'flag','8','company_id',new_data['company_id'],'relation_id',new_data['holder_id'])) content
+         |from winhc_ng.bds_change_extract
+         |where ds = '$ds'
+         |and update_type = 'remove'
+         |and tn  = 'company_holder'
+         |and new_data['holder_id'] is not null
+         |""".stripMargin)
+      .createTempView("company_dynamic_out_tab_del")
+
+
+    val rdd = sql(
+      s"""
+         |select rowkey,
+         |       company_id,
+         |       table_name,
+         |       update_type,
+         |       old_data,
+         |       new_data,
+         |       change_fields,
+         |       biz_date,
+         |       update_time,
+         |       tn
+         |from winhc_ng.bds_change_extract
+         |where ds = '$ds'
+         |and update_type = 'update'
+         |and ${tns.map(r => s" tn = '$r' ").mkString("(", "or", ")")}
+         |""".stripMargin)
+      .rdd.map(r => {
+      val rowkey = r.getAs[String]("rowkey")
+      val company_id = r.getAs[String]("company_id")
+      val table_name = r.getAs[String]("table_name")
+      val update_type = r.getAs[String]("update_type")
+      val old_data = r.getAs[Map[String, String]]("old_data")
+      val new_data = r.getAs[Map[String, String]]("new_data")
+      val change_fields = r.getAs[String]("change_fields")
+      val biz_date = r.getAs[String]("biz_date")
+      val update_time = r.getAs[String]("update_time")
+      val tn = r.getAs[String]("tn")
+      val handle = map(tn)
+      val old_entity = handle.getAssociationEntityInfo(old_data)
+      val new_entity = handle.getAssociationEntityInfo(new_data)
+      val old_name = old_entity.filter(e => StringUtils.isEmpty(e.keyno)).map(_.name).toSet
+      val entity: Seq[AssociationEntityInfo] = new_entity.filter(e => old_name.contains(e.name)).filter(e => StringUtils.isNotEmpty(e.keyno))
+      if (entity.isEmpty) {
+        null
+      } else {
+        Row(Map(
+          "flag" -> "0"
+          , "tn" -> tn
+          , "rowkey" -> rowkey
+          , "entity" -> entity).toJson()
+          , null
+        )
+      }
+    }).filter(_ != null)
+
+    spark.createDataFrame(rdd, spark.table(target_tab).schema)
+      .createTempView("company_dynamic_out_tab")
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |SELECT ${getColumns(target_tab).diff(Seq("ds")).mkString(",")}
+         |FROM   company_dynamic_out_tab
+         |UNION ALL
+         |select ${getColumns(target_tab).diff(Seq("ds")).mkString(",")}
+         |from company_dynamic_out_tab_del
+         |""".stripMargin)
+
+
+  }
+}
+
+object DynamicUpdateId {
+
+  def main(args: Array[String]): Unit = {
+    val Array(ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    DynamicUpdateId(spark).calc(ds)
+    spark.stop()
+  }
+}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -169,7 +169,7 @@ case class NgCompanyDynamic(s: SparkSession,
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}_test')
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
          |SELECT $cols
          |FROM   company_dynamic_out_tab
          |""".stripMargin)

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/BusinessInfo.scala

@@ -58,8 +58,8 @@ case class BusinessInfo() extends AcrossTabAggHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
-
-    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
+    val id = seq.map(_.id).sorted.mkString(",")
+    Seq(seq(0).customCopy(id = id, dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
   }
 
 

+ 11 - 6
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -25,8 +25,8 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
   private val dynamic_code_map = Map(
     "109003,109004" -> "109003"
     , "101101,101102" -> "101101"
-  ).flatMap(r=>{
-    r._1.split(",").map((_,r._2))
+  ).flatMap(r => {
+    r._1.split(",").map((_, r._2))
   })
 
 
@@ -81,7 +81,13 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
       if (bool == 0)
         return Seq.empty
 
-      if (0 == RegCapitalAmount.getAmount(before).toDouble || RegCapitalAmount.getAmount(after).toDouble == 0)
+      val before_d = RegCapitalAmount.getAmount(before).toDouble
+      val after_d = RegCapitalAmount.getAmount(after).toDouble
+
+      if (0 == before_d || after_d == 0)
+        return Seq.empty
+
+      if (before_d / after_d == 10000 || after_d / before_d == 10000)
         return Seq.empty
 
       val flag = (bool > 0) ? "1" | "0"
@@ -273,8 +279,8 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
-
-    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
+    val id = seq.map(_.id).sorted.mkString(",")
+    Seq(seq(0).customCopy(id = id, dynamic_code = get_new_code(seq), dynamic_info = str.toAnyMap()))
   }
 
 
@@ -290,7 +296,6 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
   }
 
 
-
   def getRealCapital(str: String): String = {
     if (StringUtils.isEmpty(str))
       return null

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_staff.scala

@@ -257,7 +257,8 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     })
     val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
     val str = BusinessInfoDynamicInfoMap(label = label, content = contents)
-    Seq(seq(0).customCopy(dynamic_code = get_new_code(seq), dynamic_info = str.toMap()))
+    val id = seq.map(_.id).sorted.mkString(",")
+    Seq(seq(0).customCopy(id = id,dynamic_code = get_new_code(seq), dynamic_info = str.toMap()))
   }