Browse Source

feat: 企业动态框架代码

许家凯 4 years ago
parent
commit
609919c2f0

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/AcrossTabAggHandle.scala

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import org.apache.spark.internal.Logging
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/23 13:53
+ */
+trait AcrossTabAggHandle extends Serializable with Logging {
+
+  def getTables: Seq[String]
+
+  def group_by_key: (CompanyDynamicRecord) => String
+
+  def group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord]
+}

+ 73 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -0,0 +1,73 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import org.apache.spark.sql.Row
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 13:49
+ */
+case class ChangeExtract(rowkey: String
+                         , company_id: String
+                         , company_name: String
+                         , tn: String
+                         , update_type: String
+                         , old_data: Map[String, String]
+                         , new_data: Map[String, String]
+                         , change_fields: Seq[String]
+                         , biz_date: String
+                         , update_time: String
+                        )
+
+
+case class AssociationEntityInfo(keyno: String
+                                 , name: String
+                                 , risk_level: NgCompanyRiskLevelType.RiskLevelType //变更风险等级
+                                 , rta_info: String //描述
+                                )
+
+case class RowkeyInfo(rowkey: String, tn: String) {
+  def toStr(): String = {
+    s"$tn@@$rowkey"
+  }
+}
+
+case class CompanyDynamicRecord(id: String,
+                                association_entity_info: Seq[AssociationEntityInfo],
+                                rowkey: String,
+                                tn: String,
+                                update_type: String,
+                                dynamic_info: Map[String, String],
+                                agg_detail_text: String,
+                                agg_detail_rowkey: Seq[RowkeyInfo],
+                                old_record: Map[String, String],
+                                new_record: Map[String, String],
+                                change_time: String,
+                                update_time: String,
+                                create_time: String
+                               ) {
+  def to_row(): Row = {
+    val risk_level_str = association_entity_info.map(_.risk_level).distinct.mkString(",")
+    val risk_level_detail = association_entity_info.map(r => s"${r.keyno}@@${r.risk_level}").distinct.mkString(",")
+    val agg_detail_rowkey_str: String = if (agg_detail_rowkey == null) null else agg_detail_rowkey.map(_.toStr).mkString(",")
+    Row(id
+      , association_entity_info.toJson()
+      , rowkey
+      , tn
+      , update_type
+      , risk_level_str
+      , risk_level_detail
+      , dynamic_info.toJson()
+      , agg_detail_text
+      , agg_detail_rowkey_str
+      , old_record.toJson()
+      , new_record.toJson()
+      , change_time
+      , update_time
+      , create_time
+      ,null
+    )
+  }
+}
+
+

