|
@@ -13,8 +13,7 @@ import scala.collection.mutable
|
|
|
abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
|
|
|
override def filter: (String, String, Seq[String], Map[String, String], Map[String, String]) => Boolean = (update_type: String, biz_date: String, change_fields: Seq[String], old_data: Map[String, String], new_data: Map[String, String]) => update_type.equals("insert")
|
|
|
|
|
|
-
|
|
|
- def getDynamicInfo(): Map[String, String]
|
|
|
+ def getDynamicInfo(new_data: Map[String, String]): (String, Map[String, String])
|
|
|
|
|
|
def getAssociationEntityInfo(new_data: Map[String, String]): Seq[AssociationEntityInfo]
|
|
|
|
|
@@ -30,68 +29,72 @@ abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
|
|
|
val rowkey = change_extract.rowkey
|
|
|
val update_time = change_extract.update_time
|
|
|
var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
|
|
|
- list = list :+ getCompanyDynamicRecord(change_extract, getDynamicInfo(), null, getAssociationEntityInfo(new_data))
|
|
|
+ val dy = getDynamicInfo(new_data)
|
|
|
+ list = list :+ getCompanyDynamicRecord(change_extract, dy._1, dy._2, null, getAssociationEntityInfo(new_data))
|
|
|
list
|
|
|
}
|
|
|
|
|
|
-/*
|
|
|
- override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
|
|
|
- println("aaa")
|
|
|
- if (cdr.association_entity_info.length != 1) {
|
|
|
- throw new RuntimeException("entity error!")
|
|
|
- }
|
|
|
- cdr.association_entity_info(0).keyno
|
|
|
- }*/
|
|
|
- override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
|
|
|
- val ids = seq.map(_.id)
|
|
|
- val tn = seq(0).tn
|
|
|
- val association_entity_info = seq(0).association_entity_info
|
|
|
+ /*
|
|
|
+ override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
|
|
|
+ println("aaa")
|
|
|
+ if (cdr.association_entity_info.length != 1) {
|
|
|
+ throw new RuntimeException("entity error!")
|
|
|
+ }
|
|
|
+ cdr.association_entity_info(0).keyno
|
|
|
+ }*/
|
|
|
+ /*
|
|
|
+ override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
|
|
|
+ val ids = seq.map(_.id)
|
|
|
+ val tn = seq(0).tn
|
|
|
+ val association_entity_info = seq(0).association_entity_info
|
|
|
|
|
|
- val agg_detail_rowkey: Seq[RowkeyInfo] = ids.map(r => RowkeyInfo(rowkey = r, tn = this.getClass.getSimpleName))
|
|
|
- val change_time = getMax(seq.map(_.change_time))
|
|
|
- val update_time = getMax(seq.map(_.update_time))
|
|
|
- val create_time = getMax(seq.map(_.create_time))
|
|
|
+ val agg_detail_rowkey: Seq[RowkeyInfo] = ids.map(r => RowkeyInfo(rowkey = r, tn = this.getClass.getSimpleName))
|
|
|
+ val change_time = getMax(seq.map(_.change_time))
|
|
|
+ val update_time = getMax(seq.map(_.update_time))
|
|
|
+ val create_time = getMax(seq.map(_.create_time))
|
|
|
|
|
|
- val id = CompanyDynamicUtils.generateId(association_entity_info(0).keyno, change_time, tn)
|
|
|
- Seq(CompanyDynamicRecord(
|
|
|
- id = id, association_entity_info = association_entity_info, rowkey = null, tn = tn, update_type = "insert", dynamic_info = getDynamicInfo(), agg_detail_text = null, agg_detail_rowkey = agg_detail_rowkey, old_record = null, new_record = null, change_time = change_time, update_time = update_time, create_time = create_time
|
|
|
- ))
|
|
|
+ val id = CompanyDynamicUtils.generateId(association_entity_info(0).keyno, change_time, tn)
|
|
|
+ Seq(CompanyDynamicRecord(
|
|
|
+ id = id, association_entity_info = association_entity_info, rowkey = null, tn = tn, update_type = "insert", dynamic_info = getDynamicInfo(), agg_detail_text = null, agg_detail_rowkey = agg_detail_rowkey, old_record = null, new_record = null, change_time = change_time, update_time = update_time, create_time = create_time
|
|
|
+ ))
|
|
|
|
|
|
- }
|
|
|
+ }
|
|
|
+ */
|
|
|
|
|
|
- private def getMax(seq:Seq[String]): String ={
|
|
|
+ private def getMax(seq: Seq[String]): String = {
|
|
|
val strings = seq.filter(_ != null)
|
|
|
- if(strings.isEmpty){
|
|
|
+ if (strings.isEmpty) {
|
|
|
null
|
|
|
- }else{
|
|
|
+ } else {
|
|
|
strings.max
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
|
|
|
+ val id = cdr.id
|
|
|
+ val association_entity_info = cdr.association_entity_info
|
|
|
+ val rowkey = cdr.rowkey
|
|
|
+ val tn = cdr.tn
|
|
|
+ val update_type = cdr.update_type
|
|
|
+ val dynamic_info = cdr.dynamic_info
|
|
|
+ val agg_detail_text = cdr.agg_detail_text
|
|
|
+ val agg_detail_rowkey = cdr.agg_detail_rowkey
|
|
|
+ val old_record = cdr.old_record
|
|
|
+ val new_record = cdr.new_record
|
|
|
+ val change_time = cdr.change_time
|
|
|
+ val update_time = cdr.update_time
|
|
|
+ val create_time = cdr.create_time
|
|
|
+ association_entity_info.filter(r=>StringUtils.isNotEmpty(r.keyno)).map(r=>{
|
|
|
+ CompanyDynamicRecord(id = id, association_entity_info = Seq(r), rowkey = rowkey, tn = tn, update_type = update_type, dynamic_info = dynamic_info, agg_detail_text = agg_detail_text, agg_detail_rowkey = agg_detail_rowkey, old_record = old_record, new_record = new_record, change_time = change_time, update_time = update_time, create_time = create_time)
|
|
|
+ })
|
|
|
+ }
|
|
|
|
|
|
- override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
|
|
|
- val id = cdr.id
|
|
|
- val association_entity_info = cdr.association_entity_info
|
|
|
- val rowkey = cdr.rowkey
|
|
|
- val tn = cdr.tn
|
|
|
- val update_type = cdr.update_type
|
|
|
- val dynamic_info = cdr.dynamic_info
|
|
|
- val agg_detail_text = cdr.agg_detail_text
|
|
|
- val agg_detail_rowkey = cdr.agg_detail_rowkey
|
|
|
- val old_record = cdr.old_record
|
|
|
- val new_record = cdr.new_record
|
|
|
- val change_time = cdr.change_time
|
|
|
- val update_time = cdr.update_time
|
|
|
- val create_time = cdr.create_time
|
|
|
- association_entity_info.filter(r=>StringUtils.isNotEmpty(r.keyno)).map(r=>{
|
|
|
- CompanyDynamicRecord(id = id, association_entity_info = Seq(r), rowkey = rowkey, tn = tn, update_type = update_type, dynamic_info = dynamic_info, agg_detail_text = agg_detail_text, agg_detail_rowkey = agg_detail_rowkey, old_record = old_record, new_record = new_record, change_time = change_time, update_time = update_time, create_time = create_time)
|
|
|
- })
|
|
|
- }
|
|
|
-
|
|
|
+ */
|
|
|
protected def getEntity(json: String, id_key: String, name_key: String
|
|
|
- , risk_level: NgCompanyRiskLevelType.RiskLevelType //变更风险等级
|
|
|
- , rta_info: String //描述
|
|
|
- ): Seq[AssociationEntityInfo] = {
|
|
|
+ , risk_level: NgCompanyRiskLevelType.RiskLevelType //变更风险等级
|
|
|
+ , rta_info: String //描述
|
|
|
+ ): Seq[AssociationEntityInfo] = {
|
|
|
if (StringUtils.isEmpty(json)) {
|
|
|
Seq.empty
|
|
|
} else {
|