Browse Source

feat: 知产动态

许家凯 3 years ago
parent
commit
ddaa68238f

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -58,7 +58,8 @@ case class NgChangeExtract(s: SparkSession,
 
   val updateTimeMapping = Map(
     "wenshu_detail_combine" -> "update_date", //文书排序时间
-    "company_equity_info_list" -> "reg_date" //文书排序时间
+    "company_equity_info_list" -> "reg_date", //文书排序时间
+    "company_lawsuit" -> "pub_date" //文书排序时间
   )
   //不同name映射table
   val tabMapping =

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtractArgs.scala

@@ -68,5 +68,11 @@ object NgChangeExtractArgs {
       , "company_mortgage_people"
 
 
+      //intellectual
+      , "company_patent"
+      , "company_copyright_reg"
+      , "company_copyright_works"
+      , "company_certificate"
+
     ).map(r => NgChangeExtractArgs(tableName = r))
 }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/CompanyDynamicRecord.scala

@@ -3,7 +3,6 @@ package com.winhc.bigdata.spark.ng.dynamic
 import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic.utils.CollapseKeyArgs
-import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.Row
 
 /**
@@ -56,7 +55,8 @@ case class CompanyDynamicRecord(id: String,
     if (association_entity_info == null || association_entity_info.isEmpty) {
       return null
     }
-    val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
+    // todo 留下人名,为后期补id
+   /* val rec = association_entity_info.filter(r => StringUtils.isNotEmpty(r.keyno))
     if (rec.isEmpty) return null
 
     if (rec.length != association_entity_info.length)
@@ -73,7 +73,7 @@ case class CompanyDynamicRecord(id: String,
         dynamic_time,
         update_time,
         create_time
-      )
+      )*/
     this
   }
 

+ 5 - 4
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicArgs.scala

@@ -45,9 +45,10 @@ object NgCompanyDynamicArgs {
   def main(args: Array[String]): Unit = {
 //    val handles = ReflectUtils.subObject[NgCompanyDynamicHandle](classOf[NgCompanyDynamicHandle], this.getClass.getPackage.getName)
 //    val orNull1 = handles.find(_.getClass.getSimpleName.contains("auction_tracking")).get
-    val args1 = getStartArgs()
-    val orNull = args1.find(_.tn.equals("auction_tracking")).orNull
-    orNull.flat_map.apply(null)
-    println(args1)
+//    val args1 = getStartArgs()
+//    val orNull = args1.find(_.tn.equals("auction_tracking")).orNull
+//    orNull.flat_map.apply(null)
+//    val args2 = getAggArgs
+//    println(args2(0).group_by_key.apply(null))
   }
 }

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/NgCompanyDynamicHandle.scala

@@ -1,9 +1,13 @@
 package com.winhc.bigdata.spark.ng.dynamic
 
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic.utils.CompanyDynamicUtils
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 
 import scala.annotation.meta.{getter, setter}
+import scala.collection.mutable
 
 /**
  * @author: XuJiakai
@@ -68,4 +72,23 @@ trait NgCompanyDynamicHandle extends Serializable with Logging {
     )
   }
 
+  protected def getEntity(json: String, id_key: String, name_key: String
+                          , risk_level: NgCompanyRiskLevelType //变更风险等级
+                          , rta_info: String //描述
+                         ): Seq[AssociationEntityInfo] = {
+    if (StringUtils.isEmpty(json)) {
+      Seq.empty
+    } else {
+      val array = JSON.parseArray(json)
+      var list: mutable.Seq[AssociationEntityInfo] = mutable.Seq.empty
+      for (i <- 0 until array.size()) {
+        val jSONObject = array.getJSONObject(i)
+        val keyno = jSONObject.getString(id_key)
+        val name = jSONObject.getString(name_key)
+        list = list :+ AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = rta_info)
+      }
+      list
+    }
+  }
+
 }

+ 29 - 0
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/handle/company_tm.scala

@@ -0,0 +1,29 @@
+package com.winhc.bigdata.spark.ng.dynamic.handle
+
+import com.winhc.bigdata.spark.ng.dynamic.{AssociationEntityInfo, ChangeExtract, CompanyDynamicRecord, NgCompanyDynamicHandle, NgCompanyRiskLevelType}
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import org.apache.commons.lang3.StringUtils
+
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/7/20 16:40
+ */
+case class company_tm(is_inc: Boolean) extends NgCompanyDynamicHandle {
+  override def flat_map: ChangeExtract => Seq[CompanyDynamicRecord] = (change_extract: ChangeExtract) => {
+    val new_data = change_extract.new_data
+    var list: mutable.Seq[CompanyDynamicRecord] = mutable.Seq.empty
+    val entityInfo: Seq[AssociationEntityInfo] = Seq(
+      AssociationEntityInfo(keyno = new_data.getOrEmptyStr("keyno")
+        , name = new_data.getOrEmptyStr("applicant_cn")
+        , risk_level = NgCompanyRiskLevelType.Positive
+        , rta_info = null)
+    )
+    val info = entityInfo.filter(r => StringUtils.isNotEmpty(r.keyno))
+    if(info.nonEmpty){
+      list = list :+ getCompanyDynamicRecord(change_extract, null, null, null, info)
+    }
+    list
+  }
+}

+ 1 - 21
src/main/scala/com/winhc/bigdata/spark/ng/dynamic/utils/DailyAggHandle.scala

@@ -1,9 +1,6 @@
 package com.winhc.bigdata.spark.ng.dynamic.utils
 
-import com.alibaba.fastjson.JSON
-import com.winhc.bigdata.spark.ng.dynamic.NgCompanyRiskLevelType.NgCompanyRiskLevelType
 import com.winhc.bigdata.spark.ng.dynamic._
-import org.apache.commons.lang3.StringUtils
 
 import scala.collection.mutable
 
@@ -96,22 +93,5 @@ abstract class DailyAggHandle() extends NgCompanyDynamicHandle {
     }
 
     */
-  protected def getEntity(json: String, id_key: String, name_key: String
-                          , risk_level: NgCompanyRiskLevelType //变更风险等级
-                          , rta_info: String //描述
-                         ): Seq[AssociationEntityInfo] = {
-    if (StringUtils.isEmpty(json)) {
-      Seq.empty
-    } else {
-      val array = JSON.parseArray(json)
-      var list: mutable.Seq[AssociationEntityInfo] = mutable.Seq.empty
-      for (i <- 0 until array.size()) {
-        val jSONObject = array.getJSONObject(i)
-        val keyno = jSONObject.getString(id_key)
-        val name = jSONObject.getString(name_key)
-        list = list :+ AssociationEntityInfo(keyno = keyno, name = name, risk_level = risk_level, rta_info = rta_info)
-      }
-      list
-    }
-  }
+
 }

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/explode_tab.scala

@@ -59,8 +59,10 @@ case class explode_tab(s: SparkSession,
   def calc(tmpView: String): Unit = {
     val explode_args = args.map(f => {
       if (f.is_filter_null) {
+        //过滤到null
         s"LATERAL VIEW explode(json_2_array(${f.org_field},'${f.json_path}')) t_${f.org_field} AS ${f.alias}"
       } else {
+        //保留null值
         s"LATERAL VIEW OUTER explode(json_2_array(${f.org_field},'${f.json_path}')) t_${f.org_field} AS ${f.alias}"
       }
     }).mkString("\n")