+ 167 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -0,0 +1,167 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 09:13
+ */
+case class NgCompanyDynamic(s: SparkSession,
+                            args: Seq[NgCompanyDynamicArgs]
+                            , agg: Seq[NgAcrossTabAggArgs]
+                           ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_tab = "winhc_ng.out_company_dynamic"
+
+  private val args_map: Map[String, NgCompanyDynamicArgs] = args.map(r => (r.tn, r)).toMap
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    id STRING COMMENT '企业动态主键'
+         |    ,association_entity_info STRING COMMENT '关联主体信息,json格式。[{keyno:xxx,name:xxx}]'
+         |    ,rowkey STRING COMMENT '企业动态涉及到的rowkey,如果是聚合类型的数据此项为空'
+         |    ,tn STRING COMMENT 'rowkey字段对应的tn,如果是聚合类型的数据此项可能为空'
+         |    ,update_type STRING COMMENT '动态类型:insert,update,deleted,remove'
+         |    ,risk_level STRING COMMENT '变更风险等级'
+         |    ,risk_level_detail STRING COMMENT '变更风险等级'
+         |    ,dynamic_info STRING COMMENT '动态展示层的相关数据,json格式'
+         |    ,agg_detail_text STRING COMMENT '聚合类型的json数据,规范优先'
+         |    ,agg_detail_rowkey STRING COMMENT '聚合类型rowkey的多项rowkey字段,结构:tn@@rowkey,tn@@rowkey'
+         |    ,old_record STRING COMMENT '上一个版本数据json格式'
+         |    ,new_record STRING COMMENT '当前版本数据json格式'
+         |    ,change_time string COMMENT '变更时间(业务展示 yyyy-MM-dd)'
+         |    ,update_time STRING  COMMENT  '更新时间'
+         |    ,create_time STRING COMMENT '创建时间'
+         |)
+         |COMMENT '企业动态输出表'
+         |PARTITIONED BY
+         |(
+         |    ds STRING COMMENT '分区'
+         |)
+         |""".stripMargin)
+  }
+
+  def calc(): Unit = {
+    val rdd: RDD[CompanyDynamicRecord] = sql(
+      s"""
+         |SELECT  *
+         |--- FROM    winhc_ng.bds_change_extract
+         |FROM    winhc_ng.bds_change_extract_test
+         |WHERE   ds > 0
+         |""".stripMargin).rdd.map(r => {
+      val value = r.getAs[String]("change_fields")
+      val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
+      ChangeExtract(rowkey = r.getAs("rowkey")
+        , company_id = r.getAs("company_id")
+        , company_name = null
+        , tn = r.getAs("table_name")
+        , update_type = r.getAs("update_type")
+        , old_data = r.getAs("old_data")
+        , new_data = r.getAs("new_data")
+        , change_fields = change_fields
+        , biz_date = r.getAs("biz_date")
+        , update_time = r.getAs("update_time")
+      )
+    }).filter(r => {
+      val filter = args_map(r.tn).filter
+      if (filter == null) {
+        true
+      } else {
+        filter.apply(r.update_type, r.biz_date, r.change_fields, r.old_data, r.new_data)
+      }
+    }).flatMap(r => args_map(r.tn).flat_map.apply(r))
+      .filter(_ != null)
+      .cache()
+
+
+    var rdd_map: mutable.Map[String, RDD[CompanyDynamicRecord]] = mutable.Map.empty
+
+    for (elem <- args) {
+      var tmp_rdd: RDD[CompanyDynamicRecord] = null
+      if (elem.group_by_key == null) {
+        tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+      } else {
+        tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+          .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
+          .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
+      }
+      rdd_map = rdd_map + (elem.tn -> tmp_rdd)
+    }
+
+    for (elem <- agg) {
+      var tmp_rdd: RDD[CompanyDynamicRecord] = null
+      for (tn <- elem.tabs) {
+        if (tmp_rdd == null) {
+          rdd_map(tn)
+        } else {
+          tmp_rdd = tmp_rdd.union(rdd_map(tn))
+        }
+        rdd_map = rdd_map - tn
+      }
+      tmp_rdd = tmp_rdd.groupBy(elem.group_by_key).flatMap(r => elem.group_by_flat_map.apply(r._2.toSeq))
+      rdd_map = rdd_map + (elem.tabs.mkString("_") -> tmp_rdd)
+    }
+
+    var out_rdd: RDD[CompanyDynamicRecord] = null
+
+    for ((key, value) <- rdd_map) {
+      if (out_rdd == null) {
+        out_rdd = value
+      } else {
+        out_rdd = out_rdd.union(value)
+      }
+    }
+
+    spark.createDataFrame(out_rdd.map(_.to_row()), spark.table(target_tab).schema)
+      .createTempView("company_dynamic_out_tab")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |SELECT id
+         |       ,association_entity_info
+         |       ,rowkey
+         |       ,tn
+         |       ,update_type
+         |       ,risk_level
+         |       ,risk_level_detail
+         |       ,dynamic_info
+         |       ,agg_detail_text
+         |       ,agg_detail_rowkey
+         |       ,old_record
+         |       ,new_record
+         |       ,change_time
+         |       ,update_time
+         |       ,create_time
+         |FROM   company_dynamic_out_tab
+         |""".stripMargin)
+  }
+}
+
+object NgCompanyDynamic {
+
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    NgCompanyDynamic(spark, NgCompanyDynamicArgs.getStartArgs, NgCompanyDynamicArgs.getAggArgs).calc()
+    spark.stop()
+  }
+
+}

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicArgs.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
+import com.winhc.bigdata.spark.utils.ReflectUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 13:58
+ */
+case class NgCompanyDynamicArgs(
+                                 tn: String
+                                 , flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord]
+                                 , filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = CompanyDynamicUtils.default_filter
+                                 , group_by_key: (CompanyDynamicRecord) => String = null
+                                 , group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord] = null //group by key:
+                               ) {
+  override def toString: String = {
+    s"tn=$tn $flat_map,$filter,$group_by_key,$group_by_flat_map"
+  }
+}
+
+case class NgAcrossTabAggArgs(tabs: Seq[String]
+                              , group_by_key: (CompanyDynamicRecord) => String
+                              , group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord])
+
+object NgCompanyDynamicArgs {
+
+  def getStartArgs: Seq[NgCompanyDynamicArgs] = {
+    val handles = ReflectUtils.subObject[NgCompanyDynamicHandle](classOf[NgCompanyDynamicHandle], this.getClass.getPackage.getName)
+    handles.map(ch => {
+      val tn: String = ch.getClass.getSimpleName
+      NgCompanyDynamicArgs(tn = tn, flat_map = ch.flat_map, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map, filter = ch.filter)
+    })
+  }
+
+  def getAggArgs: Seq[NgAcrossTabAggArgs] = {
+    val handles = ReflectUtils.subObject[AcrossTabAggHandle](classOf[AcrossTabAggHandle], this.getClass.getPackage.getName)
+    handles.map(ch => {
+      NgAcrossTabAggArgs(tabs = ch.getTables, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map)
+    })
+  }
+
+  def main(args: Array[String]): Unit = {
+    val args1 = getStartArgs
+    println(args1)
+  }
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
+import org.apache.spark.internal.Logging
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/23 10:35
+ */
+trait NgCompanyDynamicHandle extends Serializable with Logging {
+
+  def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = CompanyDynamicUtils.default_filter
+
+  def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord]
+
+  def group_by_key: (CompanyDynamicRecord) => String = null
+
+  def group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord] = null
+}

