Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
25933c280b

+ 8 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs.dynamic
 import java.util.Date
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.message.IntellectualMessage
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
@@ -184,7 +185,7 @@ object CompanyDynamic {
     , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
-    , Args(tableName = "company_employment", bName = 1, isAgg = true)
+    , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
@@ -218,12 +219,13 @@ object CompanyDynamic {
     , Args(tableName = "company_holder", bName = 1) //股东
     , Args(tableName = "company_annual_report_out_investment", bName = 1) //裁判文书
     , Args(tableName = "company_own_tax", bName = 1) //欠税公告
+    , Args(tableName = "company_employment", bName = 1, aggs = 2)//知识产权
   )
 
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
                           , bName: Int = 1
-                          , isAgg: Boolean = false)
+                          , aggs: Int = 0)
 
   def main(args: Array[String]): Unit = {
 
@@ -261,10 +263,10 @@ object CompanyDynamic {
     }
 
     val a = start.map(e => (e.tableName, () => {
-      if (e.isAgg) {
-        CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)
-      } else {
-        cd.calc(e.tableName, e.bName)
+      e.aggs match {
+        case 1 => CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)//招聘
+        case 2 => IntellectualMessage(spark, project).calc()//知识产权
+        case _ => cd.calc(e.tableName, e.bName)//通用处理
       }
       true
     }))

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala

@@ -25,11 +25,14 @@ object Justicase {
         ,"""{"case_no":"4","from":"3"}"""
 
         ,"""{"case_no":"8","from":"7"}"""
+        ,"""{"case_no":"7","from":"8"}"""
+
+        ,"""{"case_no":"101","from":"150"}"""
 
         ,"""{"case_no":"14","from":"13"}"""
         ,"""{"case_no":"13","from":"12"}"""
 
-        ,"""{"case_no":"19"}"""
+        ,"""{"case_no":"19","from":"12"}"""
 
         ,"""{"case_no":"28","from":"27"}"""
         ,"""{"case_no":"27","from":"26"}"""
@@ -57,7 +60,7 @@ object Justicase {
     val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
 
     // 将同一连通分量中各个边聚合
-    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcAttr, Set(t.dstId)))
+    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcId, Set(t.dstId)))
       .reduceByKey(_ ++ _)
 
     //逐一输出所有极大连通子图结果:起点Vertex,与其连通的Vertex集合。Vertex可以是案号形成的Long