Explorar o código

feat: 动态细节优化

许家凯 %!s(int64=3) %!d(string=hai) anos
pai
achega
b82ee96cbc

+ 60 - 22
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.ng.dynamic
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic.utils.CollapseKeyArgs
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.Row
 
 /**
@@ -43,11 +44,46 @@ case class CompanyDynamicRecord(id: String,
                                 dynamic_info: Map[String, Any],
                                 agg_detail_text: String,
                                 agg_detail_rowkey: Seq[RowkeyInfo],
-                                biz_time:String,
-                                dynamic_time:String,
+                                biz_time: String,
+                                dynamic_time: String,
                                 update_time: String,
                                 create_time: String
                                ) {
+
+  import com.winhc.bigdata.spark.implicits.Bool._
+
+  def customCopy(id: String = null,
+                 association_entity_info: Seq[AssociationEntityInfo] = null,
+                 rowkey: String = null,
+                 tn: String = null,
+                 update_type: String = null,
+                 dynamic_code: String = null,
+                 dynamic_info: Map[String, Any] = null,
+                 agg_detail_text: String = null,
+                 agg_detail_rowkey: Seq[RowkeyInfo] = null,
+                 biz_time: String = null,
+                 dynamic_time: String = null,
+                 update_time: String = null,
+                 create_time: String = null
+                ): CompanyDynamicRecord = {
+
+    val _id = (id == null) ? this.id | id
+    val _association_entity_info = (association_entity_info == null) ? this.association_entity_info | association_entity_info
+    val _rowkey = (rowkey == null) ? this.rowkey | rowkey
+    val _tn = (tn == null) ? this.tn | tn
+    val _update_type = (update_type == null) ? this.update_type | update_type
+    val _dynamic_code = (dynamic_code == null) ? this.dynamic_code | dynamic_code
+    val _dynamic_info = (dynamic_info == null) ? this.dynamic_info | dynamic_info
+    val _agg_detail_text = (agg_detail_text == null) ? this.agg_detail_text | agg_detail_text
+    val _agg_detail_rowkey = (agg_detail_rowkey == null) ? this.agg_detail_rowkey | agg_detail_rowkey
+    val _biz_time = (biz_time == null) ? this.biz_time | biz_time
+    val _dynamic_time = (dynamic_time == null) ? this.dynamic_time | dynamic_time
+    val _update_time = (update_time == null) ? this.update_time | update_time
+    val _create_time = (create_time == null) ? this.create_time | create_time
+    CompanyDynamicRecord(id = _id, association_entity_info = _association_entity_info, rowkey = _rowkey, tn = _tn, update_type = _update_type, dynamic_code = _dynamic_code, dynamic_info = _dynamic_info, agg_detail_text = _agg_detail_text, agg_detail_rowkey = _agg_detail_rowkey, biz_time = _biz_time, dynamic_time = _dynamic_time, update_time = _update_time, create_time = _create_time)
+  }
+
+
   def format(): CompanyDynamicRecord = {
     if (id == null) {
       return null
@@ -56,31 +92,33 @@ case class CompanyDynamicRecord(id: String,
       return null
     }
     // todo 留下人名,为后期补id
-   /* val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
-    if (rec.isEmpty) return null
-
-    if (rec.length != association_entity_info.length)
-      return CompanyDynamicRecord(id,
-        rec,
-        rowkey,
-        tn,
-        update_type,
-        dynamic_code,
-        dynamic_info,
-        agg_detail_text,
-        agg_detail_rowkey,
-        biz_time,
-        dynamic_time,
-        update_time,
-        create_time
-      )*/
+    /* val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
+     if (rec.isEmpty) return null
+
+     if (rec.length != association_entity_info.length)
+       return CompanyDynamicRecord(id,
+         rec,
+         rowkey,
+         tn,
+         update_type,
+         dynamic_code,
+         dynamic_info,
+         agg_detail_text,
+         agg_detail_rowkey,
+         biz_time,
+         dynamic_time,
+         update_time,
+         create_time
+       )*/
     this
   }
 