+ 15 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyRiskLevelType.scala

@@ -0,0 +1,15 @@
+package com.winhc.bigdata.spark.ng.dynamic
+
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 15:52
+ */
+object NgCompanyRiskLevelType extends Enumeration {
+  type RiskLevelType = Value //声明枚举对外暴露的变量类型
+  val Positive = Value("0") //利好信息
+  val Prompt = Value("1") //提示信息
+  val Caution  = Value("2 ") //警示信息
+
+  def showAll = this.values.foreach(println) // 打印所有的枚举值
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/test.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.ng.dynamic.agg
+
+import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/23 13:57
+ */
+case class test() extends AcrossTabAggHandle {
+  override def getTables: Seq[String] = Seq("", "")
+
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    ""
+  }
+
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    seq
+  }
+}

+ 153 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -0,0 +1,153 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.utils.RegCapitalAmount
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 16:04
+ */
+case class company() extends NgCompanyDynamicHandle {
+
+  private def getCompanyDynamicRecord(change_extract: ChangeExtract
+                                      , dynamic_info: Map[String, String]
+                                      , update_field: String
+                                      , association_entity_info: Seq[AssociationEntityInfo]
+                                     ): CompanyDynamicRecord = {
+    val tn = change_extract.tn
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+
+
+    CompanyDynamicRecord(
+      id = CompanyDynamicUtils.generateId(rowkey, biz_date, tn, update_field)
+      , association_entity_info = association_entity_info
+      , rowkey = rowkey
+      , tn = tn
+      , update_type = update_type
+      , dynamic_info = dynamic_info
+      , agg_detail_text = null
+      , agg_detail_rowkey = null
+      , old_record = old_data
+      , new_record = new_data
+      , change_time = biz_date
+      , update_time = update_time
+      , create_time = null
+    )
+  }
+
+
+  private def getCompanyDynamicRecord(change_extract: ChangeExtract, dynamic_info: Map[String, String], update_field: String): CompanyDynamicRecord = {
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    getCompanyDynamicRecord(change_extract, dynamic_info, update_field, Seq(AssociationEntityInfo(keyno = company_id, name = company_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null)))
+  }
+
+  def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+
+    update_type match {
+      case "insert" => {
+        val legal_entity_id = new_data("legal_entity_id")
+        val legal_entity_name = new_data("legal_entity_name")
+        val dynamic_info = Map(
+          "code" -> "100100",
+          "description" -> "公司新成立"
+        )
+        list = list :+ getCompanyDynamicRecord(change_extract = change_extract
+          , dynamic_info = dynamic_info
+          , update_field = null
+          , association_entity_info = Seq(AssociationEntityInfo(keyno = legal_entity_id, name = legal_entity_name, risk_level = NgCompanyRiskLevelType.Prompt, rta_info = null))
+        )
+      }
+      case "update" => {
+        for (elem <- change_fields.intersect("name,reg_capital,legal_entity_name,reg_location,business_scope,reg_status_std".split(","))) {
+          elem match {
+            case "name" => {
+              val dynamic_info = Map(
+                "code" -> "100101",
+                "description" -> "公司名称发生变化"
+              )
+              list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "name")
+            }
+            case "reg_capital" => { //注册资本需发生实质性变化
+              val old_reg_capital: String = RegCapitalAmount.getAmount(old_data("reg_capital"))
+              val new_reg_capital: String = RegCapitalAmount.getAmount(new_data("reg_capital"))
+              if (old_reg_capital != null && new_reg_capital != null) {
+                if (!old_reg_capital.equals(new_reg_capital)) {
+                  val dynamic_info = Map(
+                    "code" -> "100102",
+                    "description" -> "注册资本发生变化"
+                  )
+                  list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "reg_capital")
+                }
+              }
+            }
+
+            case "legal_entity_name" => {
+              val dynamic_info = Map(
+                "code" -> "100103",
+                "description" -> "法定代表人发生变化"
+              )
+              list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "legal_entity_name")
+            }
+
+            case "reg_location" => {
+              val dynamic_info = Map(
+                "code" -> "100104",
+                "description" -> "公司注册地址发生变化"
+              )
+              list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "reg_location")
+            }
+
+            case "business_scope" => {
+              val dynamic_info = Map(
+                "code" -> "100105",
+                "description" -> "公司经营范围发生变化"
+              )
+              list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "business_scope")
+            }
+
+            case "reg_status_std" => {
+              val dynamic_info = Map(
+                "code" -> "100106",
+                "description" -> "公司状态发生变化"
+              )
+              val reg_status_std = new_data("reg_status_std")
+              if (reg_status_std.contains("销")) {
+                list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "reg_status_std",
+                  association_entity_info = Seq(AssociationEntityInfo(keyno = company_id, name = company_name, risk_level = NgCompanyRiskLevelType.Caution, rta_info = null))
+                )
+              } else {
+                list = list :+ getCompanyDynamicRecord(change_extract, dynamic_info, "reg_status_std")
+              }
+            }
+            case _ => null
+          }
+        }
+      }
+
+      case _ => null
+    }
+
+    list
+  }
+
+}

