|
@@ -0,0 +1,64 @@
|
|
|
+package com.winhc.bigdata.spark.ng.dynamic.agg
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
|
|
|
+import com.winhc.bigdata.spark.ng.dynamic.{AcrossTabAggHandle, CompanyDynamicRecord, RowkeyInfo}
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * @date: 2021/7/20 16:46
|
|
|
+ */
|
|
|
+case class intellectual() extends AcrossTabAggHandle {
|
|
|
+
|
|
|
+ /**
|
|
|
+ * group_by前置处理程。flat_map
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ override def group_by_pre: CompanyDynamicRecord => Seq[CompanyDynamicRecord] = (cdr: CompanyDynamicRecord) => {
|
|
|
+ cdr.association_entity_info.map(e => {
|
|
|
+ CompanyDynamicRecord(id = cdr.id, association_entity_info = Seq(e), rowkey = cdr.rowkey, tn = cdr.tn, update_type = cdr.update_type, dynamic_code = null, dynamic_info = null, agg_detail_text = null, agg_detail_rowkey = null, biz_time = cdr.biz_time, dynamic_time = cdr.dynamic_time, update_time = cdr.update_time, create_time = cdr.create_time)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 需要聚合的维度
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ override def getTables: Seq[String] = Seq("company_tm"
|
|
|
+ , "company_patent"
|
|
|
+ , "company_copyright_reg"
|
|
|
+ , "company_copyright_works"
|
|
|
+ , "company_certificate"
|
|
|
+ , "company_icp")
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 聚合的key,相同的key会聚合到一起
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ override def group_by_key: CompanyDynamicRecord => String = (cdr: CompanyDynamicRecord) => {
|
|
|
+ cdr.association_entity_info(0).keyno
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 聚合处理程序 flat_map
|
|
|
+ *
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ override def group_by_flat_map: Seq[CompanyDynamicRecord] => Seq[CompanyDynamicRecord] = (seq: Seq[CompanyDynamicRecord]) => {
|
|
|
+ val keyno = seq(0).association_entity_info(0).keyno
|
|
|
+ val agg_rowkey = seq.map(r => RowkeyInfo(rowkey = r.rowkey, tn = r.tn))
|
|
|
+ val biz_time = BaseUtil.nowDate()
|
|
|
+ val dynamic_time = biz_time
|
|
|
+ val update_time = biz_time
|
|
|
+ val create_time = biz_time
|
|
|
+ val stringToInt: Map[String, Int] = seq.map(r => (r.tn, 1)).groupBy(_._1)
|
|
|
+ .mapValues(_.foldLeft(0)(_ + _._2))
|
|
|
+ Seq(
|
|
|
+ CompanyDynamicRecord(id = CompanyDynamicUtils.generateId(keyno, BaseUtil.nowDate(), "intellectual"), association_entity_info = seq(0).association_entity_info, rowkey = null, tn = "intellectual", update_type = seq(0).update_type, dynamic_code = "201801", dynamic_info = stringToInt, agg_detail_text = null, agg_detail_rowkey = agg_rowkey, biz_time = biz_time, dynamic_time = dynamic_time, update_time = update_time, create_time = create_time)
|
|
|
+ )
|
|
|
+ }
|
|
|
+
|
|
|
+}
|