|
@@ -4,6 +4,7 @@ package com.winhc.bigdata.spark.ng.dynamic.handle
|
|
|
|
|
|
import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
|
|
import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
|
|
import com.winhc.bigdata.spark.implicits.MapHelper._
|
|
import com.winhc.bigdata.spark.implicits.MapHelper._
|
|
|
|
+import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
|
|
import org.apache.commons.lang3.StringUtils
|
|
import org.apache.commons.lang3.StringUtils
|
|
|
|
|
|
import scala.collection.mutable
|
|
import scala.collection.mutable
|
|
@@ -13,6 +14,18 @@ import scala.collection.mutable
|
|
* @date: 2021/7/20 16:40
|
|
* @date: 2021/7/20 16:40
|
|
*/
|
|
*/
|
|
case class company_certificate(is_inc: Boolean) extends NgCompanyDynamicHandle {
|
|
case class company_certificate(is_inc: Boolean) extends NgCompanyDynamicHandle {
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => {
|
|
|
|
+ if (CompanyDynamicUtils.default_filter(update_type, biz_date, change_fields, old_data, new_data)) {
|
|
|
|
+ if (update_type.equals("insert")) true
|
|
|
|
+ else
|
|
|
|
+ false
|
|
|
|
+ } else {
|
|
|
|
+ false
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
|
|
override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
|
|
val new_data = change_extract.new_data
|
|
val new_data = change_extract.new_data
|
|
var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
|
|
var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
|
|
@@ -23,7 +36,7 @@ case class company_certificate(is_inc: Boolean) extends NgCompanyDynamicHandle {
|
|
, rta_info = null)
|
|
, rta_info = null)
|
|
)
|
|
)
|
|
val info = entityInfo.filter(r => StringUtils.isNotEmpty(r.keyno))
|
|
val info = entityInfo.filter(r => StringUtils.isNotEmpty(r.keyno))
|
|
- if(info.nonEmpty){
|
|
|
|
|
|
+ if (info.nonEmpty) {
|
|
list = list :+ getCompanyDynamicRecord(change_extract, null, null, null, info)
|
|
list = list :+ getCompanyDynamicRecord(change_extract, null, null, null, info)
|
|
}
|
|
}
|
|
list
|
|
list
|