Browse Source

feat: 企业动态

许家凯 3 years ago
parent
commit
689dc1a378

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.ng.dynamic
 
 import org.apache.spark.sql.Row
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+import org.apache.commons.lang3.StringUtils
 
 /**
  * @author: XuJiakai
@@ -53,7 +54,7 @@ case class CompanyDynamicRecord(id: String,
     if (association_entity_info == null || association_entity_info.isEmpty) {
       return null
     }
-    val rec = association_entity_info.filter(_.keyno != null)
+    val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
     if (rec.isEmpty) return null
 
     if (rec.length != association_entity_info.length)

+ 19 - 8
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamic.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.ng.dynamic
 
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.BaseUtil.{getYesterday, isWindows}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
@@ -60,12 +60,16 @@ case class NgCompanyDynamic(s: SparkSession,
   }
 
   def calc(): Unit = {
+    val ds = getYesterday()
+    val where = args_map.keys.map(r => s""" "$r" """).mkString("(", ",", ")")
+
     val rdd: RDD[CompanyDynamicRecord] = sql(
       s"""
          |SELECT  *
-         |--- FROM    winhc_ng.bds_change_extract
-         |FROM    winhc_ng.bds_change_extract_test
-         |WHERE   ds > 0 and tn in ('company','company_holder')
+         |FROM    winhc_ng.bds_change_extract
+         |--- FROM    winhc_ng.bds_change_extract_test
+         |WHERE   ds = $ds
+         |AND    tn in $where
          |""".stripMargin).rdd.map(r => {
       val value = r.getAs[String]("change_fields")
       val change_fields: Seq[String] = if (StringUtils.isEmpty(value)) Seq.empty else value.split(",")
@@ -102,10 +106,17 @@ case class NgCompanyDynamic(s: SparkSession,
       if (elem.group_by_key == null) {
         tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
       } else {
-
-        tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
-          .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
-          .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
+        if (elem.group_by_pre == null) {
+          tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+            .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
+            .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
+        } else {
+          tmp_rdd = rdd.filter(r => elem.tn.equals(r.tn))
+            .flatMap(r => args_map(elem.tn).group_by_pre.apply(r))
+            .filter(_ != null)
+            .groupBy(r => args_map(elem.tn).group_by_key.apply(r))
+            .flatMap(r => args_map(elem.tn).group_by_flat_map(r._2.toSeq))
+        }
       }
       rdd_map = rdd_map + (elem.tn -> tmp_rdd)
     }

+ 6 - 1
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicArgs.scala

@@ -11,6 +11,7 @@ case class NgCompanyDynamicArgs(
                                  tn: String
                                  , flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord]
                                  , filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = CompanyDynamicUtils.default_filter
+                                 , group_by_pre: (CompanyDynamicRecord) => Seq[CompanyDynamicRecord]
                                  , group_by_key: (CompanyDynamicRecord) => String = null
                                  , group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord] = null //group by key:
                                ) {
@@ -30,7 +31,7 @@ object NgCompanyDynamicArgs {
     val handles = ReflectUtils.subObject[NgCompanyDynamicHandle](classOf[NgCompanyDynamicHandle], this.getClass.getPackage.getName)
     handles.map(ch => {
       val tn: String = ch.getClass.getSimpleName
-      NgCompanyDynamicArgs(tn = tn, flat_map = ch.flat_map, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map, filter = ch.filter)
+      NgCompanyDynamicArgs(tn = tn, flat_map = ch.flat_map, group_by_pre = ch.group_by_pre, group_by_key = ch.group_by_key, group_by_flat_map = ch.group_by_flat_map, filter = ch.filter)
     })
   }
 
@@ -42,7 +43,11 @@ object NgCompanyDynamicArgs {
   }
 
   def main(args: Array[String]): Unit = {
+//    val handles = ReflectUtils.subObject[NgCompanyDynamicHandle](classOf[NgCompanyDynamicHandle], this.getClass.getPackage.getName)
+//    val orNull1 = handles.find(_.getClass.getSimpleName.contains("auction_tracking")).get
     val args1 = getStartArgs
+    val orNull = args1.find(_.tn.equals("auction_tracking")).orNull
+    orNull.flat_map.apply(null)
     println(args1)
   }
 }

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -13,6 +13,14 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
 
   def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord]
 
+  /**
+   * group_by前置处理程。flat_map
+   *
+   * @return
+   */
+  def group_by_pre: (CompanyDynamicRecord) => Seq[CompanyDynamicRecord] = null
+
+
   def group_by_key: (CompanyDynamicRecord) => String = null
 
   def group_by_flat_map: (Seq[CompanyDynamicRecord]) => Seq[CompanyDynamicRecord] = null

+ 34 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/auction_tracking.scala

