소스 검색

fix: 移除知产动态聚合rowkey

许家凯 3 년 전
부모
커밋
d51f6fefb5
1개의 변경된 파일39개의 추가작업 그리고 10개의 파일을 삭제
  1. 39 10
      src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/intellectual.scala

+ 39 - 10
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/agg/intellectual.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.ng.dynamic.agg
 import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
 import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord, RowkeyInfo}
 import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
 
 /**
  * @author: XuJiakai
@@ -16,9 +17,25 @@ case class intellectual() extends AcrossTabAggHandle {
    * @return
    */
   override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
-    cdr.association_entity_info.map(e => {
-      CompanyDynamicRecord(id = cdr.id, association_entity_info = Seq(e), rowkey = cdr.rowkey, tn = cdr.tn, update_type = cdr.update_type, dynamic_code = null, dynamic_info = null, agg_detail_text = null, agg_detail_rowkey = null, biz_time = cdr.biz_time, dynamic_time = cdr.dynamic_time, update_time = cdr.update_time, create_time = cdr.create_time)
-    })
+    if (StringUtils.isEmpty(cdr.biz_time)) {
+      Seq.empty
+    } else {
+      cdr.association_entity_info.map(e => {
+        CompanyDynamicRecord(id = cdr.id
+          , association_entity_info = Seq(e)
+          , rowkey = cdr.rowkey
+          , tn = cdr.tn
+          , update_type = cdr.update_type
+          , dynamic_code = null
+          , dynamic_info = null
+          , agg_detail_text = null
+          , agg_detail_rowkey = null
+          , biz_time = cdr.biz_time
+          , dynamic_time = cdr.dynamic_time
+          , update_time = cdr.update_time
+          , create_time = cdr.create_time)
+      })
+    }
   }
 
   /**
@@ -39,7 +56,7 @@ case class intellectual() extends AcrossTabAggHandle {
    * @return
    */
   override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
-    cdr.association_entity_info(0).keyno
+    s"${cdr.association_entity_info(0).keyno}@@${cdr.dynamic_time}"
   }
 
   /**
@@ -49,15 +66,27 @@ case class intellectual() extends AcrossTabAggHandle {
    */
   override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
     val keyno = seq(0).association_entity_info(0).keyno
-    val agg_rowkey = seq.map(r => RowkeyInfo(rowkey = r.rowkey, tn = r.tn))
-    val biz_time = BaseUtil.nowDate()
-    val dynamic_time = biz_time
-    val update_time = biz_time
-    val create_time = biz_time
+    //    val agg_rowkey = seq.map(r => RowkeyInfo(rowkey = r.rowkey, tn = r.tn))
+    val biz_time = seq(0).biz_time
+    val dynamic_time = seq(0).dynamic_time
+    val update_time = BaseUtil.nowDate()
+    val create_time = update_time
     val stringToInt: Map[String, Int] = seq.map(r => (r.tn, 1)).groupBy(_._1)
       .mapValues(_.foldLeft(0)(_ + _._2))
     Seq(
-      CompanyDynamicRecord(id = CompanyDynamicUtils.generateId(keyno, BaseUtil.nowDate(), "intellectual"), association_entity_info = seq(0).association_entity_info, rowkey = null, tn = "intellectual", update_type = seq(0).update_type, dynamic_code = "201801", dynamic_info = stringToInt, agg_detail_text = null, agg_detail_rowkey = agg_rowkey, biz_time = biz_time, dynamic_time = dynamic_time, update_time = update_time, create_time = create_time)
+      CompanyDynamicRecord(id = CompanyDynamicUtils.generateId(keyno, biz_time, "intellectual")
+        , association_entity_info = seq(0).association_entity_info
+        , rowkey = null
+        , tn = "intellectual"
+        , update_type = seq(0).update_type
+        , dynamic_code = "201801"
+        , dynamic_info = stringToInt
+        , agg_detail_text = null
+        , agg_detail_rowkey = Seq.empty
+        , biz_time = biz_time
+        , dynamic_time = dynamic_time
+        , update_time = update_time
+        , create_time = create_time)
     )
   }