Selaa lähdekoodia

知识产权动态

xufei 4 vuotta sitten
vanhempi
commit
8ff1f6dade

+ 192 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -0,0 +1,192 @@
+package com.winhc.bigdata.spark.jobs.message
+
+import com.winhc.bigdata.spark.udf.MapAggs
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import org.json4s.DefaultFormats
+import org.json4s.jackson.Json
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+/**
+ * @Description: 知识产权动态变化
+ * @author π
+ * @date 2020/7/2719:15
+ */
+object IntellectualMessage {
+  def main(args: Array[String]): Unit = {
+
+    val project = "winhc_eci_dev"
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
+    //    IntellectualMessage(spark, project, "").calc()
+    IntellectualMessage(spark, project, "").transForm("20200727", "tmp_xf_ads_intellectual_message")
+    spark.stop()
+  }
+
+}
+
+
+case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def col2Map(pkg: String, day: String): Map[String, String] = {
+    Map(pkg -> day)
+  }
+
+  def calc(): Unit = {
+    spark.udf.register("col2Map", col2Map _)
+    spark.udf.register("MapAggs", new MapAggs())
+
+    val t1 = s"company_tm" //商标
+    val t2 = s"company_patent_list" //专利
+    val t3 = s"company_icp" //网站
+    val t4 = s"company_copyright_reg_list" //著作权
+    val t5 = s"company_certificate" //证书
+    val t6 = s"company_copyright_works_list" //软著作权
+
+    val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
+
+    val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $res_tb PARTITION(ds=$ds)
+         |select new_cid,date,m,
+         |max(coalesce(split(m[1], ',')[2],'0')) over (partition by new_cid) cnt1,
+         |max(coalesce(split(m[2], ',')[2],'0')) over (partition by new_cid) cnt2,
+         |max(coalesce(split(m[3], ',')[2],'0')) over (partition by new_cid) cnt3,
+         |max(coalesce(split(m[4], ',')[2],'0')) over (partition by new_cid) cnt4,
+         |max(coalesce(split(m[5], ',')[2],'0')) over (partition by new_cid) cnt5,
+         |max(coalesce(split(m[6], ',')[2],'0')) over (partition by new_cid) cnt6
+         |from (
+         |select new_cid,date,MapAggs(m1) m from (
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t1", "app_date", "1")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t2", "pub_date", "2")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t3", "examine_date", "3")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t4", "reg_time", "4")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t5", "start_date", "5")}
+         |union all
+         |select *,
+         |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
+         |from ${assemblySQL(s"$project", s"$t6", "reg_time", "6")}
+         |)a
+         |group by new_cid,date
+         |)h
+         |""".stripMargin)
+
+    transForm(ds, res_tb)
+
+  }
+
+  def transForm(ds: String, tb: String): Unit = {
+    sql(
+      s"""
+         |select *
+         |from ${project}.${tb}
+         |where ds = $ds
+         |""".stripMargin).map(r => {
+      val cid = r.getAs[String]("cid")
+      val date = r.getAs[String]("date")
+      val m = r.getAs[Map[String, String]](2)
+      val tags = descStr(m)
+      val content: String = Json(DefaultFormats).write(
+        mutable.LinkedHashMap("商标数量" -> r.getAs[String]("cnt1"),
+          "专利数量" -> r.getAs[String]("cnt2"),
+          "域名数量" -> r.getAs[String]("cnt3"),
+          "著作权数量" -> r.getAs[String]("cnt4"),
+          "证书数量" -> r.getAs[String]("cnt5"),
+          "软著数量" -> r.getAs[String]("cnt6")
+        )
+      )
+      (cid, date, tags, content)
+    }).toDF("cid", "date", "tags", "content").createOrReplaceTempView("res")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${tb}_res PARTITION (ds=$ds)
+         |select * from res
+         |""".stripMargin)
+  }
+
+  def descStr(m: Map[String, String]) = {
+    val list = ListBuffer[String]()
+    if (m.contains("1")) getStr(m("1"), "商标", list)
+    if (m.contains("2")) getStr(m("2"), "专利", list)
+    if (m.contains("3")) getStr(m("3"), "域名", list)
+    if (m.contains("4")) getStr(m("4"), "著作权", list)
+    if (m.contains("5")) getStr(m("5"), "证书", list)
+    if (m.contains("6")) getStr(m("6"), "软著", list)
+    list.toList.mkString(",")
+  }
+
+  def getStr(s: String, name: String, list: ListBuffer[String]) = {
+    val Array(inc_cnt, del_cnt, total_cnt) = s.split(",")
+    if (!inc_cnt.equals("0")) {
+      list += s"新增${name}数量${inc_cnt}个"
+    }
+    if (!del_cnt.equals("0")) {
+      list += s"移除${name}数量${del_cnt}个"
+    }
+  }
+
+  //存量历史计算
+  def assemblySQL(project: String, table: String, date: String, tp: String) = {
+    s"""
+       |(
+       |SELECT  new_cid
+       |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
+       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
+       |        ,sum(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) del_cnt
+       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
+       |        ,$tp type
+       |FROM    (
+       |         select * from (
+       |            SELECT  rowkey
+       |                    ,new_cid
+       |                    ,$date
+       |                    ,deleted
+       |            FROM    $project.inc_ads_$table x
+       |            WHERE   x.ds > 0
+       |            union all
+       |            SELECT  rowkey
+       |                    ,new_cid
+       |                    ,$date
+       |                    ,deleted
+       |            FROM    $project.ads_$table y
+       |            WHERE   y.ds > 0
+       |         )z
+       |            GROUP BY rowkey
+       |                     ,new_cid
+       |                     ,$date
+       |                     ,deleted
+       |        ) b
+       |)
+       |""".stripMargin
+  }
+}

+ 48 - 0
src/main/scala/com/winhc/bigdata/spark/udf/MapAggs.scala

@@ -0,0 +1,48 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}
+import scala.collection.mutable
+
+/**
+ * Create by π
+ * Desc: 聚合map
+ */
+
+class MapAggs(max: Int = 500) extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(
+    Array[StructField](StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)))
+  )
+
+  override def bufferSchema: StructType = StructType(
+    Array[StructField](StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)))
+  )
+
+  override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = {
+    buffer.update(0, Map[String, String]())
+  }
+
+  override def deterministic: Boolean = true
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val map1 = buffer.getMap[String, String](0).toMap
+    if (map1.size >= max) {
+      return
+    }
+    val map2 = input.getMap[String, String](0).toMap
+    val map_new: mutable.Map[String, String] = (scala.collection.mutable.Map[String, String](map1.toSeq: _*)
+      ++ scala.collection.mutable.Map[String, String](map2.toSeq: _*))
+    buffer.update(0, map_new)
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    update(buffer1, buffer2)
+  }
+
+  override def evaluate(buffer: Row): Any = {
+    buffer.getAs[Map[String, String]](0)
+  }
+}