+  import com.winhc.bigdata.spark.implicits.BaseHelper._
+
   def to_row(): Row = {
     val risk_level_str = association_entity_info.map(_.risk_level).distinct.mkString(",")
-    val risk_level_detail = association_entity_info.map(r => s"${r.keyno}@@${r.risk_level}").distinct.mkString(",")
-    val agg_detail_rowkey_str: String = if (agg_detail_rowkey == null) null else agg_detail_rowkey.map(_.toStr).mkString(",")
+    val risk_level_detail = association_entity_info.filter(r => StringUtils.isNotBlank(r.keyno)).map(r => s"${r.keyno}@@${r.risk_level}").distinct.mkStringOrNull(",")
+    val agg_detail_rowkey_str: String = if (agg_detail_rowkey == null || agg_detail_rowkey.isEmpty) null else agg_detail_rowkey.map(_.toStr).mkString(",")
     Row(id
       , association_entity_info.toJson()
       , rowkey

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/BusinessInfo.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.ng.dynamic.agg
+
+import com.winhc.bigdata.spark.implicits.BaseHelper._
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
+import com.winhc.bigdata.spark.ng.dynamic.utils.{BusinessInfoDynamicInfoMap, ChangeContent}
+import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/28 17:36
+ */
+case class BusinessInfo() extends AcrossTabAggHandle {
+  /**
+   * 需要聚合的维度
+   *
+   * @return
+   */
+  override def getTables: Seq[String] = Seq("company", "company_staff")
+
+  /**
+   * 聚合的key,相同的key会聚合到一起
+   *
+   * @return
+   */
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    if (info == null) {
+      s"${cdr.id}_${cdr.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label}"
+    } else {
+      s"${info.keyno}_${cdr.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label}"
+    }
+  }
+
+  /**
+   * 聚合处理程序  flat_map
+   *
+   * @return
+   */
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    val contents: Seq[ChangeContent] = seq.flatMap(r => {
+      r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].content
+    })
+    val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
+    val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
+    Seq(seq(0).customCopy(dynamic_info = str.toAnyMap()))
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company.scala

@@ -248,4 +248,5 @@ case class company(is_inc: Boolean) extends NgCompanyDynamicHandle {
     list
   }
 
+
 }

+ 21 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_equity_pledge_holder.scala

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.ng.dynamic.utils.{DailyAggHandle, SimpleDailyDynamic}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/29 14:25
+ */
+case class company_equity_pledge_holder(is_inc: Boolean) extends DailyAggHandle with SimpleDailyDynamic {
+  override def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo] = {
+    Seq(
+      AssociationEntityInfo(keyno = new_data.getOrEmptyStr("holder_id")
+        , name = new_data.getOrEmptyStr("holder_name")
+        , risk_level = NgCompanyRiskLevelType.Caution, "质押人")
+      ,AssociationEntityInfo(keyno = new_data.getOrEmptyStr("company_id")
+        , name = new_data.getOrEmptyStr("company_name")
+        , risk_level = NgCompanyRiskLevelType.Prompt, "质押人参股企业")
+    )
+  }
+}

+ 41 - 12
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_holder.scala

@@ -1,6 +1,8 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
+import com.winhc.bigdata.spark.implicits.BaseHelper._
 import com.winhc.bigdata.spark.implicits.Bool._
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic._
 import com.winhc.bigdata.spark.ng.dynamic.utils._
@@ -220,21 +222,48 @@ case class company_holder(is_inc: Boolean) extends NgCompanyDynamicHandle {
     list
   }
 
-  /*
+  /**
+   * group_by前置处理程。flat_map
+   *
+   * @return
+   */
+  override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
+    val _id = cdr.id
+    val _association_entity_info = cdr.association_entity_info
+    val _rowkey = cdr.rowkey
+    val _tn = cdr.tn
+    val _update_type = cdr.update_type
+    val _dynamic_code = cdr.dynamic_code
+    val _dynamic_info = cdr.dynamic_info
+    val _agg_detail_text = cdr.agg_detail_text
+    val _agg_detail_rowkey = cdr.agg_detail_rowkey
+    val _biz_time = cdr.biz_time
+    val _dynamic_time = cdr.dynamic_time
+    val _update_time = cdr.update_time
+    val _create_time = cdr.create_time
+    _association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno)).map(r => {
+      CompanyDynamicRecord(id = _id, association_entity_info = Seq(r), rowkey = _rowkey, tn = _tn, update_type = _update_type, dynamic_code = _dynamic_code, dynamic_info = _dynamic_info, agg_detail_text = _agg_detail_text, agg_detail_rowkey = _agg_detail_rowkey, biz_time = _biz_time, dynamic_time = _dynamic_time, update_time = _update_time, create_time = _create_time)
+    })
+  }
 
-    override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-      val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
-      if (info == null) {
-        cdr.id
-      } else {
-        info.keyno
-      }
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    if (info == null) {
+      cdr.id
+    } else {
+      info.keyno
     }
+  }
 
-    override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
-      seq
-    }
-  */
+
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    val contents: Seq[ChangeContent] = seq.flatMap(r => {
+      r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].content
+    })
+    val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
+    val str = BusinessInfoDynamicInfoMap(label = label, content = contents).toJson()
+    Seq(seq(0).customCopy(dynamic_info = str.toAnyMap()))
+  }
 
 
   def getRealCapital(str: String): String = {

+ 40 - 30
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_staff.scala

@@ -206,39 +206,49 @@ case class company_staff(is_inc: Boolean) extends NgCompanyDynamicHandle {
     }
     list
   }