+ 18 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.{ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/23 10:43
+ */
+case class company_holder() extends NgCompanyDynamicHandle {
+  override def flat_map:( ChangeExtract)=> Seq[CompanyDynamicRecord] = (c:ChangeExtract)=>{
+    null
+  }
+/*
+  override def group_by_key: (CompanyDynamicRecord) => String = (c: CompanyDynamicRecord) => {
+    println("group_by_key")
+    "xjk"
+  }*/
+}

+ 31 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/CompanyDynamicUtils.scala

@@ -0,0 +1,31 @@
+package com.winhc.bigdata.spark.ng.dynamic.utils
+
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, NgCompanyRiskLevelType}
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/22 15:46
+ */
+object CompanyDynamicUtils {
+
+  def getAssociationEntityInfo(keyno: String, name: String, risk_level: NgCompanyRiskLevelType.RiskLevelType, rta_info: String): Seq[AssociationEntityInfo] = {
+    Seq(AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = rta_info))
+  }
+
+
+  def default_filter(update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]): Boolean = {
+    if (biz_date == null) return false
+    if (update_type.equals("remove") || update_type.equals("other")) return false
+    if (update_type.equals("update") && change_fields.isEmpty) return false
+    true
+  }
+
+  def generateId(rowkey: String, biz_date: String, tn: String, random_num: String = null): String = {
+    if (StringUtils.isEmpty(random_num)) {
+      s"$rowkey@$biz_date@$tn"
+    } else {
+      s"$rowkey@$biz_date@$tn@$random_num"
+    }
+  }
+}