@@ -0,0 +1,34 @@
+
+
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.ng.dynamic.utils.DailyAggHandle
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/29 15:07
+ */
+case class auction_tracking() extends DailyAggHandle {
+  override def getDynamicInfo(): Map[String, String] = {
+    null
+  }
+
+  override def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo] = {
+    try {
+      var list: mutable.Seq[AssociationEntityInfo] = mutable.Seq.empty
+      val company_info = new_data.getOrEmptyStr("company_info")
+      list = list ++ getEntity(company_info, "company_id", "company_name", NgCompanyRiskLevelType.Caution, null)
+      list
+    } catch {
+      case e: Exception => {
+        logError(e.getMessage, e)
+        Seq.empty
+      }
+    }
+  }
+}
+

+ 109 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DailyAggHandle.scala

@@ -0,0 +1,109 @@
+package com.winhc.bigdata.spark.ng.dynamic.utils
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.ng.dynamic._
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/6/29 14:29
+ */
+abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
+  override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => update_type.equals("insert")
+
+
+  def getDynamicInfo(): Map[String, String]
+
+  def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo]
+
+  override def flat_map: (ChangeExtract) => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val change_fields = change_extract.change_fields
+    val tn = change_extract.tn
+    val company_id = change_extract.company_id
+    val company_name = change_extract.company_name
+    val update_type = change_extract.update_type
+    val old_data = change_extract.old_data
+    val new_data = change_extract.new_data
+    val biz_date = change_extract.biz_date
+    val rowkey = change_extract.rowkey
+    val update_time = change_extract.update_time
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+    list = list :+ getCompanyDynamicRecord(change_extract, getDynamicInfo(), null, getAssociationEntityInfo(new_data))
+    list
+  }
+
+/*
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    println("aaa")
+    if (cdr.association_entity_info.length != 1) {
+      throw new RuntimeException("entity error!")
+    }
+    cdr.association_entity_info(0).keyno
+  }*/
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    val ids = seq.map(_.id)
+    val tn = seq(0).tn
+    val association_entity_info = seq(0).association_entity_info
+
+    val agg_detail_rowkey: Seq[RowkeyInfo] = ids.map(r => RowkeyInfo(rowkey = r, tn = this.getClass.getSimpleName))
+    val change_time = getMax(seq.map(_.change_time))
+    val update_time = getMax(seq.map(_.update_time))
+    val create_time = getMax(seq.map(_.create_time))
+
+    val id = CompanyDynamicUtils.generateId(association_entity_info(0).keyno, change_time, tn)
+    Seq(CompanyDynamicRecord(
+      id = id, association_entity_info = association_entity_info, rowkey = null, tn = tn, update_type = "insert", dynamic_info = getDynamicInfo(), agg_detail_text = null, agg_detail_rowkey = agg_detail_rowkey, old_record = null, new_record = null, change_time = change_time, update_time = update_time, create_time = create_time
+    ))
+
+  }
+
+  private def getMax(seq:Seq[String]): String ={
+    val strings = seq.filter(_ != null)
+    if(strings.isEmpty){
+      null
+    }else{
+      strings.max
+    }
+  }
+
+
+  override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
+    val id = cdr.id
+    val association_entity_info = cdr.association_entity_info
+    val rowkey = cdr.rowkey
+    val tn = cdr.tn
+    val update_type = cdr.update_type
+    val dynamic_info = cdr.dynamic_info
+    val agg_detail_text = cdr.agg_detail_text
+    val agg_detail_rowkey = cdr.agg_detail_rowkey
+    val old_record = cdr.old_record
+    val new_record = cdr.new_record
+    val change_time = cdr.change_time
+    val update_time = cdr.update_time
+    val create_time = cdr.create_time
+    association_entity_info.filter(r=>StringUtils.isNotEmpty(r.keyno)).map(r=>{
+      CompanyDynamicRecord(id = id, association_entity_info = Seq(r), rowkey = rowkey, tn = tn, update_type = update_type, dynamic_info = dynamic_info, agg_detail_text = agg_detail_text, agg_detail_rowkey = agg_detail_rowkey, old_record = old_record, new_record = new_record, change_time = change_time, update_time = update_time, create_time = create_time)
+    })
+  }
+
+  protected def getEntity(json: String, id_key: String, name_key: String
+                        , risk_level: NgCompanyRiskLevelType.RiskLevelType //变更风险等级
+                        , rta_info: String //描述
+                       ): Seq[AssociationEntityInfo] = {
+    if (StringUtils.isEmpty(json)) {
+      Seq.empty
+    } else {
+      val array = JSON.parseArray(json)
+      var list: mutable.Seq[AssociationEntityInfo] = mutable.Seq.empty
+      for (i <- 0 until array.size()) {
+        val jSONObject = array.getJSONObject(i)
+        val keyno = jSONObject.getString(id_key)
+        val name = jSONObject.getString(name_key)
+        list = list :+ AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = rta_info)
+      }
+      list
+    }
+  }
+}