-  /*
-    override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-      val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
-      if (info == null) {
-        cdr.id
-      } else {
-        info.keyno
-      }
-    }
 
 
-    override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
-      seq
-    }*/
-}
-
-/*
-object company_staff {
-
-  private val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z ]".r
-
-  def main(args: Array[String]): Unit = {
-    val beforeType = "总经理,执行董事"
-    val afterType = "执行董事,总经理"
-
-    val v1 = pattern.split(beforeType).sorted.mkString(",")
-    val v2 = pattern.split(afterType).sorted.mkString(",")
+  /**
+   * group_by前置处理程。flat_map
+   *
+   * @return
+   */
+  override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
+    val _id = cdr.id
+    val _association_entity_info = cdr.association_entity_info
+    val _rowkey = cdr.rowkey
+    val _tn = cdr.tn
+    val _update_type = cdr.update_type
+    val _dynamic_code = cdr.dynamic_code
+    val _dynamic_info = cdr.dynamic_info
+    val _agg_detail_text = cdr.agg_detail_text
+    val _agg_detail_rowkey = cdr.agg_detail_rowkey
+    val _biz_time = cdr.biz_time
+    val _dynamic_time = cdr.dynamic_time
+    val _update_time = cdr.update_time
+    val _create_time = cdr.create_time
+    _association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno)).map(r => {
+      CompanyDynamicRecord(id = _id, association_entity_info = Seq(r), rowkey = _rowkey, tn = _tn, update_type = _update_type, dynamic_code = _dynamic_code, dynamic_info = _dynamic_info, agg_detail_text = _agg_detail_text, agg_detail_rowkey = _agg_detail_rowkey, biz_time = _biz_time, dynamic_time = _dynamic_time, update_time = _update_time, create_time = _create_time)
+    })
+  }
 
-    println(v1)
-    println(v2)
-    if (v1.equals(v2))
-      println("aa")
+  override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
+    val info = cdr.association_entity_info.filter(_.keyno != null).find(r => r.keyno.length == 32).orNull
+    if (info == null) {
+      cdr.id
+    } else {
+      info.keyno
+    }
+  }
 
 
+  override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
+    val contents: Seq[ChangeContent] = seq.flatMap(r => {
+      r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].content
+    })
+    val label = seq.map(r => r.dynamic_info("base").asInstanceOf[BusinessInfoDynamicInfoMap].label).distinct.mkString(",")
+    val str = BusinessInfoDynamicInfoMap(label = label, content = contents)
+    Seq(seq(0).customCopy(dynamic_info = str.toMap()))
   }
-}*/
+}
+

+ 2 - 9
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/zxr_evaluate_results.scala

@@ -3,21 +3,14 @@
 package com.winhc.bigdata.spark.ng.dynamic.handle
 
 import com.winhc.bigdata.spark.implicits.MapHelper._
-import com.winhc.bigdata.spark.ng.dynamic.utils.DailyAggHandle
+import com.winhc.bigdata.spark.ng.dynamic.utils.{DailyAggHandle, SimpleDailyDynamic}
 import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, NgCompanyRiskLevelType}
 
 /**
  * @author: XuJiakai
  * @date: 2021/6/29 15:07
  */
-case class zxr_evaluate_results(is_inc:Boolean) extends DailyAggHandle {
-  override def getDynamicInfo(new_data: Map[String, String]): (String, Map[String, Any]) = {
-    ("",Map(
-      "code" -> "301901"
-      , "description" -> "新增询价评估"
-    ))
-  }
-
+case class zxr_evaluate_results(is_inc:Boolean) extends DailyAggHandle   with SimpleDailyDynamic{
   override def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo] = {
     val keyno = new_data.getOrEmptyStr("keyno")
     val name = new_data.getOrEmptyStr("name")

+ 18 - 3
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DynamicDimConfiguration.scala

@@ -261,6 +261,24 @@ object DynamicDimConfiguration {
       )
     )
 
+    , "zxr_evaluate_results" -> Map(
+      "code" -> Map(
+        "insert" -> "304501"
+      ),
+      "sample" -> true,
+      "name" -> "询价评估",
+      "list_field" -> Map(
+        "keyno" -> Map()
+        , "name" -> Map()
+        , "money" -> Map()
+        , "asset_type" -> Map()
+        , "reference_price" -> Map()
+        , "case_no" -> Map()
+        , "court_name" -> Map()
+        , "publish_time" -> Map()
+      )
+    )
+
 
     , "company_brief_cancel_announcement" -> Map(
       "code" -> Map(
@@ -462,7 +480,6 @@ object DynamicDimConfiguration {
     )
 
 
-
     , "company_judicial_assistance" -> Map(
       "code" -> Map(
         "insert" -> "304101"
@@ -511,7 +528,5 @@ object DynamicDimConfiguration {
     )
 
 
-
-
   )
 }