Browse Source

知识产权动态bugfix

xufei 4 years ago
parent
commit
7f471b11f7

+ 69 - 25
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -1,8 +1,12 @@
 package com.winhc.bigdata.spark.jobs.message
 
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandleUtils
 import com.winhc.bigdata.spark.udf.MapAggs
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Json
@@ -23,18 +27,18 @@ object IntellectualMessage {
 
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
-    val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
-        IntellectualMessage(spark, project, "").calc()
-//    IntellectualMessage(spark, project, "").transForm()
+    val spark = SparkUtils.InitEnv("IntellectualMessage", config)
+    IntellectualMessage(spark, project).calc()
+//    IntellectualMessage(spark, project).transForm()
     spark.stop()
   }
 
 }
 
 
-case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String,
+case class IntellectualMessage(s: SparkSession, project: String,
                                runOld: Boolean = false) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
@@ -48,21 +52,36 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
   val t5 = s"company_certificate" //证书
   val t6 = s"company_copyright_works_list" //软著作权
   val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
-  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
+  //  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
+  val res_tb_res = s"$project.ads_company_dynamic" //转换输出结果表
+
+  val tn = "intellectual"
 
-  val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+  var ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
   val remainDs = BaseUtil.getPartion(s"$project.ads_$t1", spark)
+  val mapDs = BaseUtil.getPartion(s"$project.base_company_mapping", spark)
 
   def col2Map(pkg: String, day: String): Map[String, String] = {
     Map(pkg -> day)
   }
 
   def calc(): Unit = {
+    println("start calc" + new Date())
     spark.udf.register("col2Map", col2Map _)
     spark.udf.register("MapAggs", new MapAggs())
 
     sql(
       s"""
+        |SELECT  cid as new_cid
+        |FROM    $project.ads_change_extract
+        |WHERE   ds = $ds
+        |AND     tn in ('$t1','$t2','$t3','$t4','$t5','$t6')
+        |AND     type = 'insert'
+        |GROUP by cid
+        |""".stripMargin).cache().createOrReplaceTempView("mapping")
+
+    sql(
+      s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $res_tb PARTITION(ds=$ds)
          |select new_cid,date,m,
          |max(coalesce(split(m[1], ',')[2],'0')) over (partition by new_cid) cnt1,
@@ -103,21 +122,48 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
 
     //转换输出格式
     transForm()
-
+    println("end calc" + new Date())
   }
 
   def transForm(): Unit = {
+    println("start transForm" + new Date())
     sql(
       s"""
+         |select a.*,b.cname from
+         |(
          |select *
          |from ${res_tb}
          |where ds = $ds
+         |)a
+         |left join
+         |(
+         |select cid,cname
+         |from
+         |$project.base_company_mapping
+         |where ds =$mapDs
+         |)b on a.cid = b.cid
          |""".stripMargin).map(r => {
       val cid = r.getAs[String]("cid")
-      val date = r.getAs[String]("date")
+      val cname = r.getAs[String]("cname")
+      val info_type = tn
+      val change_time = r.getAs[String]("date")
+      val biz_id = s"${cid}_$change_time"
+      val sub_info_type = ""
+      val info_risk_level = "2"
+      val winhc_suggest = ""
+      val create_time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")
       val m = r.getAs[Map[String, String]](2)
-      val tags = descStr(m)
-      val content: String = Json(DefaultFormats).write(
+      val rta_desc = descStr(m)
+      var id = "-1"
+      try{
+        id = CompanyDynamicHandleUtils.getDynamicId(cid, "", biz_id, change_time)
+
+      }catch {
+        case ex:Exception => {
+          logError(ex.getMessage)
+        }
+      }
+      val change_content: String = Json(DefaultFormats).write(
         mutable.LinkedHashMap("商标数量" -> r.getAs[String]("cnt1"),
           "专利数量" -> r.getAs[String]("cnt2"),
           "域名数量" -> r.getAs[String]("cnt3"),
@@ -126,14 +172,20 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
           "软著数量" -> r.getAs[String]("cnt6")
         )
       )
-      (cid, date, tags, content)
-    }).toDF("cid", "date", "tags", "content").createOrReplaceTempView("res")
+
+      (id, cid, cname, info_type, rta_desc, change_content, change_time, biz_id,
+        sub_info_type, info_risk_level, winhc_suggest, create_time)
+    }).toDF("id", "cid", "cname", "info_type", "rta_desc", "change_content", "change_time", "biz_id",
+      "sub_info_type", "info_risk_level", "winhc_suggest", "create_time").createOrReplaceTempView("res")
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${res_tb_res} PARTITION (ds=$ds)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${res_tb_res} PARTITION (ds='$ds',tn='$tn')
          |select * from res
+         |where id <> '-1'
          |""".stripMargin)
+
+    println("end transForm" + new Date())
   }
 
   def descStr(m: Map[String, String]) = {
@@ -205,15 +257,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |        ,f
          |FROM    (
          |            SELECT  b.*,"1" f
-         |            FROM    (
-         |                        SELECT  rowkey
-         |                                ,cid as new_cid
-         |                                ,biz_date as date
-         |                        FROM    $project.ads_change_extract
-         |                        WHERE   ds = $ds
-         |                        AND     tn = '$table'
-         |                        AND     type = 'insert'
-         |                    ) a
+         |            FROM    mapping a
          |            JOIN    (
          |                        SELECT  rowkey
          |                                ,new_cid
@@ -242,7 +286,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |                                ) d
          |                        WHERE   num = 1
          |                    ) b
-         |            ON      a.rowkey = b.rowkey
+         |            ON      a.new_cid = b.new_cid
          |            UNION ALL
          |            SELECT  rowkey
          |                    ,cid new_cid
@@ -254,7 +298,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |            AND     type = 'insert'
          |        ) e
          |)
-         |where f = "0"
+         |--where f = "0"
          |)
          |""".stripMargin
     }