Browse Source

fix: 知识产权企业动态支持补数据

许家凯 4 năm trước cách đây
mục cha
commit
50acf614c6

+ 5 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -208,7 +208,7 @@ object CompanyDynamic {
     , Args(tableName = "company_liquidating_info", bName = 1)
 
 
-    , Args(tableName = "company_zxr_list", bName = 0)   //被执行人
+    , Args(tableName = "company_zxr_list", bName = 0) //被执行人
     , Args(tableName = "company_zxr_final_case", bName = 1) //终本案件
     , Args(tableName = "company_license_creditchina", bName = 1) //行政许可-信用中国
     , Args(tableName = "company_license_entpub", bName = 1) //行政许可-企业公示
@@ -224,7 +224,7 @@ object CompanyDynamic {
     , Args(tableName = "company_holder", bName = 1) //股东
     , Args(tableName = "company_annual_report_out_investment", bName = 1) //裁判文书
     , Args(tableName = "company_own_tax", bName = 1) //欠税公告
-    , Args(tableName = "intellectual", bName = 1, aggs = 2)//知识产权
+    , Args(tableName = "intellectual", bName = 1, aggs = 2) //知识产权
   )
 
   private case class Args(project: String = "winhc_eci_dev"
@@ -269,9 +269,9 @@ object CompanyDynamic {
 
     val a = start.map(e => (e.tableName, () => {
       e.aggs match {
-        case 1 => CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)//招聘
-        case 2 => IntellectualMessage(spark, project).calc()//知识产权
-        case _ => cd.calc(e.tableName, e.bName)//通用处理
+        case 1 => CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName) //招聘
+        case 2 => IntellectualMessage(spark, ds, project).calc() //知识产权
+        case _ => cd.calc(e.tableName, e.bName) //通用处理
       }
       true
     }))

+ 14 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -31,8 +31,8 @@ object IntellectualMessage {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv("IntellectualMessage", config)
-    IntellectualMessage(spark, project).calc()
-//    IntellectualMessage(spark, project).transForm()
+    IntellectualMessage(spark, null, project).calc()
+    //    IntellectualMessage(spark, project).transForm()
     spark.stop()
   }
 
@@ -40,6 +40,7 @@ object IntellectualMessage {
 
 
 case class IntellectualMessage(s: SparkSession, project: String,
+                               ds: String,
                                runOld: Boolean = false) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
@@ -58,7 +59,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
 
   val tn = "intellectual"
 
-  var ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+  //  var ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
   val remainDs = BaseUtil.getPartion(s"$project.ads_$t1", spark)
   val mapDs = BaseUtil.getPartion(s"$project.base_company_mapping", spark)
 
@@ -74,13 +75,13 @@ case class IntellectualMessage(s: SparkSession, project: String,
 
     sql(
       s"""
-        |SELECT  cid as new_cid
-        |FROM    $project.ads_change_extract
-        |WHERE   ds = $ds
-        |AND     tn in ('$t1','$t2','$t3','$t4','$t5','$t6')
-        |AND     type = 'insert'
-        |GROUP by cid
-        |""".stripMargin).cache().createOrReplaceTempView("mapping")
+         |SELECT  cid as new_cid
+         |FROM    $project.ads_change_extract
+         |WHERE   ds = $ds
+         |AND     tn in ('$t1','$t2','$t3','$t4','$t5','$t6')
+         |AND     type = 'insert'
+         |GROUP by cid
+         |""".stripMargin).cache().createOrReplaceTempView("mapping")
 
     sql(
       s"""
@@ -157,11 +158,11 @@ case class IntellectualMessage(s: SparkSession, project: String,
       val m = r.getAs[Map[String, String]](2)
       val rta_desc = descStr(m)
       var id = "-1"
-      try{
+      try {
         id = CompanyDynamicHandleUtils.getDynamicId(cid, "", biz_id, change_time)
 
-      }catch {
-        case ex:Exception => {
+      } catch {
+        case ex: Exception => {
           logError(ex.getMessage)
         }
       }